Browse Source

websocket 분리 작업

master
노승철 2 years ago
parent
commit
191c9ac033
  1. 10
      pom.xml
  2. 7
      src/main/java/com/palnet/PavWebsocketApplication.java
  3. 122
      src/main/java/com/palnet/comn/model/CtrCntrlModel.java
  4. 65
      src/main/java/com/palnet/comn/model/GPDatabaseModel.java
  5. 15
      src/main/java/com/palnet/comn/model/GPHistoryModel.java
  6. 70
      src/main/java/com/palnet/comn/model/GPModel.java
  7. 153
      src/main/java/com/palnet/comn/utils/DateUtils.java
  8. 80
      src/main/java/com/palnet/comn/utils/JsonUtils.java
  9. 79
      src/main/java/com/palnet/process/message/config/MessageConfig.java
  10. 79
      src/main/java/com/palnet/process/message/consumer/MessageConsumer.java
  11. 119
      src/main/java/com/palnet/server/WebServer.java
  12. 22
      src/main/java/com/palnet/server/codec/WebPayLoadDecoder.java
  13. 26
      src/main/java/com/palnet/server/codec/WebPayLoadEncorder.java
  14. 26
      src/main/java/com/palnet/server/collection/ChannelCollection.java
  15. 80
      src/main/java/com/palnet/server/handler/WebHandler.java
  16. 36
      src/main/java/com/palnet/server/initializer/WebInitializer.java
  17. 33
      src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java
  18. 66
      src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java
  19. 12
      src/main/resources/application.yml

10
pom.xml

@ -37,6 +37,16 @@
<artifactId>netty-all</artifactId>
<version>4.1.77.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<version>2.4.4</version>
</dependency>
</dependencies>
<build>

7
src/main/java/com/palnet/PavWebsocketApplication.java

@ -1,7 +1,9 @@
package com.palnet;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class PavWebsocketApplication {
@ -10,4 +12,9 @@ public class PavWebsocketApplication {
SpringApplication.run(PavWebsocketApplication.class, args);
}
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper();
}
}

122
src/main/java/com/palnet/comn/model/CtrCntrlModel.java

@ -0,0 +1,122 @@
package com.palnet.comn.model;
import com.palnet.comn.utils.DateUtils;
import io.netty.util.internal.StringUtil;
import lombok.Data;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@Data
public class CtrCntrlModel implements Comparable<CtrCntrlModel>{
private String messageTypeCd;
private String controlId;
private String objectTypeCd;
private String objectId;
private Double lat;
private Double lng;
private String elevType;
private Double elev;
private String speedType;
private Double speed;
private Double betteryLevel;
private Double betteryVoltage;
private String takeOffPositon;
private String dronStatus;
private Double heading;
private String terminalRcvDt;
private String serverRcvDt;
private String controlStartDt;
private Double moveDistance;
private String moveDistanceType;
// 환경센서 필드
private Double sensorCo;
private Double sensorSo2;
private Double sensorNo2;
private Double sensorO3;
private Double sensorDust;
private List<Map<String , Double>> lastHistory;
@Override
public int compareTo(CtrCntrlModel o) {
if(!StringUtil.isNullOrEmpty(o.getControlStartDt()) && !StringUtil.isNullOrEmpty(controlStartDt)) {
long targetStartDt = Long.parseLong(o.getControlStartDt());
long startDt = Long.parseLong(controlStartDt);
if(startDt == targetStartDt) {
return 0;
}else if(startDt > targetStartDt) {
return 1;
}else if(startDt < targetStartDt) {
return -1;
}
}
return 0;
}
public static void main(String [] args) {
List<CtrCntrlModel> list = new ArrayList<CtrCntrlModel>();
try {
for(int i = 0 ; i < 5 ; i++) {
Thread.sleep(1000);
System.out.println(">>>" + DateUtils.getCurrentTime());
CtrCntrlModel model = new CtrCntrlModel();
model.setControlStartDt(DateUtils.getCurrentTime());
list.add(model);
}
list.sort(Comparator.naturalOrder());
System.out.println("오름차순 정렬");
for(CtrCntrlModel data :list) {
System.out.println(data.getControlStartDt());
}
list.sort(Comparator.reverseOrder());
System.out.println("내림차순 정렬");
for(CtrCntrlModel data :list) {
System.out.println(data.getControlStartDt());
}
}catch(Exception e) {
e.printStackTrace();
}
}
}

65
src/main/java/com/palnet/comn/model/GPDatabaseModel.java

@ -0,0 +1,65 @@
package com.palnet.comn.model;
import lombok.Data;
@Data
public class GPDatabaseModel {
private String typeCd; // 01 : 최초 들어온 데이터 , 99 : 종료 시킬 데이터
private String messageType;
private String terminalId;
private Double moveDistance = 0.0;
private String moveDistanceType;
private String prodNumber;
private String controlId;
private String objectType;
private String objectId;
private Double lat = 0.0;
private Double lng = 0.0;
private String elevType;
private Double elev = 0.0;
private String speedType;
private Double speed = 0.0;
private Double betteryLevel = 0.0;
private Double betteryVoltage = 0.0;
private String dronStatus;
private Double heading = 0.0;
private String terminalRcvDt;
private String serverRcvDt;
private String controlStartDt;
private String controlEndDt;
private String areaTrnsYn;
private String endTypeCd; // 01: 일정시간 위치 정보가 들어오지 않는 경우 , 02 : 데이터 베이스에 종료 시점이 명시 되지 않은 경우
// 환경센서 필드
private Double sensorCo = 0.0;
private Double sensorSo2 = 0.0;
private Double sensorNo2 = 0.0;
private Double sensorO3 = 0.0;
private Double sensorDust = 0.0;
}

15
src/main/java/com/palnet/comn/model/GPHistoryModel.java

@ -0,0 +1,15 @@
package com.palnet.comn.model;
import lombok.Data;
@Data
public class GPHistoryModel {
private String objectId;
private Double lat = 0.0;
private Double lng = 0.0;
}

70
src/main/java/com/palnet/comn/model/GPModel.java

@ -0,0 +1,70 @@
package com.palnet.comn.model;
import lombok.Data;
import java.util.List;
@Data
public class GPModel {
private String typeCd; // 01 : 최초 들어온 데이터 , 99 : 종료 시킬 데이터
private String messageType;
private String terminalId;
private Double moveDistance = 0.0;
private String moveDistanceType;
private String controlId; // 처음 위치 데이터가 들어 왔을때 생성 함
private String objectType;
private String objectId;
private Double lat = 0.0;
private Double lng = 0.0;
private String elevType;
private Double elev = 0.0;
private String speedType;
private Double speed = 0.0;
private Double betteryLevel = 0.0;
private Double betteryVoltage = 0.0;
private String dronStatus;
private Double heading = 0.0;
private String terminalRcvDt;
private String serverRcvDt;
private String controlStartDt;
private String controlEndDt;
private String areaTrnsYn;
// 환경센서 필드
private Double sensorCo = 0.0;
private Double sensorSo2 = 0.0;
private Double sensorNo2 = 0.0;
private Double sensorO3 = 0.0;
private Double sensorDust = 0.0;
//최근 5건만 저장
private List<GPHistoryModel> recentPositionHistory;
// 전체 히스토리 저장
private List<GPHistoryModel> postionHistory;
}

153
src/main/java/com/palnet/comn/utils/DateUtils.java

@ -0,0 +1,153 @@
package com.palnet.comn.utils;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.Date;
public class DateUtils {
public static String getCurrentTime(){
return new SimpleDateFormat("yyyyMMddHHmmss").format(System.currentTimeMillis());
}
public static String stringToFormat(String str) {
Date date = stringToDatetime(str);
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
}
public static Date stringToDatetime(String date) {
SimpleDateFormat transFormat = new SimpleDateFormat("yyyyMMddHHmmss");
Date result = null;
try {
if(date.length() == 14) {
result = transFormat.parse(date);
}
}catch(Exception e) {
e.printStackTrace();
}
return result;
}
public static Date stringToDate(String date) {
SimpleDateFormat transFormat = new SimpleDateFormat("yyyy-MM-dd");
SimpleDateFormat transFormat2 = new SimpleDateFormat("yyyyMMdd");
Date result = null;
try {
if(date.length() == 10) {
result = transFormat.parse(date);
}else if(date.length() == 8) {
result = transFormat2.parse(date);
}
}catch(Exception e) {
e.printStackTrace();
}
return result;
}
public static LocalDateTime stringToLocalDateTime(String date) {
DateTimeFormatter transFormat = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
LocalDateTime localDateTime = LocalDateTime.parse(date , transFormat);
return localDateTime;
}
public static LocalDateTime stringToLocalDate(String date) {
DateTimeFormatter transFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd");
DateTimeFormatter DATEFORMATTER = new DateTimeFormatterBuilder().append(transFormat)
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.toFormatter();
LocalDateTime localDateTime = LocalDateTime.parse(date , DATEFORMATTER);
return localDateTime;
}
public static Date nowDate() {
return new Date(System.currentTimeMillis());
}
public static long diffSecond(Date firstDate , Date secondDate) {
long diffDate = secondDate.getTime() - firstDate.getTime();
long diffTime = diffDate / (1000);
return diffTime;
}
public static long diffMinute(Date firstDate , Date secondDate) {
long diffDate = secondDate.getTime() - firstDate.getTime();
long diffTime = diffDate / (1000 * 60);
return diffTime;
}
// public static void main(String[] args) {
//
// Date firstDate = stringToDate("20210630142014");
//
// Date secondDate = stringToDate("20210630142914");
//
// LocalDateTime dateTime = stringToLocalDate("2021-07-20");
//
// System.out.println(">>>>" + dateTime);
//
//
// System.out.println("두 날짜의 차이 분: "+diffMinute(firstDate , secondDate));
//
// System.out.println(">>>" + stringToFormat("20210630142014"));
//
//
//
//
//
//// List<Map<String, Double>> list = new ArrayList<Map<String, Double>>();
////
////
//// for( double i= 0 ; i < 1000 ; i ++) {
//// Map<String, Double> position = new HashMap<String, Double>();
//// position.put("lat", 11111.0);
//// position.put("lng", 11111.0);
//// position.put("date", i);
//// list.add(position);
//// }
////
//// int limitSize = 1000;
//// if(list.size() < 1000) {
//// limitSize = list.size();
//// }
////
//// List<Map<String, Double>> lastThreeThings = list.subList(list.size() - limitSize, list.size());
////
//// //뒤집기
//// Collections.reverse(lastThreeThings);
////
//// for(Map<String , Double> data : lastThreeThings) {
////
////
//// System.out.println(data);
//// }
//
// }
}

80
src/main/java/com/palnet/comn/utils/JsonUtils.java

@ -0,0 +1,80 @@
package com.palnet.comn.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@Slf4j
public class JsonUtils {
private static ObjectMapper getObjectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return mapper;
}
public static String toJson(Object object) {
String json = null;
try {
json = getObjectMapper().writeValueAsString(object);
} catch (JsonProcessingException e) {
log.error("", e);
}
return json;
}
public static <T> T fromJson(String val, Class<T> clazz){
T object = null;
try {
object = getObjectMapper().readValue(val, clazz);
} catch (IOException e) {
log.error("", e);
}
return object;
}
public static <T> T fromJson(byte[] bytes, Class<T> clazz){
T object = null;
try {
object = getObjectMapper().readValue(bytes, clazz);
} catch (IOException e) {
log.error("", e);
}
return object;
}
public static <T> T fromJson(String val, TypeReference<T> type){
T object = null;
try {
object = getObjectMapper().readValue(val, type);
} catch (IOException e) {
log.error("", e);
}
return object;
}
public static <T> T bytesToJson(byte[] bytes, TypeReference<T> type){
T object = null;
try {
object = getObjectMapper().readValue(bytes, type);
} catch (IOException e) {
log.error("", e);
}
return object;
}
}

79
src/main/java/com/palnet/process/message/config/MessageConfig.java

@ -0,0 +1,79 @@
package com.palnet.process.message.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
@Configuration
@Slf4j
public class MessageConfig {
private final Environment env;
public MessageConfig(Environment env) {
this.env = env;
}
// @Bean
// public CachingConnectionFactory cachingConnectionFactory() {
// return new CachingConnectionFactory();
// }
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public DirectExchange appDroneExchange() {
return new DirectExchange(env.getProperty("message.app.exchange-name"));
}
@Bean
public DirectExchange websocketDroneExchange() {
return new DirectExchange(env.getProperty("message.websocket.exchange-name"));
}
@Bean
public Queue appDroneQueue() {
return new Queue(env.getProperty("message.app.queue-name"), false);
}
@Bean
public Queue websocketDroneQueue() {
return new Queue(env.getProperty("message.websocket.queue-name"), false);
}
@Bean
public Binding appDroneBinding(Queue appDroneQueue, DirectExchange appDroneExchange) {
return BindingBuilder.bind(appDroneQueue)
.to(appDroneExchange)
.with(env.getProperty("message.app.routing-key"));
}
@Bean
public Binding websocketDroneBinding(Queue websocketDroneQueue, DirectExchange websocketDroneExchange) {
return BindingBuilder.bind(websocketDroneQueue)
.to(websocketDroneExchange)
.with(env.getProperty("message.websocket.routing-key"));
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMessageConverter(converter);
return rabbitTemplate;
}
}

79
src/main/java/com/palnet/process/message/consumer/MessageConsumer.java

@ -0,0 +1,79 @@
package com.palnet.process.message.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.palnet.comn.model.CtrCntrlModel;
import com.palnet.comn.utils.JsonUtils;
import com.palnet.comn.model.GPDatabaseModel;
import com.palnet.server.collection.ChannelCollection;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
@Component
@Slf4j
@RequiredArgsConstructor
public class MessageConsumer {
private ChannelCollection cc = new ChannelCollection();
private final ObjectMapper objectMapper;
@RabbitHandler
@RabbitListener(queues = {"websocket.drone.queue"})
public void receiveDroneMessage(final GPDatabaseModel model) {
ArrayList<CtrCntrlModel> list = this.getList(model);
cc.getAllChannels().stream().forEach(c -> { // 접속되어 있는 모든 사용자에게 전달 처리
c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(list)));
});
}
public ArrayList<CtrCntrlModel> getList(final GPDatabaseModel dataInfo) {
ArrayList<CtrCntrlModel> list = new ArrayList<>();
CtrCntrlModel model = new CtrCntrlModel();
model.setObjectId(dataInfo.getObjectId());
model.setControlId(dataInfo.getControlId());
model.setControlStartDt(dataInfo.getControlStartDt());
model.setObjectTypeCd(dataInfo.getObjectType());
model.setLat(dataInfo.getLat());
model.setLng(dataInfo.getLng());
model.setElevType(dataInfo.getElevType());
model.setElev(dataInfo.getElev());
model.setSpeedType(dataInfo.getSpeedType());
model.setSpeed(dataInfo.getSpeed());
model.setBetteryLevel(dataInfo.getBetteryLevel());
model.setBetteryVoltage(dataInfo.getBetteryVoltage());
model.setDronStatus(dataInfo.getDronStatus());
model.setHeading(dataInfo.getHeading());
model.setMoveDistance(dataInfo.getMoveDistance());
model.setMoveDistanceType(dataInfo.getMoveDistanceType());
model.setServerRcvDt(dataInfo.getServerRcvDt());
// 환경 데이터 필드 추가
model.setSensorCo(dataInfo.getSensorCo());
model.setSensorSo2(dataInfo.getSensorSo2());
model.setSensorNo2(dataInfo.getSensorNo2());
model.setSensorO3(dataInfo.getSensorO3());
model.setSensorDust(dataInfo.getSensorDust());
list.add(model);
//=== 정렬 처리 ==
//기준 : 관제 시작일이 가장 느린순으로 상단에 올린다.
list.sort(Comparator.reverseOrder());
return list;
}
}

119
src/main/java/com/palnet/server/WebServer.java

@ -0,0 +1,119 @@
package com.palnet.server;
import com.palnet.server.initializer.WebInitializer;
import com.palnet.server.task.ctr.CtrCntrlTask;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component
public class WebServer {
private Logger logger = LoggerFactory.getLogger(getClass());
@Value("${netty.websocket.port}")
private int port;
@Value("${netty.websocket.thread.boss}")
private int threadBoss;
@Value("${netty.websocket.thread.worker}")
private int threadWorker;
@Value("${netty.task.controlinfoTime}")
private int controlinfoTime;
private Channel ch;
private EventLoopGroup bossGroup ;
private EventLoopGroup workerGroup ;
private ServerBootstrap b = new ServerBootstrap();
@PostConstruct
public void postConstruct(){
start();
}
@PreDestroy
public void preDestroy(){
stop();
}
public void start(){
try {
bossGroup = new NioEventLoopGroup();
workerGroup =new NioEventLoopGroup();
b.group(bossGroup , workerGroup) // bossGroup과 workerGroup은 NioEventLoopGroup의 인스턴스입니다.이 때 스레드 개수를 설정할 수 있음 각각 1, Runtime.getRuntime().availableProcessors() * 2로 설정했습니다.
.channel(NioServerSocketChannel.class)
// .handler(new LoggingHandler(LogLevel.valueOf(logBootstrap)))
// .handler(bootstrapHandler) // 채널이 활성화되면 소켓 서버와 소켓 클라이언트를 구동하게 되는 핸들러를 등록합니다.
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_BACKLOG, 1000) // 동시에 수용할 수 있는 소켓 연결 요청 수입니다.
.childOption(ChannelOption.TCP_NODELAY, true) // 반응속도를 높이기 위해 Nagle 알고리즘을 비활성화 합니다
.childOption(ChannelOption.SO_LINGER, 0) // 소켓이 close될 때 신뢰성있는 종료를 위해 4way-handshake가 발생하고 이때 TIME_WAIT로 리소스가 낭비됩니다. 이를 방지하기 위해 0으로 설정합니다.
.childOption(ChannelOption.SO_KEEPALIVE, true) // Keep-alive를 켭니다.
.childOption(ChannelOption.SO_REUSEADDR, true) // SO_LINGER설정이 있으면 안해도 되나 혹시나병(!)으로 TIME_WAIT걸린 포트를 재사용할 수 있도록 설정합니다.
.childHandler(new WebInitializer());
connection();
taskProcess();
logger.warn("====== [WEBSOCKET SERVER] Start ====== ");
}catch (Exception e) {
logger.warn("====== [WEBSOCKET SERVER] Fail ====== ");
logger.error(e.getMessage());
}
}
private void connection() {
//서버는 Listen상태로 기다려야하는데, 톰캣이 Timeout이 발생함
//이를 방지하기 위해 Thread로 처리한다.
new Thread(new Runnable() {
@Override
public void run() {
try {
ChannelFuture future = b.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error("InterruptedException", e);
}
}
}).start();
}
private void taskProcess() {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(new CtrCntrlTask(),0,controlinfoTime,TimeUnit.MILLISECONDS);
}
public void stop(){
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
logger.warn("====== [WEBSOCKET SERVER] STOP ====== ");
// ch.close().addListener(f->{
// logger.warn("====== [WEBSOCKET SERVER] CLOSE ====== ");
// bossGroup.shutdownGracefully();
// workerGroup.shutdownGracefully();
// });
}
}

22
src/main/java/com/palnet/server/codec/WebPayLoadDecoder.java

@ -0,0 +1,22 @@
package com.palnet.server.codec;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
@Sharable
public class WebPayLoadDecoder extends MessageToMessageDecoder<String> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
logger.info("MSG ::" + msg);
}
}

26
src/main/java/com/palnet/server/codec/WebPayLoadEncorder.java

@ -0,0 +1,26 @@
package com.palnet.server.codec;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
@Sharable
public class WebPayLoadEncorder extends MessageToMessageEncoder<String> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
logger.info("MSG ::" + msg);
}
}

26
src/main/java/com/palnet/server/collection/ChannelCollection.java

@ -0,0 +1,26 @@
package com.palnet.server.collection;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class ChannelCollection {
private static final long serialVersionUID = 1L;
//접속 되어있는 모든 Channel
private static final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public void setAllChannels(Channel ch) {
allChannels.add(ch);
}
public ChannelGroup getAllChannels() {
return allChannels;
}
}

80
src/main/java/com/palnet/server/handler/WebHandler.java

@ -0,0 +1,80 @@
package com.palnet.server.handler;
import com.palnet.server.collection.ChannelCollection;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebHandler extends SimpleChannelInboundHandler<WebSocketFrame>{
private Logger logger = LoggerFactory.getLogger(getClass());
ChannelCollection cc = new ChannelCollection();
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
logger.info("==================== [ channelRead0 ] ==================== ");
logger.info("Channel Size [" + cc.getAllChannels().size() + "]");
cc.getAllChannels().stream().forEach(c -> {
logger.info(":" + c);
c.writeAndFlush(new TextWebSocketFrame("test"));
// command.execute(c, data, result);
});
// ctx.writeAndFlush(new TextWebSocketFrame("Test"));
}
// private class HeartBeatTask implements Runnable{
//
// private final ChannelHandlerContext ctx;
//
// public HeartBeatTask(final ChannelHandlerContext ctx){
// this.ctx = ctx;
// }
//
// public void run() {
// ctx.writeAndFlush(new TextWebSocketFrame("test"));
//
// System.out.println("Client send heart beat message to server : ---> ");
// }
//
// }
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// SocketAddress remoteAddress = ctx.channel().remoteAddress();
cc.setAllChannels(ctx.channel());
// ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx),0,500,
// TimeUnit.MILLISECONDS);
logger.info("==================== [ channelActive ] ==================== ");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("==================== [ channelInactive ] ==================== ");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
//2 Triggered when a connection is established with the selector of a thread in the worker thread group (channel is registered to EventLoop)
logger.info("==================== [ channelRegistered ] ==================== ");
}
}

36
src/main/java/com/palnet/server/initializer/WebInitializer.java

@ -0,0 +1,36 @@
package com.palnet.server.initializer;
import com.palnet.server.codec.WebPayLoadDecoder;
import com.palnet.server.codec.WebPayLoadEncorder;
import com.palnet.server.handler.WebHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.timeout.IdleStateHandler;
public class WebInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(new WebSocketServerCompressionHandler())
.addLast(new WebSocketServerProtocolHandler("/ws", null, true))
.addLast(new IdleStateHandler(0, 0, 180)) // [5]
// .addLast(idleHandler) // [6]
// .addLast(healthEndpointHandler) // [7]
.addLast(new WebPayLoadDecoder() , new WebPayLoadEncorder())
// .addLast(new WebCommandHandler())
.addLast(new WebHandler());
// .addLast(authHandler) // [9]
// .addLast(commandHandler); // [10]
}
}

33
src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java

@ -0,0 +1,33 @@
package com.palnet.server.task.ctr;
import com.palnet.comn.utils.JsonUtils;
import com.palnet.server.collection.ChannelCollection;
import com.palnet.server.task.ctr.service.CtrCntrlTaskService;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CtrCntrlTask implements Runnable{
private ChannelCollection cc = new ChannelCollection();
private Logger logger = LoggerFactory.getLogger(getClass());
private CtrCntrlTaskService service = new CtrCntrlTaskService();
@Override
public void run() {
// try {
// cc.getAllChannels().stream().forEach(c -> { // 접속되어 있는 모든 사용자에게 전달 처리
// c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(service.getTest())));
// });
// }catch(Exception e) {
// e.printStackTrace();
// }
}
}

66
src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java

@ -0,0 +1,66 @@
package com.palnet.server.task.ctr.service;
import com.palnet.process.message.consumer.MessageConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class CtrCntrlTaskService {
@Autowired
private MessageConsumer messageConsumer;
/**
* Websocket 통해 전달될 위치 정보를 관리
* @return
*/
// public ArrayList<CtrCntrlModel> getList() {
//
// GPDatabaseModel dataInfo = messageConsumer.receiveDroneMessage();
//
// ArrayList<CtrCntrlModel> list = new ArrayList<>();
// CtrCntrlModel model = new CtrCntrlModel();
//
//
// model.setObjectId(dataInfo.getObjectId());
// model.setControlId(dataInfo.getControlId());
// model.setControlStartDt(dataInfo.getControlStartDt());
// model.setObjectTypeCd(dataInfo.getObjectType());
// model.setLat(dataInfo.getLat());
// model.setLng(dataInfo.getLng());
// model.setElevType(dataInfo.getElevType());
// model.setElev(dataInfo.getElev());
// model.setSpeedType(dataInfo.getSpeedType());
// model.setSpeed(dataInfo.getSpeed());
// model.setBetteryLevel(dataInfo.getBetteryLevel());
// model.setBetteryVoltage(dataInfo.getBetteryVoltage());
// model.setDronStatus(dataInfo.getDronStatus());
// model.setHeading(dataInfo.getHeading());
// model.setMoveDistance(dataInfo.getMoveDistance());
// model.setMoveDistanceType(dataInfo.getMoveDistanceType());
//
// model.setServerRcvDt(dataInfo.getServerRcvDt());
//
// // 환경 데이터 필드 추가
// model.setSensorCo(dataInfo.getSensorCo());
// model.setSensorSo2(dataInfo.getSensorSo2());
// model.setSensorNo2(dataInfo.getSensorNo2());
// model.setSensorO3(dataInfo.getSensorO3());
// model.setSensorDust(dataInfo.getSensorDust());
//
// list.add(model);
//
//
// //=== 정렬 처리 ==
// //기준 : 관제 시작일이 가장 느린순으로 상단에 올린다.
// list.sort(Comparator.reverseOrder());
//
// return list;
// }
}

12
src/main/resources/application.yml

@ -7,9 +7,6 @@ spring:
port: 5672
username: guest
password: guest
graphql:
websocket:
connection-init-timeout:
netty:
task:
@ -20,3 +17,12 @@ netty:
boss: 1
worker: 1
message:
app:
queue-name: app.drone.queue
exchange-name: app.drone.exchange
routing-key: app.drone.routing.#
websocket:
queue-name: websocket.drone.queue
exchange-name: websocket.drone.exchange
routing-key: websocket.drone.routing.#
Loading…
Cancel
Save