RocketMQ中Producer向Topic发送消息,Consumer订阅Topic。消息从属与Topic,因此Topic的创建管理可以认为是最基础的一个功能特性。
创建Topic
RocketMQ中存在不同的Topic创建方式,
自动创建
默认情况下,Topic不用手动创建。在Producer发送消息的时候,如果没有对应的Topic,那么这个时候会创建该Topic并投递消息。
自动创建的开关配置在BrokerConfig中,通过autoCreateTopicEnable字段进行控制。当这个开关打开之后,Broker上会默认创建一个MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC的Topic。Broker启动之后向Name Server注册时,会表明自身创建了该Topic。Producer在发送消息时会向Name Server查询这个消息的投递路径,相关的方法为DefaultMQProducerImpl.tryToFindTopicPublishInfo、MQClientInstance.updateTopicRouteInfoFromNameServer,这个时候会获取到该Topic对应的信息。之后消息会被投递到对应的Broker上。当消息抵达Broker之后,AbstractSendMessageProcessor.msgCheck函数会检查Topic是否存在,当不存在根据是否允许自动创建调用TopicConfigManager.createTopicInSendMessageMethod方法创建。
还有一些特殊Topic也是通过自动创建进行的,这个时候不通过autoCreateTopicEnable判断,比如消息消费失败之后会进入重试队列,重试队列的创建为Broker自动创建,实际创建方法为TopicConfigManager.createTopicInSendMessageBackMethod。
预先创建
实际环境中,Topic自动创建的开关是需要关闭的,否则难以管控集群的运行状态。这种情况下就需要预先创建Topic。RocketMQ中可以通过mqadmin提供的Topic相关命令进行创建。
命令的实现位于UpdateTopicSubCommand,执行,
./mqadmin updateTopic
会输出该命令的各项参数,
usage: mqadmin updateTopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>]
-t <arg> [-u <arg>] [-w <arg>]
-b,--brokerAddr <arg> create topic to which broker
-c,--clusterName <arg> create topic to which cluster
-h,--help Print help
-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-o,--order <arg> set topic's order(true|false
-p,--perm <arg> set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]
-r,--readQueueNums <arg> set read queue nums
-s,--hasUnitSub <arg> has unit sub (true|false
-t,--topic <arg> topic name
-u,--unit <arg> is unit topic (true|false
-w,--writeQueueNums <arg> set write queue nums
可以看到Topic可以创建在指定的Cluster或指定的Broker上,同时可以配置Topic的一些相关连参数。
Topic相关命令
Topic相关的命令有,
命令 | 实现类 | 描述 |
---|---|---|
allocateMQ | AllocateMQSubCommand | 申请创建新的MessageQueue |
deleteTopic | DeleteTopicSubCommand | 删除Topic |
topicClusterList | TopicClusterSubCommand | 列出含有该Topic的Cluster列表 |
topicList | TopicListSubCommand | 列出当前集群中的所有Topic |
topicRoute | TopicRouteSubCommand | 列出Topic的路由信息 |
topicStatus | TopicStatusSubCommand | 列出Topic的一些统计数据状态 |
updateOrderConf | UpdateOrderConfSubCommand | 更新Topic顺序相关配置 |
updateTopicPerm | UpdateTopicPermSubCommand | 更新Topic的读写权限 |
updateTopic | UpdateTopicSubCommand | 创建或更新Topic配置 |
其中的一些查询性命令在遇到问题调试的时候能够提供一定帮助,理解创建修改型命令的各项参数,有助于了解Topic相关的功能特性。