Consumer也是RocketMQ中的很重要一环,RocketMQ提供了Consumer相关接口,Consumer在上层系统调用。RocketMQ的一些功能特性需要Consumer配合才能真正有效,因此Consumer的使用方法是很有必要去了解的。

消费模式

在使用层面上,RocketMQ提供了Push/Pull两种方式让Consumer消费数据。

DefaultMQPushConsumer

先来看下Push方式下的Consumer使用,RocketMQ提供了DefaultPushConsumer,实现,

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SampleGroup");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("SampleTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

在Push模式下,调用方注册MessageListener,消息的消费逻辑在MessageListner的接口中被异步调用。

DefaultMQPullConsumer

RocketMQ中的DefaultMQPullConsumer是默认的Pull方式实现,

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("SampleGroup");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.start();

Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("SampleToic");
for (MessageQueue mq : mqs) {
    while (true) {
        try {
            long offset = consumer.fetchConsumeOffset(mq, false);
            PullResult pullResult = consumer.pull(mq, "*", offset, MAX_PULL_NUM);
            if (pullResult.getPullStatus() != PullStatus.FOUND) {
                break;
            }
            for (MessageExt msg : pullResult.getMsgFoundList()) {
                System.out.println("Msg pulled");
            }
            consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
            consumer.getDefaultMQPullConsumerImpl().updateConsumeOffsetToBroker(mq, pullResult.getNextBeginOffset(), true);
        } catch (Exception e) {
            logger.error("handlePull", e);
            break;
        }
    }
}

单纯从上面的使用方式来看,Pull方式用起来略微繁琐一些。两者的差异有,

  • Push方式下消息消费逻辑时被异步调用的,Pull方式下是在调用方线程直接调用
  • Push方式消息消费有专门的线程池进行处理,处理效率更高
  • Push方式下消费进度信息自动进行了记录,Pull方式下需要手动调用相关接口更新消费偏移
  • Push方式不能细粒度控制,Pull方式可以自主选择拉取消息的时机以及处理方式
  • Push方式下消息消费失败重试走默认流程,Pull方式可以根据需求主动控制是重新拉取、等待还是放弃

各项参数

消费模式

当存在多个Consumer时,消息消费还可以通过设置消费模式,RocketMQ默认为CLUSTERING模式,

// Consumer
consumer.setMessageModel(MessageModel.BROADCASTING);
定义描述
MessageModel.BROADCASTING广播模式, 一个ConsumerGroup的每一个Consumer都会收到消息
MessageModel.CLUSTERING集群模式, 一个ConsumerGroup下只有一个订阅者会收到消息

消费起点

上面样例代码中需要注意setConsumeFromWhere这个调用,这个方法设置了Consumer从个消息开始消费,ConsumeFramWhere定义了支持的几种方式,

public enum ConsumeFromWhere {
    CONSUME_FROM_LAST_OFFSET,
    CONSUME_FROM_FIRST_OFFSET,
    CONSUME_FROM_TIMESTAMP,
} 
定义描述
CONSUME_FROM_LAST_OFFSET只消费Consumer启动之后新来的消息
CONSUME_FROM_FIRST_OFFSET消费当前已经存在的所有消息
CONSUME_FROM_TIMESTAMP消费某个时间之后的消息

顺序消费/并行消费

Consumer在消费消息的时候需要通过registerMessageListener设置消息处理函数,存在两种方式,

定义描述
MessageListenerOrderly顺序消费消息
MessageListenerConcurrently并行消费消息

非顺序消息的话都应该使用MessageListenerConcurrently。顺序消息只能使用MessageListenerOrderly,如果顺序消息下用错了MessageListener,实际消费到的消息是没有顺序可言的。

消息过滤

Consumer通过subscribe方法订阅消息topic,subscribe订阅时可以进行过滤,存在几种过滤定义方式,

根据表达式过滤tag,表达式支持||操作,例如tag1 || tag2 || tag3,以及*

public class DefaultMQPushConsumer {
    public void subscribe(String topic, String subExpression) throws MQClientException {...}
}

提供MessageFilter的实现类进行过滤,

public class DefaultMQPushConsumer {
    public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {...}
}

通过MessageSelector进行过滤,MessageSelector支持两种表达式,一是SQL92,另外则是tag过滤,

public class DefaultMQPushConsumer {
    public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {...}
}

如无特别复杂需求的话,使用Tag过滤最好,ConsumeQueue的条目中记录了Tag的Hash值,可以更高效的进行过滤。