600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > MQ延迟队列实现延迟消息

MQ延迟队列实现延迟消息

时间:2020-11-08 23:52:19

相关推荐

MQ延迟队列实现延迟消息

在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像这类在某事件触发后一段时间内执行的需求任务我们称之为延时任务。

那么如何实现延迟任务呢?

第一反应是利用cron方案来实现:

启动一个cron定时任务,每隔一段时间执行一次,比如30分钟,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。如果数据量比较大,需要分页查询,分页update,这将是一个for循环更新操作。

cron方案是很常见的一种方案,但是常见的不一定是最好的,主要有以下几个问题:

当数据量大的时候轮询效率低;时效性不够好,如果每小时轮询一次,最差的情况时间误差会达到1小时;如果通过增加cron轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题;

既然cron方案不是很理想,那就请出我们今天的主角,使用RocketMQ的延时消息解决。在创建订单的时候发送一条延时消息到RocketMQ,30分钟后消费者消费消息去检查订单的状态,如果发现订单未支付则取消订单释放库存。

实现

RocketMQ延迟队列的核心思路是:所有的延迟消息由producer发出之后,都会存放到同一个topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。

注意:RocketMQ不支持任意时间的延时,只支持以下几个固定的延时等级

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

下面我们结合SprintBoot利用RocketMQ发送延时消息

引入RocketMQ组件

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId></dependency>

增加RocketMQ的配置

rocketmq:

name-server:172.31.0.44:9876

producer:

group:delay-group

编写生产者

@Component@Slf4jpublicclassDelayProduce{@AutowiredprivateRocketMQTemplaterocketMQTemplatet;publicvoidsendDelayMessage(Stringtopic,Stringmessage,intdelayLevel{SendResultsendResult=rocketMQTemplatet.syncSend(topic,MessageBuilder.withPayload(message).build(),2000,delayLevel);log.info("sendtimeis{}",DateTimeFormatter.ofPattern("yyyy年MM月dd 日HH:mm:ss").format(LocalDateTime.now()));log.info("sendResultis{}",sendResult);}}

编写消费者

@Slf4j@Component@RocketMQMessageListener(topic="delay-topic",consumerGroup="delay-group")publicclassDelayConsumerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Stringmessage){log.info("receivedmessagetimeis{}",DateTimeFormatter.ofPattern("yyyy年MM月dd日HH:mm:ss").format(LocalDateTime.now()));log.info("receivedmessageis{}",message);}}

测试

@RunWith(SpringRunner.class)@SpringBootTestpublicclassDelayProduceTest{@AutowiredprivateDelayProducedelayProduce;@TestpublicvoidsendDelayMessage(){delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",5);}}

这里delayLevel设置成5,对应RocketMQ的延时等级就是1分钟后投递消息。

运行结果

发送时间

消费时间

修改延时级别

RocketMQ的延迟等级可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持1天的延迟,修改最后一个level的值为1d,这个时候依然是18个level;也可以增加一个1d,这个时候总共就有19个level。

打开RocketMQ的配置文件,修改messageDelayLevel属性

brokerClusterName = DefaultClusterbrokerName = broker-abrokerId = 0deleteWhen = 04fileReservedTime = 48brokerRole = ASYNC_MASTERflushDiskType = ASYNC_FLUSHstorePathRootDir = /app/rocketmq/datamessageDelayLevel=90s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

这次将延时等级1修改成了90s,生产者发送消息后需要90s后再进行消息投递。修改完成后重启RocketMQ。

nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &

使用延时等级1发送消息

publicvoidsendDelayMessage(){delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",1);}

测试

发送时间

消费时间

通过比对发送时间与消费时间证明延时等级修改生效。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。