From e287bc6f377566ac2c901f32b1ff43efb638e467 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?scnoh=28=EB=85=B8=EC=8A=B9=EC=B2=A0=29?= Date: Wed, 14 Sep 2022 19:17:52 +0900 Subject: [PATCH] =?UTF-8?q?Socket=20=EA=B5=AC=EC=A1=B0=20=EC=88=98?= =?UTF-8?q?=EC=A0=95=20=EC=9E=91=EC=97=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../palnet/comn/collection/GPCollection.java | 34 ++++- .../java/com/palnet/server/SocketServer.java | 44 ++---- .../palnet/server/command/SocketCommand.java | 142 ++++++++++++------ .../palnet/server/handler/SocketHandler.java | 58 ++++--- src/main/resources/application.yml | 6 + 5 files changed, 175 insertions(+), 109 deletions(-) diff --git a/src/main/java/com/palnet/comn/collection/GPCollection.java b/src/main/java/com/palnet/comn/collection/GPCollection.java index 7e1e8cf..c8f4c31 100644 --- a/src/main/java/com/palnet/comn/collection/GPCollection.java +++ b/src/main/java/com/palnet/comn/collection/GPCollection.java @@ -2,15 +2,26 @@ package com.palnet.comn.collection; import com.palnet.comn.utils.ContextUtils; import com.palnet.comn.utils.DateUtils; +import com.palnet.comn.utils.JsonUtils; import com.palnet.process.message.producer.MessageProducer; import com.palnet.comn.model.GPDatabaseModel; import com.palnet.comn.model.GPHistoryModel; import com.palnet.comn.model.GPModel; +import io.netty.channel.AbstractChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeanUtils; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.charset.StandardCharsets; import java.util.*; /** @@ -51,12 +62,25 @@ public class GPCollection { data.setPostionHistory(historyList); /* Message Queue Server 전달 */ -// convertModel(data); messageProducer.sendToAppServerMessage(data); + + try { + Socket socket = new Socket(); + SocketAddress address = new InetSocketAddress("192.168.0.24", 4355); + socket.connect(address); + + String gpsJson = JsonUtils.toJson(data); + + OutputStream outputStream = socket.getOutputStream(); + outputStream.write(gpsJson.getBytes(StandardCharsets.UTF_8)); + outputStream.flush(); + + socket.close(); + + } catch (IOException e) { + e.printStackTrace(); + } } } - -// public void convertModel(GPModel model) { -// messageProducer.sendToAppServerMessage(model); -// } + } diff --git a/src/main/java/com/palnet/server/SocketServer.java b/src/main/java/com/palnet/server/SocketServer.java index 8e544a9..f12fb56 100644 --- a/src/main/java/com/palnet/server/SocketServer.java +++ b/src/main/java/com/palnet/server/SocketServer.java @@ -21,19 +21,13 @@ public class SocketServer { private Logger logger = LoggerFactory.getLogger(getClass()); @Value("${netty.socket.port}") - private int port; - - @Value("${netty.socket.thread.boss}") - private int threadBoss; - - @Value("${netty.socket.thread.worker}") - private int threadWorker; - - - private Channel ch; + private int socketPort; + + @Value("${netty.websocket.port}") + private int websocketPort; - private EventLoopGroup bossGroup ; - private EventLoopGroup workerGroup ; + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; private ServerBootstrap b = new ServerBootstrap(); // @Autowired @@ -73,8 +67,6 @@ public class SocketServer { .childHandler(new SocketInitializer()); connection(); -// taskProcess(); -// client.start(); logger.warn("====== [SOCKET SERVER] Start ====== "); }catch (Exception e) { @@ -85,20 +77,16 @@ public class SocketServer { } private void connection() { - //서버는 Listen상태로 기다려야하는데, 톰캣이 Timeout이 발생함 - //이를 방지하기 위해 Thread로 처리한다. - new Thread(new Runnable() { - @Override - public void run() { - try { - ChannelFuture future = b.bind(port).sync(); - future.channel().closeFuture().sync(); - - } catch (InterruptedException e) { - logger.error("InterruptedException", e); - } - } - }).start(); + //서버는 Listen 상태로 기다려야하는데, 톰캣이 Timeout 발생함, 이를 방지하기 위해서 별도의 thread 로 처리한다. + new Thread(() -> { + try { + ChannelFuture socketFuture = b.bind(socketPort).sync(); + socketFuture.channel().closeFuture().sync(); + + } catch (InterruptedException e) { + logger.error("InterruptedException", e); + } + }).start(); } diff --git a/src/main/java/com/palnet/server/command/SocketCommand.java b/src/main/java/com/palnet/server/command/SocketCommand.java index 36f0bca..d724100 100644 --- a/src/main/java/com/palnet/server/command/SocketCommand.java +++ b/src/main/java/com/palnet/server/command/SocketCommand.java @@ -1,11 +1,25 @@ package com.palnet.server.command; import com.palnet.comn.collection.GPCollection; +import com.palnet.comn.model.GPHistoryModel; import com.palnet.comn.model.GPModel; +import com.palnet.comn.utils.ContextUtils; +import com.palnet.comn.utils.DateUtils; +import com.palnet.comn.utils.JsonUtils; +import com.palnet.process.message.producer.MessageProducer; import com.palnet.server.codec.SocketPayload; +import com.palnet.server.collection.ChannelCollection; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.core.env.Environment; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -13,27 +27,28 @@ import java.util.List; public class SocketCommand { private Logger logger = LoggerFactory.getLogger(getClass()); + private MessageProducer messageProducer; + private Environment env; + + public SocketCommand() { + this.messageProducer = (MessageProducer) ContextUtils.getBean("messageProducer"); + this.env = env; + } + GPCollection collection = new GPCollection(); - public void openSkyCollectionPut(SocketPayload payload) { - + /** Socket Body **/ + List> resultList = (ArrayList)payload.getBody(); + /** 전문 설정 ***/ - String messageType = "OPENSKY"; - String objectType = "FLIGHT"; - - List> resultList = new ArrayList>(); - + final String messageType = "OPENSKY"; + final String objectType = "FLIGHT"; + List dataList = new ArrayList(); - - logger.debug("BODY TYPE :: " + payload.getBody().getClass()); - - resultList = (ArrayList)payload.getBody(); - - GPModel model; - + for(LinkedHashMap obj : resultList) { - model = new GPModel(); + GPModel model = new GPModel(); model.setObjectType(objectType); model.setMessageType(messageType); @@ -66,45 +81,38 @@ public class SocketCommand { * @param payload */ public void sandboxCollectionPut(SocketPayload payload) { - + List> resultList = (ArrayList)payload.getBody(); + /** 전문 설정 ***/ - String messageType = "LTEM"; - String objectType = "DRON"; - - List> resultList = new ArrayList>(); - - List dataList = new ArrayList(); - - logger.debug("BODY TYPE :: " + payload.getBody().getClass()); - - resultList = (ArrayList)payload.getBody(); - - GPModel model; - - for(LinkedHashMap obj : resultList) { + final String messageType = "LTEM"; + final String objectType = "DRON"; + +// List dataList = new ArrayList(); + /** 데이터 모델링 **/ + for(LinkedHashMap obj : resultList) { //위,경도 좌표가 0으로 들어오는 것은 무시 처리 if((Double)obj.get("lat") > 0 && (Double)obj.get("lon") > 0) { - model = new GPModel(); -// logger.debug(">>>>" + obj.toString()); + GPModel model = new GPModel(); + model.setObjectType(objectType); model.setMessageType(messageType); model.setObjectId(obj.get("objectId").toString().trim()); model.setTerminalId(payload.getTerminalId()); + model.setTerminalRcvDt(obj.get("terminalRcvDt").toString().trim()); + model.setSpeedType((String)obj.get("speedType")); + model.setElevType((String)obj.get("elevType")); + model.setMoveDistanceType((String)obj.get("moveDistanceType")); + model.setDronStatus((String)obj.get("dronStatus")); + if(obj.get("lat") != null) model.setLat(Double.valueOf(obj.get("lat").toString())); if(obj.get("lon") != null) model.setLng(Double.valueOf(obj.get("lon").toString())); if(obj.get("elev") != null) model.setElev(Double.valueOf(obj.get("elev").toString())); - model.setTerminalRcvDt(obj.get("terminalRcvDt").toString().trim()); if(obj.get("speed") != null) model.setSpeed(Double.valueOf(obj.get("speed").toString())); if(obj.get("heading") != null) model.setHeading(Double.valueOf(obj.get("heading").toString())); - model.setSpeedType((String)obj.get("speedType")); - model.setElevType((String)obj.get("elevType")); - model.setMoveDistanceType((String)obj.get("moveDistanceType")); if(obj.get("moveDistance") != null) model.setMoveDistance(Double.valueOf(obj.get("moveDistance").toString())); - if(obj.get("betteryLevel") != null) model.setBetteryLevel(Double.valueOf(obj.get("betteryLevel").toString())); if(obj.get("betteryVoltage") != null) model.setBetteryVoltage(Double.valueOf(obj.get("betteryVoltage").toString())); - model.setDronStatus((String)obj.get("dronStatus")); // 환경 데이터 필드 추가 if(obj.get("sensorCo") != null) model.setSensorCo(Double.valueOf(obj.get("sensorCo").toString())); @@ -113,13 +121,59 @@ public class SocketCommand { if(obj.get("sensorO3") != null) model.setSensorO3(Double.valueOf(obj.get("sensorO3").toString())); if(obj.get("sensorDust") != null) model.setSensorDust(Double.valueOf(obj.get("sensorDust").toString())); - dataList.add(model); - }else { - logger.info("lat , lon No data "); + // 서버 수신 시간 정보 + model.setServerRcvDt(DateUtils.getCurrentTime()); + + // 관제 이력 정보 + List hisList; + + GPHistoryModel history = new GPHistoryModel(); + history.setObjectId(model.getObjectId()); + history.setLat(model.getLat()); + history.setLng(model.getLng()); + + if(model.getPostionHistory() != null) { + hisList = model.getPostionHistory(); + } else { + hisList = new ArrayList<>(); + } + + hisList.add(history); + model.setPostionHistory(hisList); + + // STEP 1. Control ID 발급 -> Application Server Http 통신 + + + + // STEP 2. 이력 생성할 전문 전달 + messageProducer.sendToAppServerMessage(model); + + // STEP 3. 화면에 표출할 정보 WebSocket 전달 + try { + Socket socket = new Socket(); + SocketAddress address = new InetSocketAddress( + env.getProperty("netty.websocket.host"), + Integer.parseInt(env.getProperty("netty.websocket.port")) + ); + socket.connect(address); + + String gpsJson = JsonUtils.toJson(model); + + OutputStream outputStream = socket.getOutputStream(); + outputStream.write(gpsJson.getBytes(StandardCharsets.UTF_8)); + outputStream.flush(); + + socket.close(); + } catch (IOException e) { + logger.error(""); + } +// dataList.add(model); + + } else { + logger.error("좌표 정보가 존재하지 않습니다."); + throw new IllegalArgumentException("좌표 정보가 존재하지 않습니다."); } } - collection.putData(dataList); +// collection.putData(dataList); } - - } diff --git a/src/main/java/com/palnet/server/handler/SocketHandler.java b/src/main/java/com/palnet/server/handler/SocketHandler.java index f5ed142..329fcf3 100644 --- a/src/main/java/com/palnet/server/handler/SocketHandler.java +++ b/src/main/java/com/palnet/server/handler/SocketHandler.java @@ -5,6 +5,8 @@ import com.palnet.comn.utils.JsonUtils; import com.palnet.server.codec.SocketPayload; import com.palnet.server.codec.SocketPayloadResponse; import com.palnet.server.command.SocketCommand; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.slf4j.Logger; @@ -31,29 +33,27 @@ public class SocketHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, SocketPayload payload){ - + try { - logger.debug("==================== [SocketHandler channelRead0 ] ==================== "); - logger.debug("AUTH KEY :: ==== > " +payload.getAuthKey()); -// logger.info(auth.checkAuthkey(payload.getAuthKey().toString().trim())); - - if(StringUtils.isEmpty(payload.getAuthKey())) { - // 인증키가 없을 경우 - logger.debug("AUTH : no key " ); + logger.debug("AUTH KEY ===> {}", payload.getAuthKey()); + + if(payload.getAuthKey().isEmpty()) { + logger.info("====== auth key is empty ======"); + res.setRspCode("-2000"); - res.setRspMessage("inValidate AuthKey"); - }else if(!auth.checkAuthkey(payload.getAuthKey().toString().trim())) { - // 인증키가 틀린 경우 - logger.debug("AUTH : error key " ); - logger.debug(payload.getAuthKey() ); -// logger.info(auth.checkAuthkey(payload.getAuthKey().toString().trim())); + res.setRspMessage("auth key is empty !"); + + } else if(!auth.checkAuthkey(payload.getAuthKey().trim())) { + logger.info("===== auth key is invalid ====="); + res.setRspCode("-2000"); - res.setRspMessage("inValidate AuthKey"); - }else{ - logger.warn("MSG :: ==== > " + JsonUtils.toJson(payload)); - - switch(payload.getCommand().trim()) { + res.setRspMessage("Invalid auth key !"); + + } else { + logger.info("====> Socket Payload : {}", JsonUtils.toJson(payload)); + + switch (payload.getCommand().trim()) { case "OPENSKY" : command.openSkyCollectionPut(payload); break; @@ -63,28 +63,25 @@ public class SocketHandler extends SimpleChannelInboundHandler { default: break; } - + res.setRspCode("0"); res.setRspMessage("SUCCESS"); - } - - - }catch(Exception e) { + + } catch(Exception e) { res.setRspCode("-9999"); res.setRspMessage("Etc error"); - + e.printStackTrace(); - }finally { + } finally { logger.debug("res >>>" + JsonUtils.toJson(res)); ctx.writeAndFlush(res); } } - - - @Override + + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.info("=========== [exceptionCaught ] ===================="); res.setRspCode("-9999"); @@ -94,7 +91,4 @@ public class SocketHandler extends SimpleChannelInboundHandler { cause.printStackTrace(); ctx.close(); } - - - } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 5a6bb59..10076ce 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -17,6 +17,9 @@ netty: thread: boss: 1 worker: 1 + websocket: + host: 127.0.0.1 + port: 8081 server: port: 8182 @@ -85,6 +88,9 @@ netty: thread: boss: 1 worker: 1 + websocket: + host: pav.palntour.com + port: 8081 server: port: 8182