600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > rocketmq原理_消息中间件漫谈:RocketMQ延时消息应用及原理剖析

rocketmq原理_消息中间件漫谈:RocketMQ延时消息应用及原理剖析

时间:2021-10-28 09:35:35

相关推荐

rocketmq原理_消息中间件漫谈:RocketMQ延时消息应用及原理剖析

业务背景

延时任务是非常普遍的业务场景之一,即系统某一动作触发后,经过一定时间的延时后再触发其他一个或多个动作。以订单系统为例:

下单后10分钟未支付发送支付提醒下单30分钟内未支付订单自动取消

业界对延时任务的实现有不通过的解决方案,例如基于定时任务扫库/Redis ZSet/Rabbit MQ死信队列等等,本文对分布式延时任务的解决方案不做展开探讨,而是聚焦于基于RocketMQ的处理方案。

RocketMQ支持的延时消息机制允许生产者发送的消息在一定时间周期之后才能被消费者消费,基于该特性可以便捷的实现延时任务功能。

// 发送延时消息示例代码public void sendSheduleMessage() { DefaultMQProducer producer = new DefaultMQProducer("Group"); producer.start(); Message message = new Message("topicname","content".getBytes()); // 设置延时级别为3,则消息在10秒之后投递给消费端 message.setDelayTimeLevel(3); producer.send(message);}

延时消息的基本原理

RocketMQ延时消息实现的基本原理是:

如何保证延时消息发送到Broker后不会被立即消费?

Producer发送消息到达RockerMQ Broker,Broker根据延时级别判定是否为延时消息,如果是,则将该消息的原始目标Topic和目标队列备份到消息属性中,并将其替换为与该消息延时级别相对应的Topic和消费队列,然后执行消息存储的其他逻辑。由于消息投递的Topic和消费队列发生变化,所以该消息不会被客户端消费到。

如何保证达到延时周期后消费者能及时消费到消息?

RocketMQ内部定义了用于实现延时消息的统一Topic,并为每个延时级别定义各一个消费队列。同时,为每个延时级别初始化一个定时任务,通过定时任务扫描出满足投递条件的消息,然后将这些消息的目标Topic和消费队列变更为原来的Topic和消费队列并投递,此时,客户端就可以消费到延时消息了。

源码剖析

从延时消息的基本原理可以看出,实现延时消息涉及的几个关键因素:

延时级别定义及初始化延时消息的Topic及消费队列转移延时消息的定时调度

延时级别定义及初始化

延时消息级别定义

// 类:org.apache.rocketmq.store.config.MessageStoreConfigprivate String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

内置延时级别的定义与加载

// org.apache.rocketmq.store.schedule.ScheduleMessageService// 延时级别和延时时间的映射关系:delayLevelTableprivate final ConcurrentMap delayLevelTable = new ConcurrentHashMap(32);public boolean parseDelayLevel() { // 省略...... String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); // 按照空格分隔延时级别字符串 String[] levelArray = levelString.split(" "); // 存入映射表 this.delayLevelTable.put(level, delayTimeMillis);}

Broker接收延时消息的处理逻辑:转存Topic/Queue

// CommitLog.putMessage()// Delay Deliveryif (msg.getDelayTimeLevel() > 0) { // 如果发送的消息延时级别大于RocketMQ定义的最大值,则直接设置为最大值 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } // 获取内置的延时消息固定Topic:常量名SCHEDULE_TOPIC topic = ScheduleMessageService.SCHEDULE_TOPIC; // 根据延时级别获取消费队列ID:leve - 1 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // 备份消息的原Topic和原队列ID MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); // 更改消息Topic为内置的延时消息Topic msg.setTopic(topic); // 更改消息队列为内置的延时消息队列 msg.setQueueId(queueId);}

根据延时级别和存储时间计算消息投递时间

// 根据延时级别和存储时间计算消息投递时间public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) { Long time = this.delayLevelTable.get(delayLevel); if (time != null) {return time + storeTimestamp; } return storeTimestamp + 1000;}

延时消息到期的自动投递:定时任务调度

延时队列定时任务初始化:

public void start() { // 省略...... for (Map.Entry entry : this.delayLevelTable.entrySet()) { if (timeDelay != null) {// 为每个延时级别创建定时任务this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } // ...... // 为每个延时级别创建用于持久化的定时任务,默认10S持久化 this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() { if (started.get()) ScheduleMessageService.this.persist();} }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }}

扫描是否满足投递条件

定时任务的主要处理类是DeliverDelayedMessageTimerTask

// DeliverDelayedMessageTimerTask// 定时任务的入口@Overridepublic void run() { if (isStarted()) { this.executeOnTimeup(); }}// 判定延时消息是否到期的主要逻辑public void executeOnTimeup() { // 获取消费延时级别对应的消费队列 ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); // ...... long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now; if (countdown <= 0) { MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner); }// ......

最后

延时消息是解决延时任务的方式之一,并不是所有的消息中间件都支持延时消息。RocketMQ的延时消息实现基于消息主题转换+定时调度实现,巧妙的利用了其自身的消息模型。另外,RocketMQ的延时级别是有限制的,默认支持18种,从1S到2H,实际项目使用时可能需要配合其他设计来完成具体的业务场景。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。