源码阅读(一)
搭建环境
rocketmq的java-sdk并没有什么特殊的构建的,直接加载maven即可。
节点部署:
Docker 部署 RocketMQ | RocketMQ
面板部署:
apache/rocketmq-dashboard
hhh,遵从官网部署,点点点就可以了。
生产者初识
阅读源码首先要明确的目标,或者说我们想要了解什么?
在我看来生产者的职责分为以下几个
- 1,与
namesrv
沟通,维护本地的broker
通信队列
- 2,发送
topic
消息到远程broker
- 3,根据响应结果判断是否发送成功,以及失败情况下的策略
每一个生产者对于broker
节点来说是消息的来源,但是broker
并不关心究竟有多少个生产者在为它提供服务,对于消费者来说同样如此!
首先定位到源码示例

示例的逻辑并不复杂,但问题是怎么处理的?尝试追溯DefaultMQProducer
,找到客户端的sdk模块rocketmq-client
观察DefaultMQProducer
的父类路径发现extends ClientConfig implements MQProducer
,对于MQProducer
没啥好说的,定义了生产者方法规范,而且没有注释。小白一只的我将目光对准ClientConfig
,
private String clientIP = NetworkUtil.getLocalAddress();
寻找ip的方法。
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
生产者名称
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| # 客户端id,这个要注意与生产者名称区分,在我看来是类似与多元组一样的, ip+instanceName+...... public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP());
sb.append("@"); sb.append(this.getInstanceName()); if (!UtilAll.isBlank(this.unitName)) { sb.append("@"); sb.append(this.unitName); }
if (enableStreamRequestType) { sb.append("@"); sb.append(RequestType.STREAM); }
return sb.toString(); }
|
需要注意的是对于修改clentIP
可以直接通过producer.setClientIP(DEFAULT_NAMESRVADDR);
,clientID
则并没有这个字段,他是在生产者启动时自动生成的的一个标识符,或许可以这么说 ip+instanceName
表示了这个主机上的所有rocketmq的消费者和生产者。
追溯buildMQClientId
即可发现,start的调用链路中存在这个方法。
与namesrv
沟通,维护本地的broker
通信队列
DefaultMQProducer#start() -> DefaultMQProducerImpl#start() -> MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHoo) -> MQClientInstance # new MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook)
追溯到MQClientInstance
的初始化,在这里类中可以看到非常重要的几个属性
1 2 3 4 5 6 7 8 9 10 11 12
| ## broker节点列表 ket为id,value为 ip地址 private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new ConcurrentHashMap<>();
## 心跳检查开关,源码中显示用于管理broker节点的连接是否实效 private boolean enableHeartbeatChannelEventListener = true;
## 消费者状态代理 this.consumerStatsManager
## netty连接客户端配置 private final NettyClientConfig nettyClientConfig;
|
断点查看参数后发现new MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook)
初始化的是远程namesrv的客户端。
![[Pasted image 20250409182100.png]]
namesrv的配置
结论: 存在不一致则直接覆盖更新,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| NettyRemotingClient{ private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<>();//这个原子引用在方法中充当了互斥锁
public void updateNameServerAddressList(List<String> addrs) { List<String> old = this.namesrvAddrList.get(); boolean update = false; // 1,如果存在不一致,则全部更新 if (!addrs.isEmpty()) { if (null == old) {//olg不存在 update = true; } else if (addrs.size() != old.size()) {//数量不一致 update = true; } else { for (String addr : addrs) { if (!old.contains(addr)) {//存在差异 update = true; break; } } } if (update) { this.namesrvAddrList.set(addrs); } } }
}
|
此处引申出两个新问题,
1,如何复用MQClientInstance
?
调用链路中直接上级是
MQClientManager
,关键成员变量为
1 2
| private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<>();
|
2,namesrv
存在更新,看上去是MQClientInstance 重新初始化的缘故?那么为什么会重新初始化?namesrv
应当写死在配置文件当中,但此处似乎允许动态更新?
追溯ClientConfig
信息的来源发现在DefaultMQProducerImpl
成员变量
1 2 3 4
| private final DefaultMQProducer defaultMQProducer; public class DefaultMQProducer extends ClientConfig implements MQProducer { .... }
|
从接口角度上来说,DefaultMQProducer确实是生产者实现,但可以将DefaultMQProducerImpl
理解为生产者的启动类,而并非实现类。
此外这个实现类会默认使用环境变量(或者是其他配置),然后使用编码角度的上层配置覆盖,这也是常规的编码>环境变量的实现。
维护broker队列
追溯发送消息的方法找到关键方法sendKernelImpl
1 2 3 4 5 6 7 8 9 10 11 12 13
| class DefaultMQProducerImpl{ private SendResult sendKernelImpl(。。。 ) { String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName); } }
|
1,获取brokerAddr地址时并不会检验是否有效,只要有对应缓存即可
2, tryToFindTopicPublishInfo(mq.getTopic());
会在本地没有对应topic条目时更新
基于以上两点可以看出,默认实现里对于broker的维护似乎是通过发送消息错误时重新拉取实现的?
1 2 3 4
| DefaultMQProducerImpl#sendKernelImpl -》 this.mQClientFactory.getMQClientAPIImpl().sendMessage -》
|
在sendmessgae
中发送消息有三种走向,通过switch
区分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| switch (communicationMode) { case ONEWAY: //单向发送 this.remotingClient.invokeOneway(addr, request, timeoutMillis);
case ASYNC: //异步发送 this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); case SYNC: //同步发送 return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); default: assert false; break; }
|
单向发送: 不会判断是否发送成功
异步发送/同步发送:会根据ack或者响应超时等其他指标判断消息是否发送成功
因此需要注意,假设使用了单向发送,那么先前的推论根据相应判断broker是否有效
就不成立,或者说不会单单根据这个判断broker是否有效。
心跳模式
重新阅读源码,追溯brokerAddrTable
调用发现心跳模式,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| if (clientConfig.isEnableHeartbeatChannelEventListener()) { channelEventListener = new ChannelEventListener() { private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = MQClientInstance.this.brokerAddrTable;
@Override public void onChannelActive(String remoteAddr, Channel channel) { for (Map.Entry<String, HashMap<Long, String>> addressEntry : brokerAddrTable.entrySet()) { for (Map.Entry<Long, String> entry : addressEntry.getValue().entrySet()) { String addr = entry.getValue(); if (addr.equals(remoteAddr)) { long id = entry.getKey(); String brokerName = addressEntry.getKey(); if (sendHeartbeatToBroker(id, brokerName, addr, false)) { rebalanceImmediately(); } break; } } } } }
|
核心判断 if (addr.equals(remoteAddr)) {。。。 break;}
结合心跳模式的判断,
1,此处应当为borker地址的监听判断,或者说维护。
2,这里仍然不是拉去broker的地址
定时任务: 清除离线broker
再一次追溯调用找到定时任务,每秒执行一次
1 2 3 4 5 6 7 8 9
| MQClientInstance#startScheduledTask this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Throwable t) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", t); } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
|
1 2
| MQClientInstance.this.cleanOfflineBroker(); -》 this.isBrokerAddrExistInTopicRouteTable(addr)
|
该方法内部依赖TopicRouteData
的维护,并且仅仅判断是否存在对应的broker
地址,而不关心是否有对应topic
1 2 3 4 5 6 7 8 9 10 11 12
| private boolean isBrokerAddrExistInTopicRouteTable(final String addr) { for (Entry<String, TopicRouteData> entry : this.topicRouteTable.entrySet()) { TopicRouteData topicRouteData = entry.getValue(); List<BrokerData> bds = topicRouteData.getBrokerDatas(); for (BrokerData bd : bds) { if (bd.getBrokerAddrs() != null) { boolean exist = bd.getBrokerAddrs().containsValue(addr); if (exist) return true; } } }
|
根据方法的注释,可以判断TopicRouteData
的数据是需要作为依据存在,准确性要比brokerAddrTable
高。
updateTopicRouteInfoFromNameServer
更新路由拉区信息,无同步
1 2
| (final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer)
|
因此该方法用于更新指定生产者下的topic的地址,
调用时机:
- 1, sendKernelImpl下没有topic对应的
MASTER_ID
的节点时。
阅读源码发现,生产者作用似乎仅仅是指定了队列数量,但是其他数量都使用了当前MQClientInstance实例的数据,那么就引申出一个新问题,就是生产者和实例的对应关系?MQClientInstance似乎可以复用,在前面的方法调用当中存在getOrCreateMQClientInstance
,仅仅从方法名称中也可以发现似乎存在某个变量维护着一个公共的`MQClientInstance``?
MQClientManager和MQClientInstance
首先说结论,这两对象都是能复用就复用,不会做额外的判断。
MQClientManager在调用时,直接使用的是静态成员变量
private static MQClientManager instance = new MQClientManager();
MQClientInstance通过上面的静态成员变量维护了一个成员变量,
1 2
| private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<>();
|
回顾clientiD的创建方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP());
sb.append("@"); sb.append(this.getInstanceName()); if (!UtilAll.isBlank(this.unitName)) { sb.append("@"); sb.append(this.unitName); }
if (enableStreamRequestType) { sb.append("@"); sb.append(RequestType.STREAM); }
return sb.toString(); }
|
四元组为: ip: 实例名称: 单元名称: 流式请求
尝试使用相同clientId后成功启动。
TopicRouteInfo
目前来讲brokerAddrTable有只有一个直接来源topicRouteTable
追溯调用找到关键方法MQClientInstance#updateTopicRouteInfoFromNameServer
1 2 3 4 5
| public boolean updateTopicRouteInfoFromNameServer(,,,){ topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(clientConfig.getMqClientApiTimeout()); // 中间省略的操作大致作用为old、young合并 this.topicRouteTable.put(topic, cloneTopicRouteData); }
|
1 2 3 4 5
| 追溯找到路由信息的请求定义 InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); requestHeader.setTopic(topic); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
|
注意核心请求code
1 2 3
| RequestCode.GET_ROUTEINFO_BY_TOPIC,
public static final int GET_ROUTEINFO_BY_TOPIC = 105;
|
发送消息失败
发送消息失败,会抛出自定义错误RemotingException e
交给DefaultMQProducerImpl#sendDefaultImpl
的catch处理,断点查看得知,在默认情况下,会尝试隔离broker
1 2 3 4 5 6 7 8 9 10
| } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); if (this.mqFaultStrategy.isStartDetectorEnable()) { // Set this broker unreachable when detecting schedule task is running for RemotingException. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false); } else { // Otherwise, isolate this broker. 在内部方法的调用链路中,发现他会重新检测broker的状态, this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, true); }
|
总结
生产者网络通信: remotingClient
生产者通信api封装: MQClientAPIImpl
生产者动态信息保存: MQClientInstance
生产者动态信息- 经理:MQClientManager
默认生产者实现:DefaultMQProducerImpl
默认生产者的定义: DefaultMQProducer
初始化remotingClient
会以当前编码配置为主。
1 2 3 4 5 6 7 8 9 10 11 12 13
| // 1,如果存在不一致,则全部更新 if (!addrs.isEmpty()) { if (null == old) {//olg不存在 update = true; } else if (addrs.size() != old.size()) {//数量不一致 update = true; } else { for (String addr : addrs) { if (!old.contains(addr)) {//存在差异 update = true; break; } } }
|
broker的更新和维护
1,心跳模式
2,定时任务根据路由信息更新
3,不存在对应的topic信息时,会发送code为105的请求到namesrv中
4,默认情况下,发送消息失败会尝试隔离对应的broker

客户端所用的code类
org.apache.rocketmq.remoting.protocol.RequestCode