分类 RocketMQ 下的文章

因为RocketMQ的消息存储在Broker中的CommitLog中,由索引文件ConsumerQueue来查询消息,ConsumerQueue中用来定位消息的就是CommitLog的Offset,所以消费者或者Broker通过管理Offset来管理消费进度。

消费模式

  • 广播消费

    
    ConsumerGroup中的所有Consumer都可以消费Topic的所有消息,各自的消费进度没有交集,所以offset是consumer本地管理
    
  • 集群消费

    
    ConsumerGroup中的所有Consumer互斥消费Topic的消息,即一个消息只能被一个Consumer消费,ConsumerGroup下所有Consumer共同维护一个消费进度,所以offset是在Broker端远程管理
    
    

Offset的提交方式

在集群消费模式下,Consumer向Broker提交offset的方式有两种:

  • 同步提交

    • Consumer处理完消息后会向Broker提交offset,然后等待Broker的成功响应,在超时时间内获取到响应则进行下一批消息获取,否则重新提交offset,直到获取响应,这个过程中Consumer是阻塞的
  • 异步提交

    • Consumer处理完消息后会向Broker提交offset,不等待Broker响应,直接获取下一批消息,提高了消费者的吞吐量,但是一旦Broker宕机了,offset没有持久化,当Broker重启后就会重复消费

Producer负载均衡

  • 容错机制:开启容错机制后,会进行失败隔离和超时隔离,规避失败的Broker
  • Broker选择:如果随机到了已经发送过的Broker会重新进行选择

Consumer负载均衡

  • Consumer会定时向所有的Broker发送心跳,心跳数据包括消费组名称、订阅关系集合、消息的通信模式、消费者ID等,Broker接收到心跳包后会维护在ConsumerManager中,当Broker监听到Consumer变化之后,会通知Consumer进行ReBalance
  • 在Consumer客户端,也会有一个定时任务,周期性进行Rebalance

Rebalance

Rebalance是一个由消费端平衡queue和consumer的过程,实现原理:

  • 触发时机:

    • 在Consuner启动或者停止的时候,此时消费者数量变化,Consumer会主动发起一次Rebalance
    • 收到Broker的Rebalance通知时
    • 消费者每执行定时任务,每20s触发一次Rebalance
  • 向服务端获取TopicA最新的Queue列表和Consumer列表,根据这些信息重新分配消费关系
  • 分配策略:

    • rocket提供多个消费策略,消费机器不同但是内部代码相同,所以会选择相同的消费策略,保证不同的Consumer节点执行Rebalance后得到相同的分配结果
  • Rebalance之后,消费者会对比消费关系是否变化,有新增的queue则加入pullRequests请求队列中,如果一个queue被移除将被标记drop在后续流程中将会从pullRequests请求队列剔除

Rebalance的缺点

  • 暂停消费

    • 当有新增的consumer时,原来的queue需要分一部分到新的consumer消费,在分配好之前这部分queue的消费会被暂停
  • 重复消费

    • 在切换分配关系的间隙,可能queue的原来consumer还未提交offset就切换到新的consumer了,此时会导致queue的消息被多个Consumer消费了,所以需要在业务上考虑幂等

顺序消息的Rebalance:

因为顺序消息为了严格保证顺序,是不能被多个Consumer消费的,所以顺序消息的Rebalance会直接在queue上加锁,上锁成功后才可以进行消费关系调整

CommitLog

Producer发送过来的消息是存储在CommitLog中,Producer端每次发送过来的消息是不等长的,每写满1G就会新建一个CommitLog文件继续写入。

那Consumer消费的时候,怎么获取到消息呢?直接从CommitLog中查询消息的话是低效的,RocketMQ作为一个高性能、高吞吐的消息中间件肯定不会采用这个方案,而是使用ConsumerQueue。

ConsumerQueue

ConsumerQueue也是文件,当Broker收到一条消息,写入CommitLog的同时,也会一条ConsumerQueue记录,保存这条消息在CommitLog中的offset、消息大小、对应的Tag的hash值。

每个MessageQueue都会对应一个ConsumerQueue文件,每个ConsumerQueue文件可写30W条消息,超过之后再新建ConsumerQueue文件继续写入。

主从复制

Broker中有两种角色:Master、Slave,Master用于处理生产者、消费者的请求以及消息的存储,Slave从Master中同步所有数据,有以下两个作用:

  1. 保证Broker高可用:作为Master的数据副本,当Master宕机后,消费者可以连接Slave继续消费,避免单点故障
  2. 保证Broker高性能:当Master负载高的时候,Master会建议消费者从Slave上拉取消息,分担Master压力

Broker主从同步的方式有两种:

  1. 同步复制:生产者发送消息到Master,然后Master将消息同步发送到Slave才算发送完成
  2. 异步复制:生产者发送消息到Master即完成,再由异步线程异步发送到Slave

Broker主从同步的数据主要是配置数据和消息数据

  1. 检测消息的合法性:

    • topic名称是否包含非法字符
    • topic名称长度是否超过限制(127个字符)
    • message是否为null是否为空消息
    • message消息体是否大于最大长度(4M)
  2. 获取topic详情

    • 首先获取内存中维护的一份Map获取,如果没有的话会从NameServer的接口获取数据,然后更新到map中
    • 获取的topic元数据包含:topic的路由信息,topic下broker相关信息
  3. 选择具体的MessageQueue

    • MessageQueue选择机制:获取一个随机数与MessageQueue数量取模,决定选择哪个queue,再找到这个queue所在的broker
    • 容错机制下的选择逻辑:如果开启了路由发现的Broker失败隔离和Broker超时隔离,则会过滤掉不可用的Broker再进行选择
    • 正常情况下的选择逻辑:每次发送都会将queue的broker与上一次选的进行对比,保证选择和上一次不一样的Broker,避免数据倾斜
  4. 发送流程

    选择到了具体的MessageQueue就会底层调用Netty接口将消息发送出去

RocketMQ支持事务消息来提供分布式事务功能,事务消息分为两个部分:

  1. 事务的发送和提交,即两阶段提交
  2. 事务状态回查

事务消息
其中,事务的发送和提交的步骤如下:

  1. Producer发送一个half消息给MQ Sever,half消息是对消费者不可见的,用来检测MQ Sever是否可用
  2. MQ Sever对该half消息返回一个响应表示half消息接收成功
  3. Producer确认Sever可用之后,发起本地事务执行
  4. 本地事务执行成功或者失败之后,Producer提交一个二次确认消息
  5. MQ Sever收到二次确认消息后,如果是这个消息是commit,将half消息状态修改为Consumer可见,如果消息是rollback,则将half消息删除

而在第4步因为某些异常导致执行失败了,则half消息将无法得到确认,所以RocketMQ还提供了一个回查机制:

  1. MQ Sever在half消息等待一定时间未确认后,会对Producer发起回查请求
  2. 此时Producer再去检查本地事务状态,然后重新提交commit或者rollback二次确认

事务消息回查时,事务可能由3种状态:

  1. 提交事务
  2. 回滚事务
  3. 未知事务:未确定状态,需要再次回查

回查是有限制的,默认只能回查不超过15次,超过15次还不能确认消息状态的则默认当作回滚

事务消息的 缺点:

  1. 不支持延时消息和批量消息
  2. 回查机制可能导致消息堆积