RocketMQ系统中的消息来自于Producer,Producer的功能就是发送消息,这里简要了解下发送消息的一些需要注意的地方。
调用流程
RocketMQ提供的Producer实现为DefaultMQProducer。Producer在创建的时候必须指定Producer Group Name,在正式发送消息之前需要调用start方法初始化Producer,
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.start();
Producer创建完成之后则可以发送消息,消息能够发送的前提是存在对应的路由信息,基本发送流程可以简化为,
消息发送方式
Producer发送消息存在三种模式,分别为:Sync、Async、Oneway,
同步发送(Sync)
同步发送,在调用发送结果之后会等待发送结果,对于调用方来说式同步使用方式,
Message msg = new Message("SampleTopic", "SampleTag", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(msg);
异步发送(Async)
异步发送,调用方在发送消息之后立即返回,消息发送的结果处理在回调中进行,
Message msg = new Message("SampleTopic", "SampleTag", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
System.out.printf("send success %s", result.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("send fail %s", e);
e.printStackTrace();
}
});
发送且无返回结果(One Way)
同步发送、异步发送都有一个发送结果的处理,但是对应Oneway来说,就是管发不管结果,成功失败与否发送方表示不在意,
Message msg = new Message("SampleTopic", "SampleTag", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
三种方式的差异与适用场景
其实Producer这三种发送方式从代码实现角度来说,根本的差别都在NettyRemotingAbstract中。RocketMQ各构件之间都是通过Netty进行网络通信的。同步发送最终的调用为,NettyRemotingAbstract.invokeSyncImpl,在写入数据之后直接等待结果返回,
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
...
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
...
}
发送超时的配置为DefaultMQProducer.sendMsgTimeout,通过set接口进行配置。
对于异步、Oneway发送来说,两者在发送时的处理是类似的,分别通过semaphoreAsync、semaphoreOneway控制同时发送的数量。这两个参数在NettyClientConfig中进行配置,默认值为,
public class NettyClientConfig {
private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
}
在使用场景方面,如果发送方对消息发送要立即掌控发送成功与否,那么比较适合用同步发送。如果是批量发送很多消息,那么异步发送比较合适。Oneway理论上比较适合发送那些可以丢弃的消息,但实际中估计比较少用。
消息功能特性
RocketMQ针对消息提供了一些功能特写,大部分都是Producer/Consumer互相配合着来的,这里主要看下Producer的处理,
消息过滤处理
Producer发送消息可以设置Tags,这样在Consumer消费的时候可以根据Tags区进行过滤。
根据Key查询消息
RocketMQ中的消息可以通过MsgId进行查询,也可以通过Topic + Key的方式进行查询,这里的Key,就是通过Message.setKeys设置的,也是在Producer发送消息之前进行配置的。
延时消息
RocketMQ支持延时消息,但是延时消息的级别是固定的,只能选择预定义的延时级别,默认的级别配置在MessageStoreConfig中定义,
public class MessageStoreConfig {
...
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
...
}
在消息发送时通过Message.setDelayTimeLevel设置对应级别。
顺序消息
RocketMQ也提供了顺序消息支持。对于Producer来说,想使用顺序消息特性就需要将消息有序的发送到同一个MessageQueue当中,这样才能利用RocketMQ的存取特性实现顺序消息功能。
Producer提供了发送指定MessageQueue的接口,对于发送方来说,需要显示定义MessageQueue的选择规则,
Message msg = new Message("SampleTopic", "SampleTag", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(((Integer) arg) % mqs.size());
}
}, orderId);
顺序消息是需要Producer/Consumer配合实现,严格顺序则在RocketMQ的运维部署上也存在一定要求。
批量发送
Producer提供了批量发送接口,
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 3; i++) {
String key = String.format("ID%d", i);
byte[] body = String.format("Hello %d", i).getBytes();
messages.add(new Message("SampleTopic", "SampleTag", key, body));
}
producer.send(messages);
文档上描述一次发送的批量消息大小不能超过1MB,否则需要分批发送。不过实际代码中默认的最大消息大小在DefaultMQProducer.maxMessageSize中定义,默认是配成了4MB。
如果以Message列表形式进行传参只有同步发送方式,如果想异步Batch发送消息,可以先构建org.apache.rocketmq.common.message.MessageBatch,然后再走异步发送消息接口。实际上在Producer内部也是从Message列表构建MessageBatch来进行处理的。
发送Hook
RocketMQ在发送流程上提供了注册Hook相关接口。可以设置两种Hook,一种是RPCHook,另外是SendMessageHook。
RPCHook是在每次有Netty网络操作时调用。SendMessageHook是在发送消息前、发送消息后分别调用其接口。这些Hook里面可以做一些Hack的处理,或者是简单的日志统计。
注意事项
Producer提供的接口实际是比较简单的,要说需要注意的话就是要关注发送的日志。日志相关配置在ClientLogger中,默认写入的位置是~/logs/rocketmqlogs/rocketmq_client.log。
如果出现Producer发送消息,但Consumer无法消费到消息。那么首先要排查下Producer是否真正发出,那么一是要看rocketmq_client.log中是否存在异常日志,二是需要到Broker中去看消息的发送统计,最后再看是否Consumer那一方是否存在异常。