RocketMQ中Producer向Topic发送消息,Consumer订阅Topic。消息从属与Topic,因此Topic的创建管理可以认为是最基础的一个功能特性。

创建Topic

RocketMQ中存在不同的Topic创建方式,

自动创建

默认情况下,Topic不用手动创建。在Producer发送消息的时候,如果没有对应的Topic,那么这个时候会创建该Topic并投递消息。

自动创建的开关配置在BrokerConfig中,通过autoCreateTopicEnable字段进行控制。当这个开关打开之后,Broker上会默认创建一个MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC的Topic。Broker启动之后向Name Server注册时,会表明自身创建了该Topic。Producer在发送消息时会向Name Server查询这个消息的投递路径,相关的方法为DefaultMQProducerImpl.tryToFindTopicPublishInfo、MQClientInstance.updateTopicRouteInfoFromNameServer,这个时候会获取到该Topic对应的信息。之后消息会被投递到对应的Broker上。当消息抵达Broker之后,AbstractSendMessageProcessor.msgCheck函数会检查Topic是否存在,当不存在根据是否允许自动创建调用TopicConfigManager.createTopicInSendMessageMethod方法创建。

还有一些特殊Topic也是通过自动创建进行的,这个时候不通过autoCreateTopicEnable判断,比如消息消费失败之后会进入重试队列,重试队列的创建为Broker自动创建,实际创建方法为TopicConfigManager.createTopicInSendMessageBackMethod。

预先创建

实际环境中,Topic自动创建的开关是需要关闭的,否则难以管控集群的运行状态。这种情况下就需要预先创建Topic。RocketMQ中可以通过mqadmin提供的Topic相关命令进行创建。

命令的实现位于UpdateTopicSubCommand,执行,

./mqadmin updateTopic

会输出该命令的各项参数,

usage: mqadmin updateTopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>]
       -t <arg> [-u <arg>] [-w <arg>]
 -b,--brokerAddr <arg>       create topic to which broker
 -c,--clusterName <arg>      create topic to which cluster
 -h,--help                   Print help
 -n,--namesrvAddr <arg>      Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
 -o,--order <arg>            set topic's order(true|false
 -p,--perm <arg>             set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]
 -r,--readQueueNums <arg>    set read queue nums
 -s,--hasUnitSub <arg>       has unit sub (true|false
 -t,--topic <arg>            topic name
 -u,--unit <arg>             is unit topic (true|false
 -w,--writeQueueNums <arg>   set write queue nums

可以看到Topic可以创建在指定的Cluster或指定的Broker上,同时可以配置Topic的一些相关连参数。

Topic相关命令

Topic相关的命令有,

命令实现类描述
allocateMQAllocateMQSubCommand申请创建新的MessageQueue
deleteTopicDeleteTopicSubCommand删除Topic
topicClusterListTopicClusterSubCommand列出含有该Topic的Cluster列表
topicListTopicListSubCommand列出当前集群中的所有Topic
topicRouteTopicRouteSubCommand列出Topic的路由信息
topicStatusTopicStatusSubCommand列出Topic的一些统计数据状态
updateOrderConfUpdateOrderConfSubCommand更新Topic顺序相关配置
updateTopicPermUpdateTopicPermSubCommand更新Topic的读写权限
updateTopicUpdateTopicSubCommand创建或更新Topic配置

其中的一些查询性命令在遇到问题调试的时候能够提供一定帮助,理解创建修改型命令的各项参数,有助于了解Topic相关的功能特性。