实现MQTT协议的中间件,目前使用的是Apache-Apollo服务器。
接上篇,本文介绍如何在SpringBoot上集成MQTT消息订阅功能。

SpringBoot

使用idea新建Spring Initializr工程,过程省略,使用Maven对项目依赖进行管理,配置过程省略。
完成后,在pom文件中加入以下依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!--MQTT Start-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--MQTT End-->

然后配置SpringBoot的文件application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
spring:
mqtt:
# subscribe 订阅
subscribe:
#订阅 - 用户名
username: admin
#订阅 - 密码
password: password
#订阅 - 服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
url: tcp://127.0.0.1:61613,tcp://192.168.2.133:61614
client:
#订阅 - 连接服务器默认客户端ID
id: mqttId-1
default:
#订阅 - 默认的消息推送主题,实际可在调用接口时指定
topic: hello,world
#订阅 - 连接超时
completionTimeout: 3000

在配置文件中,我们设置两个订阅主题helloworld

SpringBoot配置类

配置MQTT的消息订阅配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

import java.util.Arrays;
import java.util.List;

/**
* mqtt subscribe
*/
@Configuration
@IntegrationComponentScan
public class MqttSubscribeConfig {

@Value("${spring.mqtt.subscribe.username}")
private String username;

@Value("${spring.mqtt.subscribe.password}")
private String password;

@Value("${spring.mqtt.subscribe.url}")
private String hostUrl;

@Value("${spring.mqtt.subscribe.client.id}")
private String clientId;

@Value("${spring.mqtt.subscribe.default.topic}")
private String defaultTopic;

@Value("${spring.mqtt.subscribe.completionTimeout}")
private int completionTimeout ; //连接超时

private Logger logger = LoggerFactory.getLogger(getClass());

@Bean
public MqttConnectOptions getReceiverMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
List<String> hostList = Arrays.asList(hostUrl.trim().split(","));
String[] serverURIs = new String[hostList.size()];
hostList.toArray(serverURIs);
mqttConnectOptions.setServerURIs(serverURIs);
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory receiverMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getReceiverMqttConnectOptions());
return factory;
}

//接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}

//配置client,监听的topic
@Bean
public MessageProducer inbound() {
List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
String[] topics = new String[topicList.size()];
topicList.toArray(topics);

MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId,
receiverMqttClientFactory(),
topics);
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}

//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String msg = message.getPayload().toString();
logger.info("\n----------------------------START---------------------------\n" +
"接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg +
"\n-----------------------------END----------------------------");
}
};
}
}

Service

配置MqttGateway消息推送接口类,提供发布特定主题消息的能力。

1
2
3
4
5
6
7
8
9
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void publishMqttMessageWithTopic(String data,
@Header(MqttHeaders.TOPIC) String topic);
}

测试

启动SpringBootApplication
使用Postman发送一条topictest的消息和一条topichello的消息。
控制台中,打印出以下info

console.png

Apollo后台,我们可以看到以下记录

apollo.png

多客户端

SpringBoot项目打成jar包,然后运行。
之后使用Postman发送一条topichello的消息。可以看到两个订阅端都收到了消息

multi-client-console.png

Apollo后台,我们可以看到consumer的数量已经变更为2

multi-client-apollo.png

大功告成!