1.背景
本篇文章主要讲一下RocketMQ中延时消息的实现,主要是为了后面做时间轮改造做下铺垫,之前有一篇讲到了xxl的时间轮实现 时间轮系列-xxl的实现 感兴趣的可以看一下,本文重点在延迟消息的实现上,中间有一些关于消息存储的地方不重点展开,以后的文章中会陆续的更新。
2.流程
打开我的PPT开始画图:
整个的过程分为真假两个阶段:对应图中上半部分
假过程:当Broker发现消息的信息中设置了delayLevel属性并且值大于0,会进行一个假的过程,将当前的信息存储到一个特性的Topic中,修改queue为设置的delayLevl-1,并且把真正要发送的topic放到property的参数重。
真过程:通过自带的内部一个调度器实现类:ScheduleMessageService
把假过程中放到property的真正的topic以及queue进行消息的重新投递。
调度器的详细实现:对应图中的下半部分
MQ中对于每个延迟等级都创建了一个Task去具体的处理,也就是说默认全配置的话是有18个task,每个task处理放到每个队列中的消息。
有两个重要的Map: delayLevelTable , offsetTable,分别表示的是延迟等级对应的延迟时间以及offset。
延迟消息对应的存储文件是 delayOffset.json
图中已经把流程描述的很清楚了,大家可以看一下。
3.后续
下一节会写一下时间轮的算法以及实现,然后第四节会写一下根据时间轮算法改造下RocketMQ支持自定义延迟消息。
相关新闻
◎版权作品,未经中国文化报道网书面授权,严禁转载,违者将被追究法律责任。
Copyright 2015-2019. 中国文化报道网 www.cgia.cn All rights reserved.
违法和不良信息举报邮箱:jubao@cgia.cn
未经过本站允许,请勿将本站内容传播或复制