RocketMQ中消息的结构定义虽然简单,不过还是有必要了解下。

Message

RocketMQ消息定义相关的代码位于org.apache.rocketmq.common.message下,Producer发送的消息定义为Message类,其字段定义为,

topic

Message都有Topic这一属性,Producer发送指定Topic的消息,Consumer订阅Topic下的消息。通过Topic字段,Producer会获取消息投递的路由信息,决定发送给哪个Broker。

flag

网络通信层标记。在代码中可能与之相关的使用代码有,

// org.apache.rocketmq.remoting.protocal.RemotingCommand

class RemotingCommand {
    @JSONField(serialize = false)
    public boolean isOnewayRPC() {
        int bits = 1 << RPC_ONEWAY;
        return (this.flag & bits) == bits;
    }

    @JSONField(serialize = false)
    public boolean isResponseType() {
        int bits = 1 << RPC_TYPE;
        return (this.flag & bits) == bits;
    }
}

翻阅代码之后只找到上面这一出与该字段相关的逻辑,并没有特别明确该字段的作用。

body

Producer要发送的实际消息内容,以字节数组形式进行存储。Message消息有一定大小限制。

transactionId

RocketMQ 4.3.0引入的事务消息相关的事务编号。

properties

该字段为一个HashMap,存储了Message其余各项参数,比如tag、key等关键的消息属性。RocketMQ预定义了一组内置属性,除了内置属性之外,还可以设置任意自定义属性。当然属性的数量也是有限的,消息序列化之后的大小不能超过预设的最大消息大小。

系统内置属性定义于org.apache.rocketmq.common.message.MessageConst

对于一些关键属性,Message类提供了一组set接口来进行设置,

class Message {
    public void setTags(String tags) {...}
    public void setKeys(Collection<String> keys) {...}
    public void setDelayTimeLevel(int level) {...}
    public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {...}
    public void setBuyerId(String buyerId) {...}
}

这几个set接口对应的作用分别为为,

属性接口用途
MessageConst.PROPERTY_TAGSsetTags在消费消息时可以通过tag进行消息过滤判定
MessageConst.PROPERTY_KEYSsetKeys可以设置业务相关标识,用于消费处理判定,或消息追踪查询
MessageConst.PROPERTY_DELAY_TIME_LEVELsetDelayTimeLevel消息延迟处理级别,不同级别对应不同延迟时间
MessageConst.PROPERTY_WAIT_STORE_MSG_OKsetWaitStoreMsgOK在同步刷盘情况下是否需要等待数据落地才认为消息发送成功
`MessageConst.PROPERTY_BUYER_IDsetBuyerId没有在代码中找到使用的地方,所以暂不明白其用处

这几个字段为什么用属性定义,而不是单独用一个字段进行表示?方便之处可能在于消息数据存盘结构早早定义,一些后期添加上的字段功能为了适应之前的存储结构,以属性形式存储在一个动态字段更为方便,自然兼容。

MessageExt

对于发送方来说,上述Message的定义以足够。但对于RocketMQ的整个处理流程来说,还需要更多的字段信息用以记录一些必要内容,比如消息的id、创建时间、存储时间等等。在同package下可以找到与之相关的其余类定义。首先就是MessageExt,

字段用途
queueId记录MessageQueue编号,消息会被发送到Topic下的MessageQueue
storeSize记录消息在Broker存盘大小
queueOffset记录在ConsumeQueue中的偏移
sysFlag记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
bornTimestamp消息创建时间,在Producer发送消息时设置
storeHost记录存储该消息的Broker地址
msgId消息Id
commitLogOffset记录在Broker中存储便宜
bodyCRC消息内容CRC校验值
reconsumeTimes消息重试消费次数
preparedTransactionOffset事务详细相关字段

Message还有一个名为MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX的属性,在消息发送时由Producer生成创建。上面表格中的msgId则是消息在Broker端进行存储时通过MessageDecoder.createMessageId方法生成的,其构成为,

这个MsgId是在Broker生成的,Producer在发送消息时没有该信息,Consumer在消费消息时则能获取到该值。RocketMQ也提供了相关命令,

命令实现类描述
queryMsgByIdQueryMsgByIdSubCommand根据MsgId查询消息