hhhzua 发布的文章

因为RocketMQ的消息存储在Broker中的CommitLog中,由索引文件ConsumerQueue来查询消息,ConsumerQueue中用来定位消息的就是CommitLog的Offset,所以消费者或者Broker通过管理Offset来管理消费进度。

消费模式

  • 广播消费

    
    ConsumerGroup中的所有Consumer都可以消费Topic的所有消息,各自的消费进度没有交集,所以offset是consumer本地管理
    
  • 集群消费

    
    ConsumerGroup中的所有Consumer互斥消费Topic的消息,即一个消息只能被一个Consumer消费,ConsumerGroup下所有Consumer共同维护一个消费进度,所以offset是在Broker端远程管理
    
    

Offset的提交方式

在集群消费模式下,Consumer向Broker提交offset的方式有两种:

  • 同步提交

    • Consumer处理完消息后会向Broker提交offset,然后等待Broker的成功响应,在超时时间内获取到响应则进行下一批消息获取,否则重新提交offset,直到获取响应,这个过程中Consumer是阻塞的
  • 异步提交

    • Consumer处理完消息后会向Broker提交offset,不等待Broker响应,直接获取下一批消息,提高了消费者的吞吐量,但是一旦Broker宕机了,offset没有持久化,当Broker重启后就会重复消费

Producer负载均衡

  • 容错机制:开启容错机制后,会进行失败隔离和超时隔离,规避失败的Broker
  • Broker选择:如果随机到了已经发送过的Broker会重新进行选择

Consumer负载均衡

  • Consumer会定时向所有的Broker发送心跳,心跳数据包括消费组名称、订阅关系集合、消息的通信模式、消费者ID等,Broker接收到心跳包后会维护在ConsumerManager中,当Broker监听到Consumer变化之后,会通知Consumer进行ReBalance
  • 在Consumer客户端,也会有一个定时任务,周期性进行Rebalance

Rebalance

Rebalance是一个由消费端平衡queue和consumer的过程,实现原理:

  • 触发时机:

    • 在Consuner启动或者停止的时候,此时消费者数量变化,Consumer会主动发起一次Rebalance
    • 收到Broker的Rebalance通知时
    • 消费者每执行定时任务,每20s触发一次Rebalance
  • 向服务端获取TopicA最新的Queue列表和Consumer列表,根据这些信息重新分配消费关系
  • 分配策略:

    • rocket提供多个消费策略,消费机器不同但是内部代码相同,所以会选择相同的消费策略,保证不同的Consumer节点执行Rebalance后得到相同的分配结果
  • Rebalance之后,消费者会对比消费关系是否变化,有新增的queue则加入pullRequests请求队列中,如果一个queue被移除将被标记drop在后续流程中将会从pullRequests请求队列剔除

Rebalance的缺点

  • 暂停消费

    • 当有新增的consumer时,原来的queue需要分一部分到新的consumer消费,在分配好之前这部分queue的消费会被暂停
  • 重复消费

    • 在切换分配关系的间隙,可能queue的原来consumer还未提交offset就切换到新的consumer了,此时会导致queue的消息被多个Consumer消费了,所以需要在业务上考虑幂等

顺序消息的Rebalance:

因为顺序消息为了严格保证顺序,是不能被多个Consumer消费的,所以顺序消息的Rebalance会直接在queue上加锁,上锁成功后才可以进行消费关系调整

CommitLog

Producer发送过来的消息是存储在CommitLog中,Producer端每次发送过来的消息是不等长的,每写满1G就会新建一个CommitLog文件继续写入。

那Consumer消费的时候,怎么获取到消息呢?直接从CommitLog中查询消息的话是低效的,RocketMQ作为一个高性能、高吞吐的消息中间件肯定不会采用这个方案,而是使用ConsumerQueue。

ConsumerQueue

ConsumerQueue也是文件,当Broker收到一条消息,写入CommitLog的同时,也会一条ConsumerQueue记录,保存这条消息在CommitLog中的offset、消息大小、对应的Tag的hash值。

每个MessageQueue都会对应一个ConsumerQueue文件,每个ConsumerQueue文件可写30W条消息,超过之后再新建ConsumerQueue文件继续写入。

主从复制

Broker中有两种角色:Master、Slave,Master用于处理生产者、消费者的请求以及消息的存储,Slave从Master中同步所有数据,有以下两个作用:

  1. 保证Broker高可用:作为Master的数据副本,当Master宕机后,消费者可以连接Slave继续消费,避免单点故障
  2. 保证Broker高性能:当Master负载高的时候,Master会建议消费者从Slave上拉取消息,分担Master压力

Broker主从同步的方式有两种:

  1. 同步复制:生产者发送消息到Master,然后Master将消息同步发送到Slave才算发送完成
  2. 异步复制:生产者发送消息到Master即完成,再由异步线程异步发送到Slave

Broker主从同步的数据主要是配置数据和消息数据

客户端通过注册中心知道所有的服务实例,这就需要注册中心提供服务注册、服务剔除、服务发现这些基本功能。

NameServer

NameServer的特点是简单、无状态、可横向扩展、节点间互不通信,相对于kafka来说,kafka使用Zookeeper来Master选举、分布式锁等功能保证强一致性,RocketMQ使用的NameServer达到的是最终一致性。根据CAP理论,Zookeeper是CP,NameServer是AP:

一致性(Consistency):NameServer节点间互不通信,意味着某一时刻,不同节点可能存在数据不一致

可用性(Availability):只要不是NameServer都挂掉,就可以对外提供服务

分区容错性(Partiton Tolerance):分布式架构跨网络的情况下,部分网络不可达是不可避免的,只要做好跨机房容灾,就可以达到分区容错性

最终一致性

前面说到RocketMQ使用的NameServer达到的是最终一致性,这个是怎么做到的呢?从路由注册、路由剔除、路由发现三个角度来说。

  • 路由注册

    像Zookeeper这样的强一致性组件,数据只要写到了主节点,内部就会通过状态机将数据复制到其他节点,但是NameServer节点间无状态且互不通信的,RockerMQ采取的策略是Broker轮询NameServer列表,与每个NameServer节点建立长链接发起注册请求,每30s发送一个心跳包,将自己的最新信息上报给NameServer,包含BrockerId、Brocker名称、Broker地址、Broker所属集群名称等等,然后NameServer收到心跳包之后,会更新所维护的Broker路由表,记录Broker最新存活时间,而且这个Broker路由表的操作引入了ReadWriteLock,允许并发读

  • 路由剔除

    • 正常情况下,Broker关闭时,会与NameSerer断开长连接,此时NameServer会将这个Broker的信息剔除掉
    • 异常情况下,NameServer有一个定时任务,每10s扫一遍Broker表,如果Broker最新的存活时间已经距当前超过120s,则将Broker移除
  • 路由发现

    路由发现是客户端的行为,包括生产者和消费者都需要依赖NamerServer进行路由发现

    对于生产者来说,因为可以发送任意Topic的消息,所以只有在发送消息的时候,才会根据Topic从NameServer获取路由消息

    对于消费者来说,一般是固定Topic,所以启动的时候就会从NameServer获取路由消息

    从路由注册、路由剔除的环节可以看出来,NameServer并不会主动推送路由到客户端,所以需要客户端自己拉取,RocketMQ客户端提供了定时拉取机制,默认是每隔30s会从NameServer拉取一次,也就意味着,在这30s内,客户端无法感知到Broker的宕机,依然会向宕机的Broker发送或者消费消息,那这个问题是怎么解决的呢?

    靠的是客户端重试机制

生产者重试机制

首先我们要知道消息按有序性有3种类型:普通消息、普通有序消息、严格有序消息

  • 普通消息

    
    消息是无序的,发送到任意队列都可以
    
  • 普通有序消息

    
    消息发送的时候会动态选择队列,正常情况下同一类消息指定一个队列,异常情况下发送到其他队列
    
  • 严格有序消息

    
    消息必须发送到同一个队列,一旦发生异常情况,也不允许发送到其他队列
    
    

当发送的是普通消息时,一旦发送失败,会自动重试选择其他Broker下的Queue,而普通有序消息,发送失败后,RocketMQ不会自动进行重试,如果需要重试需要在业务代码上编写重试,但是严格有序消息,因为不会更换MessageQueue,所以重试也不会成功,直到宕机的Broker恢复。

即使是普通消息的自动重试,也存在一个问题,即每条消息都会先失败然后重新选择其他的Broker,必然每次消息发送的耗时都会增加,所以RocketMQ提供了以下两个功能:

  • 失败隔离:如果消息往某个Broker发送失败了,则后续的消息发送都不选择该Broker
  • 超时隔离:如果消息往某个Broker发生了超时,则按照超时时间来制定一定时间范围内,后续的消息不选择该Broker

  1. 检测消息的合法性:

    • topic名称是否包含非法字符
    • topic名称长度是否超过限制(127个字符)
    • message是否为null是否为空消息
    • message消息体是否大于最大长度(4M)
  2. 获取topic详情

    • 首先获取内存中维护的一份Map获取,如果没有的话会从NameServer的接口获取数据,然后更新到map中
    • 获取的topic元数据包含:topic的路由信息,topic下broker相关信息
  3. 选择具体的MessageQueue

    • MessageQueue选择机制:获取一个随机数与MessageQueue数量取模,决定选择哪个queue,再找到这个queue所在的broker
    • 容错机制下的选择逻辑:如果开启了路由发现的Broker失败隔离和Broker超时隔离,则会过滤掉不可用的Broker再进行选择
    • 正常情况下的选择逻辑:每次发送都会将queue的broker与上一次选的进行对比,保证选择和上一次不一样的Broker,避免数据倾斜
  4. 发送流程

    选择到了具体的MessageQueue就会底层调用Netty接口将消息发送出去