Back-end/spring boot

MQTT - subscribe 시 retained data 받아오기 (how to set subscribe) [spring boot]

은성 개발자 2025. 1. 25. 22:16
728x90

MQTTv5 에서는 내가 알던 기능보다 다양한 부분이 있었다.

 

Retained Data

retained data는 mqtt에서 retained (유지한) 값을 그 topic에 맞게 저장하는 개념이다.

즉, retained를 지정해서 publish를 해놓으면 그 후에 해당 topic에 subscribe를 하면 바로 마지막에 publish된 값을 받을 수 있다.


gradle

// https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.mqttv5.client
implementation("org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5")

 

publish

// publish는.. 까먹음  
// publish( topic 명, 전달할 메시지, qos, retained 한다면 1/안하면 0)
client.publish("topic","message",1,1);

 

subscribe

import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PreDestroy;

@Configuration
public class MqttConfig {

    private final String broker = "tcp://localhost:1883"; // mqtt broker 설정
    private final String clientId = "client";
    private final String TOPIC = "test";
    private final int connectTimeout = 1000;
    private final int keepAlive = 30;

    private MqttClient mqttClient;
    private final MqttMessageHandler mqttMessageHandler;

    public MqttConfig(MqttMessageHandler mqttMessageHandler) {
        this.mqttMessageHandler = mqttMessageHandler;
    }

    @Bean
    public MqttConnectionOptions mqttConnectionOptions() {
        MqttConnectionOptions options = new MqttConnectionOptions();
        options.setServerURIs(new String[]{broker});
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(connectTimeout);
        options.setKeepAliveInterval(keepAlive);
        options.setCleanStart(false); // false해야 retained 됨
        return options;
    }

    @Bean(destroyMethod = "")
    public MqttClient mqttClient() {
        try {
            mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
            mqttClient.connect(mqttConnectionOptions());

            mqttClient.subscribe(RESPONSE_TOPIC,1);

            // Subscription 설정
            MqttSubscription subscription = new MqttSubscription(TOPIC, 1);
            subscription.setRetainHandling(1); // 1: 구독 시 retained 메시지 수신

            // 구독 수행
            mqttClient.subscribe(new MqttSubscription[]{subscription}, new IMqttMessageListener[] {
                new IMqttMessageListener() {
                    @Override
                    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                        // 여기서 바로 subscribe된 후에 retained 값 있으면 가져와서 처리해줘야 함
                        String message = new String(mqttMessage.getPayload());
                    }
                }
            });

            return mqttClient;
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }

    @PreDestroy
    public void close() {
        try {
            if (mqttClient != null && mqttClient.isConnected()) {
                mqttClient.disconnect();
                mqttClient.close();
            }
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
}

 

 


publish하는 법은 인터넷에서 많이 나와있지만 subscribe하는 방법은 알기가 어려워서 cursor를 통해 subscribe를 할 수 있었다.

 

고마워 따봉 커서야.

 

728x90
반응형