博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
52.源代码解读-RocketMQ消息写入机制
阅读量:5871 次
发布时间:2019-06-19

本文共 2302 字,大约阅读时间需要 7 分钟。

一. 前言

RocketMQ采用内存和磁盘存储来存储消息。那现在来分析一下消息存储的流程

二. 代码流程

在Broker启动的时候会拉起相关服务

流程如下:

52.源代码解读-RocketMQ消息写入机制

流程图引用网址

三. 代码流程

由于是Broker来存储消息,那么消息入口的代码应该是在Broker里面,而Broker的入口是BrokerStartup,以及重要的BrokerController。

具体流程可以参考Broker启动源代码分析。

以发送消息为例

1. Broker启动注册发送消息处理器

Broker启动的时候,会注册一个SendMessageProcesser来响应netty的发送消息请求,如下:

public void registerProcessor() {        /**         * SendMessageProcessor         */        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);        sendProcessor.registerSendMessageHook(sendMessageHookList);        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);}

2. 消息处理器处理发送者发送过来的消息

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {    @Override    public RemotingCommand proce***equest(ChannelHandlerContext ctx,        RemotingCommand request) throws RemotingCommandException {        SendMessageContext mqtraceContext;                ...        switch (request.getCode()) {            response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);        }    }}

继续看sendMessage..

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,        ...        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);}

调用MessageStore.putMessage(msgInner)

转载于:https://blog.51cto.com/483181/2086030

你可能感兴趣的文章
[老老实实学WCF] 第八篇 实例化
查看>>
前端自动化构建工具webpack (一)之webpack安装 和 设置webpack.confi.js
查看>>
js数据类型转换大全
查看>>
java面向对象高级分层实例_实体类
查看>>
python ide
查看>>
Servlet--传参和接参
查看>>
UI之UIAlertView--提示框
查看>>
Guice 练习 constructorbindings demo
查看>>
model 弹出框放到一个html中
查看>>
jQuery easyui中获取datagrid某一列的值之和
查看>>
20190104 你懂得
查看>>
dedecms调用子栏目及文章列表
查看>>
jsoup抓取网页+具体解说
查看>>
关于ping以及TTL的分析
查看>>
uboot下netconsole的原理及用法
查看>>
有用PHP依赖管理工具Composer新手教程
查看>>
Hibernate与 MyBatis的比较
查看>>
网页选项卡的应用
查看>>
PLSQL Developer安装(Oracle11g+win7_64bit)
查看>>
创建和使用虚拟专用目录
查看>>