实现MQTT协议的中间件,目前使用的是Apache-Apollo服务器。
本文使用Gateway绑定的方式,进行消息发送。
SpringBoot
使用idea
新建Spring Initializr
工程,过程省略,使用Maven
对项目依赖进行管理,配置过程省略。
完成后,在pom
文件中加入以下依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
|
然后配置SpringBoot的文件application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| spring: mqtt: publish: username: admin password: password url: tcp://127.0.0.1:61613 client: id: mqttId default: topic: topic
|
SpringBoot配置类
配置MQTT的消息推送配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler;
import java.util.Arrays; import java.util.List;
@Configuration @IntegrationComponentScan public class MqttPublishConfig {
@Value("${spring.mqtt.publish.username}") private String username;
@Value("${spring.mqtt.publish.password}") private String password;
@Value("${spring.mqtt.publish.url}") private String hostUrl;
@Value("${spring.mqtt.publish.client.id}") private String clientId;
@Value("${spring.mqtt.publish.default.topic}") private String defaultTopic;
@Bean public MqttConnectOptions getMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setKeepAliveInterval(2); List<String> hostList = Arrays.asList(hostUrl.trim().split(",")); String[] serverURIs = new String[hostList.size()]; hostList.toArray(serverURIs); mqttConnectOptions.setServerURIs(serverURIs); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; }
@Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
|
Service
配置MqttGateway消息推送接口类,提供发布特定主题消息的能力。
1 2 3 4 5 6 7 8 9
| import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { void publishMqttMessageWithTopic(String data, @Header(MqttHeaders.TOPIC) String topic); }
|
RestController
配置http接口,用于(或者测试)触发消息的发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import com.xxxxxxxxxxxxxxxxxxxxx.mqtt.service.MqttGateway; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.MimeTypeUtils; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;
@RestController @RequestMapping("/mqtt") public class MessageApiController { @Autowired private MqttGateway mqttGateway;
private Logger logger = LoggerFactory.getLogger(getClass());
@RequestMapping(value = {"publish"}, method = {RequestMethod.POST}, produces = MimeTypeUtils.APPLICATION_JSON_VALUE) public void postPublishMessage(@RequestParam String message, @RequestParam(value = "topic", required = false) String topic) { logger.info("\n----------------------------START---------------------------\n" + "接收到发布请求:\ntopic:" + topic + "\nmessage:" + message + "\n-----------------------------END----------------------------");
if (topic == null || topic.isEmpty()) { topic = "topic"; }
mqttGateway.publishMqttMessageWithTopic(message, topic); } }
|
测试
启动SpringBootApplication
。
使用Postman
发送一条topic
为test-topic
的消息。

控制台中,打印出以下info

Apollo
后台,我们可以看到一个主题为test-topic
的记录

大功告成!下一篇介绍SpringBoot集成MQTT消息订阅功能!
本文标题:SpringBoot之MQTT消息发送
文章作者:梁通
发布时间:2019-03-15
最后更新:2020-09-15
原始链接:http://www.liangtong.site/2019/03/15/java_20190315_springboot_mqtt_publish/
版权声明:Copyright© 2016-2020 liangtong 版权所有