Browse Source

remove rabbit websocket and socket

feature/remove-rabbit
지대한 1 year ago
parent
commit
35fbb41664
  1. 20
      pom.xml
  2. 7
      src/main/java/com/palnet/PavWebsocketApplication.java
  3. 44
      src/main/java/com/palnet/process/message/Receiver.java
  4. 98
      src/main/java/com/palnet/process/message/config/MessageConfig.java
  5. 198
      src/main/java/com/palnet/process/message/consumer/MessageConsumer.java
  6. 3
      src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java
  7. 54
      src/main/java/com/palnet/server/controller/WebSocketReceiverController.java
  8. 15
      src/main/java/com/palnet/server/handler/WebHandler.java
  9. 1
      src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java
  10. 3
      src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java
  11. 86
      src/main/resources/application.yml

20
pom.xml

@ -37,16 +37,16 @@
<artifactId>netty-all</artifactId>
<version>4.1.77.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<version>2.4.4</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-amqp</artifactId>-->
<!-- <version>2.7.0</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.amqp</groupId>-->
<!-- <artifactId>spring-rabbit-test</artifactId>-->
<!-- <version>2.4.4</version>-->
<!-- </dependency>-->
</dependencies>
<build>

7
src/main/java/com/palnet/PavWebsocketApplication.java

@ -1,10 +1,7 @@
package com.palnet;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@ -15,9 +12,5 @@ public class PavWebsocketApplication {
SpringApplication.run(PavWebsocketApplication.class, args);
}
// @Bean
// public ObjectMapper objectMapper() {
// return new ObjectMapper();
// }
}

44
src/main/java/com/palnet/process/message/Receiver.java

@ -1,22 +1,22 @@
package com.palnet.process.message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
@Component
@Slf4j
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(String message) {
log.info("Received <" + message);
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
//package com.palnet.process.message;
//
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.stereotype.Component;
//
//import java.util.concurrent.CountDownLatch;
//
//@Component
//@Slf4j
//public class Receiver {
//
// private CountDownLatch latch = new CountDownLatch(1);
//
// public void receiveMessage(String message) {
// log.info("Received <" + message);
// latch.countDown();
// }
//
// public CountDownLatch getLatch() {
// return latch;
// }
//}

98
src/main/java/com/palnet/process/message/config/MessageConfig.java

@ -1,53 +1,53 @@
package com.palnet.process.message.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
@Configuration
@Slf4j
public class MessageConfig {
private final Environment env;
public MessageConfig(Environment env) {
this.env = env;
}
//package com.palnet.process.message.config;
//
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.amqp.core.Binding;
//import org.springframework.amqp.core.BindingBuilder;
//import org.springframework.amqp.core.DirectExchange;
//import org.springframework.amqp.core.Queue;
//import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
//import org.springframework.amqp.rabbit.connection.ConnectionFactory;
//import org.springframework.amqp.rabbit.core.RabbitTemplate;
//import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.core.env.Environment;
//
//@Configuration
//@Slf4j
//public class MessageConfig {
//
// private final Environment env;
//
// public MessageConfig(Environment env) {
// this.env = env;
// }
//// @Bean
//// public CachingConnectionFactory cachingConnectionFactory() {
//// CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
////
//// connectionFactory.setHost(env.getProperty("spring.rabbitmq.host"));
//// connectionFactory.setPort(Integer.parseInt(env.getProperty("spring.rabbitmq.port")));
//// connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
//// connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));
////
//// return connectionFactory;
//// }
//
// @Bean
// public CachingConnectionFactory cachingConnectionFactory() {
// CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
// public Jackson2JsonMessageConverter converter() {
// return new Jackson2JsonMessageConverter();
// }
//
//
// connectionFactory.setHost(env.getProperty("spring.rabbitmq.host"));
// connectionFactory.setPort(Integer.parseInt(env.getProperty("spring.rabbitmq.port")));
// connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
// connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));
// @Bean
// public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
// RabbitTemplate rabbitTemplate = new RabbitTemplate();
//
// return connectionFactory;
// rabbitTemplate.setConnectionFactory(connectionFactory);
// rabbitTemplate.setMessageConverter(converter);
//
// return rabbitTemplate;
// }
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMessageConverter(converter);
return rabbitTemplate;
}
}
//
//}

198
src/main/java/com/palnet/process/message/consumer/MessageConsumer.java

@ -1,99 +1,99 @@
package com.palnet.process.message.consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.palnet.comn.model.CtrCntrlModel;
import com.palnet.comn.model.CtrHistoryShareContext;
import com.palnet.comn.model.GPModel;
import com.palnet.comn.utils.JsonUtils;
import com.palnet.server.collection.ChannelCollection;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MessageConsumer {
private ChannelCollection cc = new ChannelCollection();
private final ObjectMapper objectMapper;
private final CtrHistoryShareContext gpHistoryShareModel;
private final Environment env;
public MessageConsumer(CtrHistoryShareContext gpHistoryShareModel, Environment env) {
this.objectMapper = JsonUtils.getObjectMapper();
this.gpHistoryShareModel = gpHistoryShareModel;
this.env = env;
}
@RabbitHandler
@RabbitListener(queues = {"websocket.drone.queue"}, concurrency = "8")
public void receiveDroneMessage(final String message) throws JsonProcessingException {
log.info("websocket message : {}", message);
GPModel model = objectMapper.readValue(message, GPModel.class);
CtrCntrlModel history = this.convertModel(model);
// DRON의 대한 식별정보만 이력 관리
gpHistoryShareModel.putHistory(model.getObjectId(), history);
}
public CtrCntrlModel convertModel(final GPModel dataInfo) {
CtrCntrlModel model = new CtrCntrlModel();
CtrCntrlModel prevModel = gpHistoryShareModel.getHistory(dataInfo.getObjectId());
model.setObjectId(dataInfo.getObjectId());
model.setControlId(dataInfo.getControlId());
model.setTrmnlId(dataInfo.getTerminalId());
model.setControlStartDt(dataInfo.getControlStartDt());
model.setObjectTypeCd(dataInfo.getObjectType());
model.setLat(dataInfo.getLat());
model.setLng(dataInfo.getLng());
model.setElevType(dataInfo.getElevType());
model.setElev(dataInfo.getElev());
model.setSpeedType(dataInfo.getSpeedType());
model.setSpeed(dataInfo.getSpeed());
model.setBetteryLevel(dataInfo.getBetteryLevel());
model.setBetteryVoltage(dataInfo.getBetteryVoltage());
model.setDronStatus(dataInfo.getDronStatus());
model.setHeading(dataInfo.getHeading());
model.setMoveDistance(dataInfo.getMoveDistance());
model.setMoveDistanceType(dataInfo.getMoveDistanceType());
model.setServerRcvDt(dataInfo.getServerRcvDt());
// 환경 데이터 필드 추가
model.setSensorCo(dataInfo.getSensorCo());
model.setSensorSo2(dataInfo.getSensorSo2());
model.setSensorNo2(dataInfo.getSensorNo2());
model.setSensorO3(dataInfo.getSensorO3());
model.setSensorDust(dataInfo.getSensorDust());
// 비정상 상황 식별코드 추가
model.setControlWarnCd(dataInfo.isControlWarnCd());
// if(prevModel == null) {
// if(model.isControlWarnCd()) {
// model.setControlWarnNotyCd(true); // 최초 비정상 발생
// }
// } else {
// if(prevModel.isControlWarnCd() && model.isControlWarnCd()) {
// model.setControlWarnNotyCd(false); // 비정상 -> 비정상
// }
// if(prevModel.isControlWarnCd() && !model.isControlWarnCd()) {
// model.setControlWarnNotyCd(false); // 비정상 -> 정상
// }
// if(!prevModel.isControlWarnCd() && model.isControlWarnCd()) {
// model.setControlWarnNotyCd(true); // 정상 -> 비정상상
// }
// }
//
// model.setControlCacheCount(1);
return model;
}
}
//package com.palnet.process.message.consumer;
//
//import com.fasterxml.jackson.core.JsonProcessingException;
//import com.fasterxml.jackson.databind.ObjectMapper;
//import com.palnet.comn.model.CtrCntrlModel;
//import com.palnet.comn.model.CtrHistoryShareContext;
//import com.palnet.comn.model.GPModel;
//import com.palnet.comn.utils.JsonUtils;
//import com.palnet.server.collection.ChannelCollection;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.amqp.rabbit.annotation.RabbitHandler;
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.core.env.Environment;
//import org.springframework.stereotype.Component;
//
//@Component
//@Slf4j
//public class MessageConsumer {
//
// private ChannelCollection cc = new ChannelCollection();
//
// private final ObjectMapper objectMapper;
// private final CtrHistoryShareContext gpHistoryShareModel;
// private final Environment env;
//
// public MessageConsumer(CtrHistoryShareContext gpHistoryShareModel, Environment env) {
// this.objectMapper = JsonUtils.getObjectMapper();
// this.gpHistoryShareModel = gpHistoryShareModel;
// this.env = env;
// }
//
// @RabbitHandler
// @RabbitListener(queues = {"websocket.drone.queue"}, concurrency = "8")
// public void receiveDroneMessage(final String message) throws JsonProcessingException {
// log.info("websocket message : {}", message);
//
// GPModel model = objectMapper.readValue(message, GPModel.class);
//
// CtrCntrlModel history = this.convertModel(model);
//
// // DRON의 대한 식별정보만 이력 관리
// gpHistoryShareModel.putHistory(model.getObjectId(), history);
// }
//
// public CtrCntrlModel convertModel(final GPModel dataInfo) {
// CtrCntrlModel model = new CtrCntrlModel();
// CtrCntrlModel prevModel = gpHistoryShareModel.getHistory(dataInfo.getObjectId());
//
// model.setObjectId(dataInfo.getObjectId());
// model.setControlId(dataInfo.getControlId());
// model.setTrmnlId(dataInfo.getTerminalId());
// model.setControlStartDt(dataInfo.getControlStartDt());
// model.setObjectTypeCd(dataInfo.getObjectType());
// model.setLat(dataInfo.getLat());
// model.setLng(dataInfo.getLng());
// model.setElevType(dataInfo.getElevType());
// model.setElev(dataInfo.getElev());
// model.setSpeedType(dataInfo.getSpeedType());
// model.setSpeed(dataInfo.getSpeed());
// model.setBetteryLevel(dataInfo.getBetteryLevel());
// model.setBetteryVoltage(dataInfo.getBetteryVoltage());
// model.setDronStatus(dataInfo.getDronStatus());
// model.setHeading(dataInfo.getHeading());
// model.setMoveDistance(dataInfo.getMoveDistance());
// model.setMoveDistanceType(dataInfo.getMoveDistanceType());
//
// model.setServerRcvDt(dataInfo.getServerRcvDt());
//
// // 환경 데이터 필드 추가
// model.setSensorCo(dataInfo.getSensorCo());
// model.setSensorSo2(dataInfo.getSensorSo2());
// model.setSensorNo2(dataInfo.getSensorNo2());
// model.setSensorO3(dataInfo.getSensorO3());
// model.setSensorDust(dataInfo.getSensorDust());
//
// // 비정상 상황 식별코드 추가
// model.setControlWarnCd(dataInfo.isControlWarnCd());
//
//// if(prevModel == null) {
//// if(model.isControlWarnCd()) {
//// model.setControlWarnNotyCd(true); // 최초 비정상 발생
//// }
//// } else {
//// if(prevModel.isControlWarnCd() && model.isControlWarnCd()) {
//// model.setControlWarnNotyCd(false); // 비정상 -> 비정상
//// }
//// if(prevModel.isControlWarnCd() && !model.isControlWarnCd()) {
//// model.setControlWarnNotyCd(false); // 비정상 -> 정상
//// }
//// if(!prevModel.isControlWarnCd() && model.isControlWarnCd()) {
//// model.setControlWarnNotyCd(true); // 정상 -> 비정상상
//// }
//// }
////
//// model.setControlCacheCount(1);
//
// return model;
// }
//}

3
src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java

@ -3,6 +3,7 @@ package com.palnet.process.scheduler;
import com.palnet.comn.model.CtrCntrlModel;
import com.palnet.comn.model.CtrHistoryShareContext;
import com.palnet.comn.utils.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@ -10,6 +11,7 @@ import java.util.Date;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Component
public class GpHistoryScheduler {
@ -22,6 +24,7 @@ public class GpHistoryScheduler {
@Scheduled(fixedDelay = 1000 * 10)
public void removeHistory() {
log.info(">>> removeHistory :: ");
Map<String, CtrCntrlModel> allHistory = gpHistoryShareContext.getAllHistory();
// Key 의 존재하는 데이터는 마지막 서버수신 History Data

54
src/main/java/com/palnet/server/controller/WebSocketReceiverController.java

@ -0,0 +1,54 @@
package com.palnet.server.controller;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.palnet.comn.model.CtrCntrlModel;
import com.palnet.comn.model.CtrHistoryShareContext;
import com.palnet.comn.model.GPModel;
import com.palnet.server.task.ctr.service.CtrCntrlTaskService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.*;
/**
* packageName : com.palnet.server.controller
* fileName : WebSocketReceiverController
* author : dhji
* date : 2023-08-28(028)
* description :
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2023-08-28(028) dhji 최초 생성
*/
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/ws")
public class WebSocketReceiverController {
private final CtrCntrlTaskService service;
private final CtrHistoryShareContext historyShareContext;
private final ObjectMapper objectMapper;
@PostMapping("/receiver")
public String receiver(@RequestBody String message) {
log.info("websocket message : {}", message);
GPModel model = null;
try {
model = objectMapper.readValue(message, GPModel.class);
} catch (JsonProcessingException e) {
log.error("ERROR : {}\n{}", e.getMessage(), e.getStackTrace());
throw new RuntimeException(e);
}
CtrCntrlModel history = service.modelConvert(model);
// DRON의 대한 식별정보만 이력 관리
historyShareContext.putHistory(model.getObjectId(), history);
return "SUCCESS";
}
}

15
src/main/java/com/palnet/server/handler/WebHandler.java

@ -1,28 +1,18 @@
package com.palnet.server.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.palnet.comn.model.CtrCntrlModel;
import com.palnet.comn.model.CtrHistoryShareContext;
import com.palnet.comn.model.GPModel;
import com.palnet.comn.utils.ContextUtils;
import com.palnet.comn.utils.JsonUtils;
import com.palnet.server.codec.WebPayLoad;
import com.palnet.server.codec.WebPayLoadResponse;
import com.palnet.server.collection.ChannelCollection;
import com.palnet.server.task.ctr.service.CtrCntrlTaskService;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
@ -108,13 +98,16 @@ public class WebHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
cc.setAllChannels(ctx.channel());
logger.info("==================== [ channelActive ] ==================== ");
logger.info(">>> channelActive : cc : {}", cc.getAllChannels().size());
logger.info(">>> channelActive : cc : {}", cc.getAllChannels());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info(">>> channelInactive : cc : {}", cc.getAllChannels().size());
logger.info(">>> channelInactive : cc : {}", cc.getAllChannels());
logger.info("==================== [ channelInactive ] ==================== ");
}

1
src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java

@ -25,6 +25,7 @@ public class CtrCntrlTask implements Runnable{
@Override
public void run() {
try {
logger.info(">>> run : {}", cc.getAllChannels());
cc.getAllChannels().stream().forEach(c -> { // 접속되어 있는 모든 사용자에게 전달 처리
c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(service.getList())));
});

3
src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java

@ -15,8 +15,8 @@ import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
@Service
@Slf4j
@Service
@RequiredArgsConstructor
public class CtrCntrlTaskService {
@ -88,6 +88,7 @@ public class CtrCntrlTaskService {
ArrayList<CtrCntrlModel> list = new ArrayList<>();
Map<String, CtrCntrlModel> allHistory = historyShareContext.getAllHistory();
log.info(">>> getList :: {}", allHistory);
if(Objects.nonNull(allHistory)) {
allHistory.forEach((k, v) -> {

86
src/main/resources/application.yml

@ -2,12 +2,12 @@ spring:
config:
activate:
on-profile: local
rabbitmq:
host: 192.168.0.26
port: 5672
username: palnet
password: palnet!234
virtual-host: /
# rabbitmq:
# host: 192.168.0.26
# port: 5672
# username: palnet
# password: palnet!234
# virtual-host: /
server:
port: 8181
@ -18,15 +18,15 @@ netty:
websocket:
port: 8081
message:
app:
queue-name: app.drone.queue
exchange-name: app.drone.exchange
routing-key: app.drone.routing.#
websocket:
queue-name: websocket.drone.queue
exchange-name: websocket.drone.exchange
routing-key: websocket.drone.routing.#
#message:
# app:
# queue-name: app.drone.queue
# exchange-name: app.drone.exchange
# routing-key: app.drone.routing.#
# websocket:
# queue-name: websocket.drone.queue
# exchange-name: websocket.drone.exchange
# routing-key: websocket.drone.routing.#
app:
host: http://127.0.0.1:8080/
@ -37,11 +37,11 @@ spring:
config:
activate:
on-profile: prod
rabbitmq:
host: 211.253.38.218
port: 5672
username: palnet
password: palnet1234
# rabbitmq:
# host: 211.253.38.218
# port: 5672
# username: palnet
# password: palnet1234
server:
port: 8181
@ -52,15 +52,15 @@ netty:
websocket:
port: 8081
message:
app:
queue-name: app.drone.queue
exchange-name: app.drone.exchange
routing-key: app.drone.routing.#
websocket:
queue-name: websocket.drone.queue
exchange-name: websocket.drone.exchange
routing-key: websocket.drone.routing.#
#message:
# app:
# queue-name: app.drone.queue
# exchange-name: app.drone.exchange
# routing-key: app.drone.routing.#
# websocket:
# queue-name: websocket.drone.queue
# exchange-name: websocket.drone.exchange
# routing-key: websocket.drone.routing.#
logging:
pattern:
@ -80,11 +80,11 @@ spring:
config:
activate:
on-profile: prod2
rabbitmq:
host: 211.253.38.218
port: 5672
username: palnet
password: palnet1234
# rabbitmq:
# host: 211.253.38.218
# port: 5672
# username: palnet
# password: palnet1234
server:
port: 8181
@ -95,15 +95,15 @@ netty:
websocket:
port: 8081
message:
app:
queue-name: app.drone.queue
exchange-name: app.drone.exchange
routing-key: app.drone.routing.#
websocket:
queue-name: websocket.drone.queue
exchange-name: websocket.drone.exchange
routing-key: websocket.drone.routing.#
#message:
# app:
# queue-name: app.drone.queue
# exchange-name: app.drone.exchange
# routing-key: app.drone.routing.#
# websocket:
# queue-name: websocket.drone.queue
# exchange-name: websocket.drone.exchange
# routing-key: websocket.drone.routing.#
logging:
pattern:

Loading…
Cancel
Save