728x90
MQTTv5 에서는 내가 알던 기능보다 다양한 부분이 있었다.
Retained Data
retained data는 mqtt에서 retained (유지한) 값을 그 topic에 맞게 저장하는 개념이다.
즉, retained를 지정해서 publish를 해놓으면 그 후에 해당 topic에 subscribe를 하면 바로 마지막에 publish된 값을 받을 수 있다.
gradle
// https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.mqttv5.client
implementation("org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5")
publish
// publish는.. 까먹음
// publish( topic 명, 전달할 메시지, qos, retained 한다면 1/안하면 0)
client.publish("topic","message",1,1);
subscribe
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PreDestroy;
@Configuration
public class MqttConfig {
private final String broker = "tcp://localhost:1883"; // mqtt broker 설정
private final String clientId = "client";
private final String TOPIC = "test";
private final int connectTimeout = 1000;
private final int keepAlive = 30;
private MqttClient mqttClient;
private final MqttMessageHandler mqttMessageHandler;
public MqttConfig(MqttMessageHandler mqttMessageHandler) {
this.mqttMessageHandler = mqttMessageHandler;
}
@Bean
public MqttConnectionOptions mqttConnectionOptions() {
MqttConnectionOptions options = new MqttConnectionOptions();
options.setServerURIs(new String[]{broker});
options.setAutomaticReconnect(true);
options.setConnectionTimeout(connectTimeout);
options.setKeepAliveInterval(keepAlive);
options.setCleanStart(false); // false해야 retained 됨
return options;
}
@Bean(destroyMethod = "")
public MqttClient mqttClient() {
try {
mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
mqttClient.connect(mqttConnectionOptions());
mqttClient.subscribe(RESPONSE_TOPIC,1);
// Subscription 설정
MqttSubscription subscription = new MqttSubscription(TOPIC, 1);
subscription.setRetainHandling(1); // 1: 구독 시 retained 메시지 수신
// 구독 수행
mqttClient.subscribe(new MqttSubscription[]{subscription}, new IMqttMessageListener[] {
new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// 여기서 바로 subscribe된 후에 retained 값 있으면 가져와서 처리해줘야 함
String message = new String(mqttMessage.getPayload());
}
}
});
return mqttClient;
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
@PreDestroy
public void close() {
try {
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.disconnect();
mqttClient.close();
}
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}
publish하는 법은 인터넷에서 많이 나와있지만 subscribe하는 방법은 알기가 어려워서 cursor를 통해 subscribe를 할 수 있었다.
고마워 따봉 커서야.
728x90
반응형