RocketMQ中提供了DefaultMQPushConsumer、DefaultMQPullConsumer,这里主要分析下DefaultMQPushConsumer实现中一些需要注意的地方。

消息获取

DefaultMQPushConsumer对外层暴露的接口像是推送模式,但实际阅读代码就会发现,其消息获取逻辑也还是由Consumer向Broker去进行拉取而获得的。

PullMessageService是用于拉取消息的独立线程,在Consumer启动时该线程会启动。PullMessageService实例位于MQClientInstance中,一个进程中的多个Consumer Group会共享同一个PullMessageService。

PullMessageService会一直发起拉取消息请求,如果顺利拉取到消息,将会将消息存放到ProcessQueue中等待消费。不过预先拉取的消息存在一定限制,

  • DefaultMQPushConsumer.pullThresholdForQueue,限制预取的消息条数,默认1000条
  • DefaultMQPushConsumer.pullThresholdSizeForQueue,限制预取的消息大小,默认100MB

预取缓存在本地是基于MessageQueue进行区分的,因此同一个Consumer订阅不同Topic之间不会产生互相影响。

均衡处理

RocketMQ Consumer实现中包含了对消息队列消费的负载均衡处理。RebalancePushImpl、RebalancePullImpl分别对应Push、Pull下的均衡处理。

RebalanceService是Consumer中的一个独立线程,按照固定等待间隔,持续触发RebalanceImpl的doRebalance接口,进行均衡处理。

具体均衡逻辑操作在ReplaceImpl.rebalanceByTopic中,均衡的处理也是基于MessageQueue。均衡的目的在于将Topic下的MessageQueue按照策略分配给不同的Consumer实例进行消费处理。

RocketMQ中提供的MessageQueue分配策略实现有,

  • AllocateMachineRoomNearby,根据机房进行分配
  • AllocateMessageQueueAveragely,均分哈希策略
  • AllocateMessageQueueAveragelyByCircle,环形哈希策略
  • AllocateMessageQueueByConfig,根据配置进行分配
  • AllocateMessageQueueByMachineRoom,机房哈希
  • AllocateMessageQueueConsistentHash,一致性哈希

默认的策略是AllocateMessageQueueAveragely。

从这个均衡处理可以看到一个MessageQueue只会有一个Consumer进行消费。因此在实际运行环境中,Topic下面MessageQueue的数量值得关注,MessageQueue数量少而Consumer多的话,很多Consumer会消费不到消息,造成性能损失。

OffsetStore

OffsetStore这个概念用于表达Consumer消费位点的存储。不同消费模式下游不同的OffsetStore实现,

  • LocalFileOffsetStore,存储在本地,对应BROADCASTING消费模式
  • RemoteBrokerOffsetStore,存储在Broker,对应CLUSTERING消费模式

CLUSTERING模式下一条消息只会发送给一个Consumer,因此记录在Broker。BROADCASTING则是发送给所有Consumer,因此记录在各Consumer端。BROADCASTING模式需要注意一些,因为本地增加了一个有状态的记录文件,如果文件丢失则会再重新消费。