本文共 2302 字,大约阅读时间需要 7 分钟。
RocketMQ采用内存和磁盘存储来存储消息。那现在来分析一下消息存储的流程
在Broker启动的时候会拉起相关服务
流程如下:流程图引用网址
由于是Broker来存储消息,那么消息入口的代码应该是在Broker里面,而Broker的入口是BrokerStartup,以及重要的BrokerController。
具体流程可以参考Broker启动源代码分析。以发送消息为例
Broker启动的时候,会注册一个SendMessageProcesser来响应netty的发送消息请求,如下:
public void registerProcessor() { /** * SendMessageProcessor */ SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);}
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { @Override public RemotingCommand proce***equest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; ... switch (request.getCode()) { response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); } }}
继续看sendMessage..
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, ... PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);}
调用MessageStore.putMessage(msgInner)
转载于:https://blog.51cto.com/483181/2086030