薛定谔的风口猪

站在巨人的肩膀上学习,猪都能看得很远

Java GC垃圾收集器这点小事

​ 对于大多数的应用来说,其实默认的JVM设置就够用了,但当你意识到有GC引起的性能问题、并且仅仅加大堆内存空间也解决不了的时候,那你就应该考虑下GC的调优了。但对于大多数程序员来说,这是很麻烦的,因为调优需要很多耐心,并且需要知道垃圾回收器的运作原理及背后对应用的影响。本文是high-level层面的关于Java垃圾回收器的总览,并以例子的形式去讲解性能定位的一些问题。

​ 正文开始。

​ Java提供了很多种垃圾回收器,会在gc运行的线程中搭配着不同的算法。不同的回收器工作原理不一样,优缺点也不同。最重要的是无论哪一种回收器都会”stop the world”。就是说,在收集和回收的过程中,你的应用程序(或多或少)都会处于暂停状态,只不过不同算法的stop the world的表现有所不同。有些算法一直都会闲置不工作直到必须要垃圾收集才开始工作,然后就暂停应用程序很长的时间;而有一些则能和应用程序同步的进行所以在“stop the world”阶段就会有更少的暂停。选择最合适的算法要取决于你的目标:你是想优化整体的吞吐量即便不时地就会长时间暂停也是可以接受的,还是说你是想得到低延迟的优化通过分散各个时间以得到每时每刻都低延迟。

​ 为了增强垃圾回收的过程,Java(准确的说是 HotSpot JVM)把堆分成了两个代,年轻代和年老代(还有一个叫永久代的区域不在我们本文讨论范围)

hotspot-heap

​ 年轻代是一些“年轻”的对象存放的地方,而年轻代还会继续分为以下三个区域:

  1. 伊甸区(Eden Space)
  2. 幸存区1(Survivor Space 1)
  3. 幸存区2(Survivor Space 2)

​ 默认情况下,伊甸区是大于两个幸存者区的总和的。例如在我的Mac OS X上的64位HotSpot JVM上,伊甸区占了大概年轻代76%的区域。所有的对象一开始都会在伊甸区创建,当伊甸区满了之后,就会触发一次次要的垃圾回收(minor gc),期间新对象会快速地被检查是否可以进行垃圾回收。一旦发现那些对象已经死亡(dead),也就是说没有再被其他对象引用了(这里先简单忽略掉引用的具体类型带来的一些差异,不在本文讨论),就会被标记为死亡然后被垃圾回收掉。而其中“幸存”的对象就会被移到其中的一个空的Survivor Space。你可能会问,具体移动到哪一个幸存区呢?要回答这个问题,首先我们先聊一下幸存区的设计。

​ 之所以设计两个幸存区,是为了避免内存碎片。假设只有一个幸存区(然后我们把幸存区想象成一个内存中连续的数组),当年轻代的gc在这个数组上运行了一遍后,会标记一些死亡对象然后删除掉,这样的话势必会在内存中留下一些空洞的区域(原来的对象存活的位置),那么就有必要做压缩了。所以为了避免做压缩,HotSpot JVM就从一个幸存者区复制所有幸存的对象到另外一个(空的)幸存者区里,这样就没有空洞了。这里我们讲到了压缩,顺便提一下年老代的垃圾回收器(除了CMS)在进行年老代垃圾回收的时候都会进行压缩以避免内存碎片。

​ 简单地说,次要的垃圾回收(当伊甸区满的时候)就会把存活的对象从伊甸区和其中一个幸存区(gc日志中以“from”呈现)左右捣腾地搬到另外一个幸存区(又叫“to”)。这样会一直的持续下去直到以下的条件发生:

  1. 对象达到了最大的晋升时间阈值(maximum tenuring threshold),就是说在年轻代被左右捣腾得足够久了,媳妇熬成婆。
  2. 幸存区已经没有空间去接受新生的对象了(后面会讲到)

​ 以上条件发生后,对象就会被移动到年老代了。下面用一个具体的例子来理解下。假设我们有以下的应用程序,它会在初始化的时候创建一些长期存活的对象,也会在运行的过程中不断的创建很多存活时间很短的对象(例如我们的web服务器程序在处理请求的时候会不断分配存活时间很短的对象)

1
2
3
4
5
6
7
8
9
10
11
12
13
private static void createFewLongLivedAndManyShortLivedObjects() {
        HashSet<Double> set = new HashSet<Double>();

        long l = 0;
        for (int i=0; i < 100; i++) {
            Double longLivedDouble = new Double(l++);
            set.add(longLivedDouble);  // 加到集合里,让这些对象能持续的存活
        }

        while(true) { // 不断地创建一些存活时间短的对象(这里在实际代码中比较极端,仅为演示用)
            Double shortLivedDouble = new Double(l++);
        }
}

在运行这个程序的过程中我们启用GC的部分日志参数:

1
2
3
4
5
6
-Xmx100m                     // 分配100MV的堆内存
-XX:-PrintGC                 // 开启GC日志打印
-XX:+PrintHeapAtGC           // 开启GC日志打印堆信息
-XX:MaxTenuringThreshold=15  // 为了让对象能在年轻代呆久一点
-XX:+UseConcMarkSweepGC      // 暂时先忽略这个配置,后面会讲到
-XX:+UseParNewGC             // 暂时先忽略这个配置,后面会讲到

gc 日志会显示垃圾收集前后的情况如下:

1
2
3
4
5
6
7
8
9
10
11
12
Heap <b>before</b> GC invocations=5 (full 0):
 par new (<u>young</u>) generation total 30720K, used 28680K
  eden space 27328K,   <b>100%</b> used
  from space 3392K,   <b>39%</b> used
  to   space 3392K,   0% used
 concurrent mark-sweep (<u>old</u>) generation total 68288K, used <b>0K</b> <br/>
Heap <b>after</b> GC invocations=6 (full 0):
 par new generation (<u>young</u>) total 30720K, used 1751K
  eden space 27328K,   <b>0%</b> used
  from space 3392K,   <b>51%</b> used
  to   space 3392K,   0% used
 concurrent mark-sweep (<u>old</u>) generation total 68288K, used <b>0K</b>

​ 从这个日志里我们能得到以下信息。第一,在这次gc之前,已经发生了5次的minor gc了(所以这次是第6次)。第二,伊甸区占用了100%所以触发了这次的gc。第三,其中一个幸存区域已经使用了39%的空间(还有不少可用空间)。而这次垃圾收集结束后,我们能看到伊甸区就被清空了(0%)然后幸存者区域上升到51%。这意味着伊甸区和其中一个幸存区里存活的对象已经被移动到另外一个幸存区了,然后死亡的对象已经被垃圾回收了。怎么推断的死亡对象被回收了呢?我们看到伊甸区原来是比幸存区要大的(27328K vs 3392K),而后面幸存区的空间大小仅仅是轻微的上升(伊甸区被清空了),所以大量的对象肯定是被垃圾回收了。而我们再看看年老代,年老代是一直都是空的,无论是这次垃圾回收前还是后(回想一下,我们设置了晋升阈值为15)。

​ 下面我们再试另外一个实验。这次用多线程不断的创建存活时间很短的对象。直觉上判断,依旧应该没有对象会上升到年老代才对,因为minor gc就应该可以把这些对象清理干净。我们来看看实际情况如何

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static void createManyShortLivedObjects() {
        final int NUMBER_OF_THREADS = 100;
        final int NUMBER_OF_OBJECTS_EACH_TIME = 1000000;

        for (int i=0; i<NUMBER_OF_THREADS; i++) {
            new Thread(() -> {
                    while(true) {
                        for (int i=0; i<NUMBER_OF_OBJECTS_EACH_TIME; i++) {
                            Double shortLivedDouble = new Double(1.0d);
                        }
                        sleepMillis(1);
                    }
                }
            }).start();
        }
    }
}

这次,我们只给10MB的内存,然后看看GC日志

1
2
3
4
5
6
7
8
9
10
11
12
Heap <b>before</b> GC invocations=0 (full 0):
 par new (<u>young</u>) generation total 3072K, used 2751K
  eden space 2752K,  99% used
  from space 320K,   0% used
  to   space 320K,   0% used
 concurrent mark-sweep (<u>old</u>) generation total 6848K, used <b>0K</b> <br/>
Heap <b>after</b> GC invocations=1 (full 0):
 par new generation  (<u>young</u>)  total 3072K, used 318K
  eden space 2752K,   0% used
  from space 320K,  99% used
  to   space 320K,   0% used
 concurrent mark-sweep (<u>old</u>) generation total 6848K, used <b>76K</b>

​ 从日志上看,并不如我们一开始想的那样。这次,老年代在第一次minor gc之后,接受了一些对象。实际上这些对象都是存活时间很短的对象,并且我们设置了晋升阈值是15次,再而且日志里显示的gc只是第一次垃圾收集。这个现象背后实际上是这样的:应用程序创建了大量的对象在伊甸区,minor gc启动的时候尝试去回收,但是大多数的这些存活时间很短的对象实际上都是active的(被一个运行中的线程引用着)。那么年轻代的垃圾收集器就只好把这些对象移动到年老代了。这其实是一个不好的现象,因为这些被移到到年老代的对象其实是过早衰老了(prematurely aged),它们只有在老年代的major gc才能被回收,而major gc通常会耗时更长。对于某些垃圾算法例如CMS,major gc会在年老代70%内存占据后出发。这个值可以通过参数修改-XX:CMSInitiatingOccupancyFraction=70

​ 怎么样防止这些短暂存活的对象过早衰老呢?有几个方法,其中一个理论上可行的方法是估计这些活跃的短暂存活对象的数量,然后设置合理的年轻代大小。我们下面来试试:

  • 年轻代默认是整个堆大小的1/3,这次我们通过 -XX:NewRatio=1 来修改其大小让他内存更大些(大约3.4MB,原来是3MB)
  • 同时调整幸存者区的大小:-XX:SurvivorRatio=1 (大约1.6MB一个区,原来是0.3MB)

问题就解决了。经过8次的minor gc,年老代依旧是空的

1
2
3
4
5
6
7
8
9
10
11
12
Heap <b>before</b> GC invocations=7 (full 0):
 par new generation   total 3456K, used 2352K
  eden space 1792K,  99% used
  from space 1664K,  33% used
  to   space 1664K,   0% used
 concurrent mark-sweep generation total 5120K, used <b>0K</b> <br/>
Heap <b>after</b> GC invocations=8 (full 0):
 par new generation   total 3456K, used 560K
  eden space 1792K,   0% used
  from space 1664K,  33% used
  to   space 1664K,   0% used [
 concurrent mark-sweep generation total 5120K, used <b>0K</b>

​ 对于GC调优,没有银弹。这里只是简单地示意。对于实际的应用,需要不断的修改配置试错来找到最佳配置。例如,这次其实我们也可以将堆的总大小调大一倍来解决此问题。

垃圾回收算法

​ 接下来我们来看看具体的垃圾回收算法。Hotspot JVM针对年轻代和年老代有多个不同的算法。从宏观层面上看,有三种类型的垃圾回收算法,每一类都有单独的性能特性:

serial collector :使用一条线程进行所有的垃圾回收工作,相对来说也是高效的因为没有线程之间的通信。适用于单处理器的机器。使用-XX:+UseSerialGC.启用

parallel collector (同时也称作吞吐回收器) :使用多线程进行垃圾回收,这样能显著的降低垃圾回收的负荷。设计来适用于这样的应用:拥有中等或大数据集的,运行在多核处理器或多线程的硬件

concurrent collector: 大部分的垃圾回收工作会同步的进行(不阻塞应用的运行)以维持短暂的GC暂停时间。它是设计给中等或大数据集的、响应时间比整体的吞吐量要更重要的应用,因为用这种算法去降低GC的停顿会一定程度降低应用的性能。

gc-compared

​ HotSpot JVM可以让我们选择不同的GC算法去回收年轻代和年老代,但是某些算法是需要配套的使用才兼容的。例如,你不能选择Parallel Scavenge去回收年轻代的同时,使用CMS收集器去回收年老代因为这两个收集器是不兼容的。以下是兼容的收集器的示意图

gc-collectors-pairing

  1. “Serial”是一个stop-the-world,复制算法的垃圾收集器,使用一条GC线程。
  2. “Parallel Scavenge”是一个stop-the-world、采用复制算法的垃圾收集器,但是使用多条GC线程。
  3. ParNew是一个stop-the-world,复制算法的收集器,使用多条GC线程。它和Parallel Scavenge的区别是它做了一些增强以适应搭配CMS使用。例如ParNew会做必要的同步(synchronization )以便它能在CMS的同步阶段运行。
  4. Serial Old 是一个stop-the-world,采用标记-清除-压缩算法的回收器,使用一条GC线程
  5. CMS(Concurrent Mark Sweep)是一个同步能力最强、低延迟的垃圾回收器
  6. Parallel Old是一个压缩算法的垃圾回收器,使用多个GC线程。

​ 对于服务端的应用程序(需要处理客户端请求)来说,使用CMS+ParNew是不错的选择。

我在大概10GB堆内存的程序中使用过也能保持响应时间稳定和短暂的GC暂停时间。我认识的一些开发者使用Parallel collectors (Parallel Scavenge + Parallel Old) ,效果也不错。

​ 其中一件需要注意的事是CMS已经宣布废弃了,会被Oralce推荐使用一个新的同步收集器取代, Garbage-First 简称 G1, 一个最先由Java推出的垃圾收集器

​ G1是一个服务端类型(server-style)的垃圾回收器,针对多处理器、大内存的计算机使用。它能尽可能地满足一个GC延迟时间的目标,同时也有很高的吞吐量

G1 会同时在年轻代和年老代进行工作。它针对大堆有专门的优化(>10GB)。我没有亲身尝试过G1,我团队里的开发者仍然使用的CMS,所以我还不能对两者进行比较。但通过快速的搜索之后,我找到了一个性能对比说CMS会比G1更好(CMS outperforming G1)。我倾向于谨慎,但G1应该是不错的。我们能靠以下参数启动

1
-XX:+UseG1GC

注:以上由本人摘选翻译自https://codeahoy.com/2017/08/06/basics-of-java-garbage-collection/

基于Sharding Sphere实现数据“一键脱敏”

在真实业务场景中,数据库中经常需要存储某些客户的关键性敏感信息如:身份证号、银行卡号、姓名、手机号码等,此类信息按照合规要求,通常需要实现加密存储以满足合规要求。

痛点一:

通常的解决方案是我们书写SQL的时候,把对应的加密字段手动进行加密再进行插入,在查询的时候使用之前再手动进行解密。此方法固然可行,但是使用起来非常不便捷且繁琐,使得日常的业务开发与存储合规的细节紧耦合

痛点二:

对于一些为了快速上线而一开始没有实现合规脱敏的系统,如何比较快速的使得已有业务满足合规要求的同时,尽量减少对原系统的改造。(通常的这个过程至少包括:1.新增脱敏列的存储 2.同时做数据迁移 3.业务的代码做兼容逻辑等)。

Apache ShardingSphere下面存在一个数据脱敏模块,此模块集成的常用的数据脱敏的功能。其基本原理是对用户输入的SQL进行解析拦截,并依靠用户的脱敏配置进行SQL的改写,从而实现对原文字段的加密及加密字段的解密。最终实现对用户无感的加解密存储、查询。

脱敏配置Quick Start——Spring 显示配置:

以下介绍基于Spring如何快速让系统支持脱敏配置。

1.引入依赖

<!-- for spring namespace -->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-spring-namespace</artifactId>
    <version>${sharding-sphere.version}</version>
</dependency>

2.创建脱敏配置规则对象

在创建数据源之前,需要准备一个EncryptRuleConfiguration进行脱敏的配置,以下是一个例子,对于同一个数据源里两张表card_info,pay_order的不同字段进行AES的加密

private EncryptRuleConfiguration getEncryptRuleConfiguration() {
Properties props = new Properties();

//自带aes算法需要
props.setProperty("aes.key.value", aeskey);
EncryptorRuleConfiguration encryptorConfig = new EncryptorRuleConfiguration("AES", props);

//自定义算法
//props.setProperty("qb.finance.aes.key.value", aeskey);
//EncryptorRuleConfiguration encryptorConfig = new EncryptorRuleConfiguration("QB-FINANCE-AES", props);

EncryptRuleConfiguration encryptRuleConfig = new EncryptRuleConfiguration();
encryptRuleConfig.getEncryptors().put("aes", encryptorConfig);

//START: card_info 表的脱敏配置
{
    EncryptColumnRuleConfiguration columnConfig1 = new EncryptColumnRuleConfiguration("", "name", "", "aes");
    EncryptColumnRuleConfiguration columnConfig2 = new EncryptColumnRuleConfiguration("", "id_no", "", "aes");
    EncryptColumnRuleConfiguration columnConfig3 = new EncryptColumnRuleConfiguration("", "finshell_card_no", "", "aes");
    Map<String, EncryptColumnRuleConfiguration> columnConfigMaps = new HashMap<>();
    columnConfigMaps.put("name", columnConfig1);
    columnConfigMaps.put("id_no", columnConfig2);
    columnConfigMaps.put("finshell_card_no", columnConfig3);
    EncryptTableRuleConfiguration tableConfig = new EncryptTableRuleConfiguration(columnConfigMaps);
    encryptRuleConfig.getTables().put("card_info", tableConfig);
}
//END: card_info 表的脱敏配置

//START: pay_order 表的脱敏配置
{
    EncryptColumnRuleConfiguration columnConfig1 = new EncryptColumnRuleConfiguration("", "card_no", "", "aes");
    Map<String, EncryptColumnRuleConfiguration> columnConfigMaps = new HashMap<>();
    columnConfigMaps.put("card_no", columnConfig1);
    EncryptTableRuleConfiguration tableConfig = new EncryptTableRuleConfiguration(columnConfigMaps);
    encryptRuleConfig.getTables().put("pay_order", tableConfig);
}

log.info("脱敏配置构建完成:{} ", encryptRuleConfig);
return encryptRuleConfig;

}

说明:

  1. 创建 EncryptColumnRuleConfiguration 的时候有四个参数,前两个参数分表叫plainColumn、cipherColumn,其意思是数据库存储里面真实的两个列(名文列、脱敏列),对于新的系统,只需要设置脱敏列即可,所以以上示例为plainColumn为”“。
  2. 创建EncryptTableRuleConfiguration 的时候需要传入一个map,这个map存的value即#1中说明的EncryptColumnRuleConfiguration ,而其key则是一个逻辑列,对于新系统,此逻辑列即为真实的脱敏列。Sharding Shpere在拦截到SQL改写的时候,会按照用户的配置,把逻辑列映射为名文列或者脱敏列(默认)如下的示例

shardings sphere basic

3.使用Sharding Sphere的数据源进行管理

把原始的数据源包装一层

1
2
3
4
5
@Bean("tradePlatformDataSource")
public DataSource dataSource(@Qualifier("druidDataSource") DataSource ds) throws SQLException {
    return EncryptDataSourceFactory.createDataSource(ds, getEncryptRuleConfiguration(), new Properties());

}

脱敏配置Quick Start——Spring Boot版:

以下步骤使用Spring Boot管理,可以仅用配置文件解决:

1.引入依赖

<!-- for spring boot -->

<dependency>
<groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
    <version>${sharding-sphere.version}</version>
</dependency>

<!-- for spring namespace -->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-spring-namespace</artifactId>
    <version>${sharding-sphere.version}</version>
</dependency>
  1. Spring 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

spring.shardingsphere.datasource.name=ds
spring.shardingsphere.datasource.ds.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.ds.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds.url=xxxxxxxxxxxxx
spring.shardingsphere.datasource.ds.username=xxxxxxx
spring.shardingsphere.datasource.ds.password=xxxxxxxxxxxx



# 默认的AES加密器
spring.shardingsphere.encrypt.encryptors.encryptor_aes.type=aes
spring.shardingsphere.encrypt.encryptors.encryptor_aes.props.aes.key.value=hkiqAXU6Ur5fixGHaO4Lb2V2ggausYwW

# card_info 姓名 AES加密
spring.shardingsphere.encrypt.tables.card_info.columns.name.cipherColumn=name
spring.shardingsphere.encrypt.tables.card_info.columns.name.encryptor=encryptor_aes

# card_info 身份证 AES加密
spring.shardingsphere.encrypt.tables.card_info.columns.id_no.cipherColumn=id_no
spring.shardingsphere.encrypt.tables.card_info.columns.id_no.encryptor=encryptor_aes

# card_info 银行卡号 AES加密
spring.shardingsphere.encrypt.tables.card_info.columns.finshell_card_no.cipherColumn=finshell_card_no
spring.shardingsphere.encrypt.tables.card_info.columns.finshell_card_no.encryptor=encryptor_aes

# pay_order 银行卡号 AES加密
spring.shardingsphere.encrypt.tables.pay_order.columns.card_no.cipherColumn=card_no
spring.shardingsphere.encrypt.tables.pay_order.columns.card_no.encryptor=encryptor_aes

SpringBoot+Dubbo优雅退出分析及方案

背景:

当我们使用SpringBoot+D做微服务的时候,可能再服务停机的过程,发现在一瞬间出现一些报错,最典型的如比如拿到的数据库连接已经关闭等问题,如下图所示:

img

从日志错误可以看到,停机时还存在正在处理的请求,而此请求需要访问数据源,但数据源的资源被 Spring 容器关闭了,导致获取不到而报错。

但是实际上,无论Dubbo和Spring其实都实现了优雅退出,为什么最后退出还是不那么优雅呢?

要分析这个问题,首先得分析它们两者的优雅退出实现。

Dubbo优雅退出

dubbo框架本身基于ShutdownHook注册了一个优雅退出的钩子,背后会调用其destroyAll来实现自身的优雅关闭。

以下是Dubbo 2.6.2的源码:

img

img

Dubbo发现程序退出的时候,钩子方法会通知注册中心取消自身的注册——以便告知消费者不要调用自己了,然后关闭自身的端口连接——在关闭自身连接的时候还会sleep自旋的方法等待已有的处理请求先完成)

img

但是,Dubbo服务的优雅退出,不代表服务背后的代码是优雅的,也就是说在Dubbo优雅退出的完成前,我们的服务能否能保证可用——背后的资源/服务是否仍然可用。

本文一开始截图的错误,原因就是服务停机的时候,依赖的数据库资源因为某些原因已经回收了,这时候正在处理的请求自然报错而显得不优雅了。

而回收的人并不是别人,就是Spring的优雅退出。

Spring的优雅退出

Spring回收资源也是基于ShutdownHook实现的,Spring在启动的时候会调用refreshContext接口,这个接口默认会帮我们注册优雅退出的钩子方法。

img

img

这个钩子方法最后会销毁Spring容器,其中自然包括其背后的依赖的资源。

因为大部分情况下,我们的Dubbo服务是依赖于Spring的资源的,要真正实现优雅退出,除了双方本身退出的过程是优雅的,还需要保证Dubbo退出的过程中Spring的资源是可用的——也就是退出应该要是有顺序的:Dubbo退出→Spring退出。

但是Java的ShutdownHook背后的退出是并发执行而没有顺序依赖的,这是背后表现不优雅的原因。以下是JDK文档的描述:

img

正是由于本身应该有顺序关系的退出逻辑,在并行的处理,导致部分的流量正在处理过程中,依赖的资源已经释放了,最终导致退出的不优雅。

要解决这个问题,可简单可行的思路是:给Dubbo退出一定的时间去处理,然后再执行Spring容器的关闭。但由于钩子方法的时机并不能程序员控制,那么怎么样才能做到呢——禁用原生Spring的钩子方法,在合适的时机手动销毁Spring容器。

优雅退出方案(简版)——给予固定睡眠时间后才关闭Spring容器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SpringApplication application = new SpringApplication(Main.class);
application.setRegisterShutdownHook(false);//关闭spring的shutdown hook,后续手动触发
final ConfigurableApplicationContext context = application.run(args);
Runtime.getRuntime().addShutdownHook(new Thread("T_SHUTDOWN_HOOK") {
    public void run() {
        log.info("”====================shutdown App====================“。");
        //....这里可以做其他优雅退出处理,例如回收本地线程池、关闭定时调度器等的操作

        try {
            Thread.sleep(2000);//等待一段时间,这里给时间dubbo的shutdownhook执行,
        } catch (InterruptedException e) {
            log.error("",e);
        }

        //关闭spring容器
        context.close();
    }
});

优雅退出方案(升级版)——动态地等待消费者及生产者连接关闭后才关闭Spring容器:

上面的方案正常情况下也够用,因为大部分时间我们只需要估算一个退出时间,让dubbo处理销毁的工作即可,但是对于一些退出时间相对变化较大(如有动态的消费者),表现出来的结果就是dubbo的退出时间有时候较短,有时候缺比较长。如果直接给一个较大的睡眠时间,可能使得每次程序退出都等很久,就显得不太优雅了。

那么我们就可以使用一些底层的dubbo api去确认消费者和生产者的连接已经关闭,以下是一个方法用以取代上面代码片段中的sleep的语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 * 等待Dubbo退出,优雅退出的shutdown hook可使用
 * @param sleepMillis 每次发现Dubbo没退出完就睡眠等待的毫秒数
 * @param sleepMaxTimes 最多睡眠的次数,避免一直dubbo退出太久卡住程序的退出,达到此次数后会不再等待
 */
public static void waitDubboShutdown(long sleepMillis, int sleepMaxTimes) {
    for (int sleepWaitTimes=0; sleepWaitTimes <sleepMaxTimes; sleepWaitTimes++){//如果dubbo的server没有关闭完成,会睡眠等待,最多等待三次
        Collection existingDubboServers = DubboProtocol.getDubboProtocol().getServers();
        Collection existingDubboExporters  = DubboProtocol.getDubboProtocol().getExporters();
        log.info("existing dubbo servers : {}, existing dubbo expoerters {} ,  sleepWaitTimes : {}", existingDubboServers, existingDubboExporters, sleepWaitTimes);
        if (!existingDubboServers.isEmpty() || !existingDubboExporters.isEmpty()) {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            break;
        }
    }

    //优雅退出失败,打印日志
    Collection existingDubboServers = DubboProtocol.getDubboProtocol().getServers();
    if (!existingDubboServers.isEmpty()) {
        log.warn("DUBBO服务Server依然存在,不再等待其销毁,可能会导致优雅退出失败 {}",existingDubboServers);
    }

    Collection existingDubboExporters  = DubboProtocol.getDubboProtocol().getExporters();
    if (!existingDubboExporters.isEmpty()) {
        log.warn("DUBBO服务Exporters依然存在,不再等待其销毁,可能会导致优雅退出失败 {}",existingDubboExporters);
    }
}

注:这个方法用到了DubboProtocol的底层API,所以如果你的协议不是使用”dubbo”而是如HTTP协议、redis协议,则此方法不可用。关于协议的部分,可以参考官方文档:http://dubbo.apache.org/zh-cn/docs/user/references/protocol/introduction.html

那么最后,升级版的优雅退出代码则如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SpringApplication application = new SpringApplication(Main.class);
application.setRegisterShutdownHook(false);//关闭spring的shutdown hook,后续手动触发
final ConfigurableApplicationContext context = application.run(args);
Runtime.getRuntime().addShutdownHook(new Thread("T_SHUTDOWN_HOOK") {
    public void run() {
        log.info("”====================shutdown App====================“。");
        //....这里可以做其他优雅退出处理,例如回收本地线程池、关闭定时调度器等的操作

        waitDubboShutdown(1000,5);//每次等1000ms,最多等5次;优雅退出时间是动态的(可能1秒就能优雅退出完毕);但如果退出时间大于5秒,那么则放弃优雅退出,直接退出。

        //关闭spring容器
        context.close();
    }
});

监控Spring Boot中的Tomcat性能数据

现在,我们经常会使用Spring Boot以开发Web服务,其内嵌容器的方法的确使得开发效率大大提升。

由于网关层通常是直接面对用户请求的一层,也是微服务里面最上游的一个服务,其请求量通常是所有服务中最大的,如果服务出现了性能上的问题,网关层通常都会出现阻塞、超时等现象,这时候就很可能需要性能的调优,其中最常见的则是参数调优。但如何知道哪些性能参数成为了瓶颈(如容器线程数是否不足等),则是调优的前提条件。

本文总结介绍如何在使用了Spring Boot的前提下,获取运行时的Tomcat性能运行情况。

Spring Boot中有一个Spring Boot actuator的模块,用来监控和管理应用的运行状态,例如健康状况,线程运行情况等。

Maven 依赖:

1
2
3
4
5
6
<dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
</dependencies>

然后当Spring Boot运行之后,Spring Boot会有很多服务暴露在http服务中,这些服务叫EndPoints, 通过 http://{应用路径}/actuator 这个 url 即可访问,例如 http://{应用路径}/actuator/info, http://{应用路径}/actuator/health 这两个endpoints是默认开启的。

其中actuator这个路径可以通过配置修改:

1
management.endpoints.web.base-path=/mypath

以下是获取健康状态的一个例子:

1
$ curl 'http://localhost:8080/actuator/health' -i -X GET

可能会得到类似这样的结果:

1
2
3
{
    "status" : "UP"
}

比较简陋,如果希望这个接口有更多数据,可以尝试这样的配置:

1
management.endpoint.health.show-details=always

结果就会丰富了(我的应用用了Redis):类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
  "status": "UP",
  "details": {
      "diskSpace": {
          "status": "UP",
          "details": {
              "total": 214745214976,
              "free": 174805827584,
              "threshold": 10485760
          }
      },
      "redis": {
          "status": "UP",
          "details": {
              "cluster_size": 3,
              "slots_up": 16384,
              "slots_fail": 0
          }
      }
  }
}

但是这还不够,我们需要详细的容器数据。监控状况只是一部分。而这些我们想要的数据,是在一个叫metric的EndPoint下面。 但是此endpoint 默认没有暴露到http接口的的,需要添加配置:

1
2
#默认只开启info, health 的http暴露,在此增加metric endpoint
management.endpoints.web.exposure.include=info, health,metric

之后我们就能访问这个metric有哪些数据了

$ curl ‘http://localhost:8080/actuator/metric’ -i -X GET

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
{
    "names": [
        "jvm.memory.max",
        "jvm.threads.states",
        "process.files.max",
        "jvm.gc.memory.promoted",
        "tomcat.cache.hit",
        "tomcat.servlet.error",
        "system.load.average.1m",
        "tomcat.cache.access",
        "jvm.memory.used",
        "jvm.gc.max.data.size",
        "jvm.gc.pause",
        "jvm.memory.committed",
        "system.cpu.count",
        "logback.events",
        "tomcat.global.sent",
        "jvm.buffer.memory.used",
        "tomcat.sessions.created",
        "jvm.threads.daemon",
        "system.cpu.usage",
        "jvm.gc.memory.allocated",
        "tomcat.global.request.max",
        "tomcat.global.request",
        "tomcat.sessions.expired",
        "jvm.threads.live",
        "jvm.threads.peak",
        "tomcat.global.received",
        "process.uptime",
        "http.client.requests",
        "tomcat.sessions.rejected",
        "process.cpu.usage",
        "tomcat.threads.config.max",
        "jvm.classes.loaded",
        "http.server.requests",
        "jvm.classes.unloaded",
        "tomcat.global.error",
        "tomcat.sessions.active.current",
        "tomcat.sessions.alive.max",
        "jvm.gc.live.data.size",
        "tomcat.servlet.request.max",
        "tomcat.threads.current",
        "tomcat.servlet.request",
        "process.files.open",
        "jvm.buffer.count",
        "jvm.buffer.total.capacity",
        "tomcat.sessions.active.max",
        "tomcat.threads.busy",
        "process.start.time"
    ]
}

其中列出的是所有可以获取的监控数据,在其中我们发现了我们想要的

1
2
3
"tomcat.threads.config.max"
"tomcat.threads.current"
"tomcat.threads.busy"

那么如何获取其中的值呢?只需要在metric路径下加上希望获取的指标即可: curl ‘http://localhost:8080/actuator/metric/tomcat.threads.busy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
  "name": "tomcat.threads.busy",
  "description": null,
  "baseUnit": "threads",
  "measurements": [{
      "statistic": "VALUE",
      "value": 1.0
  }],
  "availableTags": [{
      "tag": "name",
      "values": ["http-nio-10610"]
  }]
}


在此,基本我们想要的数据都能实时的通过http服务接口的方式获取了,那么在流量峰值的时候,一些实时的状态便可获取到了。

监控数据

但是我们面对的情况是这样的,半个小时前,一个push活动带来了很大的量,但现在流量已经过去了,需要定位当时的性能问题意味着需要采集到过去的数据。所以我们可能需要一个服务定期去监控这些数据。Spring Boot已经考虑到了这种情况,所以其中有一个prometheus的模块,他是一个独立的服务去采集其中的监控数据并可视化,具体的介绍可以参考:https://www.callicoder.com/spring-boot-actuator-metrics-monitoring-dashboard-prometheus-grafana/

以日志形式定期输出监控数据

很多时候,如果有日志的方法去定期输出监控的数据这样已经足够我们分析了。在Spring Boot 2.x里,只需要配置一个Bean

1
2
3
4
5
6
7
@Configuration
class MetricsConfig {
    @Bean
    LoggingMeterRegistry loggingMeterRegistry() {
        return new LoggingMeterRegistry();
    }
}

之所以需要Spring Boot版本2.x,LoggingMeterRegistry是因为是micrometer-core里面的1.10以上才引入的,而Spring Boot 1.x都低于这个版本,如果不想升级Spring Boot版本,可以尝试显示变更此版本:

1
2
3
4
5
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-core</artifactId>
    <version>1.1.3</version>
</dependency>

最后日志的内容就会每一分钟的打印出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
jvm.buffer.count{id=direct} value=26 buffers
jvm.buffer.count{id=mapped} value=0 buffers
jvm.buffer.memory.used{id=direct} value=632.600586 KiB
jvm.buffer.memory.used{id=mapped} value=0 B
jvm.buffer.total.capacity{id=direct} value=632.599609 KiB
jvm.buffer.total.capacity{id=mapped} value=0 B
jvm.classes.loaded{} value=12306 classes
jvm.gc.live.data.size{} value=39.339607 MiB
jvm.gc.max.data.size{} value=2.666992 GiB
jvm.memory.committed{area=nonheap,id=Compressed Class Space} value=8.539062 MiB
jvm.memory.committed{area=nonheap,id=Code Cache} value=26.8125 MiB
jvm.memory.committed{area=heap,id=PS Survivor Space} value=30 MiB
jvm.memory.committed{area=heap,id=PS Eden Space} value=416.5 MiB
jvm.memory.committed{area=heap,id=PS Old Gen} value=242 MiB
jvm.memory.committed{area=nonheap,id=Metaspace} value=66.773438 MiB
jvm.memory.max{area=heap,id=PS Survivor Space} value=30 MiB
jvm.memory.max{area=heap,id=PS Eden Space} value=1.272949 GiB
jvm.memory.max{area=heap,id=PS Old Gen} value=2.666992 GiB
jvm.memory.max{area=nonheap,id=Metaspace} value=-1 B
jvm.memory.max{area=nonheap,id=Compressed Class Space} value=1 GiB
jvm.memory.max{area=nonheap,id=Code Cache} value=240 MiB
jvm.memory.used{area=nonheap,id=Code Cache} value=26.635071 MiB
jvm.memory.used{area=heap,id=PS Survivor Space} value=25.214882 MiB
jvm.memory.used{area=heap,id=PS Eden Space} value=46.910545 MiB
jvm.memory.used{area=heap,id=PS Old Gen} value=39.34742 MiB
jvm.memory.used{area=nonheap,id=Metaspace} value=63.333778 MiB
jvm.memory.used{area=nonheap,id=Compressed Class Space} value=7.947166 MiB
jvm.threads.daemon{} value=52 threads
jvm.threads.live{} value=54 threads
jvm.threads.peak{} value=67 threads
jvm.threads.states{state=terminated} value=0 threads
jvm.threads.states{state=blocked} value=0 threads
jvm.threads.states{state=new} value=0 threads
jvm.threads.states{state=runnable} value=20 threads
jvm.threads.states{state=waiting} value=19 threads
jvm.threads.states{state=timed-waiting} value=15 threads
process.cpu.usage{} value=-1
process.start.time{} value=435900h 48m 53.344s
process.uptime{} value=56m 6.709s
system.cpu.count{} value=8
system.cpu.usage{} value=-1
tomcat.global.request.max{name=http-nio-10610} value=0.597s
tomcat.servlet.request.max{name=dispatcherServlet} value=0.567s
tomcat.sessions.active.current{} value=0 sessions
tomcat.sessions.active.max{} value=0 sessions
tomcat.threads.busy{name=http-nio-10610} value=0 threads
tomcat.threads.config.max{name=http-nio-10610} value=200 threads
tomcat.threads.current{name=http-nio-10610} value=10 threads

如果需要修改打印的频率,可修改LoggingRegistryConfig以更改其打印频率

1
2
3
4
5
6
7
8
9
10
11
12
13
  //下面是单独的配置实现的参考,当需要修改配置时候可以使用
  return new LoggingMeterRegistry(new LoggingRegistryConfig() {
       @Override
     public Duration step() {
         return Duration.ofSeconds(10);//10秒输出一次
       }

       @Override
       public String get(String key) {
            return null;
       }
   }, Clock.SYSTEM);
}

RabbitMQ实现延迟队列

RabbitMQ本身没有延迟队列的支持,但是基于其本身的一些特性,可以做到类似延迟队列的效果:基于死信交换器+TTL。

以下介绍下相关概念及方法

Dead Letter Exchanges

消息在队列满足达到一定的条件,会被认为是死信消息(dead-lettered),这时候,RabbitMQ会重新把这类消息发到另外一个的exchange,这个exchange称为Dead Letter Exchanges.

以下任一条件满足,即可认为是死信:

  • 消息被拒绝消费(basic.reject or basic.nack)并且设置了requeue=fasle
  • 消息的TTL到了(消息过期)
  • 达到了队列的长度限制

需要注意的是,Dead letter exchanges (DLXs) 其实就是普通的exchange,可以和正常的exchange一样的声明或者使用。

死信消息路由

队列中可以设置两个属性:

  • x-dead-letter-exchange
  • x-dead-letter-routing-key

当这个队列里面的消息成为死信之后,就会投递到x-dead-letter-exchange指定的exchange中,其中带着的routing key就是中指定的值x-dead-letter-routing-key。

而如果使用默认的exchange(routing key就是希望指定的队列),则只需要把x-dead-letter-exchange设置为空(不能不设置),类似下面

rabbitmq 延迟队列的配置

死信消息的路由则会根据x-dead-letter-routing-key所指定的进行路由,如果这个值没有指定,则会按照消息一开始发送的时候指定的routing key进行路由

Dead-lettered messages are routed to their dead letter exchange either:

with the routing key specified for the queue they were on; or, if this was not set, with the same routing keys they were originally published with.

例如,如果一开始你对exchange X发送消息,带着routing key “foo”,进入了队列 Q然后消息变死信后,他会被重新发送到 dead letter exchange ,其中发给dead letter exchange带着的routing key 还是foo。 但如果这个队列Q本身是设置了x-dead-letter-routing-key bar, 那么他发送到 dead letter exchange的时候,带着的routing key 就是bar。

需要注意的是,当死信消息重新路由到新的队列的时候,在死信目标队列确认收到这条死信消息之前,原来队列的消息是不会删除的,也就是说在某些异常场景下例如broker突然shutdown,是有机会存在说一个消息既存在于原队列,又存在于死信目标队列。具体可参考官方说明:

Dead-lettered messages are re-published with publisher confirms turned on internally so, the “dead-letter queues” (DLX routing targets) the messages eventually land on must confirm the messages before they are removed from the original queue. In other words, the “publishing” (the one in which messages expired) queue will not remove messages before the dead-letter queues acknowledge receiving them (see Confirms for details on the guarantees made). Note that, in the event of an unclean broker shutdown, the same message may be duplicated on both the original queue and on the dead-lettering destination queues.

Time-To-Live(TTL)

开头我们说过,实现延迟队列除了用死信消息外,还需要利用消息过期的TTL机制,因为只要消息过期了,就会触发死信。

RabbitMQ有两种方法让设置消息的TTL:

直接在消息上设置

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

为队列设置消息过期TTL

rabbitmq x-message-ttl

注意,队列还有一个队列TTL,x-expires,这个的意思是队列空置经过一段时间(没有消费者,没有被重新声明,没有人在上面获取消息(basic.get))后,整个队列便会过期删除,不要混淆

如果同时设置了消息的过期和队列消息过期属性,则取两个较小值。

设计延迟队列:

例如,我们需要触发一个推送新闻,30分钟后统计这个新闻的下发情况,我们就需要一个延迟队列,新闻推送后,往延迟队列发送一个消息,这个队列的消息在30分钟后被消费,这时候触发即可统计30分钟的下发情况。我们可以这样设计:

定义一个正常的队列: ARRIVAL_STAT,统计程序监听此队列,进行消费。

定义一个“延迟队列”(RabbitMQ没有这样的队列,这里只是人为的制造一个这样的队列):DELAY_ARRIVAL_STAT,其中设置好对应的x-dead-letter-exchange,x-dead-letter-routing-key。为了简单说明,我使用默认的exchange,那么配置如下:

x-dead-letter-exchange=“”
x-dead-letter-routing-key=“ARRIVAL_STAT”

意思是,消息当这个队列DELAY_ARRIVAL_STAT的消息变死信之后,就会带着routing key “ARRIVAL_STAT”发送默认的空exchange,即队列ARRIVAL_STAT。

并且这个队列不能有消费者消费消息。

这样我们就实现了消息的死信转发。下一步,只需要让消息在这个DELAY_ARRIVAL_STAT在30分钟后过期变死信即可。按照上文所说,有两种方法,我们可以为队列的消息设置30分钟TTL,或者发送消息的时候指定消息的TTL为30分钟即可。

示例如下:

rabbitmq 延迟队列示意

“延迟队列”的堵塞缺陷

由于设置了x-dead-letter-exchange的队列本身也是普通队列,其过期的顺序是按照队列头部顺序的过期的。也就是说,如果你队列头的消息A过期时间是5分钟,后面对这个队列发送消息B的带着过期时间1分钟,那么后面的队列B要等队列A过期了才会触发过期:

Queues that had a per-message TTL applied to them retroactively (when they already had messages) will discard the messages when specific events occur. Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).

所以,对于此类多延迟时间的,可以考虑设置多级延迟队列。例如1分钟,5分钟,10分钟,20分钟这样多级的延迟队列,使得延迟相近的尽量放到同一个队列中减少拥堵的最坏情况。

rabbitmq 多级延迟队列

RabbitMQ常用命令与配置

以下记录RabbitMQ常用的运维命令和配置


常用命令

启动进程:

sbin/rabbitmq-server -detached

关闭进程:

sbin/rabbitmqctl stop

创建账号:

sbin/rabbitmqctl add_user admin ${mq_password}
sbin/rabbitmqctl set_user_tags admin administrator
sbin/rabbitmqctl set_permissions -p '/' admin '.*' '.*' '.*'

启动监控:

#启动监控后,可以用http访问控制台 ip:监控端口(默认原端口+10000)
sbin/rabbitmq-plugins enable rabbitmq_management

加入集群:

#要先停止应用
sbin/rabbitmqctl stop_app
#加入集群,cluster_name为之前启动的那个集群名称,通常为环境变量文件中配置的RABBITMQ_NODE_IP_ADDRESS
sbin/rabbitmqctl join_cluster ${cluster_name}
#再次启动应用    
sbin/rabbitmqctl start_app

命令文档:https://www.rabbitmq.com/rabbitmqctl.8.html


配置文件

rabbitmq-env.conf

RABBITMQ_NODE_IP_ADDRESS= //IP地址,空串bind所有地址,指定地址bind指定网络接口
RABBITMQ_NODE_PORT=       //TCP端口号,默认是5672
RABBITMQ_NODENAME=        //节点名称。默认是rabbit
RABBITMQ_CONFIG_FILE= //配置文件路径 ,即rabbitmq.config文件路径
RABBITMQ_MNESIA_BASE=     //mnesia所在路径
RABBITMQ_LOG_BASE=        //日志所在路径
RABBITMQ_PLUGINS_DIR=     //插件所在路径

rabbitmq.config

tcp_listerners    #设置rabbimq的监听端口,默认为[5672]。
disk_free_limit     #磁盘低水位线,若磁盘容量低于指定值则停止接收数据,默认值为{mem_relative, 1.0},即与内存相关联1:1,也可定制为多少byte.
vm_memory_high_watermark    #设置内存低水位线,若低于该水位线,则开启流控机制,默认值是0.4,即内存总量的40%。
hipe_compile     #将部分rabbimq代码用High Performance Erlang compiler编译,可提升性能,该参数是实验性,若出现erlang vm segfaults,应关掉。
force_fine_statistics    #该参数属于rabbimq_management,若为true则进行精细化的统计,但会影响性能。
frame_max     #包大小,若包小则低延迟,若包则高吞吐,默认是131072=128K。
heartbeat     #客户端与服务端心跳间隔,设置为0则关闭心跳,默认是600秒。

一台机器启动多个实例

以上如果希望一个机器中启动多个实例,简单需要配置的地方仅有

rabbitmq-env.conf:

#改个名字
RABBITMQ_NODENAME=your_new_node_name
#改个端口    
RABBITMQ_NODE_PORT=5673

rabbitmq.config:

%tcp 监听端口对应修改%
{tcp_listeners, [5673]},

rabbitmq_management下面的监听端口对应修改,建议原端口加10000保持与原来默认的统一

{listener, [{port,     15673}]}

文档:http://www.rabbitmq.com/configure.html#configuration-file

RocketMQ 客户端配置

RocketMQ的客户端和服务端采取完全不一样的配置机制,客户端没有配置文件,所有的配置选项需要开发者使用对应的配置的setter进行设置。

注: 以下带 * 的,表示为重要参数。


ClientConfig

RocketMQ的Producer(DefaultMQProducer)和Consumer(DefaultMQPushConsumerDefaultMQPullConsumer),甚至运维相关的的admin类(DefaultMQAdminExt)都继承自ClientConfig。这意味着,其中的配置无论Producer还是Consumer都可以进行设置,其中大部分都是公用的配置(但由于设计的问题,有些配置只会对消费或生产生效)。

namesrvAddr*

配置说明 默认值
NameServer的地址列表,若是集群,用;作为地址的分隔符。 -D系统参数rocketmq.namesrv.addr或环境变量NAMESRV_ADDR

无论生产者还是消费者,只要是客户端需要和服务器broker进行操作,就需要依赖Name Server进行服务发现。具体请看:RocketMQ——组件

instanceName*

配置说明 默认值
NameServer的地址列表,若是集群,用;作为地址的分隔符。 从-D系统参数rocketmq.client.name获取,否则就是DEFAULT

这个值虽然默认写是DEFAULT,但在启动的时候,如果我们没有显示修改还是维持其DEFAULT的话,RocketMQ会更新为当前的进程号:

public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        this.instanceName = String.valueOf(UtilAll.getPid());
    }
}

RocketMQ用一个叫ClientID的概念,来唯一标记一个客户端实例,一个客户端实例对于Broker而言会开辟一个Netty的客户端实例。 而ClientID是由ClientIP+InstanceName构成,故如果一个进程中多个实例(无论Producer还是Consumer)ClientIP和InstanceName都一样,他们将公用一个内部实例(同一套网络连接,线程资源等)

此外,此ClientID在对于Consumer负载均衡的时候起到唯一标识的作用,一旦多个实例(无论不同进程、不通机器、还是同一进程)的多个Consumer实例有一样的ClientID,负载均衡的时候必然RocketMQ任然会把两个实例当作一个client(因为同样一个clientID)。 故为了避免不必要的问题,ClientIP+instance Name的组合建议唯一,除非有意需要共用连接、资源。

clientIP

配置说明 默认值
客户端IP RemotingUtil.getLocalAddress()

这个值有两个用处: 1. 对于默认的instanceName(后面说明),如果没有显示设置,会使用ip+进程号,其中的ip便是这里的配置值 2. 对于Producer发送消息的时候,消息本身会存储本值到bornHost,用于标记消息从哪台机器产生的

clientCallbackExecutorThreads

配置说明 默认值
客户端通信层接收到网络请求的时候,处理器的核数 Runtime.getRuntime().availableProcessors()

虽然大部分指令的发起方是客户端而处理方是broker/NameServer端,但客户端有时候也需要处理远端对发送给自己的命令,最常见的是一些运维指令如GET_CONSUMER_RUNNING_INFO,或者消费实例上线/下线的推送指令NOTIFY_CONSUMER_IDS_CHANGED,这些指令的处理都在一个线程池处理,clientCallbackExecutorThreads控制这个线程池的核数。

pollNameServerInterval*

配置说明 默认值
轮询从NameServer获取路由信息的时间间隔 30000,单位毫秒

客户端依靠NameServer做服务发现(具体请看:RocketMQ——组件),这个间隔决定了新服务上线/下线,客户端最长多久能探测得到。默认是30秒,就是说如果做broker扩容,最长需要30秒客户端才能感知得到新broker的存在。

heartbeatBrokerInterval*

配置说明 默认值
定期发送注册心跳到broker的间隔 30000,单位毫秒

客户端依靠心跳告诉broker“我是谁(clientID,ConsumerGroup/ProducerGroup)”,“自己是订阅了什么topic”,”要发送什么topic”。以此,broker会记录并维护这些信息。客户端如果动态更新这些信息,最长则需要这个心跳周期才能告诉broker。

persistConsumerOffsetInterval*

配置说明 默认值
作用于Consumer,持久化消费进度的间隔 5000,单位毫秒

RocketMQ采取的是定期批量ack的机制以持久化消费进度。也就是说每次消费消息结束后,并不会立刻ack,而是定期的集中的更新进度。 由于持久化不是立刻持久化的,所以如果消费实例突然退出(如断点)、或者触发了负载均衡分consue queue重排,有可能会有已经消费过的消费进度没有及时更新而导致重新投递。故本配置值越小,重复的概率越低,但同时也会增加网络通信的负担。

vipChannelEnabled

配置说明 默认值
是否启用vip netty通道以发送消息 -D com.rocketmq.sendMessageWithVIPChannel参数的值,若无则是true

broker的netty server会起两个通信服务。两个服务除了服务的端口号不一样,其他都一样。其中一个的端口(配置端口-2)作为vip通道,客户端可以启用本设置项把发送消息此vip通道。


DefaultMQProducer

所有的消息发送都通过DefaultMQProducer作为入口,以下介绍一下单独属于DefaultMQProducer的一些配置项。

producerGroup*

配置说明 默认值
生产组的名称,一类Producer的标识 DEFAULT_PRODUCER

详见 RocketMQ——角色与术语详解

createTopicKey

配置说明 默认值
发送消息的时候,如果没有找到topic,若想自动创建该topic,需要一个key topic,这个值即是key topic的值 TBW102

这是RocketMQ设计非常晦涩的一个概念,整体的逻辑是这样的:

  • 生产者正常的发送消息,都是需要topic预先创建好的
  • 但是RocketMQ服务端是支持,发送消息的时候,如果topic不存在,在发送的同时自动创建该topic
  • 支持的前提是broker 的配置打开autoCreateTopicEnable=true
  • autoCreateTopicEnable=true后,broker会创建一个TBW102的topic,这个就是我们讲的默认的key topic

自动构建topic(以下成为T)的过程:

  1. Producer发送的时候如果发现该T不存在,就会配置有Producer配置的key topic的那个broker发送消息
  2. broker校验客户端的topic key是否在broker存在,且校验其权限最后一位是否是1(topic权限总共有3位,按位存储,分别是读、写、支持自动创建)
  3. 若权限校验通过,先在该broker把T创建,并且权限就是key topic除去最后一位的权限。

为了方便理解,以下贴出broker的具体源码并加入部分注释:

                TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);//key topic的配置信息
                if (defaultTopicConfig != null) {//key topic 存在
                    if (defaultTopic.equals(MixAll.DEFAULT_TOPIC)) {
                        if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                            defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
                        }
                    }

                    if (PermName.isInherited(defaultTopicConfig.getPerm())) {//检验权限,如果允许自动创建
                        topicConfig = new TopicConfig(topic);//创建topic

                        int queueNums =
                            clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
                                .getWriteQueueNums() : clientDefaultTopicQueueNums;

                        if (queueNums < 0) {
                            queueNums = 0;
                        }

                        topicConfig.setReadQueueNums(queueNums);
                        topicConfig.setWriteQueueNums(queueNums);
                        int perm = defaultTopicConfig.getPerm();
                        perm &= ~PermName.PERM_INHERIT;//权限按照key topic的来
                        topicConfig.setPerm(perm);
                        topicConfig.setTopicSysFlag(topicSysFlag);
                        topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
                    } else {//权限校验不过,自动创建失败
                        LOG.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
                                defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
                    }
                } else {//key topic不存在,创建失败
                    LOG.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]", defaultTopic, remoteAddress);
                }

             ...//把创建的topic维护起来

总的来说,这个功能设计出来比较晦涩,而从运维的角度上看,topic在大部分场景下也应该预创建,故本特性没有必要的话,也不会用到,这个配置也没有必要特殊的设置。

关于这个TBW102非常不直观的问题,我已经提了issue :https://issues.apache.org/jira/browse/ROCKETMQ-223

defaultTopicQueueNums

配置说明 默认值
自动创建topic的话,默认queue数量是多少 4

sendMsgTimeout

配置说明 默认值
默认的发送超时时间 3000,单位毫秒

若发送的时候不显示指定timeout,则使用此设置的值作为超时时间。

对于异步发送,超时后会进入回调的onException,对于同步发送,超时则会得到一个RemotingTimeoutException

compressMsgBodyOverHowmuch

配置说明 默认值
消息body需要压缩的阈值 1024 * 4,4K

retryTimesWhenSendFailed

配置说明 默认值
同步发送失败的话,rocketmq内部重试多少次 2

retryTimesWhenSendAsyncFailed

配置说明 默认值
异步发送失败的话,rocketmq内部重试多少次 2

retryAnotherBrokerWhenNotStoreOK

配置说明 默认值
发送的结果如果不是SEND_OK状态,是否当作失败处理而尝试重发 false

发送结果总共有4钟:

SEND_OK, //状态成功,无论同步还是存储
FLUSH_DISK_TIMEOUT, // broker刷盘策略为同步刷盘(SYNC_FLUSH)的话时候,等待刷盘的时候超时
FLUSH_SLAVE_TIMEOUT, // master role采取同步复制策略(SYNC_MASTER)的时候,消息尝试同步到slave超时
SLAVE_NOT_AVAILABLE, //slave不可用

注:从源码上看,此配置项只对同步发送有效,异步、oneway(由于无法获取结果,肯定无效)均无效

retryAnotherBrokerWhenNotStoreOK

配置说明 默认值
客户端验证,允许发送的最大消息体大小 1024 * 1024 * 4,4M

若超过此大小,会得到一个响应码13(MESSAGE_ILLEGAL)的MQClientException异常


TransactionMQProducer

事务生产者,截至至4.1,由于暂时事务回查功能缺失,整体并不完全可用,配置暂时忽略,等后面功能完善后补上。

https://issues.apache.org/jira/browse/ROCKETMQ-123


DefaultMQPushConsumer

最常用的消费者,使用push模式(长轮询),封装了各种拉取的方法和返回结果的判断。下面介绍其配置。

consumerGroup*

配置说明 默认值
消费组的名称,用于标识一类消费者 无默认值,必设

详见 RocketMQ——角色与术语详解

messageModel*

配置说明 默认值
消费模式 MessageModel.CLUSTERING

可选值有两个:

  1. CLUSTERING //集群消费模式
  2. BROADCASTING //广播消费模式

两种模式的区别详见:RocketMQ——角色与术语详解

consumeFromWhere*

配置说明 默认值
消费点策略 ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET

可选值有两个:

  1. CONSUME_FROM_LAST_OFFSET //队列尾消费
  2. CONSUME_FROM_FIRST_OFFSET //队列头消费
  3. CONSUME_FROM_TIMESTAMP //按照日期选择某个位置消费

注:此策略只生效于新在线测consumer group,如果是老的已存在的consumer group,都降按照已经持久化的consume offset进行消费

具体说明祥见: RocketMQ——消息ACK机制及消费进度管理

consumeTimestamp:

配置说明 默认值
CONSUME_FROM_LAST_OFFSET的时候使用,从哪个时间点开始消费 半小时前

格式为yyyyMMddhhmmss 如 20131223171201

allocateMessageQueueStrategy*

配置说明 默认值
负载均衡策略算法 AllocateMessageQueueAveragely(取模平均分配)

这个算法可以自行扩展以使用自定义的算法,目前有以下算法可以使用

  • AllocateMessageQueueAveragely //取模平均
  • AllocateMessageQueueAveragelyByCircle //环形平均
  • AllocateMessageQueueByConfig // 按照配置,传入听死的messageQueueList
  • AllocateMessageQueueByMachineRoom //按机房,从源码上看,必须和阿里的某些broker命名一致才行
  • AllocateMessageQueueConsistentHash //一致性哈希算法,本人于4.1提交的特性。用于解决“惊群效应”。

需要自行扩展的算法的,需要实现org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueStrategy

具体分配consume queue的过程祥见: RocketMQ——水平扩展及负载均衡详解

subscription

配置说明 默认值
订阅关系(topic->sub expression) {}

不建议设置,订阅topic建议直接调用subscribe接口

messageListener

配置说明 默认值
消息处理监听器(回调) null

不建议设置,注册监听的时候应调用registerMessageListener

offsetStore

配置说明 默认值
消息消费进度存储器 null

不建议设置,offsetStore 有两个策略:LocalFileOffsetStoreRemoteBrokerOffsetStore

若没有显示设置的情况下,广播模式将使用LocalFileOffsetStore,集群模式将使用RemoteBrokerOffsetStore,不建议修改。

consumeThreadMin*

配置说明 默认值
消费线程池的core size 20

PushConsumer会内置一个消费线程池,这个配置控制此线程池的core size

consumeThreadMax*

配置说明 默认值
消费线程池的max size 64

PushConsumer会内置一个消费线程池,这个配置控制此线程池的max size

adjustThreadPoolNumsThreshold

配置说明 默认值
动态扩线程核数的消费堆积阈值 1000

相关功能以废弃,不建议设置

consumeConcurrentlyMaxSpan

配置说明 默认值
并发消费下,单条consume queue队列允许的最大offset跨度,达到则触发流控 2000

注:只对并发消费(ConsumeMessageConcurrentlyService)生效

更多分析祥见: RocketMQ——消息ACK机制及消费进度管理

pullThresholdForQueue

配置说明 默认值
consume queue流控的阈值 1000

每条consume queue的消息拉取下来后会缓存到本地,消费结束会删除。当累积达到一个阈值后,会触发该consume queue的流控。

更多分析祥见: RocketMQ——消息ACK机制及消费进度管理

截至到4.1,流控级别只能针对consume queue级别,针对topic级别的流控已经提了issue: https://issues.apache.org/jira/browse/ROCKETMQ-106

pullInterval*

配置说明 默认值
拉取的间隔 0,单位毫秒

由于RocketMQ采取的pull的方式进行消息投递,每此会发起一个异步pull请求,得到请求后会再发起下次请求,这个间隔默认是0,表示立刻再发起。在间隔为0的场景下,消息投递的及时性几乎等同用Push实现的机制。

pullBatchSize*

f | 配置说明 | 默认值 | | ——| —— | |一次最大拉取的批量大小|32|

每次发起pull请求到broker,客户端需要指定一个最大batch size,表示这次拉取消息最多批量拉取多少条。

consumeMessageBatchMaxSize

配置说明 默认值
批量消费的最大消息条数 1

你可能发现了,RocketMQ的注册监听器回调的回调方法签名是类似这样的:

ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs, final ConsumeConcurrentlyContext context);

里面的消息是一个集合List而不是单独的msg,这个consumeMessageBatchMaxSize就是控制这个集合的最大大小。

而由于拉取到的一批消息会立刻拆分成N(取决于consumeMessageBatchMaxSize)批消费任务,所以集合中msgs的最大大小是consumeMessageBatchMaxSizepullBatchSize的较小值。

postSubscriptionWhenPull

配置说明 默认值
每次拉取的时候是否更新订阅关系 false

从源码上看,这个值若是true,且不是class fliter模式,则每次拉取的时候会把subExpression带上到pull的指令中,broker发现这个指令会根据这个上传的表达式重新build出注册数据,而不是直接使用读取的缓存数据。

maxReconsumeTimes

配置说明 默认值
一个消息如果消费失败的话,最多重新消费多少次才投递到死信队列 -1

注,这个值默认值虽然是-1,但是实际使用的时候默认并不是-1。按照消费是并行还是串行消费有所不同的默认值。

并行:默认16次

串行:默认无限大(Interge.MAX_VALUE)。由于顺序消费的特性必须等待前面的消息成功消费才能消费后面的,默认无限大即一直不断消费直到消费完成。

suspendCurrentQueueTimeMillis

配置说明 默认值
串行消费使用,如果返回ROLLBACK或者SUSPEND_CURRENT_QUEUE_A_MOMENT,再次消费的时间间隔 1000,单位毫秒

注:如果消费回调中对ConsumeOrderlyContext中的suspendCurrentQueueTimeMillis进行过设置,则使用用户设置的值作为消费间隔。

consumeTimeout

配置说明 默认值
消费的最长超时时间 15, 单位分钟

如果消费超时,RocketMQ会等同于消费失败来处理,更多分析祥见: RocketMQ——消息ACK机制及消费进度管理


DefaultMQPullConsumer

采取主动调用Pull接口的模式的消费者,主动权更大,但是使用难度也相对更大。以下介绍其配置,部分配置和PushConsumer一致。

consumerGroup*

配置说明 默认值
消费组的名称,用于标识一类消费者 无默认值,必设

详见 RocketMQ——角色与术语详解

registerTopics*

配置说明 默认值
消费者需要监听的topic 空集合

由于没有subscribe接口,用户需要自己把想要监听的topic设置到此集合中,RocketMQ内部会依靠此来发送对应心跳数据。

messageModel*

配置说明 默认值
消费模式 MessageModel.CLUSTERING

可选值有两个:

  1. CLUSTERING //集群消费模式
  2. BROADCASTING //广播消费模式

两种模式的区别详见:RocketMQ——角色与术语详解

allocateMessageQueueStrategy*

配置说明 默认值
负载均衡策略算法 AllocateMessageQueueAveragely(取模平均分配)

见DefaultPushConsumer的说明

offsetStore

配置说明 默认值
消息消费进度存储器 null

不建议设置,offsetStore 有两个策略:LocalFileOffsetStoreRemoteBrokerOffsetStore

若没有显示设置的情况下,广播模式将使用LocalFileOffsetStore,集群模式将使用RemoteBrokerOffsetStore,不建议修改。

maxReconsumeTimes

配置说明 默认值
调用sendMessageBack的时候,如果发现重新消费超过这个配置的值,则投递到死信队列 16

由于PullConsumer没有管理消费的线程池和管理器,需要用户自己处理各种消费结果和拉取结果,故需要投递到重试队列或死信队列的时候需要显示调用sendMessageBack

回传消息的时候会带上maxReconsumeTimes的值,broker发现此消息已经消费超过此值,则投递到死信队列,否则投递到重试队列。此逻辑和DefaultPushConsumer是一致的,只是PushConsumer无需用户显示调用。

brokerSuspendMaxTimeMillis

配置说明 默认值
broker在长轮询下,连接最长挂起的时间 20*1000,单位毫秒

长轮询具体逻辑不在本文论述,且RocketMQ不建议修改此值。

consumerTimeoutMillisWhenSuspend

配置说明 默认值
broker在长轮询下,客户端等待broker响应的最长等待超时时间 30*1000,单位毫秒

长轮询具体逻辑不在本文论述,且RocketMQ不建议修改此值,此值一定要大于brokerSuspendMaxTimeMillis

consumerPullTimeoutMillis

配置说明 默认值
pull的socket 超时时间 10*1000,单位毫秒

虽然注释上说是socket超时时间,但是从源码上看,此值的设计是不启动长轮询也不指定timeout的情况下,拉取的超时时间。

messageQueueListener

配置说明 默认值
负载均衡consume queue分配变化的通知监听器 null

由于pull操作需要用户自己去触发,故如果负载均衡发生变化,要有方法告知用户现在分到的新consume queue是什么。使用方可以实现此接口以达到此目的:

/**
 * A MessageQueueListener is implemented by the application and may be specified when a message queue changed
 */
public interface MessageQueueListener {
/**
 * @param topic message topic
 * @param mqAll all queues in this message topic
 * @param mqDivided collection of queues,assigned to the current consumer
 */
void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,final Set<MessageQueue> mqDivided);
}

RocketMQ——消息文件过期原理

RocketMQ——消息ACK机制及消费进度管理 文中提过,所有的消费均是客户端发起Pull请求的,告诉消息的offset位置,broker去查询并返回。但是有一点需要非常明确的是,消息消费后,消息其实并没有物理地被清除,这是一个非常特殊的设计。本文来探索此设计的一些细节。

消费完后的消息去哪里了?

消息的存储是一直存在于CommitLog中的。而由于CommitLog是以文件为单位(而非消息)存在的,CommitLog的设计是只允许顺序写的,且每个消息大小不定长,所以这决定了消息文件几乎不可能按照消息为单位删除(否则性能会极具下降,逻辑也非常复杂)。所以消息被消费了,消息所占据的物理空间并不会立刻被回收。

但消息既然一直没有删除,那RocketMQ怎么知道应该投递过的消息就不再投递?——答案是客户端自身维护——客户端拉取完消息之后,在响应体中,broker会返回下一次应该拉取的位置,PushConsumer通过这一个位置,更新自己下一次的pull请求。这样就保证了正常情况下,消息只会被投递一次。

什么时候清理物理消息文件?

那消息文件到底删不删,什么时候删?

消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog):

  1. 消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。
  2. 消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。
  3. 磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。

注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。

这样设计带来的好处

消息的物理文件一直存在,消费逻辑只是听客户端的决定而搜索出对应消息进行,这样做,笔者认为,有以下几个好处:

  1. 一个消息很可能需要被N个消费组(设计上很可能就是系统)消费,但消息只需要存储一份,消费进度单独记录即可。这给强大的消息堆积能力提供了很好的支持——一个消息无需复制N份,就可服务N个消费组。

  2. 由于消费从哪里消费的决定权一直都是客户端决定,所以只要消息还在,就可以消费到,这使得RocketMQ可以支持其他传统消息中间件不支持的回溯消费。即我可以通过设置消费进度回溯,就可以让我的消费组重新像放快照一样消费历史消息;或者我需要另一个系统也复制历史的数据,只需要另起一个消费组从头消费即可(前提是消息文件还存在)。

  3. 消息索引服务。只要消息还存在就能被搜索出来。所以可以依靠消息的索引搜索出消息的各种原信息,方便事后排查问题。

注:在消息清理的时候,由于消息文件默认是1GB,所以在清理的时候其实是在删除一个大文件操作,这对于IO的压力是非常大的,这时候如果有消息写入,写入的耗时会明显变高。这个现象可以在凌晨4点(默认删时间时点)后的附近观察得到。

RocketMQ官方建议Linux下文件系统改为Ext4,对于文件删除操作相比Ext3有非常明显的提升。

跳过历史消息的处理

由于消息本身是没有过期的概念,只有文件才有过期的概念。那么对于很多业务场景——一个消息如果太老,是无需要被消费的,是不合适的。

这种需要跳过历史消息的场景,在RocketMQ要怎么实现呢?

对于一个全新的消费组,PushConsumer默认就是跳过以前的消息而从最尾开始消费的,解析请参看RocketMQ——消息ACK机制及消费进度管理相关章节。

但对于已存在的消费组,RocketMQ没有内置的跳过历史消息的实现,但有以下手段可以解决:

  1. 自身的消费代码按照日期过滤,太老的消息直接过滤。如:

         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
             for(MessageExt msg: msgs){
                 if(System.currentTimeMillis()-msg.getBornTimestamp()>60*1000) {//一分钟之前的认为过期
                     continue;//过期消息跳过
                 }
    
                 //do consume here
    
             }
             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
    
  2. 自身的消费代码代码判断消息的offset和MAX_OFFSET相差很远,认为是积压了很多,直接return CONSUME_SUCCESS过滤。

         @Override
         public ConsumeConcurrentlyStatus consumeMessage(//
             List<MessageExt> msgs, //
             ConsumeConcurrentlyContext context) {
             long offset = msgs.get(0).getQueueOffset();
             String maxOffset = msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);
             long diff = Long. parseLong(maxOffset) - offset;
             if (diff > 100000) { //消息堆积了10W情况的特殊处理
                 return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;
             }
             //do consume here
             return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;
         }
    
  3. 消费者启动前,先调整该消费组的消费进度,再开始消费。可以人工使用控制台命令resetOffsetByTime把消费进度调整到后面,再启动消费。

  4. 原理同3,但使用代码来控制。代码中调用内部的运维接口,具体代码实例祥见ResetOffsetByTimeCommand.java.

RocketMQ——消息ACK机制及消费进度管理

RokectMQ——水平扩展及负载均衡详解 中剖析过,consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)?

本文将详细解析消息具体是如何ack的,又是如何保证消费肯定成功的。

由于以上工作所有的机制都实现在PushConsumer中,所以本文的原理均只适用于RocketMQ中的PushConsumer即Java客户端中的DefaultPushConsumer。 若使用了PullConsumer模式,类似的工作如何ack,如何保证消费等均需要使用方自己实现。

注:广播消费和集群消费的处理有部分区别,以下均特指集群消费(CLSUTER),广播(BROADCASTING)下部分可能不适用。

保证消费成功

PushConsumer为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。

消费的时候,我们需要注入一个消费回调,具体sample代码如下:

    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            doMyJob();//执行真正消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。(具体如何ACK见后面章节)

如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。

为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费租的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。

注:

  1. 如果业务的回调没有处理好而抛出异常,会认为是消费失败当ConsumeConcurrentlyStatus.RECONSUME_LATER处理。
  2. 当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

启动的时候从哪里消费

当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。

如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:

CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前

所以,社区中经常有人问:“为什么我设了CONSUME_FROM_LAST_OFFSET,历史的消息还是被消费了”? 原因就在于只有全新的消费组才会使用到这些策略,老的消费组都是按已经存储过的消费进度继续消费。

对于老消费组想跳过历史消息需要自身做过滤,或者使用先修改消费进度。示例代码请参看:RocketMQ——消息文件过期原理

消息ACK机制

RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。

如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。

每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度。

但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值,如下图:

message ack

这钟方式和传统的一条message单独ack的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。

在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。

在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。

对于这个场景,RocketMQ暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。

实际上,从源码的角度上看,RocketMQ可能是考虑过这个问题的,截止到3.2.6的版本的源码中,可以看到为了缓解这个问题的影响面,DefaultMQPushConsumer中有个配置consumeConcurrentlyMaxSpan

/**
 * Concurrently max span offset.it has no effect on sequential consumption
 */
private int consumeConcurrentlyMaxSpan = 2000;

这个值默认是2000,当RocketMQ发现本地缓存的消息的最大值-最小值差距大于这个值(2000)的时候,会触发流控——也就是说如果头尾都卡住了部分消息,达到了这个阈值就不再拉取消息。

但作用实际很有限,像刚刚这个例子,2101的消费是死循环,其他消费非常正常的话,是无能为力的。一旦退出,在不人工干预的情况下,2101后所有消息全部重复!

Ack卡进度解决方案

实际上对于卡住进度的场景,可以选择弃车保帅的方案:把消息卡住那些消息,先ack掉,让进度前移。但要保证这条消息不会因此丢失,ack之前要把消息sendBack回去,这样这条卡住的消息就会必然重复,但会解决潜在的大量重复的场景。 这也是我们公司自己定制的解决方案。

部分源码如下:

class ConsumeRequestWithUnAck implements Runnable {
    final ConsumeRequest consumeRequest;
    final long resendAfterIfStillUnAck;//n毫秒没有消费完,就重发

    ConsumeRequestWithUnAck(ConsumeRequest consumeRequest,long resendAfterIfStillUnAck) {
        this.consumeRequest = consumeRequest;
        this.resendAfterIfStillUnAck = resendAfterIfStillUnAck;
    }

    @Override
    public void run() {
        //每次消费前,计划延时任务,超时则ack并重发
        final WeakReference<ConsumeRequest> crReff = new WeakReference<>(this.consumeRequest);
        ScheduledFuture scheduledFuture=null;
        if(!ConsumeDispatcher.this.ackAndResendScheduler.isShutdown()) {
            scheduledFuture= ConsumeDispatcher.this.ackAndResendScheduler.schedule(new ConsumeTooLongChecker(crReff),resendAfterIfStillUnAck,TimeUnit.MILLISECONDS);
        }
        try{
            this.consumeRequest.run();//正常执行并更新offset
        }
        finally {
            if (scheduledFuture != null) scheduledFuture.cancel(false);//消费结束后,取消任务
        }
    }

}
  1. 定义了一个装饰器,把原来的ConsumeRequest对象包了一层。
  2. 装饰器中,每条消息消费前都会调度一个调度器,定时触发,触发的时候如果发现消息还存在,就执行sendback并ack的操作。

后来RocketMQ显然也发现了这个问题,RocketMQ在3.5.8之后也是采用这样的方案去解决这个问题。只是实现方式上有所不同(事实上我认为RocketMQ的方案还不够完善)

  1. 在pushConsumer中 有一个consumeTimeout字段(默认15分钟),用于设置最大的消费超时时间。消费前会记录一个消费的开始时间,后面用于比对。
  2. 消费者启动的时候,会定期扫描所有消费的消息,达到这个timeout的那些消息,就会触发sendBack并ack的操作。这里扫描的间隔也是consumeTimeout(单位分钟)的间隔。

核心源码如下:

//ConsumeMessageConcurrentlyService.java
public void start() {
    this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            cleanExpireMsg();
        }

    }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
//ConsumeMessageConcurrentlyService.java
private void cleanExpireMsg() {
    Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
            this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
    while (it.hasNext()) {
        Map.Entry<MessageQueue, ProcessQueue> next = it.next();
        ProcessQueue pq = next.getValue();
        pq.cleanExpiredMsg(this.defaultMQPushConsumer);
    }
}

//ProcessQueue.java
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
    if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
        return;
    }

    int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
    for (int i = 0; i < loop; i++) {
        MessageExt msg = null;
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
                    msg = msgTreeMap.firstEntry().getValue();
                } else {

                    break;
                }
            } finally {
                this.lockTreeMap.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getExpiredMsg exception", e);
        }

        try {

            pushConsumer.sendMessageBack(msg, 3);
            log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
            try {
                this.lockTreeMap.writeLock().lockInterruptibly();
                try {
                    if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
                        try {
                            msgTreeMap.remove(msgTreeMap.firstKey());
                        } catch (Exception e) {
                            log.error("send expired msg exception", e);
                        }
                    }
                } finally {
                    this.lockTreeMap.writeLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("getExpiredMsg exception", e);
            }
        } catch (Exception e) {
            log.error("send expired msg exception", e);
        }
    }
}

通过这个逻辑对比我定制的时间,可以看出有几个不太完善的问题:

  1. 消费timeout的时间非常不精确。由于扫描的间隔是15分钟,所以实际上触发的时候,消息是有可能卡住了接近30分钟(15*2)才被清理。
  2. 由于定时器一启动就开始调度了,中途这个consumeTimeout再更新也不会生效。

RocketMQ——水平扩展及负载均衡详解

RocketMQ是一个分布式具有高度可扩展性的消息中间件。本文旨在探索在broker端,生产端,以及消费端是如何做到横向扩展以及负载均衡的。

Broker端水平扩展

Broker负载均衡

Broker是以group为单位提供服务。一个group里面分master和slave,master和slave存储的数据一样,slave从master同步数据(同步双写或异步复制看配置)。

通过nameserver暴露给客户端后,只是客户端关心(注册或发送)一个个的topic路由信息。路由信息中会细化为message queue的路由信息。而message queue会分布在不同的broker group。所以对于客户端来说,分布在不同broker group的message queue为成为一个服务集群,但客户端会把请求分摊到不同的queue。

而由于压力分摊到了不同的queue,不同的queue实际上分布在不同的Broker group,也就是说压力会分摊到不同的broker进程,这样消息的存储和转发均起到了负载均衡的作用。

Broker一旦需要横向扩展,只需要增加broker group,然后把对应的topic建上,客户端的message queue集合即会变大,这样对于broker的负载则由更多的broker group来进行分担。

并且由于每个group下面的topic的配置都是独立的,也就说可以让group1下面的那个topic的queue数量是4,其他group下的topic queue数量是2,这样group1则得到更大的负载。

commit log

虽然每个topic下面有很多message queue,但是message queue本身并不存储消息。真正的消息存储会写在CommitLog的文件,message queue只是存储CommitLog中对应的位置信息,方便通过message queue找到对应存储在CommitLog的消息。

不同的topic,message queue都是写到相同的CommitLog 文件,也就是说CommitLog完全的顺序写。

具体如下图:

broker负载均衡

Producer

Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:

生产者负载均衡

Consumer负载均衡

集群模式

在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。

而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。

默认的分配算法是AllocateMessageQueueAveragely,如下图:

消费者负载均衡1

还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:

消费者负载均衡2

需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。

通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。

但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。

广播模式

由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。

在实现上,其中一个不同就是在consumer分配queue的时候,会所有consumer都分到所有的queue。

消费者广播模式