600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > 源码剖析RocketMQ延时消息原理

源码剖析RocketMQ延时消息原理

时间:2022-06-13 13:11:37

相关推荐

源码剖析RocketMQ延时消息原理

一、前言

RocketMQ版本4.8.0,本文中相关源码注释见GitHub中:RocketMQ:release-4.8.0。上一篇文章我们分析了RocketMQ的的消费超时/失败重试机制,最终会发送一个延时消息到Broker,本篇接着分析RockeTMQ延时消息的实现机制;

1、消息延时级别

消息的延时级别level一共有18级,分别为:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

level有以下三种情况:

level == 0,消息为非延迟消息1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1slevel > maxLevel,则level== maxLevel,例如level==20,延迟2h

2、定时消息(延迟消息)简介

定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic;

定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。注意:定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;

3、定时消息的使用

public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("producer_saint");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();Message msg = new Message("consumer-timeout", "msg-body-001".getBytes(StandardCharsets.UTF_8));msg.setTags("msg-body-001");// 设置消息延时级别msg.setDelayTimeLevel(3);List<Message> list = new ArrayList<>();list.add(msg);SendResult send = producer.send(list);System.out.println("sendResult: " + send);}}

二、源码分析

1、整体实现流程

整体流程如下图,executeOnTimeup()方法部分代码太多,流程图中采用文字说明;

2、入口

ScheduleMessageService#start()

在Broker启动时会间接执行ScheduleMessageService#start(),执行启动延时消息服务操作;下面我们从Broker的核心类BrokerController中开始看起;

1> BrokerStartup#start()

这里是Broker启动的核心;关于Broker的启动流程,请参考这篇文章:RocketMQ:深度剖析Broker启动流程原理、源码

紧接着进入到BrokerController#start();在这里会启动消息持久化服务MessageStore

2> DefaultMessageStore#start()

MessageStore是一个接口,下面会进入到它的实现类DefaultMessageStore中;在DefaultMessageStore#start()方法中会判断Broker是否开启了DLegerCommitLog,如果没有并且Broker的角色不是Slave,才会开启延时消息服务

ScheduleMessageService#start()是开启延时消息服务的核心,下面我们接着看;

3、核心逻辑

ScheduleMessageService#start()中主要做两件事:

为每个延时级别都分别开启一个定时任务,每秒执行一次发送延迟消息到真实Topic的操作;延时10s为每个延时级别都分别开启一个定时任务,每10s做一次延时队列中消息偏移量的持久化;

public void start() {if (pareAndSet(false, true)) {this.timer = new Timer("ScheduleMessageTimerThread", true);// delayLevelTable中存放着每个延时级别和其对应的消息offsetfor (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}if (timeDelay != null) {// 1、每秒从"SCHEDULE_TOPIC_XXXX" topic中取数据this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}// 2、延时10s启动,并且每10s把每一个延迟队列的最大消息偏移量写入到磁盘中this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {if (started.get()) ScheduleMessageService.this.persist();} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());}}

1> ScheduleMessageService.this.persist()

我们先看ScheduleMessageService.this.persist(),点进去会进去到ConfigManager#persist()方法中;这里的操作就单纯的持久化delayQueue的offset到delayOffset.json文件中;

public synchronized void persist() {// 读取offsetTable缓存的延迟队列的值String jsonString = this.encode(true);if (jsonString != null) {// 读取delayOffset.json的文件地址String fileName = this.configFilePath();try {// 持久化到delayOffset.json文件中MixAll.string2File(jsonString, fileName);} catch (IOException e) {log.error("persist file " + fileName + " exception", e);}}}

2> DeliverDelayedMessageTimerTask

DeliverDelayedMessageTimerTaskTimerTask的子类,表示一个线程任务;其主要作用是扫描延迟消息队列(SCHEDULE_TOPIC_XXXX)的消息,将该延迟消息转换为真实topic的消息。

这里真实消息的topic有几个特殊之处:

对于(并发消费模式下)消费超时重试的消息而言,真实的topic是%RETRY%+consumerGroup;

1)DeliverDelayedMessageTimerTask#run()

我们接着看看这个线程任务内部,执行了executeOnTimeup()方法;这是判断延时消息是否到应该转发到真实topic的核心逻辑。

2)executeOnTimeup()

由于代码篇幅过长,这里我们从五个问题着手分析;

根据延时级别获取在SCHEDULE_TOPIC_XXXX主题下queueId为delayLevel - 1的延时队列是否存在? 延时队列不存在怎么处理?延时消息是否到期?到期后怎么处理?延时消息到期后写入CommitLog失败怎么处理?延时消息没到期怎么处理?延时队列中的消息处理完怎么处理?

先看第一问题:

(1)根据延时级别获取在SCHEDULE_TOPIC_XXXX主题下queueId为delayLevel - 1的延时队列是否存在? 延时队列不存在怎么办?

判断当前延时级别对应在SCHEDULE_TOPIC_XXXX主题下的queue是否存在,如果存在进入第二个问题,否则隔0.1s再次开启当前TimerTask

再来看第二个问题:

(2)延时消息是否到期?到期后怎么处理?

在第一问题成立之后,我们已经获取到延时级别对应的延时队列,接下来首先要根据offset从ConsumeQueue中获取到延时消息的部分信息(offset、size、到期时间);接着再判断消息是否到期,并计算出下一个延时消息在延时队列中的offset

如果消息到期,再从commitlog中根据ofsetPy取出完整的消息,解析出消息的真实Topic和Queue,并清除消息的延时属性,然后将消息写入到CommitLog中;

对于如何判断消息过期,我们再跟一下correctDeliverTimestamp()方法;

从消息tagsCode属性中解析出消息应当被投递的时间,然后与当前时间做比较,判断是否应该进行投递(消息是否到期);

再来看第三个问题:

(3)延时消息到期后写入CommitLog失败怎么办?

如果写入CommitLog失败,则延时10s重新开启当前TimerTask,持久化delayOffset;

再来看第四个问题:

(4)延时消息没到期怎么处理?

如果消息未到期,则延时countdown(countdown为延时队列中第一个消息的剩余到期时间),开启一个TimerTask,并持久化delayOffset;

最后再看第五个问题:

(5)延时队列中的消息处理完怎么办?

遍历完相应延时级别的延时队列后,更新下一次开始读取延迟队列的offset,然后延时0.1s开启当前TimerTask,并持久化delayOffset;最后退出当前方法

executeOnTimeup()方法完成代码如下:

public void executeOnTimeup() {// 根据延迟级别找到topic为SCHEDULE_TOPIC_XXXX的队列(队列的ID 为延时级别 - 1)ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));long failScheduleOffset = offset;// 找到延时级别对应的队列if (cq != null) {SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if (bufferCQ != null) {try {long nextOffset = offset;int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {// 消息的commitLog物理偏移量long offsetPy = bufferCQ.getByteBuffer().getLong();// 消息大小int sizePy = bufferCQ.getByteBuffer().getInt();// 延迟结束时间,在消息写入到CommitLog之后会分发到consumeQueue;// 对于延迟消息而言,tagsCode存储的是消息的延迟到期时间long tagsCode = bufferCQ.getByteBuffer().getLong();if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}long now = System.currentTimeMillis();// 计算是否到消息投递时间long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);// 定时任务下一次开始读取延迟队列的offsetnextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);// 剩余的延时时间long countdown = deliverTimestamp - now;if (countdown <= 0) {// 根据CommitLog物理偏移量找到msgMessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt != null) {try {// 解析消息体,取出真实的topic和queue(多为%RETRY% + consumerGroup)MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}// 将消息写入到commitLog中PutMessageResult putMessageResult =ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);// 写入成功,跳过该次循环判断下一条延迟消息是否达到到期时间if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {log.info("send msg to real topic: {} from schedule topic: {}", msgInner.getTopic() ,msgExt.getTopic());continue;} else {// XXX: warn and notify melog.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());// 写入消息失败,则延时10s重新执行TimerTaskScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,nextOffset), DELAY_FOR_A_PERIOD);ScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;}} catch (Exception e) {/** XXX: warn and notify me*/log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="+ offsetPy + ",sizePy=" + sizePy, e);}}} else {// 这里说明,延时队列中最小到期的那条消息都还没到延迟时间// 重新提交一个TimerTask,延迟执行时间为延时队列中第一个消息剩余的延时时间ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);// 更新延时队列已消费的消息偏移量ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;}} // end of for// 定时任务下一次开始读取延迟队列的offsetnextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);// 开始一个延时100ms执行的定时任务 消费延时队列ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);// 将下一次读取延迟队列的offset存放到一个缓存map中ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;} finally {bufferCQ.release();}} // end of if (bufferCQ != null)else {long cqMinOffset = cq.getMinOffsetInQueue();if (offset < cqMinOffset) {failScheduleOffset = cqMinOffset;log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="+ cqMinOffset + ", queueId=" + cq.getQueueId());}}} // end of if (cq != null)ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);}

源码分析完了,我们来总结一下实现原理。

三、实现原理

消息的发送:

producer设置消息的delayLevel延迟级别,并在消息属性DELAY中存储对应的延时级别;broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始的topic和queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC_XXXX),queueId改为(延时级别-1)

消息的处理:

MQ服务端(Broker)的ScheduleMessageService中,为每一个延迟级别分别开启一个定时器,定时(每隔1秒)从延迟级别对应的的ConsumeQueue消费队列中拉取消息;然后根据消费偏移量offset从commitLog中解析出对应的消息;从消息tagsCode属性中解析出消息应当被投递的时间,然后与当前时间做比较,判断是否应该进行投递(消息是否到期);若到达了投递时间(消息到期),则构建一个新的消息,从源消息属性中解析出出真实的topic和queueId,并清除消息的延迟属性;将其写入到CommitLog中

四、对延时消息机制的思考

优点:

设计简单,把所有相同延迟时间的消息都先放到一个队列中,做定时扫描,可以保证消息消费的有序性;延时队列中的消息时按消息到期时间进行递增排序,也就是说队列中消息越靠前的到期时间越早;

缺点:

延时消息机制所有的定时任务都在一个定时器中,定时器采用的java.util.Timer,而Timer是单线程运行的;如果延迟消息的数量很大大的话,可能单线程处理不过来,也就会造成消息到期后没有及时发送出去的现象,甚至会造成消息拥堵;

可能的改进点:

为每个延迟队列上分别采用一个Timer;或者说仅使用Timer开始定时任务做扫描,而消息处理的核心逻辑使用线程池处理,进而提高消息处理的效率;

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。