首页 > 新闻频道 > 滚动  
使用RabbitMQ的死信队列实现延迟消息
2021-12-13 21:37:33  来源:中国文化报道网  作者:  分享:

全栈进阶那些事 一只爱技术的程序猿,想把分享变成一种习惯! 20篇原创内容 公众号

使用场景

  • 订单下单30分钟后,如果用户没有付款,则系统自动取消订单。

  • 会议开始前10分钟,推送消息提醒用户。

  • 自定义某个操作的执行时间,如果设置文章在明早9点发布。

除了上一篇《使用RabbitMQ插件实现延迟队列》外,使用死信队列也是一种方案.

  • Time To Live:可以在发送消息时设置过期时间,也可以设置整个队列的过期时间,如果两个同时设置已最早过期时间为准。

  • Dead Letter Exchanges:可以通过绑定队列的死信交换器来实现死信队列。

x-dead-letter-exchange:绑定死信交换器(其实也是普通交换器,与类型无关)
x-dead-letter-routing-key:绑定死信队列的路由键(可选)
x-message-ttl:绑定队列消息的过期时间(可选)

 死信队列设计思路:

生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者

进入消息队列:
1. 消息被拒绝,并且requeue= false
2. 消息ttl过期
3. 队列达到最大的长度

做延迟队列需要创建一个没有消费者的队列,用来存储消息。然后创建一个真正的消费队列,用来做具体的业务逻辑。当带有TTL的消息到达绑定死信交换器的队列,因为没有消费者所以会一直等到消息过期,然后消息被投递到死信队列也就是真正的消费队列。

具体代码:

package com.lyt.rabbitmq.config;
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;
import java.util.HashMap;import java.util.Map;
/** * @Description 利用死信队列和过期时间模拟延迟队列,没有消费者,所以不能用注解形式 * Time To Live(TTL) * 1. 可以在发送消息时设置过期时间(message.getMessageProperties().setExpiration("5000");) * 2. 也可以设置整个队列的过期时间(args.put("x-message-ttl",10000);) * 3. 如果两个同时设置已最早过期时间为准 * Dead Letter Exchanges(DLX) * @Date 2019-03-10 10:25:30 */@Componentpublic class MQDelayConfig {
/** * @Description 定义支付交换器 * @Author lyt * @Date 2021-04-02 14:39:31 */ @Bean private DirectExchange directPayExchange() { return new DirectExchange("direct.pay.exchange"); }
/** * @Description 定义支付队列 绑定死信队列(其实是绑定的交换器,然后通过交换器路由键绑定队列) 设置过期时间 * @Author lyt * @Date 2021-04-02 14:40:24 */ @Bean private Queue directPayQueue() { Map<String, Object> args = new HashMap<>(3); //声明死信交换器 args.put("x-dead-letter-exchange", "direct.delay.exchange"); //声明死信路由键 args.put("x-dead-letter-routing-key", "DelayKey"); //声明队列消息过期时间 args.put("x-message-ttl", 10000); return new Queue("direct.pay.queue", true, false, false, args); }
/** * @Description 定义支付绑定 * @Author lyt * @Date 2021-04-02 14:46:10 */ @Bean private Binding bindingOrderDirect() { return BindingBuilder.bind(directPayQueue()).to(directPayExchange()).with("OrderPay"); }}

带有过期时间且绑定死信交换器的队列

生产者,为消息设置过期时间setExpiration("15000");

/** * @Description 支付队列、绑定死信队列,测试消息延迟功能 * @Author lyt * @Date 2021-04-02 14:07:25 */@RequestMapping(value = "/directDelayMQ", method = {RequestMethod.GET})public List<User> directDelayMQ() {    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");    List<User> users = userService.getUserList(null);    for (User user : users) {        CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));        rabbitTemplate.convertAndSend("direct.pay.exchange", "OrderPay", user,                message -> {                    // 设置5秒过期                    message.getMessageProperties().setExpiration("15000");                    return message;                },                correlationData);        System.out.println(user.getName() + ":" + sdf.format(new Date()));    }    return users;}

消费者,声明真正消费的队列、交换器、绑定

/** * @Description 延迟队列 * @Author lyt * @Date 2021-04-04 16:34:28 */@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "direct.delay.queue"), exchange = @Exchange(value = "direct.delay.exchange"), key = {"DelayKey"})})public void getDLMessage(User user, Channel channel, Message message) throws InterruptedException, IOException {    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");    // 模拟执行任务    System.out.println("这是延迟队列消费:" + user.getName() + ":" + sdf.format(new Date()));    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}

思考: 如果先放入一条A消息过期时间是10秒,再放入一个b消息过期时间是5秒,那延迟队列是否可以先消费b消息?

答案是否定的,因为队列就会遵循先进先出的规则,b消息会等a消息过期后,一起消费,这就是所谓的队列阻塞。由这个问题可以用我们之前介绍的插件方式解决。



扫描二维码

关注我们

了解更多全栈技能




 

相关新闻

    无相关信息

◎版权作品,未经中国文化报道网书面授权,严禁转载,违者将被追究法律责任。




关于我们 - 媒体合作 - 广告服务 - 版权声明 - 联系我们 - 友情链接 - 网站地图

Copyright 2015-2019. 中国文化报道网 www.cgia.cn All rights reserved.

违法和不良信息举报邮箱:jubao@cgia.cn

未经过本站允许,请勿将本站内容传播或复制