网站ftp地址是什么google永久免费的服务器
RocketMQ的架构理论及底层原理
- 一:生产消息
- 1.消息生产过程
- 2.Queue选择算法
- 二:存储消息
- 2.1存储介质
- 2.2消息的存储和发送
- 2.3消息存储结构
- 2.4刷盘机制
- 三:消费消息
- 1 获取消费类型
- 2 消费模式
- 3 Rebalance机制
- 4.Queue分配算法
- 四:消息清理
- 五:订阅关系的一致性
- 1.正确订阅关系
- 2.错误订阅关系
- 六:消费幂等
- 1.什么是消费幂等
- 2.消息重复的场景
- 3.通用解决方法
- 七:消息堆积与消费延迟
- 1.概念
- 2.产生原因分析
- 3.消费耗时
- 4.消费并发度
- 5.避免方法
- 八:Rocket mq的高可用
- 1.消息消费高可用
- 2.消息发送高可用
- 3.消息主从复制
- 九:负载均衡
- 1.Producer负载均衡
- 2.Consumer负载均衡
- 十:消息重试
- 1.顺序消息的重试
- 2.无序消息的重试
- 十一:死信队列
一:生产消息
1.消息生产过程
Producer可以将消息写入到某Broker中的某Queue中,过程如下:
- Producer发送消息之前,会先向NameServer发出获取消息Topic的路由信息的请求
- NameServer返回该Topic的路由表及Broker列表
- Producer根据代码中指定的Queue选择策略,从Queue列表中选出一个队列,用于后续存储消息
- Produer对消息做一些特殊处理,例如,消息本身超过4M,则会对其进行压缩
- Producer向选择出的Queue所在的Broker发出RPC请求,将消息发送到选择出的Queue
路由表:实际是一个Map,key为Topic名称,value是一个QueueData实例列表。QueueData并不是一个Queue对应一个QueueData,而是一个Broker中该Topic的所有Queue对应一个QueueData。即,只要涉及到该Topic的Broker,一个Broker对应一个QueueData。QueueData中包含brokerName。简单来说,路由表的key为Topic名称,value则为所有涉及该Topic的BrokerName列表。
Broker列表:其实际也是一个Map。key为brokerName,value为BrokerData。不是一个Broker对应一个BrokerData实例。而是一套brokerName名称相同的Master-Slave小集群对应一个BrokerData。BrokerData中包含brokerName及一个map。该map的key为brokerId,value为该broker对应的地址。brokerId为0表示该broker为Master,非0表示Slave。
2.Queue选择算法
对于无序消息,其Queue选择算法,也称为消息投递算法,常见的有两种:
- 轮询算法
默认选择算法。该算法保证了每个Queue中可以均匀的获取到消息。
该算法存在一个问题:在某些Broker上的Queue可能投递延迟较严重。从而导致Producer的缓存队列中出现较大的消息积压,影响消息的投递性能。
- 最小投递延迟算法
该算法会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的Queue。如果延迟相同,则采用轮询算法投递。该算法可以有效提升消息的投递性能。
该算法也存在一个问题:消息在Queue上的分配不均匀。投递延迟小的Queue其可能会存在大量的消息。而对该Queue的消费者压力会增大,降低消息的消费能力,可能会导致MQ中消息的堆积。
二:存储消息
分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。RocketMQ中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下的store目录中。
- abort:该文件在Broker启动后会自动创建,正常关闭Broker,该文件会自动消失。
- checkpoint:其中存储着commitlog、consumequeue、index文件的最后刷盘时间戳
- commitlog:其中存放着commitlog文件,而消息是写在commitlog文件中的
- config:存放着Broker运行期间的一些配置数据
- consumequeue:其中存放着consumequeue文件,队列就存放在这个目录中
- index:其中存放着消息索引文件indexFile
- lock:运行期间使用到的全局资源锁
- 消息生成者发送消息
- MQ收到消息,将消息进行持久化,在存储中新增一条记录
- 返回ACK给生产者
- MQ push 消息给对应的消费者,然后等待消费者返回ACK
- 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
- MQ删除消息
2.1存储介质
- 关系型数据库DB
ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈,若DB库宕机,MQ数据将无法存储丢失。 - 文件系统
RocketMQ采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。
文件系统>关系型数据库DB
2.2消息的存储和发送
1)消息存储
目前的高性能磁盘,顺序写速度可以达到600MB/s。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。
2)消息发送
一台服务器 把本机磁盘文件的内容发送到客户端,一般进行了4 次数据复制:
- 从磁盘复制数据到内核态内存;
- 从内核态内存复 制到用户态内存;
- 然后从用户态 内存复制到网络驱动的内核态内存;
- 最后是从网络驱动的内核态内存复 制到网卡中进行传输。
通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的
采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因
2.3消息存储结构
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列