RocketMQ中消息的结构定义虽然简单,不过还是有必要了解下。
Message
RocketMQ消息定义相关的代码位于org.apache.rocketmq.common.message
下,Producer发送的消息定义为Message类,其字段定义为,
topic
Message都有Topic这一属性,Producer发送指定Topic的消息,Consumer订阅Topic下的消息。通过Topic字段,Producer会获取消息投递的路由信息,决定发送给哪个Broker。
flag
网络通信层标记。在代码中可能与之相关的使用代码有,
// org.apache.rocketmq.remoting.protocal.RemotingCommand
class RemotingCommand {
@JSONField(serialize = false)
public boolean isOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
return (this.flag & bits) == bits;
}
@JSONField(serialize = false)
public boolean isResponseType() {
int bits = 1 << RPC_TYPE;
return (this.flag & bits) == bits;
}
}
翻阅代码之后只找到上面这一出与该字段相关的逻辑,并没有特别明确该字段的作用。
body
Producer要发送的实际消息内容,以字节数组形式进行存储。Message消息有一定大小限制。
transactionId
RocketMQ 4.3.0引入的事务消息相关的事务编号。
properties
该字段为一个HashMap,存储了Message其余各项参数,比如tag、key等关键的消息属性。RocketMQ预定义了一组内置属性,除了内置属性之外,还可以设置任意自定义属性。当然属性的数量也是有限的,消息序列化之后的大小不能超过预设的最大消息大小。
系统内置属性定义于org.apache.rocketmq.common.message.MessageConst
,
对于一些关键属性,Message类提供了一组set接口来进行设置,
class Message {
public void setTags(String tags) {...}
public void setKeys(Collection<String> keys) {...}
public void setDelayTimeLevel(int level) {...}
public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {...}
public void setBuyerId(String buyerId) {...}
}
这几个set接口对应的作用分别为为,
属性 | 接口 | 用途 |
---|---|---|
MessageConst.PROPERTY_TAGS | setTags | 在消费消息时可以通过tag进行消息过滤判定 |
MessageConst.PROPERTY_KEYS | setKeys | 可以设置业务相关标识,用于消费处理判定,或消息追踪查询 |
MessageConst.PROPERTY_DELAY_TIME_LEVEL | setDelayTimeLevel | 消息延迟处理级别,不同级别对应不同延迟时间 |
MessageConst.PROPERTY_WAIT_STORE_MSG_OK | setWaitStoreMsgOK | 在同步刷盘情况下是否需要等待数据落地才认为消息发送成功 |
`MessageConst.PROPERTY_BUYER_ID | setBuyerId | 没有在代码中找到使用的地方,所以暂不明白其用处 |
这几个字段为什么用属性定义,而不是单独用一个字段进行表示?方便之处可能在于消息数据存盘结构早早定义,一些后期添加上的字段功能为了适应之前的存储结构,以属性形式存储在一个动态字段更为方便,自然兼容。
MessageExt
对于发送方来说,上述Message的定义以足够。但对于RocketMQ的整个处理流程来说,还需要更多的字段信息用以记录一些必要内容,比如消息的id、创建时间、存储时间等等。在同package下可以找到与之相关的其余类定义。首先就是MessageExt,
字段 | 用途 |
---|---|
queueId | 记录MessageQueue编号,消息会被发送到Topic下的MessageQueue |
storeSize | 记录消息在Broker存盘大小 |
queueOffset | 记录在ConsumeQueue中的偏移 |
sysFlag | 记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识 |
bornTimestamp | 消息创建时间,在Producer发送消息时设置 |
storeHost | 记录存储该消息的Broker地址 |
msgId | 消息Id |
commitLogOffset | 记录在Broker中存储便宜 |
bodyCRC | 消息内容CRC校验值 |
reconsumeTimes | 消息重试消费次数 |
preparedTransactionOffset | 事务详细相关字段 |
Message还有一个名为MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX的属性,在消息发送时由Producer生成创建。上面表格中的msgId则是消息在Broker端进行存储时通过MessageDecoder.createMessageId方法生成的,其构成为,
这个MsgId是在Broker生成的,Producer在发送消息时没有该信息,Consumer在消费消息时则能获取到该值。RocketMQ也提供了相关命令,
命令 | 实现类 | 描述 |
---|---|---|
queryMsgById | QueryMsgByIdSubCommand | 根据MsgId查询消息 |