RocketMQ 生产端与消费端
参考:
en_oc:
田守枝(rebalance):https://cloud.tencent.com/developer/article/1554950
官方文档:https://rocketmq.apache.org/zh/docs/
发送消息
RocketMQ中定义了如下三种消息通信的方式:
SYNC:同步发送,生产端会阻塞等待发送结果;- 应用场景:这种方式应用场景非常广泛,如重要业务事件通知。
ASYNC:异步发送,生产端调用发送API之后,立刻返回,在拿到Broker的响应结果后,触发对应的SendCallback回调;- 应用场景:一般用于链路耗时较长,对 RT 较为敏感的业务场景;
ONEWAY:单向发送,发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别;- 应用场景:适用于耗时非常短,对可靠性要求不高的场景,如日志收集。
Producer 队列选择
RocketMq中所有关于生产者和消费者的代码都在client包下。打开源码,可以看到Procuder下有个selector包
RocketMq提供了3种不同的选择队列方式:
- SelectMessageQueueByHash
- SelectMessageQueueByMachineRoom
- SelectMessageQueueByRandom
它们都实现了 MessageQueueSelector 接口,可以自己实现这个接口,定义自己的队列选择方式

那么默认机制是哪一种呢?
默认选择方式是轮询
消费端负载均衡策略
在 Rocket MQ 5.0 之前,仅支持队列粒度的负载均衡
-
消息粒度负载均衡:5.0 及以后 PushConsumer和SimpleConsumer默认且仅使用消息粒度负载均衡
-
队列粒度负载均衡:5.0 及以后的 PullConsumer默认负载策略。对于历史版本(服务端4.x/3.x版本)的消费者,包括PullConsumer、DefaultPushConsumer、DefaultPullConsumer、LitePullConsumer等,默认且仅能使用队列粒度负载均衡策略。
队列粒度负载均衡
同一消费者分组内的多个消费者将按照队列粒度消费消息,即每个队列只能被一个消费者消费(每个消费者可消费多个队列)。

如上图所示,主题中的三个队列Queue1、Queue2、Queue3被分配给消费者分组中的两个消费者,每个队列只能分配给一个消费者消费,该示例中由于队列数大于消费者数,因此,消费者A2被分配了两个队列。若队列数小于消费者数量,可能会出现部分消费者无绑定队列的情况。
队列粒度的负载均衡,基于队列数量、消费者数量等运行数据进行统一的算法分配,将每个队列绑定到特定的消费者,然后每个消费者按照取消息>提交消费位点>持久化消费位点的消费语义处理消息,取消息过程不提交消费状态,因此,为了避免消息被多个消费者重复消费,每个队列仅支持被一个消费者消费。
队列粒度负载均衡策略保证同一个队列仅被一个消费者处理,该策略的实现依赖消费者和服务端的信息协商机制(Rebal),Apache RocketMQ 并不能保证协商结果完全强一致。因此,在消费者数量、队列数量发生变化时,可能会出现短暂的队列分配结果不一致,从而导致少量消息被重复处理。
策略特点
相对于消息粒度负载均衡策略,队列粒度负载均衡策略分配粒度较大,不够灵活。但该策略在流式处理场景下有天然优势,能够保证同一队列的消息被相同的消费者处理,对于批量处理、聚合处理更友好。
适用场景
队列粒度负载均衡策略适用于流式计算、数据聚合等需要明确对消息进行聚合、批处理的场景。
使用示例
队列粒度负载均衡策略不需要额外设置,对于历史版本(服务端4.x/3.x版本)的消费者类型PullConsumer默认启用。
Rebalance (消费端队列分配)
Rebalance(再均衡)机制指的是:将一个Topic下的多个队列(或称之为分区),在同一个消费者组(consumer group)下的多个消费者实例(consumer instance)之间进行重新分配。
消费消息
Consumer主要提了下面三种消费策略
- CONSUME_FROM_LAST_OFFSET
这是Consumer默认的消费策略,它分为两种情况,如果Broker的磁盘消息未过期且未被删除,则从最小偏移量开始消费。如果磁盘已过期,并被删除,则从最大偏移量开始消费。
- CONSUME_FROM_FIRST_OFFSET
从最早可用的消息开始消费
- CONSUME_FROM_TIMESTAMP
从指定的时间戳开始消费,这意味着在consumeTimestamp之前生成的消息将被忽略
DefaultLitePullConsumer(拉)
Assign