From b90fde0061a8f2006822db6819d1115bebebd227 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?scnoh=28=EB=85=B8=EC=8A=B9=EC=B2=A0=29?= Date: Thu, 14 Jul 2022 11:49:49 +0900 Subject: [PATCH] websocket -> client data share --- .../com/palnet/PavWebsocketApplication.java | 2 + .../comn/model/GPHistoryShareContext.java | 40 +++++++++++ .../message/consumer/MessageConsumer.java | 32 ++++----- .../process/scheduler/GpHistoryScheduler.java | 38 +++++++++++ .../java/com/palnet/server/WebServer.java | 26 ++------ .../palnet/server/task/ctr/CtrCntrlTask.java | 11 ++-- .../task/ctr/service/CtrCntrlTaskService.java | 66 +++++++------------ src/main/resources/application.yml | 3 - 8 files changed, 132 insertions(+), 86 deletions(-) create mode 100644 src/main/java/com/palnet/comn/model/GPHistoryShareContext.java create mode 100644 src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java diff --git a/src/main/java/com/palnet/PavWebsocketApplication.java b/src/main/java/com/palnet/PavWebsocketApplication.java index 7661aaa..8248ffc 100644 --- a/src/main/java/com/palnet/PavWebsocketApplication.java +++ b/src/main/java/com/palnet/PavWebsocketApplication.java @@ -4,8 +4,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication +@EnableScheduling public class PavWebsocketApplication { public static void main(String[] args) { diff --git a/src/main/java/com/palnet/comn/model/GPHistoryShareContext.java b/src/main/java/com/palnet/comn/model/GPHistoryShareContext.java new file mode 100644 index 0000000..892b75c --- /dev/null +++ b/src/main/java/com/palnet/comn/model/GPHistoryShareContext.java @@ -0,0 +1,40 @@ +package com.palnet.comn.model; + +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Component +public class GPHistoryShareContext { + + /* + * GPS Data History ( Websocket <-> Client Data Share ) + * key : 식별 장치 번호 + * Value : History Object + * + * */ + + private final Map maps; + + public GPHistoryShareContext() { + this.maps = new ConcurrentHashMap<>(); + } + + public void putHistory(String key, CtrCntrlModel value) { + this.maps.put(key, value); + } + + public CtrCntrlModel getHistory(String key) { + return this.maps.get(key); + } + + public void removeHistory(String key) { + this.maps.remove(key); + } + + public Map getAllHistory() { + return this.maps; + } +} diff --git a/src/main/java/com/palnet/process/message/consumer/MessageConsumer.java b/src/main/java/com/palnet/process/message/consumer/MessageConsumer.java index 534cbc3..c0fcabc 100644 --- a/src/main/java/com/palnet/process/message/consumer/MessageConsumer.java +++ b/src/main/java/com/palnet/process/message/consumer/MessageConsumer.java @@ -3,47 +3,46 @@ package com.palnet.process.message.consumer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.palnet.comn.model.CtrCntrlModel; -import com.palnet.comn.utils.JsonUtils; +import com.palnet.comn.model.GPHistoryShareContext; import com.palnet.comn.model.GPDatabaseModel; import com.palnet.server.collection.ChannelCollection; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; -import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; +import java.util.List; @Component @Slf4j -@RequiredArgsConstructor public class MessageConsumer { private ChannelCollection cc = new ChannelCollection(); private final ObjectMapper objectMapper; + private final GPHistoryShareContext gpHistoryShareModel; + + public MessageConsumer(ObjectMapper objectMapper, GPHistoryShareContext gpHistoryShareModel) { + this.objectMapper = objectMapper; + this.gpHistoryShareModel = gpHistoryShareModel; + } @RabbitHandler @RabbitListener(queues = {"websocket.drone.queue"}) public void receiveDroneMessage(final String message) throws JsonProcessingException { GPDatabaseModel gpDatabaseModel = objectMapper.readValue(message, GPDatabaseModel.class); - ArrayList list = this.getList(gpDatabaseModel); + CtrCntrlModel history = this.modelConvert(gpDatabaseModel); - cc.getAllChannels().stream().forEach(c -> { // 접속되어 있는 모든 사용자에게 전달 처리 - c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(list))); - }); + gpHistoryShareModel.putHistory(gpDatabaseModel.getObjectId(), history); } - public ArrayList getList(final GPDatabaseModel dataInfo) { - ArrayList list = new ArrayList<>(); + public CtrCntrlModel modelConvert(final GPDatabaseModel dataInfo) { +// List list = new ArrayList<>(); CtrCntrlModel model = new CtrCntrlModel(); - model.setObjectId(dataInfo.getObjectId()); model.setControlId(dataInfo.getControlId()); model.setControlStartDt(dataInfo.getControlStartDt()); @@ -70,13 +69,14 @@ public class MessageConsumer { model.setSensorO3(dataInfo.getSensorO3()); model.setSensorDust(dataInfo.getSensorDust()); - list.add(model); +// list.add(model); //=== 정렬 처리 == //기준 : 관제 시작일이 가장 느린순으로 상단에 올린다. - list.sort(Comparator.reverseOrder()); +// list.sort(Comparator.reverseOrder()); - return list; +// return list; + return model; } } diff --git a/src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java b/src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java new file mode 100644 index 0000000..1da7085 --- /dev/null +++ b/src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java @@ -0,0 +1,38 @@ +package com.palnet.process.scheduler; + +import com.palnet.comn.model.CtrCntrlModel; +import com.palnet.comn.model.GPHistoryShareContext; +import com.palnet.comn.utils.DateUtils; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.List; +import java.util.Map; + +@Component +public class GpHistoryScheduler { + + private final int timeLimit = 5; + private final GPHistoryShareContext gpHistoryShareContext; + + public GpHistoryScheduler(GPHistoryShareContext gpHistoryShareContext) { + this.gpHistoryShareContext = gpHistoryShareContext; + } + + @Scheduled(fixedDelay = 1000 * 60 * 5) + public void removeHistory() { + Map allHistory = gpHistoryShareContext.getAllHistory(); + + // Key 의 존재하는 데이터는 마지막 서버수신 History Data + allHistory.forEach((k, v) -> { + Date serverRcvDt = DateUtils.stringToDate(v.getServerRcvDt()); + long diffMinute = DateUtils.diffMinute(serverRcvDt, new Date()); + + if(diffMinute > timeLimit) { + gpHistoryShareContext.removeHistory(k); + } + }); + } + +} diff --git a/src/main/java/com/palnet/server/WebServer.java b/src/main/java/com/palnet/server/WebServer.java index e9302dd..9789526 100644 --- a/src/main/java/com/palnet/server/WebServer.java +++ b/src/main/java/com/palnet/server/WebServer.java @@ -9,8 +9,10 @@ import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.RequiredArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -21,23 +23,17 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @Component +@RequiredArgsConstructor public class WebServer { private Logger logger = LoggerFactory.getLogger(getClass()); @Value("${netty.websocket.port}") private int port; - @Value("${netty.websocket.thread.boss}") - private int threadBoss; - - @Value("${netty.websocket.thread.worker}") - private int threadWorker; - @Value("${netty.task.controlinfoTime}") private int controlinfoTime; - - - private Channel ch; + + private final CtrCntrlTask ctrCntrlTask; private EventLoopGroup bossGroup ; private EventLoopGroup workerGroup ; @@ -47,11 +43,9 @@ public class WebServer { public void postConstruct(){ start(); } - @PreDestroy public void preDestroy(){ - stop(); } @@ -76,11 +70,10 @@ public class WebServer { taskProcess(); logger.warn("====== [WEBSOCKET SERVER] Start ====== "); - }catch (Exception e) { + } catch (Exception e) { logger.warn("====== [WEBSOCKET SERVER] Fail ====== "); logger.error(e.getMessage()); } - } private void connection() { @@ -102,7 +95,7 @@ public class WebServer { private void taskProcess() { ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); - service.scheduleAtFixedRate(new CtrCntrlTask(),0,controlinfoTime,TimeUnit.MILLISECONDS); + service.scheduleAtFixedRate(ctrCntrlTask,0,controlinfoTime,TimeUnit.MILLISECONDS); } @@ -110,10 +103,5 @@ public class WebServer { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); logger.warn("====== [WEBSOCKET SERVER] STOP ====== "); -// ch.close().addListener(f->{ -// logger.warn("====== [WEBSOCKET SERVER] CLOSE ====== "); -// bossGroup.shutdownGracefully(); -// workerGroup.shutdownGracefully(); -// }); } } diff --git a/src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java b/src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java index 5472f23..4ed14bc 100644 --- a/src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java +++ b/src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java @@ -4,26 +4,29 @@ import com.palnet.comn.utils.JsonUtils; import com.palnet.server.collection.ChannelCollection; import com.palnet.server.task.ctr.service.CtrCntrlTaskService; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import lombok.RequiredArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import java.util.ArrayList; +@Component +@RequiredArgsConstructor public class CtrCntrlTask implements Runnable{ private ChannelCollection cc = new ChannelCollection(); private Logger logger = LoggerFactory.getLogger(getClass()); - - - private CtrCntrlTaskService service = new CtrCntrlTaskService(); + private final CtrCntrlTaskService service; @Override public void run() { try { cc.getAllChannels().stream().forEach(c -> { // 접속되어 있는 모든 사용자에게 전달 처리 - c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(new ArrayList<>()))); + c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(service.getList()))); }); }catch(Exception e) { e.printStackTrace(); diff --git a/src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java b/src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java index 3edcc48..302d828 100644 --- a/src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java +++ b/src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java @@ -1,6 +1,8 @@ package com.palnet.server.task.ctr.service; +import com.palnet.comn.model.CtrCntrlModel; +import com.palnet.comn.model.GPHistoryShareContext; import com.palnet.process.message.consumer.MessageConsumer; import lombok.extern.slf4j.Slf4j; @@ -8,59 +10,35 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Map; + @Service @Slf4j public class CtrCntrlTaskService { @Autowired - private MessageConsumer messageConsumer; + private GPHistoryShareContext historyShareContext; /** * Websocket 을 통해 전달될 위치 정보를 관리 * @return */ -// public ArrayList getList() { -// -// GPDatabaseModel dataInfo = messageConsumer.receiveDroneMessage(); -// -// ArrayList list = new ArrayList<>(); -// CtrCntrlModel model = new CtrCntrlModel(); -// -// -// model.setObjectId(dataInfo.getObjectId()); -// model.setControlId(dataInfo.getControlId()); -// model.setControlStartDt(dataInfo.getControlStartDt()); -// model.setObjectTypeCd(dataInfo.getObjectType()); -// model.setLat(dataInfo.getLat()); -// model.setLng(dataInfo.getLng()); -// model.setElevType(dataInfo.getElevType()); -// model.setElev(dataInfo.getElev()); -// model.setSpeedType(dataInfo.getSpeedType()); -// model.setSpeed(dataInfo.getSpeed()); -// model.setBetteryLevel(dataInfo.getBetteryLevel()); -// model.setBetteryVoltage(dataInfo.getBetteryVoltage()); -// model.setDronStatus(dataInfo.getDronStatus()); -// model.setHeading(dataInfo.getHeading()); -// model.setMoveDistance(dataInfo.getMoveDistance()); -// model.setMoveDistanceType(dataInfo.getMoveDistanceType()); -// -// model.setServerRcvDt(dataInfo.getServerRcvDt()); -// -// // 환경 데이터 필드 추가 -// model.setSensorCo(dataInfo.getSensorCo()); -// model.setSensorSo2(dataInfo.getSensorSo2()); -// model.setSensorNo2(dataInfo.getSensorNo2()); -// model.setSensorO3(dataInfo.getSensorO3()); -// model.setSensorDust(dataInfo.getSensorDust()); -// -// list.add(model); -// -// -// //=== 정렬 처리 == -// //기준 : 관제 시작일이 가장 느린순으로 상단에 올린다. -// list.sort(Comparator.reverseOrder()); -// -// return list; -// } + public ArrayList getList() { + ArrayList list = new ArrayList<>(); + + Map allHistory = historyShareContext.getAllHistory(); + + allHistory.forEach((k, v) -> { + list.add(v); + }); + + //=== 정렬 처리 == + //기준 : 관제 시작일이 가장 느린순으로 상단에 올린다. + list.sort(Comparator.reverseOrder()); + + return list; + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a7bb84d..e112598 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -13,9 +13,6 @@ netty: controlinfoTime: 2000 websocket: port: 8081 - thread: - boss: 1 - worker: 1 message: app: