Browse Source

websocket -> client data share

master
노승철 2 years ago
parent
commit
b90fde0061
  1. 2
      src/main/java/com/palnet/PavWebsocketApplication.java
  2. 40
      src/main/java/com/palnet/comn/model/GPHistoryShareContext.java
  3. 32
      src/main/java/com/palnet/process/message/consumer/MessageConsumer.java
  4. 38
      src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java
  5. 22
      src/main/java/com/palnet/server/WebServer.java
  6. 11
      src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java
  7. 66
      src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java
  8. 3
      src/main/resources/application.yml

2
src/main/java/com/palnet/PavWebsocketApplication.java

@ -4,8 +4,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class PavWebsocketApplication {
public static void main(String[] args) {

40
src/main/java/com/palnet/comn/model/GPHistoryShareContext.java

@ -0,0 +1,40 @@
package com.palnet.comn.model;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class GPHistoryShareContext {
/*
* GPS Data History ( Websocket <-> Client Data Share )
* key : 식별 장치 번호
* Value : History Object
*
* */
private final Map<String, CtrCntrlModel> maps;
public GPHistoryShareContext() {
this.maps = new ConcurrentHashMap<>();
}
public void putHistory(String key, CtrCntrlModel value) {
this.maps.put(key, value);
}
public CtrCntrlModel getHistory(String key) {
return this.maps.get(key);
}
public void removeHistory(String key) {
this.maps.remove(key);
}
public Map<String, CtrCntrlModel> getAllHistory() {
return this.maps;
}
}

32
src/main/java/com/palnet/process/message/consumer/MessageConsumer.java

@ -3,47 +3,46 @@ package com.palnet.process.message.consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.palnet.comn.model.CtrCntrlModel;
import com.palnet.comn.utils.JsonUtils;
import com.palnet.comn.model.GPHistoryShareContext;
import com.palnet.comn.model.GPDatabaseModel;
import com.palnet.server.collection.ChannelCollection;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@Component
@Slf4j
@RequiredArgsConstructor
public class MessageConsumer {
private ChannelCollection cc = new ChannelCollection();
private final ObjectMapper objectMapper;
private final GPHistoryShareContext gpHistoryShareModel;
public MessageConsumer(ObjectMapper objectMapper, GPHistoryShareContext gpHistoryShareModel) {
this.objectMapper = objectMapper;
this.gpHistoryShareModel = gpHistoryShareModel;
}
@RabbitHandler
@RabbitListener(queues = {"websocket.drone.queue"})
public void receiveDroneMessage(final String message) throws JsonProcessingException {
GPDatabaseModel gpDatabaseModel = objectMapper.readValue(message, GPDatabaseModel.class);
ArrayList<CtrCntrlModel> list = this.getList(gpDatabaseModel);
CtrCntrlModel history = this.modelConvert(gpDatabaseModel);
cc.getAllChannels().stream().forEach(c -> { // 접속되어 있는 모든 사용자에게 전달 처리
c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(list)));
});
gpHistoryShareModel.putHistory(gpDatabaseModel.getObjectId(), history);
}
public ArrayList<CtrCntrlModel> getList(final GPDatabaseModel dataInfo) {
ArrayList<CtrCntrlModel> list = new ArrayList<>();
public CtrCntrlModel modelConvert(final GPDatabaseModel dataInfo) {
// List<CtrCntrlModel> list = new ArrayList<>();
CtrCntrlModel model = new CtrCntrlModel();
model.setObjectId(dataInfo.getObjectId());
model.setControlId(dataInfo.getControlId());
model.setControlStartDt(dataInfo.getControlStartDt());
@ -70,13 +69,14 @@ public class MessageConsumer {
model.setSensorO3(dataInfo.getSensorO3());
model.setSensorDust(dataInfo.getSensorDust());
list.add(model);
// list.add(model);
//=== 정렬 처리 ==
//기준 : 관제 시작일이 가장 느린순으로 상단에 올린다.
list.sort(Comparator.reverseOrder());
// list.sort(Comparator.reverseOrder());
return list;
// return list;
return model;
}
}

38
src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java

@ -0,0 +1,38 @@
package com.palnet.process.scheduler;
import com.palnet.comn.model.CtrCntrlModel;
import com.palnet.comn.model.GPHistoryShareContext;
import com.palnet.comn.utils.DateUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
import java.util.Map;
@Component
public class GpHistoryScheduler {
private final int timeLimit = 5;
private final GPHistoryShareContext gpHistoryShareContext;
public GpHistoryScheduler(GPHistoryShareContext gpHistoryShareContext) {
this.gpHistoryShareContext = gpHistoryShareContext;
}
@Scheduled(fixedDelay = 1000 * 60 * 5)
public void removeHistory() {
Map<String, CtrCntrlModel> allHistory = gpHistoryShareContext.getAllHistory();
// Key 의 존재하는 데이터는 마지막 서버수신 History Data
allHistory.forEach((k, v) -> {
Date serverRcvDt = DateUtils.stringToDate(v.getServerRcvDt());
long diffMinute = DateUtils.diffMinute(serverRcvDt, new Date());
if(diffMinute > timeLimit) {
gpHistoryShareContext.removeHistory(k);
}
});
}
}

22
src/main/java/com/palnet/server/WebServer.java

@ -9,8 +9,10 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -21,23 +23,17 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component
@RequiredArgsConstructor
public class WebServer {
private Logger logger = LoggerFactory.getLogger(getClass());
@Value("${netty.websocket.port}")
private int port;
@Value("${netty.websocket.thread.boss}")
private int threadBoss;
@Value("${netty.websocket.thread.worker}")
private int threadWorker;
@Value("${netty.task.controlinfoTime}")
private int controlinfoTime;
private Channel ch;
private final CtrCntrlTask ctrCntrlTask;
private EventLoopGroup bossGroup ;
private EventLoopGroup workerGroup ;
@ -48,10 +44,8 @@ public class WebServer {
start();
}
@PreDestroy
public void preDestroy(){
stop();
}
@ -80,7 +74,6 @@ public class WebServer {
logger.warn("====== [WEBSOCKET SERVER] Fail ====== ");
logger.error(e.getMessage());
}
}
private void connection() {
@ -102,7 +95,7 @@ public class WebServer {
private void taskProcess() {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(new CtrCntrlTask(),0,controlinfoTime,TimeUnit.MILLISECONDS);
service.scheduleAtFixedRate(ctrCntrlTask,0,controlinfoTime,TimeUnit.MILLISECONDS);
}
@ -110,10 +103,5 @@ public class WebServer {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
logger.warn("====== [WEBSOCKET SERVER] STOP ====== ");
// ch.close().addListener(f->{
// logger.warn("====== [WEBSOCKET SERVER] CLOSE ====== ");
// bossGroup.shutdownGracefully();
// workerGroup.shutdownGracefully();
// });
}
}

11
src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java

@ -4,26 +4,29 @@ import com.palnet.comn.utils.JsonUtils;
import com.palnet.server.collection.ChannelCollection;
import com.palnet.server.task.ctr.service.CtrCntrlTaskService;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
@Component
@RequiredArgsConstructor
public class CtrCntrlTask implements Runnable{
private ChannelCollection cc = new ChannelCollection();
private Logger logger = LoggerFactory.getLogger(getClass());
private CtrCntrlTaskService service = new CtrCntrlTaskService();
private final CtrCntrlTaskService service;
@Override
public void run() {
try {
cc.getAllChannels().stream().forEach(c -> { // 접속되어 있는 모든 사용자에게 전달 처리
c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(new ArrayList<>())));
c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(service.getList())));
});
}catch(Exception e) {
e.printStackTrace();

66
src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java

@ -1,6 +1,8 @@
package com.palnet.server.task.ctr.service;
import com.palnet.comn.model.CtrCntrlModel;
import com.palnet.comn.model.GPHistoryShareContext;
import com.palnet.process.message.consumer.MessageConsumer;
import lombok.extern.slf4j.Slf4j;
@ -8,59 +10,35 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Map;
@Service
@Slf4j
public class CtrCntrlTaskService {
@Autowired
private MessageConsumer messageConsumer;
private GPHistoryShareContext historyShareContext;
/**
* Websocket 통해 전달될 위치 정보를 관리
* @return
*/
// public ArrayList<CtrCntrlModel> getList() {
//
// GPDatabaseModel dataInfo = messageConsumer.receiveDroneMessage();
//
// ArrayList<CtrCntrlModel> list = new ArrayList<>();
// CtrCntrlModel model = new CtrCntrlModel();
//
//
// model.setObjectId(dataInfo.getObjectId());
// model.setControlId(dataInfo.getControlId());
// model.setControlStartDt(dataInfo.getControlStartDt());
// model.setObjectTypeCd(dataInfo.getObjectType());
// model.setLat(dataInfo.getLat());
// model.setLng(dataInfo.getLng());
// model.setElevType(dataInfo.getElevType());
// model.setElev(dataInfo.getElev());
// model.setSpeedType(dataInfo.getSpeedType());
// model.setSpeed(dataInfo.getSpeed());
// model.setBetteryLevel(dataInfo.getBetteryLevel());
// model.setBetteryVoltage(dataInfo.getBetteryVoltage());
// model.setDronStatus(dataInfo.getDronStatus());
// model.setHeading(dataInfo.getHeading());
// model.setMoveDistance(dataInfo.getMoveDistance());
// model.setMoveDistanceType(dataInfo.getMoveDistanceType());
//
// model.setServerRcvDt(dataInfo.getServerRcvDt());
//
// // 환경 데이터 필드 추가
// model.setSensorCo(dataInfo.getSensorCo());
// model.setSensorSo2(dataInfo.getSensorSo2());
// model.setSensorNo2(dataInfo.getSensorNo2());
// model.setSensorO3(dataInfo.getSensorO3());
// model.setSensorDust(dataInfo.getSensorDust());
//
// list.add(model);
//
//
// //=== 정렬 처리 ==
// //기준 : 관제 시작일이 가장 느린순으로 상단에 올린다.
// list.sort(Comparator.reverseOrder());
//
// return list;
// }
public ArrayList<CtrCntrlModel> getList() {
ArrayList<CtrCntrlModel> list = new ArrayList<>();
Map<String, CtrCntrlModel> allHistory = historyShareContext.getAllHistory();
allHistory.forEach((k, v) -> {
list.add(v);
});
//=== 정렬 처리 ==
//기준 : 관제 시작일이 가장 느린순으로 상단에 올린다.
list.sort(Comparator.reverseOrder());
return list;
}
}

3
src/main/resources/application.yml

@ -13,9 +13,6 @@ netty:
controlinfoTime: 2000
websocket:
port: 8081
thread:
boss: 1
worker: 1
message:
app:

Loading…
Cancel
Save