画廊网站模板,杭州网站建设科技有限公司,php外贸网站源码,杭州网站建设培训深入浅出消息队列----【延迟消息的实现原理】 粗说 RocketMQ 的设计细说 RocketMQ 的设计这样实现是否有什么问题#xff1f; 本文仅是文章笔记#xff0c;整理了原文章中重要的知识点、记录了个人的看法 文章来源#xff1a;编程导航-鱼皮【yes哥深入浅出消息队列专栏】 粗… 深入浅出消息队列----【延迟消息的实现原理】 粗说 RocketMQ 的设计细说 RocketMQ 的设计这样实现是否有什么问题 本文仅是文章笔记整理了原文章中重要的知识点、记录了个人的看法 文章来源编程导航-鱼皮【yes哥深入浅出消息队列专栏】 粗说 RocketMQ 的设计
RocketMQ 约定了一些延迟时间即生产者无法灵活的自定义延迟时间而是固定的几个延迟时间来供生产者选择。 这样延迟消息就有了统一归类和约束便于管理和调配。
虽说归类了延迟消息但是同一个延迟 level 的延迟消息共用一个闹钟也是无法满足需求的。
所以变成专门雇一个“人”每个“人”管一个 level 的延迟消息定时查看是否有到期的消息如果到了立马让消息给消费者消费。
至于复用 commitlog 这一套的问题专门搞个存放延迟消息的 Topic延迟消息先发往这个 Topic消费者并不会订阅这个 Topic因此此时消费者无法消费到这个消息。
等到延迟消息到达时间后Broker 将这个延迟消息发往原 Topic此时消费者就能从原 Topic 消费到这条消息
也就是说 Broker 自己建立一个专门 Topic 用来存放延迟消息此时延迟消息的存储能复用 commitlog 这一套模型消息也会被分发到 consumerQueue。
不同的延迟 level 的消息回存放到这个 Topic 不同的队列中也就是说这个 Topic 一个有 18 个队列对应 18 个 level。 然后会有一个定时线程去每个队列按序检查消息是否都到时间了如果到了就发到消息原先的 Topic 中。 细说 RocketMQ 的设计
延迟消息的发送很简单仅需设置一个 delayTimeLevel 即可
Message message new Message(TestTopic,(Hello scheduled message i).getBytes());
message.setDelayTimeLevel(3);
producer.send(message);Broker 收到这个消息后一看 delayTimeLevel 设置了值那么就知道它是一个延迟消息于是乎直接来个偷梁换柱
把消息的原 Topic 和对应队列 ID 保存在消息扩展属性里面。
然后把这条消息的 Topic 设置成 SCHEDULE_TOPIC_XXXX没错 Topic 的名字就是 SCHEDULE_TOPIC_XXXX哈后面就是 XXXX
并且根据消息的 Level 选择 SCHEDULE_TOPIC_XXXX 下对应的队列。 这样一来延迟消息就存储好了。
然后 Broker 起了一个定时线程池里面一共有 18 个核心线程这个线程池的任务就是定时调度 SCHEDULE_TOPIC_XXXX 下的每个队列的消息一旦有到期的消息就分发到原 Topic 供消费者消费。
具体的做法是在初始时每个队列都会对应被创建一个任务扔到线程池中这些任务的内容就是根据传入的队列 ID得到对应的 consumeQueue当然还有对应的 offset。 Broker 会定时保存 SCHEDULE_TOPIC_XXXX 下 consumeQueue 的消费 offset。
得到 consumeQueue 和 offset对应的就能获取延时消息这时候将延迟时间跟当前时间对比就能判断是否到期。
如果到期了就从消息扩展属性里面获取原 Topic 和对应队列 ID然后投递到原队列中。
上面的图表就是这个意思这里再贴一下 然后再代码上的实现是立马新建一个任务扔到线程池中延迟时间是 1000ms任务的入参会塞入更新后的 offset这样线程就会继续消费后面的消息如此往复循环。
当然如果拿到的对应延迟消息还未到时间那么 offset 不变也立马新建一个任务塞入到线程池中这样 1000s 后又会来看这个消息是否到期。
可以看到整个延迟消息设计就加了一个线程池很巧妙地复用了正常消息的 commitlog 和 consumeQueue 的存储机制且利用发布订阅的特性改变了消息的 Topic 来使得消费者无法消费到未到时间的消息。
到时间了又投递回原 Topic 使得消费者可以消费到到期的消息非常 nice
这样实现是否有什么问题
从实现层面来看大大减少了延迟消息开发的复杂度但是这样的实现对延迟时间来说是不准的。
首先同一个延迟 level 的消息都是入同一个队列然后上一个延迟消息处理完之后继续处理下一个如果同一时刻有大量的同一个 level 的延迟消息产生那么它们都堆积在一个队列里面一个一个处理这样一来即使后面的消息到时间了也得排队等着。
这样的机制就做不到非常实时。
并且从 SHEDULE_TOPIC_XXXX 分发至原 Topic 之后假设原 Topic 本身就已经有很多消息堆积了那么等消费者消费到这条消息的时候时间也有大大的延迟。 当然本身在大流量下对时间的把控是无法做到很准确的不论是什么方法都会有延迟无非是延迟精度多少的问题。
有一种比较好的定时结构就是时间轮了。