Browse Source

feat: websocket 데이터 전송 방식 변경

- 요청마다 전송(매건마다) -> 특정 시간동안 호출(모아서 보냄)

https://www.notion.so/PAV-KAC-socket-websocket-2ce9fe6ebe1b4015a6136764bae77501?pvs=4
feature/socket
지대한 6 months ago
parent
commit
3074ca6530
  1. 13
      app/kac-socket-app/src/main/java/kr/co/palnet/kac/socket/core/command/impl/AdsbDroneCommandImpl.java
  2. 13
      app/kac-socket-app/src/main/java/kr/co/palnet/kac/socket/core/command/impl/AntosDroneCommandImpl.java
  3. 10
      app/kac-socket-app/src/main/java/kr/co/palnet/kac/socket/core/command/impl/SandboxDroneCommandImpl.java
  4. 41
      app/kac-socket-app/src/main/java/kr/co/palnet/kac/socket/core/storage/DroneStorage.java
  5. 50
      app/kac-socket-app/src/main/java/kr/co/palnet/kac/socket/service/ScheduledService.java
  6. 40
      app/kac-socket-app/src/main/java/kr/co/palnet/kac/socket/service/WebSocketService.java
  7. 1
      app/kac-websocket-app/build.gradle
  8. 18
      app/kac-websocket-app/src/main/java/kr/co/palnet/kac/websocket/controller/SocketReceiverController.java
  9. 119
      app/kac-websocket-app/src/main/java/kr/co/palnet/kac/websocket/service/ControlService.java
  10. 3
      app/kac-websocket-app/src/main/java/kr/co/palnet/kac/websocket/service/ScheduledService.java
  11. 6
      common/config-db/src/main/java/kr/co/palnet/kac/config/db/KacJpaConfig.java
  12. 7
      common/model/src/main/java/kr/co/palnet/kac/common/model/common/DroneDto.java

13
app/kac-socket-app/src/main/java/kr/co/palnet/kac/socket/core/command/impl/AdsbDroneCommandImpl.java

@ -100,19 +100,10 @@ public class AdsbDroneCommandImpl implements DroneCommand {
}
// STEP 2. 이력 생성할 전문 전달 -> DRON의 대한 식별정보만 이력 관리
try {
// 저장 해 놓았다가 한거번에 전송 - 필요한 곳에 전송(HISTORY, UTM)
droneStorage.add(drone);
} catch (Exception e) {
log.error("ERROR : {}", e.getMessage(), e);
}
// STEP 3. 화면에 표출할 정보 WebSocket 전달
try {
if ("PA".equals(drone.getObjectId().substring(0, 2))) {
webSocketService.asyncSendData(drone);
}
// 저장 해 놓았다가 한거번에 전송 - 필요한 곳에 전송(HISTORY, UTM, WEB-SOCKET)
droneStorage.add(drone);
} catch (Exception e) {
log.error("ERROR : {}", e.getMessage(), e);
}

13
app/kac-socket-app/src/main/java/kr/co/palnet/kac/socket/core/command/impl/AntosDroneCommandImpl.java

@ -100,19 +100,10 @@ public class AntosDroneCommandImpl implements DroneCommand {
drone.setRegDt(Instant.now());
}
// STEP 2. 이력 생성할 전문 전달 -> DRON의 대한 식별정보만 이력 관리
try {
// 저장 해 놓았다가 한거번에 전송 - 필요한 곳에 전송(HISTORY, UTM)
droneStorage.add(drone);
} catch (Exception e) {
log.error("ERROR : {}", e.getMessage(), e);
}
// STEP 3. 화면에 표출할 정보 WebSocket 전달
try {
if ("PA".equals(drone.getObjectId().substring(0, 2))) {
webSocketService.asyncSendData(drone);
}
// 저장 해 놓았다가 한거번에 전송 - 필요한 곳에 전송(HISTORY, UTM, WEB-SOCKET)
droneStorage.add(drone);
} catch (Exception e) {
log.error("ERROR : {}", e.getMessage(), e);
}

10
app/kac-socket-app/src/main/java/kr/co/palnet/kac/socket/core/command/impl/SandboxDroneCommandImpl.java

@ -98,16 +98,10 @@ public class SandboxDroneCommandImpl implements DroneCommand {
}
// STEP 2. 이력 생성할 전문 전달 -> DRON의 대한 식별정보만 이력 관리
try {
// 저장 해 놓았다가 한거번에 전송 - 필요한 곳에 전송(HISTORY, UTM)
droneStorage.add(drone);
} catch (Exception e) {
log.error("ERROR : {}", e.getMessage(), e);
}
// STEP 3. 화면에 표출할 정보 WebSocket 전달
try {
webSocketService.asyncSendData(drone);
// 저장 해 놓았다가 한거번에 전송 - 필요한 곳에 전송(HISTORY, UTM, WEB-SOCKET)
droneStorage.add(drone);
} catch (Exception e) {
log.error("ERROR : {}", e.getMessage(), e);
}

41
app/kac-socket-app/src/main/java/kr/co/palnet/kac/socket/core/storage/DroneStorage.java

@ -63,11 +63,7 @@ public class DroneStorage {
if (list == null) {
list = new ArrayList<>();
// list = new CopyOnWriteArrayList<>();
// list = new ConcurrentLinkedQueue<>();
// droneMap.put(drone.getObjectId(), list);
}
// log.info("add :: {}::{}::{}", drone.getObjectId(), droneMap.keySet().size(), list.size());
list.add(drone);
this.droneMap.put(drone.getObjectId(), list);
// list.offer(drone);
@ -93,25 +89,18 @@ public class DroneStorage {
// DroneDto lastDroneDto = list.stream().toList().get(list.size() - 1);
DroneDto lastDroneDto = list.stream().toList().getLast();
if ((lastDroneDto.isSendHistroy() && lastDroneDto.isSendUtm()) || compareTime.isAfter(lastDroneDto.getRegDt())) {
// list.remove(lastDronDto);
if ((lastDroneDto.isSendAll()) || compareTime.isAfter(lastDroneDto.getRegDt())) {
droneMap.remove(objectId);
continue;
}
// 그외 데이터에서 조건에 맞는 데이터들만 삭제
list.removeIf(drone -> (drone.isSendHistroy() && drone.isSendUtm()) || compareTime.isAfter(drone.getRegDt()));
list.removeIf(drone -> (drone.isSendAll()) || compareTime.isAfter(drone.getRegDt()));
}
}
public List<DroneDto> getAllByUtm() {
// 보내지 않은 모든 데이터 추출
// return droneMap.values().stream().reduce((list, result) -> {
// log.info("list: {}, result: {}",list, result);
// List<DroneDto> yetSendData = list.stream().filter(droneDto -> !droneDto.isSendUtm()).toList();
// result.addAll(yetSendData);
// return result;
// }).orElse(new ArrayList<>());
List<DroneDto> resultList = new ArrayList<>();
droneMap.values().forEach(droneDtoList -> {
List<DroneDto> list = droneDtoList.stream().filter(droneDto -> !droneDto.isSendUtm()).toList();
@ -120,14 +109,6 @@ public class DroneStorage {
return resultList;
}
// public List<DroneDto> getAllByHistory() {
// // 보내지 않은 모든 데이터 추출
// return droneMap.values().stream().reduce((list, result) -> {
// List<DroneDto> yetSendData = list.stream().filter(droneDto -> !droneDto.isSendHistroy()).toList();
// result.addAll(yetSendData);
// return result;
// }).orElse(new ArrayList<>());
// }
public Map<String, List<DroneDto>> getAllByHistory() {
// 보내지 않은 모든 데이터 추출
@ -135,8 +116,22 @@ public class DroneStorage {
for (String objectId : droneMap.keySet()) {
List<DroneDto> droneDtoList = droneMap.get(objectId);
if (droneDtoList != null) {
// ConcurrentLinkedQueue<DroneDto> list = droneDtoList.stream().filter(droneDto -> !droneDto.isSendHistroy()).collect(Collectors.toCollection(ConcurrentLinkedQueue::new));
List<DroneDto> list = droneDtoList.stream().filter(droneDto -> !droneDto.isSendHistroy()).toList();
List<DroneDto> list = droneDtoList.stream().filter(droneDto -> !droneDto.isSendHistory()).toList();
if (!list.isEmpty()) {
sendList.put(objectId, list);
}
}
}
return sendList;
}
public Map<String, List<DroneDto>> getAllByWebSocket() {
// 보내지 않은 모든 데이터 추출
Map<String, List<DroneDto>> sendList = new HashMap<>();
for (String objectId : droneMap.keySet()) {
List<DroneDto> droneDtoList = droneMap.get(objectId);
if (droneDtoList != null) {
List<DroneDto> list = droneDtoList.stream().filter(droneDto -> !droneDto.isSendWebSocket()).toList();
if (!list.isEmpty()) {
sendList.put(objectId, list);
}

50
app/kac-socket-app/src/main/java/kr/co/palnet/kac/socket/service/ScheduledService.java

@ -22,6 +22,23 @@ public class ScheduledService {
private final ExternalService externalService;
private final KacAppService kacAppService;
private final WebSocketService webSocketService;
@Scheduled(fixedDelay = 2 * 1000)
public void sendDataByWebSocket() {
DroneStorage droneStorage = DroneStorage.getInstance();
Map<String, List<DroneDto>> droneMap = droneStorage.getAllByWebSocket();
if (droneMap == null || droneMap.isEmpty()) {
log.debug("drone data for websocket is empty.");
return;
}
if (webSocketService.sendDataAll(droneMap)) {
droneMap.values().forEach(droneDtoList -> droneDtoList.forEach(droneDto -> droneDto.setSendWebSocket(true)));
}
}
@Scheduled(fixedDelay = 2 * 1000)
public void sendDataByHistory() {
@ -29,36 +46,32 @@ public class ScheduledService {
DroneStorage droneStorage = DroneStorage.getInstance();
Map<String, List<DroneDto>> history = droneStorage.getAllByHistory();
log.info("sendDataByHistory size : {}", history.keySet().size());
// list 합
history.values().stream().map(List::size).reduce(Integer::sum).ifPresent(s -> log.info("list size sum : {}", s));
if (!history.isEmpty()) {
if (kacAppService.sendDataAll(history)) {
history.values().forEach(droneDtoList -> droneDtoList.forEach(droneDto -> droneDto.setSendHistroy(true)));
}
history.values().forEach(droneDtoList -> droneDtoList.forEach(droneDto -> droneDto.setSendHistroy(true)));
if (history == null || history.isEmpty()) {
log.debug("drone data for history is empty.");
return;
}
if (kacAppService.sendDataAll(history)) {
history.values().forEach(droneDtoList -> droneDtoList.forEach(droneDto -> droneDto.setSendHistory(true)));
}
}
// utm에 전송
@Scheduled(fixedDelay = 2 * 1000)
public void sendDataToUtm() {
// log.debug(">>>> sendDataToUtm <<<<<");
DroneStorage droneStorage = DroneStorage.getInstance();
List<DroneDto> list = droneStorage.getAllByUtm();
try {
List<UtmDto.DroneInfo> droneInfoList = list.stream().map(model -> {
UtmDto.DroneInfo droneInfo = UtmDto.DroneInfo.builder()
.id(model.getObjectId())
.latitude(model.getLat().toString())
.longitude(model.getLon().toString())
.height(model.getElev().toString())
.build();
return droneInfo;
}).collect(Collectors.toList());
List<UtmDto.DroneInfo> droneInfoList = list.stream().map(
model -> UtmDto.DroneInfo.builder()
.id(model.getObjectId())
.latitude(model.getLat().toString())
.longitude(model.getLon().toString())
.height(model.getElev().toString())
.build()
).collect(Collectors.toList());
// 가공
if (droneInfoList.isEmpty()) return;
@ -75,7 +88,6 @@ public class ScheduledService {
// if (externalService.sendDataToUtm(utmDto)) {
// list.forEach(drone -> drone.setSendUtm(true));
// }
// TODO 현재 통신이 안되므로 모두 보낸다는 가정으로 진행
list.forEach(drone -> drone.setSendUtm(true));

40
app/kac-socket-app/src/main/java/kr/co/palnet/kac/socket/service/WebSocketService.java

@ -5,12 +5,17 @@ import kr.co.palnet.kac.util.ObjectMapperUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClient;
import java.util.List;
import java.util.Map;
@Slf4j
@RequiredArgsConstructor
@Service
@ -18,18 +23,14 @@ public class WebSocketService {
@Value("${app.web-socket-web.host}")
private String webSocketWebHost;
private final String SEND_DRONE_DATA = "/v1/api/ws/drone";
private final String SEND_DRONE = "/v1/api/ws/drone";
private final String SEND_DRONE_ALL = "/v1/api/ws/drone/all";
public void sendData(DroneDto dto) {
RestClient client = RestClient.builder()
.baseUrl(webSocketWebHost)
.messageConverters(converts -> {
converts.removeIf(converter -> converter instanceof MappingJackson2HttpMessageConverter);
converts.add(new MappingJackson2HttpMessageConverter(ObjectMapperUtil.getObjectMapper()));
})
.build();
RestClient client = getRestClient();
client.post()
.uri(SEND_DRONE_DATA)
.uri(SEND_DRONE)
.contentType(MediaType.APPLICATION_JSON)
.body(dto)
.retrieve()
@ -41,5 +42,26 @@ public class WebSocketService {
sendData(dto);
}
public boolean sendDataAll(Map<String, List<DroneDto>> droneDtoList) {
RestClient client = getRestClient();
ResponseEntity<Void> resp = client.post()
.uri(SEND_DRONE_ALL)
.contentType(MediaType.APPLICATION_JSON)
.body(droneDtoList)
.retrieve()
.toBodilessEntity();
return resp.getStatusCode() == HttpStatus.OK;
}
private RestClient getRestClient() {
return RestClient.builder()
.baseUrl(webSocketWebHost)
.messageConverters(converts -> {
converts.removeIf(converter -> converter instanceof MappingJackson2HttpMessageConverter);
converts.add(new MappingJackson2HttpMessageConverter(ObjectMapperUtil.getObjectMapper()));
})
.build();
}
}

1
app/kac-websocket-app/build.gradle

@ -6,4 +6,3 @@ dependencies {
implementation project(":common:model")
}

18
app/kac-websocket-app/src/main/java/kr/co/palnet/kac/websocket/controller/SocketReceiverController.java

@ -13,6 +13,9 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Map;
@Slf4j
@RequiredArgsConstructor
@RestController
@ -32,4 +35,19 @@ public class SocketReceiverController {
return ResponseEntity.ok().build();
}
@PostMapping("/drone/all")
public ResponseEntity<Void> receiver(@RequestBody Map<String, List<DroneDto>> droneDtoAll) {
// DRON의 대한 식별정보만 이력 관리
ControlStorage controlCache = ControlStorage.getInstance();
droneDtoAll.values().forEach(droneDtoList -> {
droneDtoList.forEach(droneDto -> {
DroneControlDto history = controlService.dronDtoToControlDtoConvert(droneDto);
controlCache.put(history);
});
});
return ResponseEntity.ok().build();
}
}

119
app/kac-websocket-app/src/main/java/kr/co/palnet/kac/websocket/service/ControlService.java

@ -14,98 +14,61 @@ import java.util.*;
@Service
public class ControlService {
public List<DroneControlDto> getList() {
List<DroneControlDto> list = new ArrayList<>();
public DroneControlDto dronDtoToControlDtoConvert(DroneDto dronDto) {
ControlStorage controlCache = ControlStorage.getInstance();
Map<String, DroneControlDto> allHistory = controlCache.getAll();
if (Objects.nonNull(allHistory)) {
allHistory.forEach((k, v) -> {
/*
int cacheCount = v.getControlCacheCount();
// 데이터가 수신 되지 않고 이전 데이터를 표출하고 있는 경우
if (cacheCount == 1) {
if (v.isControlWarnCd() && v.isControlWarnNotyCd()) {
v.setControlCacheCount(2);
}
}
// 비정상 상황 판별하여 알림 표출 X
if (cacheCount == 2) {
if (v.isControlWarnCd() && v.isControlWarnNotyCd()) {
v.setControlWarnNotyCd(false);
}
}
*/
list.add(v);
});
}
// 기준 : 관제 시작일이 가장 느린순으로 상단에 올린다.
list.sort(Comparator.reverseOrder());
return list;
}
public DroneControlDto dronDtoToControlDtoConvert(DroneDto dronDTO) {
ControlStorage controlCache = ControlStorage.getInstance();
DroneControlDto prevControlDTO = controlCache.get(dronDTO.getObjectId());
DroneControlDto controlDTO = new DroneControlDto();
controlDTO.setObjectId(dronDTO.getObjectId());
controlDTO.setControlId(dronDTO.getControlId());
controlDTO.setControlStartDt(dronDTO.getControlStartDt());
controlDTO.setObjectTypeCd(dronDTO.getObjectType());
controlDTO.setLat(dronDTO.getLat());
controlDTO.setLon(dronDTO.getLon());
controlDTO.setElevType(dronDTO.getElevType());
controlDTO.setElev(dronDTO.getElev());
controlDTO.setSpeedType(dronDTO.getSpeedType());
controlDTO.setSpeed(dronDTO.getSpeed());
controlDTO.setBetteryLevel(dronDTO.getBetteryLevel());
controlDTO.setBetteryVoltage(dronDTO.getBetteryVoltage());
controlDTO.setDroneStatus(dronDTO.getDroneStatus());
controlDTO.setHeading(dronDTO.getHeading());
controlDTO.setMoveDistance(dronDTO.getMoveDistance());
controlDTO.setMoveDistanceType(dronDTO.getMoveDistanceType());
controlDTO.setServerRcvDt(dronDTO.getServerRcvDt());
DroneControlDto prevControlDto = controlCache.get(dronDto.getObjectId());
DroneControlDto controlDto = new DroneControlDto();
controlDto.setObjectId(dronDto.getObjectId());
controlDto.setControlId(dronDto.getControlId());
controlDto.setControlStartDt(dronDto.getControlStartDt());
controlDto.setObjectTypeCd(dronDto.getObjectType());
controlDto.setLat(dronDto.getLat());
controlDto.setLon(dronDto.getLon());
controlDto.setElevType(dronDto.getElevType());
controlDto.setElev(dronDto.getElev());
controlDto.setSpeedType(dronDto.getSpeedType());
controlDto.setSpeed(dronDto.getSpeed());
controlDto.setBetteryLevel(dronDto.getBetteryLevel());
controlDto.setBetteryVoltage(dronDto.getBetteryVoltage());
controlDto.setDroneStatus(dronDto.getDroneStatus());
controlDto.setHeading(dronDto.getHeading());
controlDto.setMoveDistance(dronDto.getMoveDistance());
controlDto.setMoveDistanceType(dronDto.getMoveDistanceType());
controlDto.setServerRcvDt(dronDto.getServerRcvDt());
// 환경 데이터 필드 추가
controlDTO.setSensorCo(dronDTO.getSensorCo());
controlDTO.setSensorSo2(dronDTO.getSensorSo2());
controlDTO.setSensorNo2(dronDTO.getSensorNo2());
controlDTO.setSensorO3(dronDTO.getSensorO3());
controlDTO.setSensorDust(dronDTO.getSensorDust());
controlDto.setSensorCo(dronDto.getSensorCo());
controlDto.setSensorSo2(dronDto.getSensorSo2());
controlDto.setSensorNo2(dronDto.getSensorNo2());
controlDto.setSensorO3(dronDto.getSensorO3());
controlDto.setSensorDust(dronDto.getSensorDust());
// 비정상 상황 식별코드 추가
controlDTO.setControlWarnCd(dronDTO.isControlWarnCd());
controlDto.setControlWarnCd(dronDto.isControlWarnCd());
if (prevControlDTO == null) {
if (controlDTO.isControlWarnCd()) {
controlDTO.setControlWarnNotyCd(true); // 최초 비정상 발생
if (prevControlDto == null) {
if (controlDto.isControlWarnCd()) {
controlDto.setControlWarnNotyCd(true); // 최초 비정상 발생
}
} else {
if (prevControlDTO.isControlWarnCd() && controlDTO.isControlWarnCd()) {
controlDTO.setControlWarnNotyCd(false); // 비정상 -> 비정상
if (prevControlDto.isControlWarnCd() && controlDto.isControlWarnCd()) {
controlDto.setControlWarnNotyCd(false); // 비정상 -> 비정상
}
if (prevControlDTO.isControlWarnCd() && !controlDTO.isControlWarnCd()) {
controlDTO.setControlWarnNotyCd(false); // 비정상 -> 정상
if (prevControlDto.isControlWarnCd() && !controlDto.isControlWarnCd()) {
controlDto.setControlWarnNotyCd(false); // 비정상 -> 정상
}
if (!prevControlDTO.isControlWarnCd() && controlDTO.isControlWarnCd()) {
controlDTO.setControlWarnNotyCd(true); // 정상 -> 비정상상
if (!prevControlDto.isControlWarnCd() && controlDto.isControlWarnCd()) {
controlDto.setControlWarnNotyCd(true); // 정상 -> 비정상상
}
}
controlDTO.setControlCacheCount(1);
controlDTO.setRegDt(dronDTO.getRegDt());
controlDto.setControlCacheCount(1);
controlDto.setRegDt(dronDto.getRegDt());
return controlDTO;
return controlDto;
}
}

3
app/kac-websocket-app/src/main/java/kr/co/palnet/kac/websocket/service/ScheduledService.java

@ -13,6 +13,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.Comparator;
import java.util.List;
@Slf4j
@ -35,6 +36,8 @@ public class ScheduledService {
if (controlDtoList == null || controlDtoList.isEmpty()) return;
controlDtoList.sort(Comparator.reverseOrder());
try {
String json = objectMapper.writeValueAsString(controlDtoList);
channelGroup.forEach(channel -> {

6
common/config-db/src/main/java/kr/co/palnet/kac/config/db/KacJpaConfig.java

@ -35,16 +35,10 @@ public class KacJpaConfig {
this.hibernateProperties = hibernateProperties;
}
@Value("${spring.datasource.pav-kac.jdbc-url}")
private String test;
@Bean(name = "kacDataSource")
@Primary
@ConfigurationProperties(prefix = "spring.datasource.pav-kac")
public DataSource kacDataSource() {
System.out.println("===========================");
System.out.println(test);
System.out.println("===========================");
return DataSourceBuilder.create().build();
}

7
common/model/src/main/java/kr/co/palnet/kac/common/model/common/DroneDto.java

@ -95,6 +95,11 @@ public class DroneDto {
private Instant regDt;
// 큐가 Socket서버에 도착한 시간
private boolean isSendUtm; // 불법드론 전송 여부
private boolean isSendHistroy; // 서버 전송 여부
private boolean isSendHistory; // 서버 전송 여부
private boolean isSendWebSocket; // WebSocket 전송 여부
public boolean isSendAll() {
return isSendHistory && isSendUtm && isSendWebSocket;
}
}

Loading…
Cancel
Save