Browse Source

websocket consumer

master
노승철 2 years ago
parent
commit
dcb0064ad2
  1. 49
      src/main/java/com/palnet/process/message/config/MessageConfig.java
  2. 7
      src/main/java/com/palnet/process/message/consumer/MessageConsumer.java
  3. 16
      src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java

49
src/main/java/com/palnet/process/message/config/MessageConfig.java

@ -5,6 +5,7 @@ import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
@ -15,59 +16,19 @@ import org.springframework.core.env.Environment;
@Configuration
@Slf4j
public class MessageConfig {
private final Environment env;
public MessageConfig(Environment env) {
this.env = env;
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
return new CachingConnectionFactory();
}
// @Bean
// public CachingConnectionFactory cachingConnectionFactory() {
// return new CachingConnectionFactory();
// }
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public DirectExchange appDroneExchange() {
return new DirectExchange(env.getProperty("message.app.exchange-name"));
}
@Bean
public DirectExchange websocketDroneExchange() {
return new DirectExchange(env.getProperty("message.websocket.exchange-name"));
}
@Bean
public Queue appDroneQueue() {
return new Queue(env.getProperty("message.app.queue-name"), false);
}
@Bean
public Queue websocketDroneQueue() {
return new Queue(env.getProperty("message.websocket.queue-name"), false);
}
@Bean
public Binding appDroneBinding(Queue appDroneQueue, DirectExchange appDroneExchange) {
return BindingBuilder.bind(appDroneQueue)
.to(appDroneExchange)
.with(env.getProperty("message.app.routing-key"));
}
@Bean
public Binding websocketDroneBinding(Queue websocketDroneQueue, DirectExchange websocketDroneExchange) {
return BindingBuilder.bind(websocketDroneQueue)
.to(websocketDroneExchange)
.with(env.getProperty("message.websocket.routing-key"));
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);

7
src/main/java/com/palnet/process/message/consumer/MessageConsumer.java

@ -1,5 +1,6 @@
package com.palnet.process.message.consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.palnet.comn.model.CtrCntrlModel;
import com.palnet.comn.utils.JsonUtils;
@ -28,8 +29,10 @@ public class MessageConsumer {
@RabbitHandler
@RabbitListener(queues = {"websocket.drone.queue"})
public void receiveDroneMessage(final GPDatabaseModel model) {
ArrayList<CtrCntrlModel> list = this.getList(model);
public void receiveDroneMessage(final String message) throws JsonProcessingException {
GPDatabaseModel gpDatabaseModel = objectMapper.readValue(message, GPDatabaseModel.class);
ArrayList<CtrCntrlModel> list = this.getList(gpDatabaseModel);
cc.getAllChannels().stream().forEach(c -> { // 접속되어 있는 모든 사용자에게 전달 처리
c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(list)));

16
src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java

@ -7,6 +7,8 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
public class CtrCntrlTask implements Runnable{
private ChannelCollection cc = new ChannelCollection();
@ -19,13 +21,13 @@ public class CtrCntrlTask implements Runnable{
@Override
public void run() {
// try {
// cc.getAllChannels().stream().forEach(c -> { // 접속되어 있는 모든 사용자에게 전달 처리
// c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(service.getTest())));
// });
// }catch(Exception e) {
// e.printStackTrace();
// }
try {
cc.getAllChannels().stream().forEach(c -> { // 접속되어 있는 모든 사용자에게 전달 처리
c.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(new ArrayList<>())));
});
}catch(Exception e) {
e.printStackTrace();
}
}

Loading…
Cancel
Save