diff --git a/pom.xml b/pom.xml index 1a57e0f..4ef585e 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,16 @@ netty-all 4.1.77.Final + + org.springframework.boot + spring-boot-starter-amqp + 2.7.0 + + + org.springframework.amqp + spring-rabbit-test + 2.4.4 + diff --git a/src/main/java/com/palnet/PavWebsocketApplication.java b/src/main/java/com/palnet/PavWebsocketApplication.java index 96c3850..7661aaa 100644 --- a/src/main/java/com/palnet/PavWebsocketApplication.java +++ b/src/main/java/com/palnet/PavWebsocketApplication.java @@ -1,7 +1,9 @@ package com.palnet; +import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; @SpringBootApplication public class PavWebsocketApplication { @@ -10,4 +12,9 @@ public class PavWebsocketApplication { SpringApplication.run(PavWebsocketApplication.class, args); } + @Bean + public ObjectMapper objectMapper() { + return new ObjectMapper(); + } + } diff --git a/src/main/java/com/palnet/comn/model/CtrCntrlModel.java b/src/main/java/com/palnet/comn/model/CtrCntrlModel.java new file mode 100644 index 0000000..1fdd168 --- /dev/null +++ b/src/main/java/com/palnet/comn/model/CtrCntrlModel.java @@ -0,0 +1,122 @@ +package com.palnet.comn.model; + +import com.palnet.comn.utils.DateUtils; +import io.netty.util.internal.StringUtil; +import lombok.Data; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +@Data +public class CtrCntrlModel implements Comparable{ + + private String messageTypeCd; + + private String controlId; + + private String objectTypeCd; + + private String objectId; + + private Double lat; + + private Double lng; + + private String elevType; + + private Double elev; + + private String speedType; + + private Double speed; + + private Double betteryLevel; + + private Double betteryVoltage; + + private String takeOffPositon; + + private String dronStatus; + + private Double heading; + + private String terminalRcvDt; + + private String serverRcvDt; + + private String controlStartDt; + + private Double moveDistance; + + private String moveDistanceType; + + // 환경센서 필드 + private Double sensorCo; + private Double sensorSo2; + private Double sensorNo2; + private Double sensorO3; + private Double sensorDust; + + private List> lastHistory; + + @Override + public int compareTo(CtrCntrlModel o) { + + if(!StringUtil.isNullOrEmpty(o.getControlStartDt()) && !StringUtil.isNullOrEmpty(controlStartDt)) { + long targetStartDt = Long.parseLong(o.getControlStartDt()); + long startDt = Long.parseLong(controlStartDt); + + if(startDt == targetStartDt) { + return 0; + }else if(startDt > targetStartDt) { + return 1; + }else if(startDt < targetStartDt) { + return -1; + } + } + + return 0; + + } + + public static void main(String [] args) { + List list = new ArrayList(); + + try { + for(int i = 0 ; i < 5 ; i++) { + Thread.sleep(1000); + System.out.println(">>>" + DateUtils.getCurrentTime()); + CtrCntrlModel model = new CtrCntrlModel(); + model.setControlStartDt(DateUtils.getCurrentTime()); + list.add(model); + } + + list.sort(Comparator.naturalOrder()); + + System.out.println("오름차순 정렬"); + for(CtrCntrlModel data :list) { + + System.out.println(data.getControlStartDt()); + } + + + list.sort(Comparator.reverseOrder()); + System.out.println("내림차순 정렬"); + for(CtrCntrlModel data :list) { + + System.out.println(data.getControlStartDt()); + } + }catch(Exception e) { + e.printStackTrace(); + } + + + + + } + + + +} diff --git a/src/main/java/com/palnet/comn/model/GPDatabaseModel.java b/src/main/java/com/palnet/comn/model/GPDatabaseModel.java new file mode 100644 index 0000000..0d1a6f9 --- /dev/null +++ b/src/main/java/com/palnet/comn/model/GPDatabaseModel.java @@ -0,0 +1,65 @@ +package com.palnet.comn.model; + +import lombok.Data; + +@Data +public class GPDatabaseModel { + + private String typeCd; // 01 : 최초 들어온 데이터 , 99 : 종료 시킬 데이터 + + private String messageType; + + private String terminalId; + + private Double moveDistance = 0.0; + + private String moveDistanceType; + + private String prodNumber; + + private String controlId; + + private String objectType; + + private String objectId; + + private Double lat = 0.0; + + private Double lng = 0.0; + + private String elevType; + + private Double elev = 0.0; + + private String speedType; + + private Double speed = 0.0; + + private Double betteryLevel = 0.0; + + private Double betteryVoltage = 0.0; + + private String dronStatus; + + private Double heading = 0.0; + + private String terminalRcvDt; + + private String serverRcvDt; + + private String controlStartDt; + + private String controlEndDt; + + private String areaTrnsYn; + + private String endTypeCd; // 01: 일정시간 위치 정보가 들어오지 않는 경우 , 02 : 데이터 베이스에 종료 시점이 명시 되지 않은 경우 + + // 환경센서 필드 + private Double sensorCo = 0.0; + private Double sensorSo2 = 0.0; + private Double sensorNo2 = 0.0; + private Double sensorO3 = 0.0; + private Double sensorDust = 0.0; + +} diff --git a/src/main/java/com/palnet/comn/model/GPHistoryModel.java b/src/main/java/com/palnet/comn/model/GPHistoryModel.java new file mode 100644 index 0000000..7262a45 --- /dev/null +++ b/src/main/java/com/palnet/comn/model/GPHistoryModel.java @@ -0,0 +1,15 @@ +package com.palnet.comn.model; + +import lombok.Data; + +@Data +public class GPHistoryModel { + + private String objectId; + + private Double lat = 0.0; + + private Double lng = 0.0; + + +} diff --git a/src/main/java/com/palnet/comn/model/GPModel.java b/src/main/java/com/palnet/comn/model/GPModel.java new file mode 100644 index 0000000..77f5cc8 --- /dev/null +++ b/src/main/java/com/palnet/comn/model/GPModel.java @@ -0,0 +1,70 @@ +package com.palnet.comn.model; + +import lombok.Data; + +import java.util.List; + +@Data +public class GPModel { + + private String typeCd; // 01 : 최초 들어온 데이터 , 99 : 종료 시킬 데이터 + + private String messageType; + + private String terminalId; + + private Double moveDistance = 0.0; + + private String moveDistanceType; + + private String controlId; // 처음 위치 데이터가 들어 왔을때 생성 함 + + private String objectType; + + private String objectId; + + private Double lat = 0.0; + + private Double lng = 0.0; + + private String elevType; + + private Double elev = 0.0; + + private String speedType; + + private Double speed = 0.0; + + private Double betteryLevel = 0.0; + + private Double betteryVoltage = 0.0; + + private String dronStatus; + + private Double heading = 0.0; + + private String terminalRcvDt; + + private String serverRcvDt; + + private String controlStartDt; + + private String controlEndDt; + + private String areaTrnsYn; + + // 환경센서 필드 + private Double sensorCo = 0.0; + private Double sensorSo2 = 0.0; + private Double sensorNo2 = 0.0; + private Double sensorO3 = 0.0; + private Double sensorDust = 0.0; + + //최근 5건만 저장 + private List recentPositionHistory; + + // 전체 히스토리 저장 + private List postionHistory; + + +} diff --git a/src/main/java/com/palnet/comn/utils/DateUtils.java b/src/main/java/com/palnet/comn/utils/DateUtils.java new file mode 100644 index 0000000..3a1f867 --- /dev/null +++ b/src/main/java/com/palnet/comn/utils/DateUtils.java @@ -0,0 +1,153 @@ +package com.palnet.comn.utils; + +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.util.Date; + +public class DateUtils { + + public static String getCurrentTime(){ + return new SimpleDateFormat("yyyyMMddHHmmss").format(System.currentTimeMillis()); + } + + public static String stringToFormat(String str) { + Date date = stringToDatetime(str); + + return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date); + } + + + public static Date stringToDatetime(String date) { + + SimpleDateFormat transFormat = new SimpleDateFormat("yyyyMMddHHmmss"); + Date result = null; + try { + if(date.length() == 14) { + result = transFormat.parse(date); + } + }catch(Exception e) { + e.printStackTrace(); + } + + return result; + } + + public static Date stringToDate(String date) { + SimpleDateFormat transFormat = new SimpleDateFormat("yyyy-MM-dd"); + SimpleDateFormat transFormat2 = new SimpleDateFormat("yyyyMMdd"); + Date result = null; + try { + if(date.length() == 10) { + result = transFormat.parse(date); + }else if(date.length() == 8) { + result = transFormat2.parse(date); + } + }catch(Exception e) { + e.printStackTrace(); + } + + return result; + } + + public static LocalDateTime stringToLocalDateTime(String date) { + + DateTimeFormatter transFormat = DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); + LocalDateTime localDateTime = LocalDateTime.parse(date , transFormat); + + return localDateTime; + + } + + + + public static LocalDateTime stringToLocalDate(String date) { + DateTimeFormatter transFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + DateTimeFormatter DATEFORMATTER = new DateTimeFormatterBuilder().append(transFormat) + .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) + .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) + .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) + .toFormatter(); + + + LocalDateTime localDateTime = LocalDateTime.parse(date , DATEFORMATTER); + + return localDateTime; + } + + + public static Date nowDate() { + return new Date(System.currentTimeMillis()); + } + + public static long diffSecond(Date firstDate , Date secondDate) { + + long diffDate = secondDate.getTime() - firstDate.getTime(); + long diffTime = diffDate / (1000); + + return diffTime; + + } + + public static long diffMinute(Date firstDate , Date secondDate) { + + long diffDate = secondDate.getTime() - firstDate.getTime(); + long diffTime = diffDate / (1000 * 60); + + return diffTime; + + } + + + +// public static void main(String[] args) { +// +// Date firstDate = stringToDate("20210630142014"); +// +// Date secondDate = stringToDate("20210630142914"); +// +// LocalDateTime dateTime = stringToLocalDate("2021-07-20"); +// +// System.out.println(">>>>" + dateTime); +// +// +// System.out.println("두 날짜의 차이 분: "+diffMinute(firstDate , secondDate)); +// +// System.out.println(">>>" + stringToFormat("20210630142014")); +// +// +// +// +// +//// List> list = new ArrayList>(); +//// +//// +//// for( double i= 0 ; i < 1000 ; i ++) { +//// Map position = new HashMap(); +//// position.put("lat", 11111.0); +//// position.put("lng", 11111.0); +//// position.put("date", i); +//// list.add(position); +//// } +//// +//// int limitSize = 1000; +//// if(list.size() < 1000) { +//// limitSize = list.size(); +//// } +//// +//// List> lastThreeThings = list.subList(list.size() - limitSize, list.size()); +//// +//// //뒤집기 +//// Collections.reverse(lastThreeThings); +//// +//// for(Map data : lastThreeThings) { +//// +//// +//// System.out.println(data); +//// } +// +// } +} diff --git a/src/main/java/com/palnet/comn/utils/JsonUtils.java b/src/main/java/com/palnet/comn/utils/JsonUtils.java new file mode 100644 index 0000000..6b28ef2 --- /dev/null +++ b/src/main/java/com/palnet/comn/utils/JsonUtils.java @@ -0,0 +1,80 @@ +package com.palnet.comn.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; + +@Slf4j +public class JsonUtils { + + private static ObjectMapper getObjectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return mapper; + } + + public static String toJson(Object object) { + String json = null; + + try { + json = getObjectMapper().writeValueAsString(object); + } catch (JsonProcessingException e) { + log.error("", e); + } + + return json; + } + + public static T fromJson(String val, Class clazz){ + T object = null; + + try { + object = getObjectMapper().readValue(val, clazz); + } catch (IOException e) { + log.error("", e); + } + + return object; + } + + public static T fromJson(byte[] bytes, Class clazz){ + T object = null; + + try { + object = getObjectMapper().readValue(bytes, clazz); + } catch (IOException e) { + log.error("", e); + } + + return object; + } + + public static T fromJson(String val, TypeReference type){ + T object = null; + + try { + object = getObjectMapper().readValue(val, type); + } catch (IOException e) { + log.error("", e); + } + + return object; + } + + public static T bytesToJson(byte[] bytes, TypeReference type){ + T object = null; + + try { + object = getObjectMapper().readValue(bytes, type); + } catch (IOException e) { + log.error("", e); + } + + return object; + } + +} diff --git a/src/main/java/com/palnet/process/message/config/MessageConfig.java b/src/main/java/com/palnet/process/message/config/MessageConfig.java new file mode 100644 index 0000000..24f06f0 --- /dev/null +++ b/src/main/java/com/palnet/process/message/config/MessageConfig.java @@ -0,0 +1,79 @@ +package com.palnet.process.message.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; + +@Configuration +@Slf4j +public class MessageConfig { + + private final Environment env; + + public MessageConfig(Environment env) { + this.env = env; + } + +// @Bean +// public CachingConnectionFactory cachingConnectionFactory() { +// return new CachingConnectionFactory(); +// } + + @Bean + public Jackson2JsonMessageConverter converter() { + return new Jackson2JsonMessageConverter(); + } + + @Bean + public DirectExchange appDroneExchange() { + return new DirectExchange(env.getProperty("message.app.exchange-name")); + } + + @Bean + public DirectExchange websocketDroneExchange() { + return new DirectExchange(env.getProperty("message.websocket.exchange-name")); + } + + @Bean + public Queue appDroneQueue() { + return new Queue(env.getProperty("message.app.queue-name"), false); + } + + @Bean + public Queue websocketDroneQueue() { + return new Queue(env.getProperty("message.websocket.queue-name"), false); + } + + @Bean + public Binding appDroneBinding(Queue appDroneQueue, DirectExchange appDroneExchange) { + return BindingBuilder.bind(appDroneQueue) + .to(appDroneExchange) + .with(env.getProperty("message.app.routing-key")); + } + + @Bean + public Binding websocketDroneBinding(Queue websocketDroneQueue, DirectExchange websocketDroneExchange) { + return BindingBuilder.bind(websocketDroneQueue) + .to(websocketDroneExchange) + .with(env.getProperty("message.websocket.routing-key")); + } + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) { + RabbitTemplate rabbitTemplate = new RabbitTemplate(); + + rabbitTemplate.setConnectionFactory(connectionFactory); + rabbitTemplate.setMessageConverter(converter); + + return rabbitTemplate; + } + +} diff --git a/src/main/java/com/palnet/process/message/consumer/MessageConsumer.java b/src/main/java/com/palnet/process/message/consumer/MessageConsumer.java new file mode 100644 index 0000000..cbfb56a --- /dev/null +++ b/src/main/java/com/palnet/process/message/consumer/MessageConsumer.java @@ -0,0 +1,79 @@ +package com.palnet.process.message.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.palnet.comn.model.CtrCntrlModel; +import com.palnet.comn.utils.JsonUtils; +import com.palnet.comn.model.GPDatabaseModel; +import com.palnet.server.collection.ChannelCollection; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; + +@Component +@Slf4j +@RequiredArgsConstructor +public class MessageConsumer { + + private ChannelCollection cc = new ChannelCollection(); + + private final ObjectMapper objectMapper; + + @RabbitHandler + @RabbitListener(queues = {"websocket.drone.queue"}) + public void receiveDroneMessage(final GPDatabaseModel model) { + ArrayList list = this.getList(model); + + cc.getAllChannels().stream().forEach(c -> { // 접속되어 있는 모든 사용자에게 전달 처리 + c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(list))); + }); + } + + public ArrayList getList(final GPDatabaseModel dataInfo) { + ArrayList list = new ArrayList<>(); + CtrCntrlModel model = new CtrCntrlModel(); + + + model.setObjectId(dataInfo.getObjectId()); + model.setControlId(dataInfo.getControlId()); + model.setControlStartDt(dataInfo.getControlStartDt()); + model.setObjectTypeCd(dataInfo.getObjectType()); + model.setLat(dataInfo.getLat()); + model.setLng(dataInfo.getLng()); + model.setElevType(dataInfo.getElevType()); + model.setElev(dataInfo.getElev()); + model.setSpeedType(dataInfo.getSpeedType()); + model.setSpeed(dataInfo.getSpeed()); + model.setBetteryLevel(dataInfo.getBetteryLevel()); + model.setBetteryVoltage(dataInfo.getBetteryVoltage()); + model.setDronStatus(dataInfo.getDronStatus()); + model.setHeading(dataInfo.getHeading()); + model.setMoveDistance(dataInfo.getMoveDistance()); + model.setMoveDistanceType(dataInfo.getMoveDistanceType()); + + model.setServerRcvDt(dataInfo.getServerRcvDt()); + + // 환경 데이터 필드 추가 + model.setSensorCo(dataInfo.getSensorCo()); + model.setSensorSo2(dataInfo.getSensorSo2()); + model.setSensorNo2(dataInfo.getSensorNo2()); + model.setSensorO3(dataInfo.getSensorO3()); + model.setSensorDust(dataInfo.getSensorDust()); + + list.add(model); + + + //=== 정렬 처리 == + //기준 : 관제 시작일이 가장 느린순으로 상단에 올린다. + list.sort(Comparator.reverseOrder()); + + return list; + } +} diff --git a/src/main/java/com/palnet/server/WebServer.java b/src/main/java/com/palnet/server/WebServer.java new file mode 100644 index 0000000..e9302dd --- /dev/null +++ b/src/main/java/com/palnet/server/WebServer.java @@ -0,0 +1,119 @@ +package com.palnet.server; + +import com.palnet.server.initializer.WebInitializer; +import com.palnet.server.task.ctr.CtrCntrlTask; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Component +public class WebServer { + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Value("${netty.websocket.port}") + private int port; + + @Value("${netty.websocket.thread.boss}") + private int threadBoss; + + @Value("${netty.websocket.thread.worker}") + private int threadWorker; + + @Value("${netty.task.controlinfoTime}") + private int controlinfoTime; + + + private Channel ch; + + private EventLoopGroup bossGroup ; + private EventLoopGroup workerGroup ; + private ServerBootstrap b = new ServerBootstrap(); + + @PostConstruct + public void postConstruct(){ + start(); + } + + + @PreDestroy + public void preDestroy(){ + + stop(); + } + + public void start(){ + try { + bossGroup = new NioEventLoopGroup(); + workerGroup =new NioEventLoopGroup(); + + b.group(bossGroup , workerGroup) // bossGroup과 workerGroup은 NioEventLoopGroup의 인스턴스입니다.이 때 스레드 개수를 설정할 수 있음 각각 1, Runtime.getRuntime().availableProcessors() * 2로 설정했습니다. + .channel(NioServerSocketChannel.class) +// .handler(new LoggingHandler(LogLevel.valueOf(logBootstrap))) +// .handler(bootstrapHandler) // 채널이 활성화되면 소켓 서버와 소켓 클라이언트를 구동하게 되는 핸들러를 등록합니다. + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) + .option(ChannelOption.SO_BACKLOG, 1000) // 동시에 수용할 수 있는 소켓 연결 요청 수입니다. + .childOption(ChannelOption.TCP_NODELAY, true) // 반응속도를 높이기 위해 Nagle 알고리즘을 비활성화 합니다 + .childOption(ChannelOption.SO_LINGER, 0) // 소켓이 close될 때 신뢰성있는 종료를 위해 4way-handshake가 발생하고 이때 TIME_WAIT로 리소스가 낭비됩니다. 이를 방지하기 위해 0으로 설정합니다. + .childOption(ChannelOption.SO_KEEPALIVE, true) // Keep-alive를 켭니다. + .childOption(ChannelOption.SO_REUSEADDR, true) // SO_LINGER설정이 있으면 안해도 되나 혹시나병(!)으로 TIME_WAIT걸린 포트를 재사용할 수 있도록 설정합니다. + .childHandler(new WebInitializer()); + + connection(); + taskProcess(); + + logger.warn("====== [WEBSOCKET SERVER] Start ====== "); + }catch (Exception e) { + logger.warn("====== [WEBSOCKET SERVER] Fail ====== "); + logger.error(e.getMessage()); + } + + } + + private void connection() { + //서버는 Listen상태로 기다려야하는데, 톰캣이 Timeout이 발생함 + //이를 방지하기 위해 Thread로 처리한다. + new Thread(new Runnable() { + @Override + public void run() { + try { + ChannelFuture future = b.bind(port).sync(); + future.channel().closeFuture().sync(); + + } catch (InterruptedException e) { + logger.error("InterruptedException", e); + } + } + }).start(); + } + + private void taskProcess() { + ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + service.scheduleAtFixedRate(new CtrCntrlTask(),0,controlinfoTime,TimeUnit.MILLISECONDS); + } + + + public void stop(){ + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + logger.warn("====== [WEBSOCKET SERVER] STOP ====== "); +// ch.close().addListener(f->{ +// logger.warn("====== [WEBSOCKET SERVER] CLOSE ====== "); +// bossGroup.shutdownGracefully(); +// workerGroup.shutdownGracefully(); +// }); + } +} diff --git a/src/main/java/com/palnet/server/codec/WebPayLoadDecoder.java b/src/main/java/com/palnet/server/codec/WebPayLoadDecoder.java new file mode 100644 index 0000000..983d854 --- /dev/null +++ b/src/main/java/com/palnet/server/codec/WebPayLoadDecoder.java @@ -0,0 +1,22 @@ +package com.palnet.server.codec; + +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +@Sharable +public class WebPayLoadDecoder extends MessageToMessageDecoder { + + private Logger logger = LoggerFactory.getLogger(getClass()); + @Override + protected void decode(ChannelHandlerContext ctx, String msg, List out) throws Exception { + logger.info("MSG ::" + msg); + + } + + +} diff --git a/src/main/java/com/palnet/server/codec/WebPayLoadEncorder.java b/src/main/java/com/palnet/server/codec/WebPayLoadEncorder.java new file mode 100644 index 0000000..a0f57fa --- /dev/null +++ b/src/main/java/com/palnet/server/codec/WebPayLoadEncorder.java @@ -0,0 +1,26 @@ +package com.palnet.server.codec; + +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +@Sharable +public class WebPayLoadEncorder extends MessageToMessageEncoder { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + + @Override + protected void encode(ChannelHandlerContext ctx, String msg, List out) throws Exception { + logger.info("MSG ::" + msg); + + } + + + + +} diff --git a/src/main/java/com/palnet/server/collection/ChannelCollection.java b/src/main/java/com/palnet/server/collection/ChannelCollection.java new file mode 100644 index 0000000..6524690 --- /dev/null +++ b/src/main/java/com/palnet/server/collection/ChannelCollection.java @@ -0,0 +1,26 @@ +package com.palnet.server.collection; + +import io.netty.channel.Channel; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.GlobalEventExecutor; + + +public class ChannelCollection { + + private static final long serialVersionUID = 1L; + + //접속 되어있는 모든 Channel + private static final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + + public void setAllChannels(Channel ch) { + allChannels.add(ch); + } + + public ChannelGroup getAllChannels() { + return allChannels; + } + + +} diff --git a/src/main/java/com/palnet/server/handler/WebHandler.java b/src/main/java/com/palnet/server/handler/WebHandler.java new file mode 100644 index 0000000..3afac0b --- /dev/null +++ b/src/main/java/com/palnet/server/handler/WebHandler.java @@ -0,0 +1,80 @@ +package com.palnet.server.handler; + +import com.palnet.server.collection.ChannelCollection; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebHandler extends SimpleChannelInboundHandler{ + + private Logger logger = LoggerFactory.getLogger(getClass()); + + ChannelCollection cc = new ChannelCollection(); + + + @Override + protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { + + logger.info("==================== [ channelRead0 ] ==================== "); + + logger.info("Channel Size [" + cc.getAllChannels().size() + "]"); + + cc.getAllChannels().stream().forEach(c -> { + logger.info(":" + c); + c.writeAndFlush(new TextWebSocketFrame("test")); +// command.execute(c, data, result); + }); + + + + +// ctx.writeAndFlush(new TextWebSocketFrame("Test")); + + } + +// private class HeartBeatTask implements Runnable{ +// +// private final ChannelHandlerContext ctx; +// +// public HeartBeatTask(final ChannelHandlerContext ctx){ +// this.ctx = ctx; +// } +// +// public void run() { +// ctx.writeAndFlush(new TextWebSocketFrame("test")); +// +// System.out.println("Client send heart beat message to server : ---> "); +// } +// +// } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { +// SocketAddress remoteAddress = ctx.channel().remoteAddress(); + cc.setAllChannels(ctx.channel()); +// ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx),0,500, +// TimeUnit.MILLISECONDS); + logger.info("==================== [ channelActive ] ==================== "); + + + + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + logger.info("==================== [ channelInactive ] ==================== "); + } + + + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + //2 Triggered when a connection is established with the selector of a thread in the worker thread group (channel is registered to EventLoop) + logger.info("==================== [ channelRegistered ] ==================== "); + } + + +} diff --git a/src/main/java/com/palnet/server/initializer/WebInitializer.java b/src/main/java/com/palnet/server/initializer/WebInitializer.java new file mode 100644 index 0000000..0334c0d --- /dev/null +++ b/src/main/java/com/palnet/server/initializer/WebInitializer.java @@ -0,0 +1,36 @@ +package com.palnet.server.initializer; + +import com.palnet.server.codec.WebPayLoadDecoder; +import com.palnet.server.codec.WebPayLoadEncorder; +import com.palnet.server.handler.WebHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; +import io.netty.handler.timeout.IdleStateHandler; + +public class WebInitializer extends ChannelInitializer{ + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new HttpServerCodec()) + .addLast(new HttpObjectAggregator(65536)) + .addLast(new WebSocketServerCompressionHandler()) + .addLast(new WebSocketServerProtocolHandler("/ws", null, true)) + .addLast(new IdleStateHandler(0, 0, 180)) // [5] +// .addLast(idleHandler) // [6] +// .addLast(healthEndpointHandler) // [7] + .addLast(new WebPayLoadDecoder() , new WebPayLoadEncorder()) +// .addLast(new WebCommandHandler()) + .addLast(new WebHandler()); + +// .addLast(authHandler) // [9] +// .addLast(commandHandler); // [10] + + } + +} diff --git a/src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java b/src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java new file mode 100644 index 0000000..35d1544 --- /dev/null +++ b/src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java @@ -0,0 +1,33 @@ +package com.palnet.server.task.ctr; + +import com.palnet.comn.utils.JsonUtils; +import com.palnet.server.collection.ChannelCollection; +import com.palnet.server.task.ctr.service.CtrCntrlTaskService; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CtrCntrlTask implements Runnable{ + + private ChannelCollection cc = new ChannelCollection(); + + private Logger logger = LoggerFactory.getLogger(getClass()); + + + private CtrCntrlTaskService service = new CtrCntrlTaskService(); + + + @Override + public void run() { +// try { +// cc.getAllChannels().stream().forEach(c -> { // 접속되어 있는 모든 사용자에게 전달 처리 +// c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(service.getTest()))); +// }); +// }catch(Exception e) { +// e.printStackTrace(); +// } + + } + + +} diff --git a/src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java b/src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java new file mode 100644 index 0000000..3edcc48 --- /dev/null +++ b/src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java @@ -0,0 +1,66 @@ +package com.palnet.server.task.ctr.service; + + +import com.palnet.process.message.consumer.MessageConsumer; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class CtrCntrlTaskService { + + @Autowired + private MessageConsumer messageConsumer; + + + /** + * Websocket 을 통해 전달될 위치 정보를 관리 + * @return + */ +// public ArrayList getList() { +// +// GPDatabaseModel dataInfo = messageConsumer.receiveDroneMessage(); +// +// ArrayList list = new ArrayList<>(); +// CtrCntrlModel model = new CtrCntrlModel(); +// +// +// model.setObjectId(dataInfo.getObjectId()); +// model.setControlId(dataInfo.getControlId()); +// model.setControlStartDt(dataInfo.getControlStartDt()); +// model.setObjectTypeCd(dataInfo.getObjectType()); +// model.setLat(dataInfo.getLat()); +// model.setLng(dataInfo.getLng()); +// model.setElevType(dataInfo.getElevType()); +// model.setElev(dataInfo.getElev()); +// model.setSpeedType(dataInfo.getSpeedType()); +// model.setSpeed(dataInfo.getSpeed()); +// model.setBetteryLevel(dataInfo.getBetteryLevel()); +// model.setBetteryVoltage(dataInfo.getBetteryVoltage()); +// model.setDronStatus(dataInfo.getDronStatus()); +// model.setHeading(dataInfo.getHeading()); +// model.setMoveDistance(dataInfo.getMoveDistance()); +// model.setMoveDistanceType(dataInfo.getMoveDistanceType()); +// +// model.setServerRcvDt(dataInfo.getServerRcvDt()); +// +// // 환경 데이터 필드 추가 +// model.setSensorCo(dataInfo.getSensorCo()); +// model.setSensorSo2(dataInfo.getSensorSo2()); +// model.setSensorNo2(dataInfo.getSensorNo2()); +// model.setSensorO3(dataInfo.getSensorO3()); +// model.setSensorDust(dataInfo.getSensorDust()); +// +// list.add(model); +// +// +// //=== 정렬 처리 == +// //기준 : 관제 시작일이 가장 느린순으로 상단에 올린다. +// list.sort(Comparator.reverseOrder()); +// +// return list; +// } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a42a57e..a7bb84d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -7,9 +7,6 @@ spring: port: 5672 username: guest password: guest - graphql: - websocket: - connection-init-timeout: netty: task: @@ -20,3 +17,12 @@ netty: boss: 1 worker: 1 +message: + app: + queue-name: app.drone.queue + exchange-name: app.drone.exchange + routing-key: app.drone.routing.# + websocket: + queue-name: websocket.drone.queue + exchange-name: websocket.drone.exchange + routing-key: websocket.drone.routing.# \ No newline at end of file