Consumer也是RocketMQ中的很重要一环,RocketMQ提供了Consumer相关接口,Consumer在上层系统调用。RocketMQ的一些功能特性需要Consumer配合才能真正有效,因此Consumer的使用方法是很有必要去了解的。
消费模式
在使用层面上,RocketMQ提供了Push/Pull两种方式让Consumer消费数据。
DefaultMQPushConsumer
先来看下Push方式下的Consumer使用,RocketMQ提供了DefaultPushConsumer,实现,
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SampleGroup");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("SampleTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
在Push模式下,调用方注册MessageListener,消息的消费逻辑在MessageListner的接口中被异步调用。
DefaultMQPullConsumer
RocketMQ中的DefaultMQPullConsumer是默认的Pull方式实现,
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("SampleGroup");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("SampleToic");
for (MessageQueue mq : mqs) {
while (true) {
try {
long offset = consumer.fetchConsumeOffset(mq, false);
PullResult pullResult = consumer.pull(mq, "*", offset, MAX_PULL_NUM);
if (pullResult.getPullStatus() != PullStatus.FOUND) {
break;
}
for (MessageExt msg : pullResult.getMsgFoundList()) {
System.out.println("Msg pulled");
}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
consumer.getDefaultMQPullConsumerImpl().updateConsumeOffsetToBroker(mq, pullResult.getNextBeginOffset(), true);
} catch (Exception e) {
logger.error("handlePull", e);
break;
}
}
}
单纯从上面的使用方式来看,Pull方式用起来略微繁琐一些。两者的差异有,
- Push方式下消息消费逻辑时被异步调用的,Pull方式下是在调用方线程直接调用
- Push方式消息消费有专门的线程池进行处理,处理效率更高
- Push方式下消费进度信息自动进行了记录,Pull方式下需要手动调用相关接口更新消费偏移
- Push方式不能细粒度控制,Pull方式可以自主选择拉取消息的时机以及处理方式
- Push方式下消息消费失败重试走默认流程,Pull方式可以根据需求主动控制是重新拉取、等待还是放弃
各项参数
消费模式
当存在多个Consumer时,消息消费还可以通过设置消费模式,RocketMQ默认为CLUSTERING模式,
// Consumer
consumer.setMessageModel(MessageModel.BROADCASTING);
定义 | 描述 |
---|---|
MessageModel.BROADCASTING | 广播模式, 一个ConsumerGroup的每一个Consumer都会收到消息 |
MessageModel.CLUSTERING | 集群模式, 一个ConsumerGroup下只有一个订阅者会收到消息 |
消费起点
上面样例代码中需要注意setConsumeFromWhere这个调用,这个方法设置了Consumer从个消息开始消费,ConsumeFramWhere定义了支持的几种方式,
public enum ConsumeFromWhere {
CONSUME_FROM_LAST_OFFSET,
CONSUME_FROM_FIRST_OFFSET,
CONSUME_FROM_TIMESTAMP,
}
定义 | 描述 |
---|---|
CONSUME_FROM_LAST_OFFSET | 只消费Consumer启动之后新来的消息 |
CONSUME_FROM_FIRST_OFFSET | 消费当前已经存在的所有消息 |
CONSUME_FROM_TIMESTAMP | 消费某个时间之后的消息 |
顺序消费/并行消费
Consumer在消费消息的时候需要通过registerMessageListener设置消息处理函数,存在两种方式,
定义 | 描述 |
---|---|
MessageListenerOrderly | 顺序消费消息 |
MessageListenerConcurrently | 并行消费消息 |
非顺序消息的话都应该使用MessageListenerConcurrently。顺序消息只能使用MessageListenerOrderly,如果顺序消息下用错了MessageListener,实际消费到的消息是没有顺序可言的。
消息过滤
Consumer通过subscribe方法订阅消息topic,subscribe订阅时可以进行过滤,存在几种过滤定义方式,
根据表达式过滤tag,表达式支持||
操作,例如tag1 || tag2 || tag3
,以及*
,
public class DefaultMQPushConsumer {
public void subscribe(String topic, String subExpression) throws MQClientException {...}
}
提供MessageFilter的实现类进行过滤,
public class DefaultMQPushConsumer {
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {...}
}
通过MessageSelector进行过滤,MessageSelector支持两种表达式,一是SQL92,另外则是tag过滤,
public class DefaultMQPushConsumer {
public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {...}
}
如无特别复杂需求的话,使用Tag过滤最好,ConsumeQueue的条目中记录了Tag的Hash值,可以更高效的进行过滤。