实现MQTT协议的中间件,目前使用的是Apache-Apollo服务器。
接上篇,本文介绍如何在SpringBoot上集成MQTT消息订阅功能。
SpringBoot
使用idea
新建Spring Initializr
工程,过程省略,使用Maven
对项目依赖进行管理,配置过程省略。
完成后,在pom
文件中加入以下依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
|
然后配置SpringBoot的文件application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| spring: mqtt: subscribe: username: admin password: password url: tcp://127.0.0.1:61613,tcp://192.168.2.133:61614 client: id: mqttId-1 default: topic: hello,world completionTimeout: 3000
|
在配置文件中,我们设置两个订阅主题hello
和world
SpringBoot配置类
配置MQTT的消息订阅配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
| import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException;
import java.util.Arrays; import java.util.List;
@Configuration @IntegrationComponentScan public class MqttSubscribeConfig {
@Value("${spring.mqtt.subscribe.username}") private String username;
@Value("${spring.mqtt.subscribe.password}") private String password;
@Value("${spring.mqtt.subscribe.url}") private String hostUrl;
@Value("${spring.mqtt.subscribe.client.id}") private String clientId;
@Value("${spring.mqtt.subscribe.default.topic}") private String defaultTopic;
@Value("${spring.mqtt.subscribe.completionTimeout}") private int completionTimeout ;
private Logger logger = LoggerFactory.getLogger(getClass());
@Bean public MqttConnectOptions getReceiverMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); List<String> hostList = Arrays.asList(hostUrl.trim().split(",")); String[] serverURIs = new String[hostList.size()]; hostList.toArray(serverURIs); mqttConnectOptions.setServerURIs(serverURIs); mqttConnectOptions.setKeepAliveInterval(2); return mqttConnectOptions; } @Bean public MqttPahoClientFactory receiverMqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getReceiverMqttConnectOptions()); return factory; }
@Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); }
@Bean public MessageProducer inbound() { List<String> topicList = Arrays.asList(defaultTopic.trim().split(",")); String[] topics = new String[topicList.size()]; topicList.toArray(topics);
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, receiverMqttClientFactory(), topics); adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; }
@Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String msg = message.getPayload().toString(); logger.info("\n----------------------------START---------------------------\n" + "接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg + "\n-----------------------------END----------------------------"); } }; } }
|
Service
配置MqttGateway消息推送接口类,提供发布特定主题消息的能力。
1 2 3 4 5 6 7 8 9
| import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { void publishMqttMessageWithTopic(String data, @Header(MqttHeaders.TOPIC) String topic); }
|
测试
启动SpringBootApplication
。
使用Postman
发送一条topic
为test
的消息和一条topic
为hello
的消息。
控制台中,打印出以下info

Apollo
后台,我们可以看到以下记录

多客户端
将SpringBoot
项目打成jar
包,然后运行。
之后使用Postman
发送一条topic
为hello
的消息。可以看到两个订阅端都收到了消息

Apollo
后台,我们可以看到consumer的数量已经变更为2

大功告成!
本文标题:SpringBoot之MQTT消息订阅
文章作者:梁通
发布时间:2019-03-16
最后更新:2020-09-16
原始链接:http://www.liangtong.site/2019/03/16/java_20190316_springboot_mqtt_subscribe/
版权声明:Copyright© 2016-2020 liangtong 版权所有