RocketMQ系统中的消息来自于Producer,Producer的功能就是发送消息,这里简要了解下发送消息的一些需要注意的地方。

调用流程

RocketMQ提供的Producer实现为DefaultMQProducer。Producer在创建的时候必须指定Producer Group Name,在正式发送消息之前需要调用start方法初始化Producer,

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.start();

Producer创建完成之后则可以发送消息,消息能够发送的前提是存在对应的路由信息,基本发送流程可以简化为,

消息发送方式

Producer发送消息存在三种模式,分别为:Sync、Async、Oneway,

同步发送(Sync)

同步发送,在调用发送结果之后会等待发送结果,对于调用方来说式同步使用方式,

Message msg = new Message("SampleTopic", "SampleTag", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(msg);

异步发送(Async)

异步发送,调用方在发送消息之后立即返回,消息发送的结果处理在回调中进行,

Message msg = new Message("SampleTopic", "SampleTag", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult result) {
        System.out.printf("send success %s", result.getMsgId());
    }
    @Override
    public void onException(Throwable e) {
        System.out.printf("send fail %s", e);
        e.printStackTrace();
    }
});

发送且无返回结果(One Way)

同步发送、异步发送都有一个发送结果的处理,但是对应Oneway来说,就是管发不管结果,成功失败与否发送方表示不在意,


Message msg = new Message("SampleTopic", "SampleTag", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);

三种方式的差异与适用场景

其实Producer这三种发送方式从代码实现角度来说,根本的差别都在NettyRemotingAbstract中。RocketMQ各构件之间都是通过Netty进行网络通信的。同步发送最终的调用为,NettyRemotingAbstract.invokeSyncImpl,在写入数据之后直接等待结果返回,

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
    ...
    RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
    ...
}

发送超时的配置为DefaultMQProducer.sendMsgTimeout,通过set接口进行配置。

对于异步、Oneway发送来说,两者在发送时的处理是类似的,分别通过semaphoreAsync、semaphoreOneway控制同时发送的数量。这两个参数在NettyClientConfig中进行配置,默认值为,

public class NettyClientConfig {
    private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
    private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
}

在使用场景方面,如果发送方对消息发送要立即掌控发送成功与否,那么比较适合用同步发送。如果是批量发送很多消息,那么异步发送比较合适。Oneway理论上比较适合发送那些可以丢弃的消息,但实际中估计比较少用。

消息功能特性

RocketMQ针对消息提供了一些功能特写,大部分都是Producer/Consumer互相配合着来的,这里主要看下Producer的处理,

消息过滤处理

Producer发送消息可以设置Tags,这样在Consumer消费的时候可以根据Tags区进行过滤。

根据Key查询消息

RocketMQ中的消息可以通过MsgId进行查询,也可以通过Topic + Key的方式进行查询,这里的Key,就是通过Message.setKeys设置的,也是在Producer发送消息之前进行配置的。

延时消息

RocketMQ支持延时消息,但是延时消息的级别是固定的,只能选择预定义的延时级别,默认的级别配置在MessageStoreConfig中定义,

public class MessageStoreConfig {
    ...
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    ...
}

在消息发送时通过Message.setDelayTimeLevel设置对应级别。

顺序消息

RocketMQ也提供了顺序消息支持。对于Producer来说,想使用顺序消息特性就需要将消息有序的发送到同一个MessageQueue当中,这样才能利用RocketMQ的存取特性实现顺序消息功能。

Producer提供了发送指定MessageQueue的接口,对于发送方来说,需要显示定义MessageQueue的选择规则,

Message msg = new Message("SampleTopic", "SampleTag", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return mqs.get(((Integer) arg) % mqs.size());
    }
}, orderId);

顺序消息是需要Producer/Consumer配合实现,严格顺序则在RocketMQ的运维部署上也存在一定要求。

批量发送

Producer提供了批量发送接口,

List<Message> messages = new ArrayList<>();
for (int i = 0; i < 3; i++) {
    String key = String.format("ID%d", i);
    byte[] body = String.format("Hello %d", i).getBytes();
    messages.add(new Message("SampleTopic", "SampleTag", key, body));
}
producer.send(messages);

文档上描述一次发送的批量消息大小不能超过1MB,否则需要分批发送。不过实际代码中默认的最大消息大小在DefaultMQProducer.maxMessageSize中定义,默认是配成了4MB。

如果以Message列表形式进行传参只有同步发送方式,如果想异步Batch发送消息,可以先构建org.apache.rocketmq.common.message.MessageBatch,然后再走异步发送消息接口。实际上在Producer内部也是从Message列表构建MessageBatch来进行处理的。

发送Hook

RocketMQ在发送流程上提供了注册Hook相关接口。可以设置两种Hook,一种是RPCHook,另外是SendMessageHook。

RPCHook是在每次有Netty网络操作时调用。SendMessageHook是在发送消息前、发送消息后分别调用其接口。这些Hook里面可以做一些Hack的处理,或者是简单的日志统计。

注意事项

Producer提供的接口实际是比较简单的,要说需要注意的话就是要关注发送的日志。日志相关配置在ClientLogger中,默认写入的位置是~/logs/rocketmqlogs/rocketmq_client.log。

如果出现Producer发送消息,但Consumer无法消费到消息。那么首先要排查下Producer是否真正发出,那么一是要看rocketmq_client.log中是否存在异常日志,二是需要到Broker中去看消息的发送统计,最后再看是否Consumer那一方是否存在异常。