概述

当Java程序的CPU使用率达到100%的情况,可能是代码中存在死循环、频繁的I/O操作、不合理的锁(CAS)等问题导致。
排查问题的思路是先通过Linux命令top查看进程和线程的ID,然后把线程ID转换成16进制,最后通过java的工具jstack把栈信息打印出来,拿着16进制的线程ID找到具体的线程代码这样就能定位到具体导致cpu100%的代码了。

java程序通过docker运行,下面的命令都是基于这种环境,如果是直接部署的原理是一样的,并且下面的都是例子并不是cpu是100%

占用cpu的Java进程

如何通过top命令找到占用cpu最高的线程ID?

  1. 通过top找到java程序的进程ID是24:docker exec -it nacos top
    截屏2023-04-02 20.57.47.png
  2. 通过进程ID(24)找到进程中消耗cpu最高的线程ID(100):docker exec -it nacos top -H -p 24
    截屏2023-04-02 21.04.35.png

现在我们假如的java程序24对应的100号线程导致了cpu过高。

把线程号对应的16进制找到

使用命令printf:docker exec -it nacos printf "0x%x\n" 100 得到结果是0x64(因为jstack中的nid信息是有0x开头的所以我们直接加上了这个0x。

root@debian:~# docker exec -it nacos printf "0x%x\n" 100
0x64

最后通过jstack找到具体的进程所在的代码

注意jstack的时候参数是java主线程的ID,这样可以把所有的线程信息输出出来。

docker exec -it nacos jstack 24 | grep -C 10 0x64
// grep -C 10 是显示0x64所在行的上下10行

截屏2023-04-02 21.17.40.png

建议

Java程序中的每个线程最好都要创建线程名称,这样的话可以不通过jstack直接看到线程名字,这样在代码中可以直接找到对应的代码。

概述

本文将深入RocketMQ,详细描述RocketMQ是如何存储消息的。包括存储的文件格式、0拷贝技术、文件的顺序写、hash槽、索引文件、消费文件、刷盘方式等等。

文章回答以下问题

  • RocketMQ的存储的架构设计和存储模型是什么样的?

    存储架构图、存储文件在磁盘上的组织方式、一条消息的存储过程、消息的存储模型

  • RocketMQ中如何保证消息的高效读写的?

    0拷贝、顺序写、NIO中的直接内存映射、IndexFile索引、异步刷盘

  • RocketMQ中消息存储的刷盘方式?

    同步、异步

存储的架构设计和存储模型

通过存储的架构描述消息的整体存储的过程全貌,这张图可以知道存储中使用了哪些技术以及他们是如何配合使用的。然后针对存储文件在磁盘上的组织方式做一个详细说明,通过这个过程就知道消息的数据文件有哪些,在磁盘上是怎么组织的。最后通过描述一条消息的存储过程每个核心组件以及组件内容格式做一个详细说明,每个文件的详细内容是什么。

存储架构

消息通过生产者发送给broker,broker会存储到CommitLog、IndexFile、ConsumeQueue,然后通过mmap写入内存,最后刷盘到磁盘(注意:异步刷盘就是定期触发的)。

rocketmq存储架构

在磁盘上存储文件的组织方式

CommitLog是消息的存储文件,顺序写,随机读。IndexFile是消息的索引文件,方便快速查询消息。ConsumeQueue是消息的消费信息文件。

rocketmq存储目录结构.png

  • CommitLog:CommitLog用于存储消息的文件。它是由多个固定大小的文件组成,每个文件的大小默认为1G(原因是Java中MappedByteBuffer最大是1.5G)。如果当前文件已经写满,则会新建一个文件来存储新的消息。

  • IndexFile:每个IndexFile的大小在源代码中是这么计算的:

    this.fileTotalSize = IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
    // IndexHeader.INDEX_HEADER_SIZE = 40
    // hashSlotNum = 5000000(500万)
    // hashSlotSize = 4
    // indexNum = 5000000 * 4
    // indexSize = 20
    // 所以默认大小是:(40 + (5000000*4) + (5000000 * 4 * 20))/1024/1024 = 381.4697265625M
    • Index Header: 固定为40字节,包括4字节的magic code("Index"),4字节的版本号,4字节的索引项个数,4字节的第一个索引项的偏移量,4字节的最后一个索引项的偏移量,4字节的最后一个索引项的时间戳,16字节的保留字段。

    • Hash slot: 固定为4字节,表示hash桶的索引值。

    • Index Item: 固定为20字节,包括8字节的消息物理偏移量,4字节的消息长度,4字节的key长度,4字节的tag长度。

  • ConsumQueue:每个topic对应的MessageQueue对应一个文件。默认30W条消息。

一条消息到存储过程

会按照这个顺序写入:CommitLog->IndexFile->(如果有消费)ConsumeQueue->(更新)CommitLog

1、当broker收到消息会先存储到CommitLog。

  • 同步:同步消息会触发刷盘保证写入到MmapFile的同时写入到磁盘,写入磁盘失败就返回失败。如果有从节点,同时也需要写入从节点成功。
  • 异步:异步消息写入到内存MmapFile中后直接返回成功。只需要主节点写入成功就返回成功。如果5次写入失败就通知失败(生产者和broker会保持一个长连接接收通知)。
  • OneWay:和异步的逻辑一样,只是不需要保持长连接等待结果。

2、存入CommitLog后会继续写入IndexFile,如果CommitLog写入成功但是IndeFile写入失败,重启的时候会重建,但是没有重启的时候会导致找不到消息,因为都是通过Mmap写入的所以写入失败的机率可以忽略不计。同步、异步、OneWay刷盘的逻辑和CommitLog一样。

  注意:如果在写入CommitLog成功但写入IndexFile时失败,RocketMQ会尝试将失败的IndexFile写入操作存储在内存中,然后将消息发送到消费者。在后续操作中,RocketMQ会不断尝试将失败的IndexFile写入到磁盘中,直到写入成功。在这个过程中,如果Broker崩溃了,已经写入CommitLog但没有写入IndexFile的消息将无法被消费者消费。RocketMQ在处理写入失败的情况下,会尝试最大限度地保证数据的安全性,但不能保证100%。

3、如果消费者消费了消息,会写入到ConsumeQueue,并且会去CommitLog更新消费状态。

消息的存储模型

CommitLog

CommitLog的每个条目是按照如下格式存储的:

序号 字段名称 字段长度 说明
1 TotalSize 4字节 消息条目总长度
2 MagicCode 4字节 固定值为0xAABBCCDD的消息标识码
3 BodyCRC 4字节 消息体的CRC校验码
4 QueueId 4字节 消息所属的队列ID
5 Flag 4字节 消息的Flag
6 QueueOffset 8字节 消息在队列中的偏移量
7 PhysicalOffset 8字节 消息在CommitLog文件中的物理偏移量
8 SysFlag 4字节 系统标识位
9 BornTimestamp 8字节 消息的生产时间戳
10 BornHost 可变 消息的生产者IP地址
11 StoreTimestamp 8字节 消息的存储时间戳
12 StoreHostAddress 可变 消息存储服务器的IP地址
13 ReconsumeTimes 4字节 消息被重新消费的次数
14 PreparedTransactionOffset 8字节 事务消息的准备事务偏移量
15 Body 可变 消息体
16 Topic 可变 消息主题
17 Properties 可变 消息属性

IndexFile

IndexFile由IndexHeader、Hash Slot、IndexItem组成。

例如查询主题是T中的M消息的物理偏移量:

  1. 计算hash slot:hashCode(T#M)% 500万,计算的槽是1000
  2. 槽1000中存的IndexItem位置是1。(哪个槽应该对应哪个IndexItem?下一个IndexItem位置=IndexHeader+HashSlot+IndexCount)
  3. 把槽1000中所有的IndexItem查询出来就是物理偏移量了
  4. (最后把物理偏移量拿到CommitLog中查询出来对比消息ID或者KEY就能找到唯一的消息)

IndexFile的存储结构.png

保证消息的高效读写

RocketMQ中会通过一个MmapFileQueue队列来维护所有存储文件的MmapFile,也就是CommitLog、IndexFile、ConsumeQueue文件在程序中都是通过MmapFile读写的。MmapFile默认是4K,是通过Java的NIO中的MapedByteBuffer直接把文件映射到内存中,通过MmapFile直接读写文件,并且写入的时候是顺序写入的,所以效率会很高。另外RocketMQ中的IndexFile可以提高消息的查询速度,异步刷盘也能提高消息的吞吐量。

0拷贝

RocketMQ中的0拷贝技术是通过MappedByteBuffer实现的,实际上只是减少了一次从用户空间到内核空间的拷贝。也就是读写的4次拷贝变成了3次拷贝。

具体来说,当生产者发送消息时,首先将消息写入MappedFile,同时生成一条消息索引信息,并将索引信息写入IndexFile。在写入MappedFile的过程中,RocketMQ使用ByteBuffer类来映射MappedFile中的一块内存区域,通过这个ByteBuffer对象来直接操作内存,避免了消息数据的拷贝。当消费者拉取消息时,RocketMQ也使用ByteBuffer来映射MappedFile中的消息数据,避免了数据在内存中的复制。这样做可以有效减少系统开销,提高消息存储和读取的效率,同时也有利于降低系统的延迟。

顺序写和IndexFile

RocketMQ中,消息的存储是采用顺序写的方式,也就是按照消息到达的顺序,将消息写入CommitLog,确保消息按照产生的顺序被存储。具体来说,RocketMQ的顺序写包括以下几个步骤:

  1. 消息被发送到Broker,Broker收到消息后将其放入内存缓存队列中。
  2. 当内存缓存队列的消息数量达到一定阈值或者一定时间间隔到达时,会将内存缓存队列中的消息刷写到磁盘中的CommitLog文件中,这个过程是顺序写的过程。
  3. 在将消息写入CommitLog之前,RocketMQ会对消息进行序列化和压缩处理,并在消息的前面添加一些元数据,如消息长度、消息类型、消息主题等信息,这样可以方便后续的消息读取和解析。
  4. 在消息写入CommitLog之后,会将消息的索引信息写入到IndexFile中,IndexFile中维护了消息索引信息和CommitLog的文件偏移量的映射关系,方便后续的消息查询和检索。

顺序写的好处在于可以避免随机写的开销,降低磁盘的寻道次数,提高磁盘的写入效率,从而实现高吞吐量和低延迟的存储。此外,RocketMQ还通过一些优化措施,如使用内存缓存队列、对消息进行批量处理等方式进一步提升存储性能和效率。

异步刷盘

RocketMQ中的异步刷盘是指在消息写入CommitLog之后,不立即将消息内容同步到磁盘中,而是将消息内容缓存在内存中,定期批量异步刷盘到磁盘中,以提高消息写入的性能和效率。默认情况下会每500ms定时刷盘。同时,为了保证数据不丢失,RocketMQ还会对刷盘进行一些安全机制的保证,在同步刷盘失败时进行重试,默认重试5次。

异步刷盘相较于同步刷盘,可以显著提高消息的写入性能和吞吐量,但也存在一定的风险,因为如果在消息写入后未及时刷盘就出现了机器宕机等异常情况,可能会造成数据的丢失。因此,在实际应用中,需要根据实际需求和数据安全等级选择是否使用异步刷盘。

消息存储的刷盘方式

RocketMQ中有2种刷盘方式:同步、异步。发送同步消息的时候会同步刷盘,异步消息的时候会异步刷盘,当数据大小超过阈值或者定期会触发刷盘。

同步

当用同步方式发送消息的时候,消息写入到MmapFile的时候会马上触发刷盘,如果有主从复制则还需要从节点也刷盘成功,最后才返回消息发送成功,因为过程很多所以同步消息的效率会非常低。

异步

RocketMQ的异步消息有2中触发方式:定期触发、内存缓存达到一定的大小阈值。默认每500ms会触发一次刷盘或者内存缓存达到阈值也会触发刷盘。

异步发送消息的时候,Broker会在消息成功写入CommitLog后,通过长连接通知Producer消息发送成功。具体来说,当Broker收到异步发送消息的请求后,会立即返回发送成功的响应,同时将消息保存到内存的写缓存中。随后,Broker会启动一个单独的线程,将写缓存中的消息刷写到磁盘的commitlog文件中,并更新IndexFile。当消息成功写入CommitLog后,Broker会将写入的消息的物理偏移量和消息的存储时间等信息构成响应对象,通过长连接返回给Producer。Producer可以通过设置回调函数来处理这个响应对象,以便在消息成功写入Broker后进行进一步处理。当然,如果异步刷盘失败,Broker也会通过回调函数将失败的信息通知给Producer,以便Producer进行处理。