上一章介绍了RabbitMQ
中死信队列的使用,接下来介绍RabbitMQ中延时队列的使用,包括队列TTL,消息TTL和RabbitMQ延迟插件。

使用场景
延时队列在需要延时处理的场景下非常有用,队列中的元素都是带有时间属性的, 普通队列的元素按照队列的次序依次取出处理;延时队列中的元素则按照指定时间被取出和处理。
- 订单10分钟内未支付则自动取消。
- 店铺10天没有更新,发消息提醒。
- 账单一周内未支付,自动支付。
- 新用户注册3天内没登录,短信提醒。
- 预约会议后,在预定的时间开始前15分钟通知参会人员。
- 发送定时消息。
这些场景都有一个共同点,就是跟时间有关。理想情况下,通过定时任务轮询数据可以实现功能;但随着数据量的增加,轮询会带来巨大的性能问题,同时轮询的方式处理起来很不优雅。这时候延时队列就派上用场了。
RabbitMQ中的TTL
TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者队列中的所有消息的最大存活时间(毫秒)。
即如果一条消息设置了TTL,或者进入了设置TTL属性的队列,那么这条消息如果在TTL时限内未被消费,则会成为死信;如果两者同时设置了TTL,那么较小TTL会被使用。
在消息属性上设置TTL的方式,消息可能并不会按时“死亡“,RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,索引如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。
队列TTL的设置,实在创建队列的时候,设置队列的x-message-ttl
属性,例如:
1 2 3
| Map<String, Object> args = new HashMap<>(3); args.put("x-message-ttl", 60000); return QueueBuilder.durable(QUEUE_DELAY_FIX_NAME).withArguments(args).build();
|
队列TTL + 死信队列
通过给队列配置TTL,配合死信队列来实现消息的延时。因为队列的TTL固定,所以这种如果有新的时间需求,则需要增加队列。 这种适用于固定延时时间的场景。
配置队列
声明常量
1 2 3 4 5 6 7 8 9 10 11
| public class MQConstant {
public static final String QUEUE_DELAY_FIX_1_NAME = "queue.demo.fix.1.delay"; public static final String QUEUE_DEAD_NAME = "queue.demo.dead"; }
|
队列路由与交换器绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| import static org.liangtong.example.rabbit.constant.MQConstant.*;
@Configuration public class RabbitMQConfig {
@Bean("fixDelayExchange") public DirectExchange fixDelayExchange() { return new DirectExchange(EXCHANGE_DELAY_FIX_NAME); } @Bean("deadExchange") public DirectExchange deadExchange() { return new DirectExchange(EXCHANGE_DEAD_NAME); }
@Bean("fixDelayQueue1") public Queue fixDelayQueue1() { Map<String, Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange", EXCHANGE_DEAD_NAME); args.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD_NAME); args.put("x-message-ttl", 60000); return QueueBuilder.durable(QUEUE_DELAY_FIX_1_NAME).withArguments(args).build(); } @Bean("deadQueue") public Queue deadQueue() { return new Queue(QUEUE_DEAD_NAME); }
@Bean public Binding fixDelay1Binding(@Qualifier("fixDelayQueue1") Queue queue, @Qualifier("fixDelayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue) .to(exchange) .with(ROUTING_KEY_DELAY_FIX_1_NAME); } @Bean public Binding deadBinding(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") DirectExchange exchange) { return BindingBuilder.bind(queue) .to(exchange) .with(ROUTING_KEY_DEAD_NAME); } }
|
死信消费者
此处不再配置延时消费者,而是例如TTL过期直接进消费队列机制,仅配置死信消息消费者
1 2 3 4 5 6 7 8 9 10
| @Slf4j @Component public class DeadConsumer { @RabbitListener(queues = QUEUE_DEAD_NAME) public void receiveDead(Message message, Channel channel) throws IOException { log.info("收到死信消息: " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
|
消息生产者
消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Slf4j @Component public class MsgProducer { @Autowired private RabbitTemplate rabbitTemplate;
public void sendFixDelayMsg1(String msg){ log.info("发送延时消息:{}", msg); rabbitTemplate.convertAndSend(EXCHANGE_DELAY_FIX_NAME, ROUTING_KEY_DELAY_FIX_1_NAME, msg, new CorrelationData() ); } }
|
测试
创建Webcontroller来进行功能测试,代码如下:
1 2 3 4 5 6 7 8 9 10 11
| @RequestMapping("rabbitmq") @RestController public class RabbitMsgController { @Autowired private MsgProducer sender;
@RequestMapping("fixDelay1") public void sendFixDelay1Msg(String msg){ sender.sendFixDelayMsg1(msg); } }
|
启动SpringBoot项目,然后浏览器输入http://localhost:8081/rabbitmq/fixDelay1?msg=Hello
和http://localhost:8081/rabbitmq/fixDelay1?msg=梁通
测试:
1 2 3 4
| 2020-09-16 12:45:59.686 INFO 84248 --- [nio-8081-exec-5] o.l.example.rabbit.producer.MsgProducer : 发送延时消息:Hello 2020-09-16 12:46:02.521 INFO 84248 --- [nio-8081-exec-6] o.l.example.rabbit.producer.MsgProducer : 发送延时消息:梁通 2020-09-16 12:46:59.690 INFO 84248 --- [ntContainer 2020-09-16 12:47:02.524 INFO 84248 --- [ntContainer
|
可以看到,消息在经过60秒后,被消费掉。
消息TTL + 死信队列
在队列上设置TTL时,延时时间固定。还有一种方案,就是设置任意延时时长的消息,然后设置消息的TTL。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class MQConstant {
public static final String QUEUE_DELAY_FIX_1_NAME = "queue.demo.fix.1.delay"; public static final String QUEUE_DELAY_FIX_2_NAME = "queue.demo.fix.2.delay"; public static final String QUEUE_DEAD_NAME = "queue.demo.dead"; }
|
队列配置
不设置队列TTL相关代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Bean("fixDelayQueue2") public Queue fixDelayQueue2() { Map<String, Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange", EXCHANGE_DEAD_NAME); args.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD_NAME); return QueueBuilder.durable(QUEUE_DELAY_FIX_2_NAME).withArguments(args).build(); }
@Bean public Binding fixDelay2Binding(@Qualifier("fixDelayQueue2") Queue queue, @Qualifier("fixDelayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue) .to(exchange) .with(ROUTING_KEY_DELAY_FIX_2_NAME); }
|
消息生产者
设置消息过期时间
1 2 3 4 5 6 7 8 9 10 11
| public void sendFixDelayMsg2(String msg, Integer delay){ log.info("发送延时消息:{} 延时{}秒", msg, delay); rabbitTemplate.convertAndSend(EXCHANGE_DELAY_FIX_NAME, ROUTING_KEY_DELAY_FIX_2_NAME, msg, message -> { message.getMessageProperties().setExpiration(String.valueOf(delay * 1000) ); return message; } ); }
|
测试代码
1 2 3 4
| @RequestMapping("fixDelay2") public void sendFixDelay2Msg(String msg, Integer delay){ sender.sendFixDelayMsg2(msg, delay); }
|
启动SpringBoot项目,然后浏览器输入http://localhost:8081/rabbitmq/fixDelay2?测试10&delay=10
和http://localhost:8081/rabbitmq/fixDelay2?测试20&delay=20
和http://localhost:8081/rabbitmq/fixDelay2?测试30&delay=30
测试:
1 2 3 4 5 6 7 8 9
| 2020-09-16 13:38:13.371 INFO 94573 --- [nio-8081-exec-5] o.l.example.rabbit.producer.MsgProducer : 发送延时消息:测试10 延时10秒 2020-09-16 13:38:18.863 INFO 94573 --- [nio-8081-exec-6] o.l.example.rabbit.producer.MsgProducer : 发送延时消息:测试20 延时20秒 2020-09-16 13:38:23.374 INFO 94573 --- [ntContainer 2020-09-16 13:38:24.206 INFO 94573 --- [nio-8081-exec-7] o.l.example.rabbit.producer.MsgProducer : 发送延时消息:测试30 延时30秒 2020-09-16 13:38:24.379 INFO 94573 --- [ntContainer 2020-09-16 13:38:38.866 INFO 94573 --- [ntContainer 2020-09-16 13:38:39.870 INFO 94573 --- [ntContainer 2020-09-16 13:38:54.209 INFO 94573 --- [ntContainer 2020-09-16 13:38:55.211 INFO 94573 --- [ntContainer
|
可以看到,发送的消息,再经过特定的ttl时间后,被消费掉。
别高兴太早,前边介绍过,在消息上添加TTL时,消息可能不会按时消亡。RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,索引如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。然后我们测试下:
浏览器输入http://localhost:8081/rabbitmq/fixDelay2?测试30&delay=30
和http://localhost:8081/rabbitmq/fixDelay2?测试10&delay=10
测试:
1 2 3 4
| 2020-09-16 13:44:35.203 INFO 94573 --- [io-8081-exec-10] o.l.example.rabbit.producer.MsgProducer : 发送延时消息:测试30 延时30秒 2020-09-16 13:44:39.832 INFO 94573 --- [nio-8081-exec-1] o.l.example.rabbit.producer.MsgProducer : 发送延时消息:测试10 延时10秒 2020-09-16 13:45:05.206 INFO 94573 --- [ntContainer 2020-09-16 13:45:05.207 INFO 94573 --- [ntContainer
|
结果显示,即使第二条消息的TTL较短(10秒),消息在第一条消息成为死信后才被消费。(惊不惊喜,意不意外!)
延时插件
从上边测试可以看出,RabbitMQ在消息粒度上的延迟无法满足个性化的延时需求。那么这种消息粒度上的延时问题如何解决呢。
接下来介绍通过安装RabbitMQ插件的形式来完美解决。
插件下载地址:https://www.rabbitmq.com/community-plugins.html

这里,我们需要找到 rabbitmq_delayed_message_exchange
插件,下载后,将rabbitmq_delayed_message_exchange-3.8.0.ez
文件放到sbin
同级的plugins
目录下,然后切换到sbin
目录,安装插件并重启服务
1 2 3
| > ./rabbitmq-plugins enable rabbitmq_delayed_message_exchange > ./rabbitmqctl stop > ./rabbitmq-server
|

配置队列
设置常量
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class MQConstant {
public static final String EXCHANGE_DELAY_NAME = "exchange.demo.delay"; public static final String EXCHANGE_DEAD_NAME = "exchange.demo.dead";
public static final String ROUTING_KEY_DELAY_NAME = "routingkey.demo.delay"; public static final String ROUTING_KEY_DEAD_NAME = "routingkey.demo.dead";
public static final String QUEUE_DELAY_NAME = "queue.demo.delay"; public static final String QUEUE_DEAD_NAME = "queue.demo.dead"; }
|
队列路由与交换器绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| @Configuration public class RabbitMQConfig {
@Bean("delayExchange") public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(EXCHANGE_DELAY_NAME, "x-delayed-message", true, false, args); }
@Bean("deadExchange") public DirectExchange deadExchange() { return new DirectExchange(EXCHANGE_DEAD_NAME); }
@Bean("delayQueue") public Queue delayQueue() { Map<String, Object> args = new HashMap<>(2); args.put("x-dead-letter-exchange", EXCHANGE_DEAD_NAME); args.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD_NAME); return QueueBuilder.durable(QUEUE_DELAY_NAME).withArguments(args).build(); }
@Bean("deadQueue") public Queue deadQueue() { return new Queue(QUEUE_DEAD_NAME); }
@Bean public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") CustomExchange exchange) { return BindingBuilder.bind(queue) .to(exchange) .with(ROUTING_KEY_DELAY_NAME).noargs(); }
@Bean public Binding deadBinding(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") DirectExchange exchange) { return BindingBuilder.bind(queue) .to(exchange) .with(ROUTING_KEY_DEAD_NAME); } }
|
消费者
死信消费者不变,此处增加延时队列消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Slf4j @Component public class DelayConsumer {
@RabbitListener(queues = QUEUE_DELAY_NAME) public void receiveDelay(Message message, Channel channel) throws IOException { log.info("收到延时消息: " + message);
boolean ack = true; Exception exception = null; try { } catch (Exception e) { ack = false; exception = e; } finally {
} if (!ack) { log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } else { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } }
|
生产者
生产者设置延时时间
1 2 3 4 5 6 7 8 9 10 11
| public void delayMsg(String msg, Integer delay){ log.info("发送延时消息:{} 延时{}秒", msg, delay); rabbitTemplate.convertAndSend(EXCHANGE_DELAY_NAME, ROUTING_KEY_DELAY_NAME, msg, message -> { message.getMessageProperties().setDelay(delay * 1000); return message; } ); }
|
测试代码
1 2 3 4
| @RequestMapping("delay") public void sendDelay(String msg, Integer delay){ sender.delayMsg(msg, delay); }
|
启动SpringBoot项目,然后浏览器输入http://localhost:8081/rabbitmq/delay?测试30&delay=30
、http://localhost:8081/rabbitmq/delay?测试20&delay=20
、http://localhost:8081/rabbitmq/delay?测试50&delay=50
和http://localhost:8081/rabbitmq/delay?测试10&delay=10
测试:
1 2 3 4 5 6 7 8
| 2020-09-16 14:09:35.903 INFO 2955 --- [nio-8081-exec-1] o.l.example.rabbit.producer.MsgProducer : 发送延时消息:测试30 延时30秒 2020-09-16 14:09:43.152 INFO 2955 --- [nio-8081-exec-2] o.l.example.rabbit.producer.MsgProducer : 发送延时消息:测试20 延时20秒 2020-09-16 14:09:48.832 INFO 2955 --- [nio-8081-exec-3] o.l.example.rabbit.producer.MsgProducer : 发送延时消息:测试50 延时50秒 2020-09-16 14:09:53.086 INFO 2955 --- [nio-8081-exec-4] o.l.example.rabbit.producer.MsgProducer : 发送延时消息:测试10 延时10秒 2020-09-16 14:10:03.095 INFO 2955 --- [ntContainer 2020-09-16 14:10:04.103 INFO 2955 --- [ntContainer 2020-09-16 14:10:09.114 INFO 2955 --- [ntContainer 2020-09-16 14:10:38.836 INFO 2955 --- [ntContainer
|
可以看到,借助于RabbitMQ延迟插件,延时队列按照消息粒度进行了消费!(此处应该有掌声👏)
至此,RabbitMQ 关于普通消息、死信队列和延迟队列相关内容已结束。
RabbitMQ相关的Demo代码已上传至Github,有需要的话可自行下载查阅。
地址:https://github.com/liangtongdev/demo-springboot-rabbitmq
后续:接下来将介绍利用RabbitMQ实现即时消息的消费,配合Redis实现延时发送与取消。
本文标题:RabbitMQ 延时队列
文章作者:梁通
发布时间:2020-09-16
最后更新:2020-10-23
原始链接:http://www.liangtong.site/2020/09/16/java_20200916_springboot_rabbitmq_delay/
版权声明:Copyright© 2016-2020 liangtong 版权所有