薛定谔的风口猪

站在巨人的肩膀上学习,猪都能看得很远

挑战大型系统的缓存设计——应对一致性问题

在真实的业务场景中,我们业务的数据——例如订单、会员、支付等——都是持久化到数据库中的,因为数据库能有很好的事务保证、持久化保证。但是,正因为数据库要能够满足这么多优秀的功能特性,使得数据库在设计上通常难以兼顾到性能,因此往往不能满足大型流量下的性能要求,像是 MySQL 数据库只能承担“千”这个级别的 QPS,否则很可能会不稳定,进而导致整个系统的故障。

但是客观上,我们的业务规模很可能要求着更高的 QPS,有些业务的规模本身就非常大,也有些业务会遇到一些流量高峰,比如电商会遇到大促的情况。

而这时候大部分的流量实际上都是读请求,而且大部分数据也是没有那么多变化的,如热门商品信息、微博的内容等常见数据就是如此。此时,缓存就是我们应对此类场景的利器

缓存的意义

所谓缓存,实际上就是用空间换时间,准确地说是用更高速的空间来换时间,从而整体提升读的性能

何为更高速的空间呢?

  1. 更快的存储介质。通常情况下,如果说数据库的速度慢,就得用更快的存储介质去替代它,目前最常见的就是Redis。Redis 单实例的读 QPS 可以高达 10w/s,90% 的场景下只需要正确使用 Redis 就能应对。
  2. 就近使用本地内存。就像 CPU 也有高速缓存一样,缓存也可以分为一级缓存、二级缓存。即便 Redis 本身性能已经足够高了,但访问一次 Redis 毕竟也需要一次网络 IO,而使用本地内存无疑有更快的速度。不过单机的内存是十分有限的,所以这种一级缓存只能存储非常少量的数据,通常是最热点的那些 key 对应的数据。这就相当于额外消耗宝贵的服务内存去换取高速的读取性能。

引入缓存后的一致性挑战

用空间换时间,意味着数据同时存在于多个空间。最常见的场景就是数据同时存在于 Redis 与 MySQL 上(为了问题的普适性,后面举例中若没有特别说明,缓存均指 Redis 缓存)。

实际上,最权威最全的数据还是在 MySQL 里的,只要 Redis 数据没有得到及时的更新而导致最新数据没有同步到 Redis 中,就出现了数据不一致。

大部分情况下,只要使用了缓存,就必然会有不一致的情况出现,只是说这个不一致的时间窗口是否能做到足够的小。有些不合理的设计可能会导致数据持续不一致,这是我们需要改善设计去避免的。

缓存不一致性无法客观地完全消灭

为什么我们几乎没办法做到缓存和数据库之间的强一致呢?

正常情况下,我们需要在数据库更新完后,把对应的最新数据同步到缓存中,以便在读请求的时候,能读到新的数据而不是旧的数据(脏数据)。但是很可惜,由于数据库和 Redis 之间是没有事务保证的,所以我们无法确保写入数据库成功后,写入 Redis 也是一定成功的;即便 Redis 写入能成功,在数据库写入成功后到 Redis 写入成功前的这段时间里,Redis 数据也肯定是和 MySQL 不一致的。如下图:

图片

图片 所以说这个时间窗口是没办法完全消灭的,除非我们付出极大的代价,使用分布式事务等各种手段去维持强一致,但是这样会使得系统的整体性能大幅度下降,甚至比不用缓存还慢,这样不就与我们使用缓存的目标背道而驰了吗?

不过虽然无法做到强一致,但是我们能做到的是缓存与数据库达到最终一致,而且不一致的时间窗口我们能做到尽可能短,按照经验来说,如果能将时间优化到 1ms 之内,这个一致性问题带来的影响我们就可以忽略不计。

图片

更新缓存的手段

通常情况下,我们在处理查询请求的时候,使用缓存的逻辑如下:

1
2
3
4
5
6
7
data = queryDataRedis(key);
if (data ==null) {
     data = queryDataMySQL(key); //缓存查询不到,从MySQL做查询
     if (data!=null) {
         updateRedis(key, data);//查询完数据后更新到MySQL
     }
}

也就是说优先查询缓存,查询不到才查询数据库。如果这时候数据库查到数据了,就将缓存的数据进行更新。 这样的逻辑是正确的,而一致性的问题一般不来源于此,而是出现在处理写请求的时候。所以我们简化成最简单的写请求的逻辑,此时你可能会面临多个选择,究竟是直接更新缓存,还是失效缓存?而无论是更新缓存还是失效缓存,都可以选择在更新数据库之前,还是之后操作。

这样就演变出 4 个策略:更新数据库后更新缓存、更新数据库前更新缓存、更新数据库后删除缓存、更新数据库前删除缓存下面我们来分别讲述。

更新数据库后更新缓存的不一致问题

一种常见的操作是,设置一个过期时间,让写请求以数据库为准,过期后,读请求同步数据库中的最新数据给缓存。那么在加入了过期时间后,是否就不会有问题了呢?并不是这样。

大家设想一下这样的场景。

假如这里有一个计数器,把数据库自减 1,原始数据库数据是 100,同时有两个写请求申请计数减一,假设线程 A 先减数据库成功,线程 B 后减数据库成功。那么这时候数据库的值是 98,缓存里正确的值应该也要是 98。

但是特殊场景下,你可能会遇到这样的情况:

  1. 线程 A 和线程 B 同时更新这个数据
  2. 更新数据库的顺序是先 A 后 B
  3. 更新缓存时顺序是先 B 后 A 如果我们的代码逻辑还是更新数据库后立刻更新缓存的数据,那么——
1
2
updateMySQL();
updateRedis(key, data);

就可能出现:数据库的值是 100->99->98,但是缓存的数据却是 100->98->99,也就是数据库与缓存的不一致。而且这个不一致只能等到下一次数据库更新或者缓存失效才可能修复。

时间 线程A(写请求) 线程B(写请求) 问题
T1 更新数据库为99
T2 更新数据库为98
T3 更新缓存数据为98
T4 更新缓存数据为99 此时缓存的值被显式更新为99,但是实际上数据库的值已经是98,数据不一致

更新数据库前更新缓存的不一致问题

那你可能会想,这是否表示,我应该先让缓存更新,之后再去更新数据库呢?类似这样:

1
2
updateRedis(key, data);//先更新缓存
updateMySQL();//再更新数据库

这样操作产生的问题更是显而易见的,因为我们无法保证数据库的更新成功,万一数据库更新失败了,你缓存的数据就不只是脏数据,而是错误数据了。 你可能会想,是否我在更新数据库失败的时候做 Redis 回滚的操作能够解决呢?这其实也是不靠谱的,因为我们也不能保证这个回滚的操作 100% 被成功执行。

图片

同时,在写写并发的场景下,同样有类似的一致性问题,请看以下情况:

  1. 线程 A 和线程 B 同时更新同这个数据
  2. 更新缓存的顺序是先 A 后 B
  3. 更新数据库的顺序是先 B 后 A 举个例子。线程 A 希望把计数器置为 0,线程 B 希望置为 1。而按照以上场景,缓存确实被设置为 1,但数据库却被设置为 0。
时间 线程A(写请求) 线程B(写请求) 问题
T1 更新缓存为0
T2 更新缓存为1
T3 更新数据库为1
T4 更新数据库数据为0 此时缓存的值被显式更新为1,但是实际上数据库的值是0,数据不一致

所以通常情况下,更新缓存再更新数据库是我们应该避免使用一种手段

更新数据库前删除缓存的问题

那如果采取删除缓存的策略呢?也就是说我们在更新数据库的时候失效对应的缓存,让缓存在下次触发读请求时进行更新,是否会更好呢?同样地,针对在更新数据库前和数据库后这两个删除时机,我们来比较下其差异。

最直观的做法,我们可能会先让缓存失效,然后去更新数据库,代码逻辑如下:

1
2
deleteRedis(key);//先删除缓存让缓存失效
updateMySQL();//再更新数据库

这样的逻辑看似没有问题,毕竟删除缓存后即便数据库更新失败了,也只是缓存上没有数据而已。然后并发两个写请求过来,无论怎么样的执行顺序,缓存最后的值也都是会被删除的,也就是说在并发写写的请求下这样的处理是没问题的。 然而,这种处理在读写并发的场景下却存在着隐患。

还是刚刚更新计数的例子。例如现在缓存的数据是 100,数据库也是 100,这时候需要对此计数减 1,减成功后,数据库应该是 99。如果这之后触发读请求,缓存如果有效的话,里面应该也要被更新为 99 才是正确的。

那么思考下这样的请求情况:

  1. 线程 A 更新这个数据的同时,线程 B 读取这个数据
  2. 线程 A 成功删除了缓存里的老数据,这时候线程 B 查询数据发现缓存失效
  3. 线程 A 更新数据库成功
时间 线程A(写请求) 线程B(读请求) 问题
T1 删除缓存值
T2 1.读取缓存数据,缓存缺失,从数据库读取数据100
T3 更新数据库中的数据X的值为99
T4 将数据100的值写入缓存 此时缓存的值被显式更新为100,但是实际上数据库的值已经是99了

可以看到,在读写并发的场景下,一样会有不一致的问题。

针对这种场景,有个做法是所谓的“延迟双删策略”,就是说,既然可能因为读请求把一个旧的值又写回去,那么我在写请求处理完之后,等到差不多的时间延迟再重新删除这个缓存值。

时间 线程A(写请求) 线程C(新的读请求) 线程D(新的读请求) 问题
T5 sleep(N) 缓存存在,读取到缓存旧值100 其他线程可能在双删成功前读到脏数据
T6 删除缓存值
T7 缓存缺失,从数据库读取数据的最新值(99)

这种解决思路的关键在于对 N 的时间的判断,如果 N 时间太短,线程 A 第二次删除缓存的时间依旧早于线程 B 把脏数据写回缓存的时间,那么相当于做了无用功。而 N 如果设置得太长,那么在触发双删之前,新请求看到的都是脏数据。

更新数据库后删除缓存

那如果我们把更新数据库放在删除缓存之前呢,问题是否解决?我们继续从读写并发的场景看下去,有没有类似的问题。

时间 线程A(写请求) 线程B(读请求) 线程C(读请求) 潜在问题
T1 更新主库 X = 99(原值 X = 100)
T2 读取数据,查询到缓存还有数据,返回100 线程C实际上读取到了和数据库不一致的数据
T3 删除缓存
T4 查询缓存,缓存缺失,查询数据库得到当前值99
T5 将99写入缓存

可以看到,大体上,采取先更新数据库再删除缓存的策略是没有问题的,仅在更新数据库成功到缓存删除之间的时间差内,可能会被别的线程读取到老值。

而在开篇的时候我们说过,缓存不一致性的问题无法在客观上完全消灭,因为我们无法保证数据库和缓存的操作是一个事务里的,而我们能做到的只是尽量缩短不一致的时间窗口。

在更新数据库后删除缓存这个场景下,不一致窗口仅仅是 T2 到 T3 的时间,大概是 1ms 左右,在大部分业务场景下我们都可以忽略不计。

但是真实场景下,还是会有一个情况存在不一致的可能性,这个场景是读线程发现缓存不存在,于是读写并发时,读线程回写进去老值。并发情况如下:

时间 线程A(写请求) 线程B(读请求–缓存不存在场景) 潜在问题
T1 查询缓存,缓存缺失,查询数据库得到当前值100
T2 更新主库 X = 99(原值 X = 100)
T3 删除缓存
T4 将100写入缓存 此时缓存的值被显式更新为100,但是实际上数据库的值已经是99了

总的来说,这个不一致场景出现条件非常严格,因为并发量很大时,缓存不太可能不存在;如果并发很大,而缓存真的不存在,那么很可能是这时的写场景很多,因为写场景会删除缓存。所以待会我们会提到,写场景很多时候实际上并不适合采取删除策略。

总结四种更新策略

终上所述,我们对比了四个更新缓存的手段,做一个总结对比,如下图:

策略 并发场景 潜在问题 应对方案
更新数据库+更新缓存 写+读 线程A未更新完缓存之前,线程B的读请求会短暂读到旧值 可以忽略
写+写 更新数据库的顺序是先A后B,但更新缓存时顺序是先B后A,数据库和缓存数据不一致 分布式锁(操作重)
更新缓存+更新数据库 无并发 线程A还未更新完缓存但是更新数据库可能失败 利用MQ确认数据库更新成功(较复杂)
写+写 更新缓存的顺序是先A后B,但更新数据库时顺序是先B后A 分布式锁(操作很重)
删除缓存值+更新数据库 写+读 写请求的线程A删除了缓存在更新数据库之前,这时候读请求线程B到来,因为缓存缺失,则把当前数据读取出来放到缓存,而后线程A更新成功了数据库 延迟双删(但是延迟的时间不好估计,且延迟的过程中依旧有不一致的时间窗口)
更新数据库+删除缓存值 写+读(缓存命中) 线程A完成数据库更新成功后,尚未删除缓存,线程B有并发读请求会读到旧的脏数据
可以忽略
写+读(缓存不命中) 读请求不命中缓存,写请求处理完之后读请求才回写缓存,此时缓存不一致 分布式锁(操作重)

从一致性的角度来看,采取更新数据库后删除缓存值,是更为适合的策略因为出现不一致的场景的条件更为苛刻,概率相比其他方案更低。

那么是否更新缓存这个策略就一无是处呢?不是的!

删除缓存值意味着对应的 key 会失效,那么这时候读请求都会打到数据库。如果这个数据的写操作非常频繁,就会导致缓存的作用变得非常小。而如果这时候某些 Key 还是非常大的热 key,就可能因为扛不住数据量而导致系统不可用。

如下图所示:

图片

所以做个简单总结,足以适应绝大部分的互联网开发场景的决策:

  • 针对大部分读多写少场景,建议选择更新数据库后删除缓存的策略。

  • 针对读写相当或者写多读少的场景,建议选择更新数据库后更新缓存的策略。

最终一致性如何保证?

缓存设置过期时间

第一个方法便是我们上面提到的,当我们无法确定 MySQL 更新完成后,缓存的更新/删除一定能成功,例如 Redis 挂了导致写入失败了,或者当时网络出现故障,更常见的是服务当时刚好发生重启了,没有执行这一步的代码。

这些时候 MySQL 的数据就无法刷到 Redis 了。为了避免这种不一致性永久存在,使用缓存的时候,我们必须要给缓存设置一个过期时间,例如 1 分钟,这样即使出现了更新 Redis 失败的极端场景,不一致的时间窗口最多也只是 1 分钟。

这是我们最终一致性的兜底方案,万一出现任何情况的不一致问题,最后都能通过缓存失效后重新查询数据库,然后回写到缓存,来做到缓存与数据库的最终一致。

如何减少缓存删除/更新的失败?

万一删除缓存这一步因为服务重启没有执行,或者 Redis 临时不可用导致删除缓存失败了,就会有一个较长的时间(缓存的剩余过期时间)是数据不一致的。

那我们有没有什么手段来减少这种不一致的情况出现呢?这时候借助一个可靠的消息中间件就是一个不错的选择。

因为消息中间件有 ATLEAST-ONCE 的机制,如下图所示。

图片

我们把删除 Redis 的请求以消费 MQ 消息的手段去失效对应的 Key 值,如果 Redis 真的存在异常导致无法删除成功,我们依旧可以依靠 MQ 的重试机制来让最终 Redis 对应的 Key 失效。

而你们或许会问,极端场景下,是否存在更新数据库后 MQ 消息没发送成功,或者没机会发送出去机器就重启的情况?

这个场景的确比较麻烦,如果 MQ 使用的是 RocketMQ,我们可以借助 RocketMQ 的事务消息,来让删除缓存的消息最终一定发送出去。而如果你没有使用 RocketMQ,或者你使用的消息中间件并没有事务消息的特性,则可以采取消息表的方式让更新数据库和发送消息一起成功。事实上这个话题比较大了,我们不在这里展开。

如何处理复杂的多缓存场景?

有些时候,真实的缓存场景并不是数据库中的一个记录对应一个 Key 这么简单,有可能一个数据库记录的更新会牵扯到多个 Key 的更新。还有另外一个场景是,更新不同的数据库的记录时可能需要更新同一个 Key 值,这常见于一些 App 首页数据的缓存。

我们以一个数据库记录对应多个 Key 的场景来举例。

假如系统设计上我们缓存了一个粉丝的主页信息、主播打赏榜 TOP10 的粉丝、单日 TOP 100 的粉丝等多个信息。如果这个粉丝注销了,或者这个粉丝触发了打赏的行为,上面多个 Key 可能都需要更新。只是一个打赏的记录,你可能就要做:

1
2
3
4
updateMySQL();//更新数据库一条记录
deleteRedisKey1();//失效主页信息的缓存
updateRedisKey2();//更新打赏榜TOP10
deleteRedisKey3();//更新单日打赏榜TOP100

这就涉及多个 Redis 的操作,每一步都可能失败,影响到后面的更新。甚至从系统设计上,更新数据库可能是单独的一个服务,而这几个不同的 Key 的缓存维护却在不同的 3 个微服务中,这就大大增加了系统的复杂度和提高了缓存操作失败的可能性。最可怕的是,操作更新记录的地方很大概率不只在一个业务逻辑中,而是散发在系统各个零散的位置。 针对这个场景,解决方案和上文提到的保证最终一致性的操作一样,就是把更新缓存的操作以 MQ 消息的方式发送出去,由不同的系统或者专门的一个系统进行订阅,而做聚合的操作。如下图:

图片

图片

通过订阅 MySQL binlog 的方式处理缓存

上面讲到的 MQ 处理方式需要业务代码里面显式地发送 MQ 消息。还有一种优雅的方式便是订阅 MySQL 的 binlog,监听数据的真实变化情况以处理相关的缓存。

例如刚刚提到的例子中,如果粉丝又触发打赏了,这时候我们利用 binlog 表监听是能及时发现的,发现后就能集中处理了,而且无论是在什么系统什么位置去更新数据,都能做到集中处理。

目前业界类似的产品有 Canal,具体的操作图如下:

图片

到这里,针对大型系统缓存设计如何保证最终一致性,我们已经从策略、场景、操作方案等角度进行了细致的讲述,这些是我根据多年开发经验进行总结的,希望能对你起到帮助。

为什么在一段时间内RocketMQ的队列同时分配给了两个消费者?详细剖析消费者负载均衡中的坑(上)

之前的文章有提到过,消费者大概是怎么做负载均衡的(集群模式),如下图所示:

消费者负载均衡

集群模式下,每个消费者实例会被分配到若干条队列。正因为消费者拿到了明确的队列,所以它们才能针对对应的队列做循环拉取消息的处理,以下是消费者客户端和broker通信的部分代码,可以看到通信的参数里有一个重要的参数,就是queueId

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
        PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
        requestHeader.setConsumerGroup(this.consumerGroup);
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setQueueId(mq.getQueueId());//消息拉取必须显示的告诉broker拉取哪个queue的消息
        requestHeader.setQueueOffset(offset);
        requestHeader.setMaxMsgNums(maxNums);
        requestHeader.setSysFlag(sysFlagInner);
        requestHeader.setCommitOffset(commitOffset);
        requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
        requestHeader.setSubscription(subExpression);
        requestHeader.setSubVersion(subVersion);
        requestHeader.setExpressionType(expressionType);

        String brokerAddr = findBrokerResult.getBrokerAddr();
        if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
            brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
        }

        PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
            brokerAddr,
            requestHeader,
            timeoutMillis,
            communicationMode,
            pullCallback);

这侧面也再次印证,RocketMQ的消费模型是Pull模式。

同时,对于每个消费者实例来说,在每个消息拉取之前,实际上都是确定了队列的(不会轻易发生改变),如下图控制台所示:

消费者负载均衡控制台示例

本文尝试对RocketMQ负载均衡(哪个消费者消费哪些队列)的原理进行解析,希望能让大家对其中的基本原理进行了解,并对部分问题能作出合理解析和正确规避。

所谓Rebalance到底在解决什么问题

RocketMQ每次分配队列的过程,代码里叫Relalance,本文在某些场景下也称为重排,实际上是一个负载均衡的过程。之所以说分配队列的过程就是负载均衡的过程的原因是,RocketMQ是负载均衡分配的就是队列,而不是消息。如果这个过程RocketMQ给了较高负载高,其实并不肯定意味着你能接受更多的消息(虽然绝大部分场景你可以这样理解),而只是说我给你分配了更多的队列。为什么说有更多的队列可能并不代表你有更多消息消费呢?

例如我们举一个例子,两个消费者一个消费者实例A获得了1个队列q0,一个消费者实例B获得了两个队列,这个负载均衡的过程分配了给B更多的”负载”(队列),但是假设消费者B获得的两个队列q1 q2中的q2本身是不可写的(topic可以配置读队列数量,写队列数量,所以是可能存在一些队列可读,但是不可写的情况),又或者生产者手动的选择了发送topic的queue目标(利用selector),这个过程从来都不选择q2,只有q0,和q1在做发送,甚至大部分情况下都往q0发,这时候消费者B实例其实都没有真正意义上的更高负载。

总结一下:就是所谓的消费者Rebalance,其实是分配队列的过程,它本质上希望解决的是一个消费者的负载问题,但是实际的工作其并不直接改变一个消费者实例的真实负载(消息),而是间接的决定的——通过管理分配队列的数量。而平时我们绝大部分可以认为队列的负载就是真实的消息负载的原因是基于这样一个前提:消息的分布基本是均匀分配在不同的队列上的,所以在这个前提下,获得了更多的队列实际上就是获得了更多的消息负载。

Relance具体是如何决定分配的数量的

RocketMQ的Rebalance实际上是无中心的,这和Kafka有本质区别,Kafka虽然也是客户端做的负载均衡,但是Kafka在做负载均衡之前会选定一个Leader,由这个Leader全局把控分配的过程,而后再把每个消费者对partion的分配结果广播给各个消费者。

而RocketMQ实际上没有人做这个统一分配的,而是每个消费者自己”有秩序地”计算出自己应该获取哪些队列,你可能会觉得很神奇,到底为啥大家能如此有秩序而不打架呢?我们下面来看看。

你可能知道RocketMQ是支持很多负载均衡的算法的,甚至还支持用户自己实现一个负载均衡算法。具体的这个分配算法需要实现以下接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/** * Strategy Algorithm for message allocating between consumers */public interface AllocateMessageQueueStrategy {


    /**     
    * Allocating by consumer id     
    *     
    * @param consumerGroup current consumer group     
    * @param currentCID current consumer id     
    * @param mqAll message queue set in current topic     
    * @param cidAll consumer set in current consumer group     
    * @return The allocate result of given strategy     */
    List<MessageQueue> allocate(final String consumerGroup,final String currentCID,        final List<MessageQueue> mqAll, final List<String> cidAll);


    /** * Algorithm name    
    *     * @return The strategy name     
    */
    String getName();}

这个接口的getName()只是一个唯一标识,用以标识该消费者实例是用什么负载均衡算法去分配队列。

关键在于allocate这个方法,这个方法的出参就是这次Rebalace的结果——本消费者实例应该去获取的队列列表。

其余四个入参分别是:

1.消费者组名

2.当前的消费者实例的唯一ID,实际上就是client 的ip@instanceName。

3.全局这个消费者组可以分配的队列集合

4.当前这个消费者组消费者集合(值是消费者实例的唯一id)

试想下,假设要你去做一个分配队列的算法,实际上最关键的就是两个视图:1.这个topic下全局当前在线的消费者列表,2.topic在全局下有哪些队列。

例如,你知道当前有4个消费者 c1 c2 c3 c4在线,也知道topic 下有 8个队列 q0,q1,q2,q3,q4,…q6,那么8/4=2,你就能知道每个消费者应该获取两个队列。例如: c1–>q0,q1, c2–>q2,q3, c3–>q4,q5, c4–>q5,q6。

实际上,这就是rocketmq默认的分配方案。

但现在唯一的问题在于,我们刚刚说的,我们没有一个中心节点统一地做分配,所以RocketMQ需要做一定的修改。如对于C1:

“我是C1,我知道当前有4个消费者 c1 c2 c3 c4在线,也知道topic 下有 8个队列 q0,q1,q2,q3,q4,…q6,那么8/4=2,我就能知道每个消费者应该获取两个队列,而我算出来我要的队列是c1–>q0,q1”。

同理对于C2:

“我是C2,我知道当前有4个消费者 c1 c2 c3 c4在线,也知道topic 下有 8个队列 q0,q1,q2,q3,q4,…q6,那么8/4=2,我就能知道每个消费者应该获取两个队列,而我算出来我要的队列是c2–>q2,q3。

要做到无中心的完成这个目标,唯一需要增加的输入项就是“我是C1”,”我是C2”这样的入参,所以上文提到的allocate方法下面当前的消费者实例的唯一ID就是干这个事用的。以下是一个默认的策略,本人添加了中文注释,以达到的就是上文例子中的分配结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {

    //START: 一些前置的判断
    if (currentCID == null || currentCID.length() < 1) {
        throw new IllegalArgumentException("currentCID is empty");
    }
    if (mqAll == null || mqAll.isEmpty()) {
        throw new IllegalArgumentException("mqAll is null or mqAll empty");
    }
    if (cidAll == null || cidAll.isEmpty()) {
        throw new IllegalArgumentException("cidAll is null or cidAll empty");
    }

    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!cidAll.contains(currentCID)) {
        log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
            consumerGroup,
            currentCID,
            cidAll);
        return result;
    }
    //END: 一些前置的判断

  //核心分配逻辑开始
    int index = cidAll.indexOf(currentCID);
    int mod = mqAll.size() % cidAll.size();
    int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());//平均分配,每个cid分配多少队列
    int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; //从哪里开始分配,分配的位点index是什么。
    int range = Math.min(averageSize, mqAll.size() - startIndex);//真正分配的数量,避免除不尽的情况(实际上,有除不尽的情况)

    //开始分配本cid应该拿的队列列表
    for (int i = 0; i < range; i++) {
        result.add(mqAll.get((startIndex + i) % mqAll.size()));
    }
    return result;
}

Rebalance是怎么对多Topic做分配

细心地你可能会提一个问题,上面的提到的策略分配接口里,没有Topic的订阅关系的信息,那么如果一个消费者组订阅了topic1也订阅了topic2,topic下的队列数量可能是不一样的,那么最后分配的结果肯定也是不同的,那么怎么分配的呢?

答案是:一次topic的分配就单独调用一次分配接口,每次rebalance,实际上都会被RebalanceImpl里的rebalanceByTopic调用,而每订阅一个topic就会调用rebalanceByTopic,从而触发一次上文讲到的分配策略

Rebalance什么时候触发

其实看完上文,我们已经知道RocketMQ客户端是怎么无中心地做队列分配的了。现在还有一个问题,就是这个触发时机是什么时候?

为什么触发时机很重要呢?试想一下,突然间假设有一个消费者实例扩容了,从4个变成5个。如果有一个实例以5个去做负载均衡,其他四个老消费者以为在线的消费者还是只有四个,最后分配的结果肯定是会有重复的(某些情况甚至会漏分配),所以这个“节奏”很重要。

简单地来说,RocketMQ有三个时机会触发负载均衡:

  1. 启动的时候,会立即触发

  2. 有消费实例数量的变更的时候。broker在接受到消费者的心跳包的时候如果发现这个实例是新的实例的时候,会广播一个消费者数量变更的事件给所有消费者实例;同理,当发现一个消费者实例的连接断了,也会广播这样的一个事件

  3. 定期触发(默认20秒)。

第一个时机很好理解。启动的时候,消费者需要需要知道自己要分配什么队列,所以要触发Rebalance。

第二个时机实际也很好理解。因为有实例的数量变更,所以分配的结果肯定也需要调整的,这时候就要广播给各消费者。

第三点定期触发的原因实际上是一个补偿机制,为了避免第二点广播的时候因为网络异常等原因丢失了重分配的信号,或者还有别的场景实际上也需要重新计算分配结果(例如队列的数量变化、权限变化),所以需要一个定时任务做补偿。

从以上的触发时机可以看出,大部分情况下,消费者实例应该都是“节奏一致的”,如果出现异常场景或某些特殊场景,也会因为定时任务的补偿而达到最终一致的状态。所以如果你发现消费者分配有重复/漏分,很有可能这个消费者有短暂异常,没有及时地触发Rebalance,这个也可以从客户端日志中看出问题以便具体排查:如果一个消费者负载均衡后发现自己的分配的队列发生了变化:会有类似的日志(每一个Topic都会单独打印):

1
rebalanced result changed. allocateMessageQueueStrategyName=AVG, group=my-consumer, topic=topic_event_repay, clientId=10.22.224.39@114452, mqAllSize=9, cidAllSize=1, rebalanceResultSize=9, rebalanceResultSet=[MessageQueue [topic=topic_event_repay, brokerName=broker-1, queueId=2], MessageQueue [topic=topic_event_repay, brokerName=broker-1, queueId=1], MessageQueue [topic=topic_event_repay, brokerName=broker-2, queueId=2], MessageQueue [topic=topic_event_repay, brokerName=broker-3, queueId=0], MessageQueue [topic=topic_event_repay, brokerName=broker-1, queueId=0], MessageQueue [topic=topic_event_repay, brokerName=broker-2, queueId=1], MessageQueue [topic=topic_event_repay, brokerName=broker-3, queueId=2], MessageQueue [topic=topic_event_repay, brokerName=broker-2, queueId=0], MessageQueue [topic=topic_event_repay, brokerName=broker-3, queueId=1]]

从而判断是否及时地触发了负载均衡。

注:虽然每次Rebalance都会触发,但是如果重新分配后发现和原来已分配的队列是一致的,并不会有实际的重排动作。如:上次分配的是q0,q1,这次分配的也是q0,q1意味着整体的外部状态并没有修改,是不会有真正的重排动作的,这时候在日志上并不会有所表现。

Rebalance可能会到来消息的重复

实际上,Rebalance如果真的发现前后有变化(重排),这是一个很重的操作。因为它需要drop掉当前分配的队列以及其中的任务,还需要同步消费进度等。而由于这个过程比较长,且很可能每个消费者实际drop队列和分配队列是不一致的,所以通常情况下,重排都意味着有消息的重复投递。所以消费者端必须要做好消费的幂等。

我们不妨假设这样一个分配过程:A1本来拥有q0,这次重排需要拿q1,A2本来拥有q1,这次重排不需要q1了。那么对于A2来说,他首先要做的是:把q1的任务中断(drop队列),然后在合适的时机把q1的消费进度同步一下,再重新分配(这个例子这里不太重要),同样的A1也是要经历一样的过程:把q0的任务中断(drop队列),然后在合适的时机把q0的消费进度同步一下,然后重新分配——拿到q1。

我们假设A1的过程比A2要快,这里有两个可能:

1.一种情况是A1在A2把q1队列drop掉之前,A1就又拿到了q1,所以在这个时间窗口上观察,你会发现q1短暂地同时分配给了A1和A2。而由于RocketMQ的消费模型是Pull模式,所以A1、A2会同时拉取消息,消息就重复了。

2.另一种情况可能性更大,A2的确drop掉了队列不拉取了,但是消费进度(假设为OF1)还没及时同步到broker。那么A1拿到了q1之后,他需要第一时间知道自己从哪里(位点)拉取消息,所以他会询问一次broker,而broker这时候他的信息也是落后的,就会返回一个较老的消息位点OF2,那么[OF2,OF1]之间的消息就会重复。

可以看到,光负载均衡的这个实现原理,就会导致RocketMQ消息重复比一般的消息中间件概率要大,而且严重不少(消息是批量重复的)。

消息幂等(去重)通用解决方案,RocketMQ

消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。

举个例子,一个消息M发送到了消息中间件,消息投递到了消费程序A,A接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。

然而这种可靠的特性导致,消息可能被多次地投递。举个例子,还是刚刚这个例子,程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。

这在RockectMQ的场景来看,就是同一个messageId的消息重复投递下来了。

基于消息的投递可靠(消息不丢)是优先级更高的,所以消息不重的任务就会转移到应用程序自我实现,这也是为什么RocketMQ的文档里强调的,消费逻辑需要自我实现幂等。背后的逻辑其实就是:不丢和不重是矛盾的(在分布式场景下),但消息重复是有解决方案的,而消息丢失是很麻烦的。

简单的消息去重解决方案

例如:假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存:

1
2
insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';

要实现消息的幂等,我们可能会采取这样的方案:

1
2
3
4
5
6
7
select * from t_order where order_no = 'order123'

if(order  != null) {

    return ;//消息重复,直接返回

}

这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。

并发重复消息

假设这个消费的所有代码加起来需要1秒,有重复的消息在这1秒内(假设100毫秒)内到达(例如生产者快速重发,Broker重启等),那么很可能,上面去重代码里面会发现,数据依然是空的(因为上一条消息还没消费完,还没成功更新订单状态),

那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题(如主键冲突抛出异常、库存被重复扣减而没释放等)

并发去重的解决方案之一

要解决上面并发场景下的消息幂等问题,一个可取的方案是开启事务把select 改成 select for update语句,把记录进行锁定。

1
2
3
4
select * from t_order where order_no = 'THIS_ORDER_NO' for update  //开启事务
if(order.status != null) {
    return ;//消息重复,直接返回
}

但这样消费的逻辑会因为引入了事务包裹而导致整个消息消费可能变长,并发度下降。

当然还有其他更高级的解决方案,例如更新订单状态采取乐观锁,更新失败则消息重新消费之类的。但这需要针对具体业务场景做更复杂和细致的代码开发、库表设计,不在本文讨论的范围。

但无论是select for update, 还是乐观锁这种解决方案,实际上都是基于业务表本身做去重,这无疑增加了业务开发的复杂度, 一个业务系统里面很大部分的请求处理都是依赖MQ的,如果每个消费逻辑本身都需要基于业务本身而做去重/幂等的开发的话,这是繁琐的工作量。本文希望探索出一个通用的消息幂等处理的方法,从而抽象出一定的工具类用以适用各个业务场景。

Exactly Once

在消息中间件里,有一个投递语义的概念,而这个语义里有一个叫”Exactly Once”,即消息肯定会被成功消费,并且只会被消费一次。以下是阿里云里对Exactly Once的解释:

Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。

在我们业务消息幂等处理的领域内,可以认为业务消息的代码肯定会被执行,并且只被执行一次,那么我们可以认为是Exactly Once。

但这在分布式的场景下想找一个通用的方案几乎是不可能的。不过如果是针对基于数据库事务的消费逻辑,实际上是可行的。

基于关系数据库事务插入消息表

假设我们业务的消息消费逻辑是:更新MySQL数据库的某张订单表的状态:

1
update t_order set status = 'SUCCESS' where order_no= 'order123';

要实现Exaclty Once即这个消息只被消费一次(并且肯定要保证能消费一次),我们可以这样做:在这个数据库中增加一个消息消费记录表,把消息插入到这个表,并且把原来的订单更新和这个插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。

  1. 开启事务
  2. 插入消息表(处理好主键冲突的问题)
  3. 更新订单表(原消费逻辑)
  4. 提交事务

说明:

  1. 这时候如果消息消费成功并且事务提交了,那么消息表就插入成功了,这时候就算RocketMQ还没有收到消费位点的更新再次投递,也会插入消息失败而视为已经消费过,后续就直接更新消费位点了。这保证我们消费代码只会执行一次。

  2. 如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,消息表也没插入成功;而对于RocketMQ服务端来说,消费位点也没更新,所以消息还会继续投递下来,投递下来发现这个消息插入消息表也是成功的,所以可以继续消费。这保证了消息不丢失。

事实上,阿里云ONS的EXACTLY-ONCE语义的实现上,就是类似这个方案基于数据库的事务特性实现的。更多详情可参考:https://help.aliyun.com/document_detail/102777.html

基于这种方式,的确这是有能力拓展到不同的应用场景,因为他的实现方案与具体业务本身无关——而是依赖一个消息表。

但是这里有它的局限性

  1. 消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如Redis这种不支持事务特性的数据源,则这些数据是不可回滚的。
  2. 数据库的数据必须是在一个库,跨库无法解决

注:业务上,消息表的设计不应该以消息ID作为标识,而应该以业务的业务主键作为标识更为合理,以应对生产者的重发。阿里云上的消息去重只是RocketMQ的messageId,在生产者因为某些原因手动重发(例如上游针对一个交易重复请求了)的场景下起不到去重/幂等的效果(因消息id不同)。

更复杂的业务场景

如上所述,这种方式Exactly Once语义的实现,实际上有很多局限性,这种局限性使得这个方案基本不具备广泛应用的价值。并且由于基于事务,可能导致锁表时间过长等性能问题。

例如我们以一个比较常见的一个订单申请的消息来举例,可能有以下几步(以下统称为步骤X):

  1. 检查库存(RPC)

  2. 锁库存(RPC)

  3. 开启事务,插入订单表(MySQL)

  4. 调用某些其他下游服务(RPC)

  5. 更新订单状态

  6. commit 事务(MySQL)

这种情况下,我们如果采取消息表+本地事务的实现方式,消息消费过程中很多子过程是不支持回滚的,也就是说就算我们加了事务,实际上这背后的操作并不是原子性的。怎么说呢,就是说有可能第一条小在经历了第二步锁库存的时候,服务重启了,这时候实际上库存是已经在另外的服务里被锁定了,这并不能被回滚。当然消息还会再次投递下来,要保证消息能至少消费一遍,换句话说,锁库存的这个RPC接口本身依旧要支持“幂等”。

再者,如果在这个比较耗时的长链条场景下加入事务的包裹,将大大的降低系统的并发。所以通常情况下,我们处理这种场景的消息去重的方法还是会使用一开始说的业务自己实现去重逻辑的方式,如前面加select for update,或者使用乐观锁。

那我们有没有方法抽取出一个公共的解决方案,能兼顾去重、通用、高性能呢?

拆解消息执行过程

其中一个思路是把上面的几步,拆解成几个不同的子消息,例如:

  1. 库存系统消费A:检查库存并做锁库存,发送消息B给订单服务

  2. 订单系统消费消息B:插入订单表(MySQL),发送消息C给自己(下游系统)消费

  3. 下游系统消费消息C:处理部分逻辑,发送消息D给订单系统

  4. 订单系统消费消息D:更新订单状态

注:上述步骤需要保证本地事务和消息是一个事务的(至少是最终一致性的),这其中涉及到分布式事务消息相关的话题,不在本文论述。

可以看到这样的处理方法会使得每一步的操作都比较原子,而原子则意味着是小事务,小事务则意味着使用消息表+事务的方案显得可行。

然而,这太复杂了!这把一个本来连续的代码逻辑割裂成多个系统多次消息交互!那还不如业务代码层面上加锁实现呢。

更通用的解决方案

上面消息表+本地事务的方案之所以有其局限性和并发的短板,究其根本是因为它依赖于关系型数据库的事务,且必须要把事务包裹于整个消息消费的环节。

如果我们能不依赖事务而实现消息的去重,那么方案就能推广到更复杂的场景例如:RPC、跨库等。

例如,我们依旧使用消息表,但是不依赖事务,而是针对消息表增加消费状态,是否可以解决问题呢?

基于消息幂等表的非事务方案

dedup-solution-01

以上是去事务化后的消息幂等方案的流程,可以看到,此方案是无事务的,而是针对消息表本身做了状态的区分:消费中、消费完成。只有消费完成的消息才会被幂等处理掉。而对于已有消费中的消息,后面重复的消息会触发延迟消费(在RocketMQ的场景下即发送到RETRY TOPIC),之所以触发延迟消费是为了控制并发场景下,第二条消息在第一条消息没完成的过程中,去控制消息不丢(如果直接幂等,那么会丢失消息(同一个消息id的话),因为上一条消息如果没有消费完成的时候,第二条消息你已经告诉broker成功了,那么第一条消息这时候失败broker也不会重新投递了)

上面的流程不再细说,后文有github源码的地址,读者可以参考源码的实现,这里我们回头看看我们一开始想解决的问题是否解决了:

  1. 消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。
  2. 并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。
  3. 支持上游业务生产者重发的业务重复的消息幂等问题。

关于第一个问题已经很明显已经解决了,在此就不讨论了。

关于第二个问题是如何解决的?主要是依靠插入消息表的这个动作做控制的,假设我们用MySQL作为消息表的存储媒介(设置消息的唯一ID为主键),那么插入的动作只有一条消息会成功,后面的消息插入会由于主键冲突而失败,走向延迟消费的分支,然后后面延迟消费的时候就会变成上面第一个场景的问题。

关于第三个问题,只要我们设计去重的消息键让其支持业务的主键(例如订单号、请求流水号等),而不仅仅是messageId即可。所以也不是问题。

此方案是否有消息丢失的风险?

如果细心的读者可能会发现这里实际上是有逻辑漏洞的,问题出在上面聊到的个三问题中的第2个问题(并发场景),在并发场景下我们依赖于消息状态是做并发控制使得第2条消息重复的消息会不断延迟消费(重试)。但如果这时候第1条消息也由于一些异常原因(例如机器重启了、外部异常导致消费失败)没有成功消费成功呢?也就是说这时候延迟消费实际上每次下来看到的都是消费中的状态,最后消费就会被视为消费失败而被投递到死信Topic中(RocketMQ默认可以重复消费16次)。

有这种顾虑是正确的!对于此,我们解决的方法是,插入的消息表必须要带一个最长消费过期时间,例如10分钟,意思是如果一个消息处于消费中超过10分钟,就需要从消息表中删除(需要程序自行实现)。所以最后这个消息的流程会是这样的:

dedup-solution-01

更灵活的消息表存储媒介

我们这个方案实际上没有事务的,只需要一个存储的中心媒介,那么自然我们可以选择更灵活的存储媒介,例如Redis。使用Redis有两个好处:

  1. 性能上损耗更低
  2. 上面我们讲到的超时时间可以直接利用Redis本身的ttl实现

当然Redis存储的数据可靠性、一致性等方面是不如MySQL的,需要用户自己取舍。

源码:RocketMQDedupListener

以上方案针对RocketMQ的Java实现已经开源放到Github中,具体的使用文档可以参考https://github.com/Jaskey/RocketMQDedupListener

以下仅贴一个Readme中利用Redis去重的使用样例,用以意业务中如果使用此工具加入消息去重幂等的是多么简单:

1
2
3
4
5
6
7
8
9
10
11
        //利用Redis做幂等表
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");
        consumer.subscribe("TEST-TOPIC", "*");

        String appName = consumer.getConsumerGroup();// 大部分情况下可直接使用consumer group名
        StringRedisTemplate stringRedisTemplate = null;// 这里省略获取StringRedisTemplate的过程
        DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
        DedupConcurrentListener messageListener = new SampleListener(dedupConfig);

        consumer.registerMessageListener(messageListener);
        consumer.start();

以上代码大部分是原始RocketMQ的必须代码,唯一需要修改的仅仅是创建一个DedupConcurrentListener示例,在这个示例中指明你的消费逻辑和去重的业务键(默认是messageId)。

更多使用详情请参考Github上的说明。

这种实现是否一劳永逸?

实现到这里,似乎方案挺完美的,所有的消息都能快速的接入去重,且与具体业务实现也完全解耦。那么这样是否就完美的完成去重的所有任务呢?

很可惜,其实不是的。原因很简单:因为要保证消息至少被成功消费一遍,那么消息就有机会消费到一半的时候失败触发消息重试的可能。还是以上面的订单流程X:

  1. 检查库存(RPC)

  2. 锁库存(RPC)

  3. 开启事务,插入订单表(MySQL)

  4. 调用某些其他下游服务(RPC)

  5. 更新订单状态

  6. commit 事务(MySQL)

当消息消费到步骤3的时候,我们假设MySQL异常导致失败了,触发消息重试。因为在重试前我们会删除幂等表的记录,所以消息重试的时候就会重新进入消费代码,那么步骤1和步骤2就会重新再执行一遍。如果步骤2本身不是幂等的,那么这个业务消息消费依旧没有做好完整的幂等处理。

本实现方式的价值?

那么既然这个并不能完整的完成消息幂等,还有什么价值呢?价值可就大了!虽然这不是解决消息幂等的银弹(事实上,软件工程领域里基本没有银弹),但是他能以便捷的手段解决:

1.各种由于Broker、负载均衡等原因导致的消息重投递的重复问题

2.各种上游生产者导致的业务级别消息重复问题

3.重复消息并发消费的控制窗口问题,就算重复,重复也不可能同一时间进入消费逻辑

一些其他的消息去重的建议

也就是说,使用这个方法能保证正常的消费逻辑场景下(无异常,无异常退出),消息的幂等工作全部都能解决,无论是业务重复,还是rocketmq特性带来的重复。

事实上,这已经能解决99%的消息重复问题了,毕竟异常的场景肯定是少数的。那么如果希望异常场景下也能处理好幂等的问题,可以做以下工作降低问题率:

  1. 消息消费失败做好回滚处理。如果消息消费失败本身是带回滚机制的,那么消息重试自然就没有副作用了。
  2. 消费者做好优雅退出处理。这是为了尽可能避免消息消费到一半程序退出导致的消息重试。
  3. 一些无法做到幂等的操作,至少要做到终止消费并告警。例如锁库存的操作,如果统一的业务流水锁成功了一次库存,再触发锁库存,如果做不到幂等的处理,至少要做到消息消费触发异常(例如主键冲突导致消费异常等)
  4. 在#3做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,手动做好#1的回滚,使得下次重试消费成功。

记一次因索引合并导致的MySQL死锁分析过程

生产上偶现这段代码会出现死锁,死锁日志如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
*** (1) TRANSACTION:
TRANSACTION 424487272, ACTIVE 0 sec fetching rows
mysql tables in use 3, locked 3
LOCK WAIT 6 lock struct(s), heap size 1184, 4 row lock(s)
MySQL thread id 3205005, OS thread handle 0x7f39c21c8700, query id 567774892 10.14.34.30 finance Searching rows for update
update repay_plan_info_1
     SET actual_pay_period_amount = 38027,
        actual_pay_principal_amount = 36015,
        actual_pay_interest_amount = 1980,
        actual_pay_fee = 0,
        actual_pay_fine = 32,
        actual_discount_amount = 0,
        repay_status = 'PAYOFF',
        repay_type = 'OVERDUE',
        actual_repay_time = '2019-08-12 15:48:15.025'

     WHERE (  user_id = '938467411690006528'
                  and loan_order_no = 'LN201907120655461690006528458116'
                  and seq_no = 1
                  and repay_status <> 'PAYOFF' )

*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 3680 page no 30 n bits 136 index `PRIMARY` of table `db_loan_core_2`.`repay_plan_info_1` trx id 424487272 lock_mode X locks rec but not gap waiting
Record lock, heap no 64 PHYSICAL RECORD: n_fields 33; compact format; info bits 0
 0: len 8; hex 800000000000051e; asc         ;;
 1: len 6; hex 0000193d35df; asc    =5 ;;
 2: len 7; hex 06000001d402e7; asc        ;;
 3: len 30; hex 323031393036313332303532303634323936303534323130353730303030; asc 201906132052064296054210570000; (total 32 bytes);
 4: len 30; hex 4c4e32303139303631333031323934303136393030303635323831373534; asc LN2019061301294016900065281754; (total 32 bytes);
 5: len 4; hex 80000002; asc     ;;
 6: len 18; hex 393338343637343131363930303036353238; asc 938467411690006528;;
 7: len 4; hex 80000003; asc     ;;
 8: len 4; hex 80000258; asc    X;;
 9: len 3; hex 646179; asc day;;
 10: SQL NULL;
 11: SQL NULL;
 12: len 8; hex 8000000000005106; asc       Q ;;
 13: len 8; hex 8000000000000000; asc         ;;
 14: len 8; hex 8000000000004e1e; asc       N ;;
 15: len 8; hex 8000000000000000; asc         ;;
 16: len 8; hex 80000000000002d6; asc         ;;
 17: len 8; hex 8000000000000000; asc         ;;
 18: len 8; hex 8000000000000000; asc         ;;
 19: len 8; hex 8000000000000000; asc         ;;
 20: len 8; hex 8000000000000012; asc         ;;
 21: len 8; hex 8000000000000000; asc         ;;
 22: len 8; hex 8000000000000000; asc         ;;
 23: len 8; hex 8000000000000000; asc         ;;
 24: len 8; hex 3230313930383131; asc 20190811;;
 25: len 7; hex 4f564552445545; asc OVERDUE;;
 26: SQL NULL;
 27: len 1; hex 59; asc Y;;
 28: SQL NULL;
 29: len 5; hex 99a35a1768; asc   Z h;;
 30: len 4; hex 5d503dd8; asc ]P= ;;
 31: SQL NULL;
 32: len 5; hex 99a3d80281; asc      ;;

*** (2) TRANSACTION:
TRANSACTION 424487271, ACTIVE 0 sec fetching rows
mysql tables in use 3, locked 3
5 lock struct(s), heap size 1184, 3 row lock(s)
MySQL thread id 3204980, OS thread handle 0x7f3db0cf6700, query id 567774893 10.14.34.30 finance Searching rows for update
update repay_plan_info_1
     SET actual_pay_period_amount = 20742,
        actual_pay_principal_amount = 19998,
        actual_pay_interest_amount = 726,
        actual_pay_fee = 0,
        actual_pay_fine = 18,
        actual_discount_amount = 0,
        repay_status = 'PAYOFF',
        repay_type = 'OVERDUE',
        actual_repay_time = '2019-08-12 15:48:15.025'


     WHERE (  user_id = '938467411690006528'
                  and loan_order_no = 'LN201906130129401690006528175485'
                  and seq_no = 2
                  and repay_status <> 'PAYOFF' )
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 3680 page no 30 n bits 136 index `PRIMARY` of table `db_loan_core_2`.`repay_plan_info_1` trx id 424487271 lock_mode X locks rec but not gap
Record lock, heap no 64 PHYSICAL RECORD: n_fields 33; compact format; info bits 0
 0: len 8; hex 800000000000051e; asc         ;;
 1: len 6; hex 0000193d35df; asc    =5 ;;
 2: len 7; hex 06000001d402e7; asc        ;;
 3: len 30; hex 323031393036313332303532303634323936303534323130353730303030; asc 201906132052064296054210570000; (total 32 bytes);
 4: len 30; hex 4c4e32303139303631333031323934303136393030303635323831373534; asc LN2019061301294016900065281754; (total 32 bytes);
 5: len 4; hex 80000002; asc     ;;
 6: len 18; hex 393338343637343131363930303036353238; asc 938467411690006528;;
 7: len 4; hex 80000003; asc     ;;
 8: len 4; hex 80000258; asc    X;;
 9: len 3; hex 646179; asc day;;
 10: SQL NULL;
 11: SQL NULL;
 12: len 8; hex 8000000000005106; asc       Q ;;
 13: len 8; hex 8000000000000000; asc         ;;
 14: len 8; hex 8000000000004e1e; asc       N ;;
 15: len 8; hex 8000000000000000; asc         ;;
 16: len 8; hex 80000000000002d6; asc         ;;
 17: len 8; hex 8000000000000000; asc         ;;
 18: len 8; hex 8000000000000000; asc         ;;
 19: len 8; hex 8000000000000000; asc         ;;
 20: len 8; hex 8000000000000012; asc         ;;
 21: len 8; hex 8000000000000000; asc         ;;
 22: len 8; hex 8000000000000000; asc         ;;
 23: len 8; hex 8000000000000000; asc         ;;
 24: len 8; hex 3230313930383131; asc 20190811;;
 25: len 7; hex 4f564552445545; asc OVERDUE;;
 26: SQL NULL;
 27: len 1; hex 59; asc Y;;
 28: SQL NULL;
 29: len 5; hex 99a35a1768; asc   Z h;;
 30: len 4; hex 5d503dd8; asc ]P= ;;
 31: SQL NULL;
 32: len 5; hex 99a3d80281; asc      ;;

*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 3680 page no 137 n bits 464 index `idx_user_id` of table `db_loan_core_2`.`repay_plan_info_1` trx id 424487271 lock_mode X locks rec but not gap waiting
Record lock, heap no 161 PHYSICAL RECORD: n_fields 2; compact format; info bits 0
 0: len 18; hex 393338343637343131363930303036353238; asc 938467411690006528;;
 1: len 8; hex 800000000000051e; asc         ;;

*** WE ROLL BACK TRANSACTION (2)

代码定位

按照死锁的update sql语句,我们先定位这个死锁SQL中代码是哪个代码片段导致的。后面我们定位到,是如下代码片段导致的:

deadlock-codedeadlock-code

实际上一眼看上去,这段代码有一个很典型的业务开发场景问题:开启事务在for循环写SQL。

注:这在实际的问题定位过程中并不容易,因为死锁日志并不能反向直接定位到方法的对账、线程名等,如果一个库被多个服务同时连接,甚至定位是哪个服务都不容易。

死锁分析(1)——猜测可能消息重发

按照死锁的必要条件:循环等待条件。即 T1事务应该持有了某把锁L1,然后去申请锁L2,而这时候发现T2事务已经持有了L2,而T2事务又去申请L1,这时候就发生循环等待而死锁。

一开始会猜测,是否我们更新表的顺序在两个事务里面是反方向的,即T1事务更新ta、tb表,锁ta表的记录,准备去拿tb表记录的锁;T2事务更新tb、ta表,锁了tb记录准备去拿ta的锁,这是比较常见的死锁情况。但是从SQL看,我们死锁的SQL是同一张表的,即同一张表不同的记录。

而且从死锁日志中可以发现,两个死锁的SQL居然是“一样”的,也就是说是“同一条”SQL/同一段代码(不同的where条件参数)导致的,。即上图代码中的这段for循环更新还款计划的代码。

但是光这段For循环来看,如果要发生死锁,有可能同一批请求,更新记录的顺序是反过来的,然后又并发执行的时候,可能出现。

一开始会猜测上游触发了两条一样的请求(我们这个场景是MQ重发),出现了并发,两条消息分在两个事务中并发执行。但是如果是MQ导致的原因,FOR循环更新的记录顺序是一样的,一样的顺序意味着一样的一样的加锁顺序,一样的加锁顺序意味着最多出现获取锁超时,不会满足【循环等待】的条件,不可能死锁。所以排除MQ重发的可能。

死锁分析(2)

仔细阅读出现问题的两条SQL,可以发现一个规律,这里面都带一个相同的where条件:userId= 938467411690006528,意味着这两个事务的请求都来自一个用户发起的,然后从actual_repay_time = '2019-08-12 15:48:15.025' 来看,的确是瞬间一起执行的两个事务,但是却是不一样的两个借据。对应到真实的用户的操作上,用户的确有可能发起两个借据的同时还款,例如同时结清多笔借据。

通过出现了几次的死锁,总结出了其相同的规律:每次的死锁SQL条件都有一样的特征——相同的userId+不同的借据+并发。基本可以断定,相同的用户在同时还款多笔的时候,可能会发现死锁,但很可惜,测试环境、生产环境我们模拟这个场景都无法复现死锁的情况。

只能靠技术手段分析原因了。

思路:这是了两个完全不同的借据环境计划,操作完全不一样的数据记录,为什么会发生死锁呢?是不是锁的不是行而是锁了表?

死锁日志分析

从事务1中的

1
2
3
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:

RECORD LOCKS space id 3680 page no 30 n bits 136 index `PRIMARY` of table `db_loan_core_2`.`repay_plan_info_1` trx id 424487272 lock_mode X locks rec but not gap waiting

事务2中的

1
2
3
*** (2) HOLDS THE LOCK(S):

RECORD LOCKS space id 3680 page no 30 n bits 136 index `PRIMARY` of table `db_loan_core_2`.`repay_plan_info_1` trx id 424487271 lock_mode X locks rec but not gap

从RECORD LOCKS的标示可知,的确锁的是行锁不是表锁。且从”but not gap”的信息来看,也不存在间隙锁(注:我们线上隔离级别是read committed,本来就不存在间隙锁问题)。所以锁的位置应该的确是我们操作的行记录才对。但是非常奇怪的是,实际业务上操作的记录的确是完全隔离的(因为是不同的借据,记录没有交集),为什么会冲突呢?

再细节阅读死锁日志从事务2中获取到了一点线索:

1
2
3
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:

RECORD LOCKS space id 3680 page no 137 n bits 464 index `idx_user_id` of table `db_loan_core_2`.`repay_plan_info_1` trx id 424487271 lock_mode X locks rec but not gap waiting Record lock, heap no 161 PHYSICAL RECORD: n_fields 2; compact format; info bits 0

这个索引很奇怪,是userid的索引?

分析之前,我们先看先看锁持有情况:

T1等待锁space id 3680 page no 30

T2持有锁space id 3680 page no 30

T2等待锁space id 3680 page no 137

最后回滚了T2

可以推断space id 3680 page no 137应该被T1持有了,但是日志中没有显示出来。

  • 3680 page no 30这个锁是一个主键索引PRIMARY导致的,实际上我们没有用到我们的自增主键,是非聚集索引,所以这是先锁的非主键索引最后找到的主键去加锁。

  • 3680 page no 137这个锁就比较奇怪了,他锁在了idx_user_id这个索引,这个索引是加在userId上的,也就是T2他正在尝试锁所有这个用户的还款计划的记录!

如果是这样,问题就解释通了:

T1: 锁了某行记录X(具体怎么锁的,从死锁日志中未能获取),然后准备去获取LN201907120655461690006528458116,SEQ=1的记录的锁。

T2: 锁了LN201907120655461690006528458116,SEQ=1的锁,而他想去锁所有userId=938467411690006528的记录,这里面肯定包含了记录X,所以他无法获得X的锁。

这样就造成死锁了,因为X已经被T1持有了,而T1又在等T2释放LN201907120655461690006528458116,SEQ=1这个锁。

至于为什么T2明明准备操作LN201906130129401690006528175485,SEQ=2的记录,却之前持有了LN201907120655461690006528458116,SEQ=1的锁,大概率不是因为之前的SQL真的操作LN201907120655461690006528458116,SEQ=1的记录,也是因为他之前本想持有别的记录(从锁的详细信息上猜,可能是LN2019061301294016900065281754的相关记录),但是因为这个idx_user_id的索引问题,顺带锁着了LN201907120655461690006528458116,SEQ=1,因为都属于一个userId。

所以从时间线上分析,顺序应该是:

  1. T1锁了某记录X

  2. T2锁了某记录Y(从hold this lock的日志细节中推断,是LN2019061301294016900065281754),然后准备锁LN201906130129401690006528175485,SEQ=2,这时候的这条SQL触发了idx_user_id,连带一起锁锁住了LN201907120655461690006528458116,SEQ=1并准备锁其它同用户记录

  3. T1 执行下一条sql,准备获取LN201907120655461690006528458116,SEQ=1的锁,发现被T2获取了,等待。

  4. T2在锁其它记录的过程中发现了X,但是锁不住,发现X被T1持有。而自己又持有了LN201907120655461690006528458116,SEQ=1这行记录的锁。

这时候循环等待,死锁!

所以根源是为什么SQL会使用idx_user_id这个索引呢?

索引信息

1
2
3
4
5
PRIMARY KEY (`id`),
UNIQUE KEY `uk_repay_order` (`loan_order_no`,`seq_no`),
UNIQUE KEY `uk_repay_plan_no` (`repay_plan_no`),
KEY `idx_user_id` (`user_id`),
KEY `idx_create_time` (`create_time`)

从唯一主键是UNIQUE KEY uk_repay_order (loan_order_no,seq_no)

从我们SQL上看loan_order_no+seq_no是唯一主键,应该肯定能唯一定位一行记录,索引应该使用这个是最优的才对。

这时候我们去看T2的那条SQL的执行计划得知:其没有使用索引uk_repay_order,而使用了一个type: index_merge,走的索引是uk_repay_order_,idx_user_id,也就是他居然两个索引同时生效了。

deadlock-codedeadlock-code

解决方案

实际上,由于index merge,客观上就会增加update语句的死锁可能性,相关bug连接如下:https://bugs.mysql.com/bug.php?id=77209

而其实如果出现了index merge,在很多情况下意味着我们索引的建立可能并不合理。

解决方案有两个:

  1. 建立联合索引,以避免index merge,让联合索引生效则不会因此锁住所有该userId的记录
  2. 取消index merge的优化

遗留问题

什么时候才会触发index merge,这个在文档中似乎并没有很明确的触发实际,从这些死锁的SQL来看,某些SQL在事后explain的时候,并没有走index merge,而有些却走了。从本案例来看,事务1的SQL并没有走index merge,但是事务2这样类似的SQL却走了。

只查到一个必要条件是:

Intersect和Union都需要使用的索引是ROR的,也就时ROWID ORDERED,即针对不同的索引扫描出来的数据必须是同时按照ROWID排序的,这里的 ROWID其实也就是InnoDB的主键(如果不定义主键,InnoDB会隐式添加ROWID列作为主键)。只有每个索引是ROR的,才能进行归并排序,你懂的。 当然你可能会有疑惑,查不记录后内部进行一次sort不一样么,何必必须要ROR呢,不错,所以有了SORT-UNION。SORT-UNION就是每个非ROR的索引 排序后再进行Merge – 来自 http://www.cnblogs.com/nocode/archive/2013/01/28/2880654.html

为此我在stackoverflow 提了一个问题看后续有结论再更新:https://stackoverflow.com/questions/57987713/why-mysql-decide-to-use-index-merge-though-i-have-already-use-a-unique-key-index

Elastic Job从单点到高可用、同城主备、同城双活

在使用Elastic Job Lite做定时任务的时候,我发现很多开发的团队都是直接部署单点,这对于一些离线的非核心业务(如对账、监控等)或许无关紧要,但对于一些高可用补偿、核心数据定时修改(如金融场景的利息更新等),单点部署则“非常危险”。实际上,Elastic Job Lite是支持高可用的。网上关于Elastic Job的较高级的博文甚少,本文试图结合自身实践的一些经验,大致讲解其方案原理,并延伸至同城双机房的架构实践。

注:本文所有讨论均基于开源版本的Elastic Job Lite, 不涉及Elastic Job Cloud部分。

单点部署到高可用

如本文开头所说,很多系统的部署是采取以下部署架构:

esjob-single

原因是开发者担心定时任务在同一时刻被触发多次,导致业务有问题。实际上这是对于框架最基本的原理不了解。在官方文档的功能列表里http://elasticjob.io/docs/elastic-job-lite/00-overview/ 就已说明其最基本的功能之一就是:

作业分片一致性,保证同一分片在分布式环境中仅一个执行实例

Elastic Job会依赖zookeeper选举出对应的实例做sharding,从而保证只有一个实例在执行同一个分片(如果任务没有采取分片(即分片数是0),就意味着这个任务只有一个实例在执行)

elastic-job-架构

所以如下图所示的部署架构是完全没问题的——一来,服务只会被一个实例调用,二来,如果某个服务挂了,其他实例也能接管继续提供服务从而实现高可用。

esjob-single

双机房高可用

随着互联网业务的发展,慢慢地,对架构的高可用会有更高的要求。下一步可能就是需要同城两机房部署,那这时候为了保证定时服务在两个机房的高可用,我们架构上可能会变成这样的:

esjob-single

这样如果A机房的定时任务全部不可用了,B机房的确也能接手提供服务。而且由于集群是一个,Elastic Job能保证同一个分片在两个机房也只有一个实例运行。看似挺完美的。

注:本文不讨论zookeeper如何实现双机房的高可用,实际上从zookeeper的原理来看,仅仅两个机房组成一个大集群并不可以实现双机房高可用。

优先级调度?

以上的架构解决了定时任务在两个机房都可用的问题,但是实际的生产中,定时任务很可能是依赖存储的数据源的。而这个数据源,通常是有主备之分(这里不考虑单元化的架构的情况):例如主在A机房,备在B机房做实时同步。

如果这个定时任务只有读操作,可能没问题,因为只要配置数据源连接同机房的数据源即可。但是如果是要写入的,就有一个问题——如果所有任务都在B机房被调度了,那么这些数据的写入都会跨机房地往A机房写入,这样延迟就大大提升了,如下图所示。

esjob-single

如图所示,如果Elastic Job把任务都调度到了B机房,那么流量就一直跨机房写了,这样对于性能来说是不好的事情。

那么有没有办法达到如下效果了:

  1. 保证两个机房都随时可用,也就是一个机房的服务如果全部不可用了,另外一个机房能提供对等的服务
  2. 但一个任务可以优先指定A机房执行

Elastic Job分片策略

在回答这个问题之前,我们需要了解下Elastic Job的分片策略,根据官网的说明(http://elasticjob.io/docs/elastic-job-lite/02-guide/job-sharding-strategy/ ) ,Elastic Job是内置了一些分片策略可选的,其中有平均分配算法,作业名的哈希值奇偶数决定IP升降序算法和作业名的哈希值对服务器列表进行轮转;同时也是支持自定义的策略,实现实现JobShardingStrategy接口并实现sharding方法即可。

1
public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount)

假设我们可以实现这一的自定义策略:让做分片的时候知道哪些实例是A机房的,哪些是B机房的,然后我们知道A机房是优先的,在做分片策略的时候先把B机房的实例踢走,再复用原来的策略做分配。这不就解决我们的就近接入问题(接近数据源)了吗?

以下是利用装饰器模式自定义的一个装饰器类(抽象类,由子类判断哪些实例属于standby的实例),读者可以结合自身业务场景配合使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public abstract class JobShardingStrategyActiveStandbyDecorator implements JobShardingStrategy {

    //内置的分配策略采用原来的默认策略:平均
    private JobShardingStrategy inner = new AverageAllocationJobShardingStrategy();


    /**
     * 判断一个实例是否是备用的实例,在每次触发sharding方法之前会遍历所有实例调用此方法。
     * 如果主备实例同时存在于列表中,那么备实例将会被剔除后才进行sharding
     * @param jobInstance
     * @return
     */
    protected abstract boolean isStandby(JobInstance jobInstance, String jobName);

    @Override
    public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount) {

        List<JobInstance> jobInstancesCandidates = new ArrayList<>(jobInstances);
        List<JobInstance> removeInstance = new ArrayList<>();

        boolean removeSelf = false;
        for (JobInstance jobInstance : jobInstances) {
            boolean isStandbyInstance = false;
            try {
                isStandbyInstance = isStandby(jobInstance, jobName);
            } catch (Exception e) {
                log.warn("isStandBy throws error, consider as not standby",e);
            }

            if (isStandbyInstance) {
                if (IpUtils.getIp().equals(jobInstance.getIp())) {
                    removeSelf = true;
                }
                jobInstancesCandidates.remove(jobInstance);
                removeInstance.add(jobInstance);
            }
        }

        if (jobInstancesCandidates.isEmpty()) {//移除后发现没有实例了,就不移除了,用原来的列表(后备)的顶上
            jobInstancesCandidates = jobInstances;
            log.info("[{}] ATTENTION!! Only backup job instances exist, but do sharding with them anyway {}", jobName, JSON.toJSONString(jobInstancesCandidates));
        }

        if (!jobInstancesCandidates.equals(jobInstances)) {
            log.info("[{}] remove backup before really do sharding, removeSelf :{} , remove instances: {}", jobName, removeSelf, JSON.toJSONString(removeInstance));
            log.info("[{}] after remove backups :{}", jobName, JSON.toJSONString(jobInstancesCandidates));
        } else {//全部都是master或者全部都是slave
            log.info("[{}] job instances just remain the same {}", jobName, JSON.toJSONString(jobInstancesCandidates));
        }

        //保险一点,排序一下,保证每个实例拿到的列表肯定是一样的
        jobInstancesCandidates.sort((o1, o2) -> o1.getJobInstanceId().compareTo(o2.getJobInstanceId()));

        return inner.sharding(jobInstancesCandidates, jobName, shardingTotalCount);

    }

利用自定义策略实现同城双机房下的优先级调度

以下是一个很简单的就近接入的例子: 指定在ip白名单的,就是优先执行的,不在的都认为是备用的。我们看如何实现。

一、继承此装饰器策略,指定哪些实例是standby实例

1
2
3
4
5
6
7
8
9
10
public class ActiveStandbyESJobStrategy extends JobShardingStrategyActiveStandbyDecorator{

    @Override
    protected boolean isStandby(JobInstance jobInstance, String jobName) {
        String activeIps = "10.10.10.1,10.10.10.2";//只有这两个ip的实例才是优先执行的,其他都是备用的
        String ss[] = activeIps.split(",");
        return !Arrays.asList(ss).contains(jobInstance.getIp());//不在active名单的就是后备
    }

}

很简单吧!这样实现之后,就能达到以下类似的效果

esjob-single

二、 在任务启动前,指定使用这个策略

以下以Java的方式示意,

1
2
3
4
5
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
return LiteJobConfiguration.newBuilder(simpleJobConfiguration)
        .jobShardingStrategyClass("com.xxx.yyy.job.ActiveStandbyESJobStrategy")//使用主备的分配策略,分主备实例(输入你的实现类类名)
        .build();

这样就大功告成了。

同城双活模式

以上这样改造后,针对定时任务就已经解决了两个问题:

1、定时任务能实现在两个机房下的高可用

2、任务能优先调度到指定机房

这种模式下,对于定时任务来说,B机房其实只是个备机房——因为A机房永远都是优先调度的。

对于B机房是否有一些实际问题其实我们可能是不知道的(常见的例如数据库权限没申请),由于没有流量的验证,这时候真的出现容灾问题,B机房是否能安全接受其实并不是100%稳妥的。

我们能否再进一步做到同城双活呢?也就是,B机房也会承担一部分的流量?例如10%?

回到自定义策略的sharding接口:

1
 public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount)

在做分配的时候,是能拿到一个任务实例的全景图(所有实例列表),当前的任务名,和分片数。

基于此其实是可以做一些事情把流量引流到B机房实例的,例如:

  1. 指定任务的主机房让其是B机房优先调度(例如挑选部分只读任务,占10%的任务数)
  2. 对于分片的分配,把末尾(如1/10)的分片优先分配给B机房。

以上两种方案都能实现让A、B两个机房都有流量(有任务在被调度),从而实现所谓的双活。

以下针对上面抛出来的方案一,给出一个双活的示意代码和架构。

假设我们定时任务有两个任务,TASK_A_FIRST,TASK_B_FIRST,其中TASK_B_FIRST是一个只读的任务,那么我们可以让他配置读B机房的备库让他优先运行在B机房,而TASK_A_FIRST是一个更为频繁的任务,而且带有写操作,我们则优先运行在A机房,从而实现双机房均有流量。

注:这里任意一个机房不可用了,任务均能在另外一个机房调度,这里增强的只是对于不同任务做针对性的优先调度实现双活

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ActiveStandbyESJobStrategy extends JobShardingStrategyActiveStandbyDecorator{

    @Override
    protected boolean isStandby(JobInstance jobInstance, String jobName) {
         String activeIps = "10.10.10.1,10.10.10.2";//默认只有这两个ip的实例才是优先执行的,其他都是备用的
        if ("TASK_B_FIRST".equals(jobName)){//选择这个任务优先调度到B机房
           activeIps = "10.11.10.1,10.11.10.2";
        }

        String ss[] = activeIps.split(",");
        return !Arrays.asList(ss).contains(jobInstance.getIp());//不在active名单的就是后备
    }

}

esjob-single

[DUBBO] ReferenceConfig(null) Is Not DESTROYED When FINALIZE分析及解决

最近发现经常有类似告警:

​ [DUBBO] ReferenceConfig(null) is not DESTROYED when FINALIZE,dubbo version 2.6.2

在此记录一下分析的过程和解决方案。

从日志定位源码位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SuppressWarnings("unused")
private final Object finalizerGuardian = new Object() {
    @Override
    protected void finalize() throws Throwable {
        super.finalize();

        if (!ReferenceConfig.this.destroyed) {
            logger.warn("ReferenceConfig(" + url + ") is not DESTROYED when FINALIZE");

            /* don't destroy for now
            try {
                ReferenceConfig.this.destroy();
            } catch (Throwable t) {
                    logger.warn("Unexpected err when destroy invoker of ReferenceConfig(" + url + ") in finalize method!", t);
            }
            */
        }
    }
};

通过日志搜索源码,发现这个日志是ReferenceConfig类中一个finalizerGuardian的实例变量下复写了finalize而打印出来的。而从其中的源码来看,原本应该是希望做到:在发现原本的对象没有被释放资源的时候,手动回收资源。但是后面缺代码被注释了(猜测是有巨坑),就只保留了一个告警。所以实际上这个对象现在只起到了WARN日志提示的作用,并无实际作用。

注:复写finalize方法会导致一定的GC回收的性能问题,因为一个对象如果其中finalize被复写(哪怕只是写一个分号空实现),在垃圾回收的时候都会以单独的方法回收,简单说就是有一条独立的Finalizer线程(优先级很低)单独回收,如果对象分配频繁,会引起一定的性能问题。回到Dubbo的场景,假设在高并发的场景下不断创建ReferenceConfig对象,会影响这些对象的回收效率(并且这个过程中会产生一些java.lang.ref.Finalizer对象)甚至OOM,现在只是打印一个日志是一个不好的实践。对于finalize的原理和其对垃圾回收的影响可以参考https://blog.heaphero.io/2018/04/13/heaphero-user-manual-2/#ObjFin ,这里摘抄其中一段供参考:

Objects that have finalize() method are treated differently during garbage collection process than the ones which don’t have. During garbage collection phase, objects with finalize() method aren’t immediately evicted from the memory. Instead, as the first step, those objects are added to an internal queue of java.lang.ref.Finalizer. For entire JVM, there is only one low priority JVM thread by name ‘Finalizer’ that executes finalize() method of each object in the queue. Only after the execution of finalize() method, object becomes eligible for Garbage Collection. Assume if your application is producing a lot of objects which has finalize() method and low priority “Finalizer” thread isn’t able to keep up with executing finalize() method, then significant amount unfinalized objects will start to build up in the internal queue of java.lang.ref.Finalize, which would result in significant amount of memory wastage.

问题分析

从以上的源码上,这句日志打印的充分必要条件是:

1.ReferenceConfig被垃圾回收

2.垃圾回收的时候ReferenceConfig没有调用过destroy方法,即!ReferenceConfig.this.destroyed

代码始末

基于以上的分析,需要知道哪里我们会创建ReferenceConfig对象。通常情况下,这个对象我们是不会代码显示创建的,因为正常都是Dubbo基于我们的配置(注解或者配置文件)去管理内部的对象,只有我们在泛化调用的时候,可能会手动创建。

Dubbo官方文档里http://dubbo.apache.org/zh-cn/docs/user/demos/generic-reference.html ,关于泛化调用有类似的代码,其中就会手动创建ReferenceConfig对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 引用远程服务 
// 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>();
// 弱类型接口名
reference.setInterface("com.xxx.XxxService");
reference.setVersion("1.0.0");
// 声明为泛化接口 
reference.setGeneric(true);

// 用org.apache.dubbo.rpc.service.GenericService可以替代所有接口引用  
GenericService genericService = reference.get();

// 基本类型以及Date,List,Map等不需要转换,直接调用 
Object result = genericService.$invoke("sayHello", new String[] {"java.lang.String"}, new Object[] {"world"});

由于以上代码会存在很容易导致连接等相关资源泄露等问题,详见:http://dubbo.apache.org/zh-cn/docs/user/demos/reference-config-cache.html ,所以正常的泛化调用的使用方式则变成这样:

1
2
3
4
5
6
7
8
9
10
ReferenceConfig<XxxService> reference = new ReferenceConfig<XxxService>();
reference.setInterface(XxxService.class);
reference.setVersion("1.0.0");
......
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
// cache.get方法中会缓存 Reference对象,并且调用ReferenceConfig.get方法启动ReferenceConfig
XxxService xxxService = cache.get(reference);
// 注意! Cache会持有ReferenceConfig,不要在外部再调用ReferenceConfig的destroy方法,导致Cache内的ReferenceConfig失效!
// 使用xxxService对象
xxxService.sayHello();

Dubbo官方对于相关建议的解释是:

ReferenceConfig 实例很重,封装了与注册中心的连接以及与提供者的连接,需要缓存。否则重复生成 ReferenceConfig 可能造成性能问题并且会有内存和连接泄漏。在 API 方式编程时,容易忽略此问题。

但是,实际上这句话和其API的实际设计上存在一定的误解。Dubbo认为ReferenceConfig 实例很重,所以应该缓存这个对象,所以设计了一个ReferenceConfigCache类,这个类实际上可以认为就是一个Map,当第一次调用cache.get(reference)的时候,实际上会把这个ReferenceConfig 放到里面的Map中:

1
2
3
4
5
6
7
8
9
10
11
12
public <T> T get(ReferenceConfig<T> referenceConfig) {
    String key = generator.generateKey(referenceConfig);

    ReferenceConfig<?> config = cache.get(key);
    if (config != null) {
        return (T) config.get();
    }

    cache.putIfAbsent(key, referenceConfig);
    config = cache.get(key);
    return (T) config.get();
}

config.get()的时候,实际上会调用内部的各种初始化的代码。

但是,这里接口设计有一个自相矛盾的地方。怎么讲了,因为“ReferenceConfig 实例很重”,所以,ReferenceConfigCache帮我们做了缓存,但是使用的时候,接口的设计却只能接受一个ReferenceConfig 对象,那这个对象从何而来呢?也就是说,这个缓存其实只能给Dubbo内部使用——用户给一个ReferenceConfig 对象给Dubbo,Dubbo判断这个对象是不是和以前的对象等价,等价的话我就不用用户传递的,用以前创建好的(因为这个对象各种资源都创建好了,没必要重复创建)。

原因呼之欲出

分析到这里,其实这个问题的原因已经呼之欲出了:因为泛化调用而创建了ReferenceConfig 对象。实际上,要复现这个问题,只需要模拟不断创建临时ReferenceConfig 变量然后触发GC即可:

1
2
3
4
for (int i = 0; i<100000;i++) {
    new ReferenceConfig<>();
}
System.gc();//手动触发一下GC,确保上面创建的ReferenceConfig能触发GC回收

运行以上代码,你能发现和本文开头一模一样的告警日志。

为什么是ReferenceConfig(null),即URL为什么是null?

你可能会问,上面的分析原因是清楚了,但是为什么ReferenceConfig打印的时候,url参数总是显示null? 其实上面的分析已经回答这个问题了。上面章节我们提到“而config.get()的时候,实际上会调用内部的各种初始化的代码。”而这个初始化的过程之一就是构建合适的url,所以当我们使用`ReferenceConfigCache做泛化调用的时候,除了第一次创建的ReferenceConfig被ReferenceConfigCache缓存起来并初始化了,其他的对象其实都没有初始化过,那自然URL就是空了,同时又因为没有被缓存(所以对象在方法运行结束后不可达了)必然后面会被触发GC,那日志看起来就是ReferenceConfig(null)。

实际上分析到这里,我们可以看出这里Dubbo有两个处理不好的地方

  1. API设计不合理——认为ReferenceConfig很重需要缓存,但是使用的时候必须要提供一个对象
  2. ReferenceConfig回收的告警上,其实是存在优化空间的,像这种没有初始化过的对象,其实没必要打WARN日志,毕竟他没有初始化过就自然没有什么可destroy的

解决方案

从这里分析可以看到这行告警其实是”误报“,只要日志里url显示是null,并没有什么特殊的实际影响(在不考虑上文讲的GC问题的前提下)

1
[DUBBO] ReferenceConfig(null) is not DESTROYED when FINALIZEdubbo version 2.6.2

那如果要修复这个告警,则可考虑显示的缓存ReferenceConfig对象,不要每次泛化调用的时候都创建一个,可参考以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private synchronized ReferenceConfig<GenericService> getOrNewReferenceConfig(String interfaceClass) {
    String refConfigCacheKey = interfaceClass;
    WeakReference<ReferenceConfig<GenericService>> referenceConfigWeakReference = refConfigCache.get(refConfigCacheKey);

    if (referenceConfigWeakReference != null) {//缓存有弱引用
        ReferenceConfig<GenericService> referenceConfigFromWR = referenceConfigWeakReference.get();
        if (referenceConfigFromWR == null) {//证明没人引用自己被GC了,需要重建
            ReferenceConfig<GenericService> referenceConfig = newRefConifg(interfaceClass);
            refConfigCache.put(refConfigCacheKey, new WeakReference<>(referenceConfig));//放入缓存中,用弱应用hold住,不影响该有GC
            return referenceConfig;
        } else {
            return referenceConfigFromWR;
        }

    } else {//缓存没有,则创建
        ReferenceConfig<GenericService> referenceConfig = newRefConifg(interfaceClass);
        refConfigCache.put(refConfigCacheKey, new WeakReference<>(referenceConfig));//放入缓存中,用弱应用hold住,不影响该有GC
        return referenceConfig;
    }
}

注:

  1. 其中newRefConifg即为原先的创建ReferenceConfig的代码
  2. 之所以使用WeakReference是为了保证这个缓存的对象不会影响GC——即该回收的时候还是得回收

优雅地处理重复请求(并发请求)——附Java实现

对于一些用户请求,在某些情况下是可能重复发送的,如果是查询类操作并无大碍,但其中有些是涉及写入操作的,一旦重复了,可能会导致很严重的后果,例如交易的接口如果重复请求可能会重复下单。

重复的场景有可能是:

  1. 黑客拦截了请求,重放
  2. 前端/客户端因为某些原因请求重复发送了,或者用户在很短的时间内重复点击了。
  3. 网关重发
  4. ….

本文讨论的是如果在服务端优雅地统一处理这种情况,如何禁止用户重复点击等客户端操作不在本文的讨论范畴。

利用唯一请求编号去重

你可能会想到的是,只要请求有唯一的请求编号,那么就能借用Redis做这个去重——只要这个唯一请求编号在redis存在,证明处理过,那么就认为是重复的

代码大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    String KEY = "REQ12343456788";//请求唯一编号
    long expireTime =  1000;// 1000毫秒过期,1000ms内的重复请求会认为重复
    long expireAt = System.currentTimeMillis() + expireTime;
    String val = "expireAt@" + expireAt;

    //redis key还存在的话要就认为请求是重复的
    Boolean firstSet = stringRedisTemplate.execute((RedisCallback<Boolean>) connection -> connection.set(KEY.getBytes(), val.getBytes(), Expiration.milliseconds(expireTime), RedisStringCommands.SetOption.SET_IF_ABSENT));

    final boolean isConsiderDup;
    if (firstSet != null && firstSet) {// 第一次访问
        isConsiderDup = false;
    } else {// redis值已存在,认为是重复了
        isConsiderDup = true;
    }

业务参数去重

上面的方案能解决具备唯一请求编号的场景,例如每次写请求之前都是服务端返回一个唯一编号给客户端,客户端带着这个请求号做请求,服务端即可完成去重拦截。

但是,很多的场景下,请求并不会带这样的唯一编号!那么我们能否针对请求的参数作为一个请求的标识呢?

先考虑简单的场景,假设请求参数只有一个字段reqParam,我们可以利用以下标识去判断这个请求是否重复。 用户ID:接口名:请求参数

1
String KEY = "dedup:U="+userId + "M=" + method + "P=" + reqParam;

那么当同一个用户访问同一个接口,带着同样的reqParam过来,我们就能定位到他是重复的了。

但是问题是,我们的接口通常不是这么简单,以目前的主流,我们的参数通常是一个JSON。那么针对这种场景,我们怎么去重呢?

计算请求参数的摘要作为参数标识

假设我们把请求参数(JSON)按KEY做升序排序,排序后拼成一个字符串,作为KEY值呢?但这可能非常的长,所以我们可以考虑对这个字符串求一个MD5作为参数的摘要,以这个摘要去取代reqParam的位置。

1
String KEY = "dedup:U="+userId + "M=" + method + "P=" + reqParamMD5;

这样,请求的唯一标识就打上了!

注:MD5理论上可能会重复,但是去重通常是短时间窗口内的去重(例如一秒),一个短时间内同一个用户同样的接口能拼出不同的参数导致一样的MD5几乎是不可能的。

继续优化,考虑剔除部分时间因子

上面的问题其实已经是一个很不错的解决方案了,但是实际投入使用的时候可能发现有些问题:某些请求用户短时间内重复的点击了(例如1000毫秒发送了三次请求),但绕过了上面的去重判断(不同的KEY值)。

原因是这些请求参数的字段里面,是带时间字段的,这个字段标记用户请求的时间,服务端可以借此丢弃掉一些老的请求(例如5秒前)。如下面的例子,请求的其他参数是一样的,除了请求时间相差了一秒:

1
2
3
4
5
6
7
8
9
10
11
12
    //两个请求一样,但是请求时间差一秒
    String req = "{\n" +
            "\"requestTime\" :\"20190101120001\",\n" +
            "\"requestValue\" :\"1000\",\n" +
            "\"requestKey\" :\"key\"\n" +
            "}";

    String req2 = "{\n" +
            "\"requestTime\" :\"20190101120002\",\n" +
            "\"requestValue\" :\"1000\",\n" +
            "\"requestKey\" :\"key\"\n" +
            "}";

这种请求,我们也很可能需要挡住后面的重复请求。所以求业务参数摘要之前,需要剔除这类时间字段。还有类似的字段可能是GPS的经纬度字段(重复请求间可能有极小的差别)。

请求去重工具类,Java实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ReqDedupHelper {

    /**
     *
     * @param reqJSON 请求的参数,这里通常是JSON
     * @param excludeKeys 请求参数里面要去除哪些字段再求摘要
     * @return 去除参数的MD5摘要
     */
    public String dedupParamMD5(final String reqJSON, String... excludeKeys) {
        String decreptParam = reqJSON;

        TreeMap paramTreeMap = JSON.parseObject(decreptParam, TreeMap.class);
        if (excludeKeys!=null) {
            List<String> dedupExcludeKeys = Arrays.asList(excludeKeys);
            if (!dedupExcludeKeys.isEmpty()) {
                for (String dedupExcludeKey : dedupExcludeKeys) {
                    paramTreeMap.remove(dedupExcludeKey);
                }
            }
        }

        String paramTreeMapJSON = JSON.toJSONString(paramTreeMap);
        String md5deDupParam = jdkMD5(paramTreeMapJSON);
        log.debug("md5deDupParam = {}, excludeKeys = {} {}", md5deDupParam, Arrays.deepToString(excludeKeys), paramTreeMapJSON);
        return md5deDupParam;
    }

    private static String jdkMD5(String src) {
        String res = null;
        try {
            MessageDigest messageDigest = MessageDigest.getInstance("MD5");
            byte[] mdBytes = messageDigest.digest(src.getBytes());
            res = DatatypeConverter.printHexBinary(mdBytes);
        } catch (Exception e) {
            log.error("",e);
        }
        return res;
    }
}

下面是一些测试日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
    //两个请求一样,但是请求时间差一秒
    String req = "{\n" +
            "\"requestTime\" :\"20190101120001\",\n" +
            "\"requestValue\" :\"1000\",\n" +
            "\"requestKey\" :\"key\"\n" +
            "}";

    String req2 = "{\n" +
            "\"requestTime\" :\"20190101120002\",\n" +
            "\"requestValue\" :\"1000\",\n" +
            "\"requestKey\" :\"key\"\n" +
            "}";

    //全参数比对,所以两个参数MD5不同
    String dedupMD5 = new ReqDedupHelper().dedupParamMD5(req);
    String dedupMD52 = new ReqDedupHelper().dedupParamMD5(req2);
    System.out.println("req1MD5 = "+ dedupMD5+" , req2MD5="+dedupMD52);

    //去除时间参数比对,MD5相同
    String dedupMD53 = new ReqDedupHelper().dedupParamMD5(req,"requestTime");
    String dedupMD54 = new ReqDedupHelper().dedupParamMD5(req2,"requestTime");
    System.out.println("req1MD5 = "+ dedupMD53+" , req2MD5="+dedupMD54);

}

日志输出:

1
2
req1MD5 = 9E054D36439EBDD0604C5E65EB5C8267 , req2MD5=A2D20BAC78551C4CA09BEF97FE468A3F
req1MD5 = C2A36FED15128E9E878583CAAAFEFDE9 , req2MD5=C2A36FED15128E9E878583CAAAFEFDE9

日志说明:

  • 一开始两个参数由于requestTime是不同的,所以求去重参数摘要的时候可以发现两个值是不一样的
  • 第二次调用的时候,去除了requestTime再求摘要(第二个参数中传入了”requestTime”),则发现两个摘要是一样的,符合预期。

总结

至此,我们可以得到完整的去重解决方案,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
String userId= "12345678";//用户
String method = "pay";//接口名
String dedupMD5 = new ReqDedupHelper().dedupParamMD5(req,"requestTime");//计算请求参数摘要,其中剔除里面请求时间的干扰
String KEY = "dedup:U=" + userId + "M=" + method + "P=" + dedupMD5;

long expireTime =  1000;// 1000毫秒过期,1000ms内的重复请求会认为重复
long expireAt = System.currentTimeMillis() + expireTime;
String val = "expireAt@" + expireAt;

// NOTE:直接SETNX不支持带过期时间,所以设置+过期不是原子操作,极端情况下可能设置了就不过期了,后面相同请求可能会误以为需要去重,所以这里使用底层API,保证SETNX+过期时间是原子操作
Boolean firstSet = stringRedisTemplate.execute((RedisCallback<Boolean>) connection -> connection.set(KEY.getBytes(), val.getBytes(), Expiration.milliseconds(expireTime),
        RedisStringCommands.SetOption.SET_IF_ABSENT));

final boolean isConsiderDup;
if (firstSet != null && firstSet) {
    isConsiderDup = false;
} else {
    isConsiderDup = true;
}

Dubbo Provider中获取调用者的应用名与IP

在Dubbo做微服务的架构后,对于应用请求的追踪是尤为重要的。试想一下你有一个服务在告警,但你却不知道你的请求是从哪个服务/ip上过来的,这对于问题的定位会造成极大的困难。这对于一个上游调用方多、实例多的系统来说,问题尤为明显。

本文仅讨论如何简单地用日志的形式做到追踪调用方的的应用名与IP,详细的调用链追踪是一个系统的话题,不在本文讨论。

要无缝的获取调用方的相关信息,我们可以借助Dubbo的Filter。通过在Provider端增加一个Filter做一个打印。但具体怎么获取呢?

IP

IP的获取比较简单,我们可以在Provier端直接使用如下代码获取:

1
String clientIp = RpcContext.getContext().getRemoteHost();//这次请求来自哪个ip

应用名

应用名则没那么容易,或许你看到过url中是有一个application的参数的,那我们是否可以使用以下代码来获取呢?

1
2
3
String applicationFromContextUrl = RpcContext.getContext().getUrl().getParameter("application");//得到的是本应用的名字
String applicationFromInvokerURL = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);//得到的也是本应用的名字
LOG.info("applicationFromUrl = {}, applicationFromInvokerURL= {}", applicationFromContextUrl, applicationFromInvokerURL);

答案是否定的,事实上,无论是Provider还是Consumer,当你使用这段代码获取的时候,拿到的都是本应用。

所以需要获取调用方的应用名,我们需要显示的设置进来,这里就需要增加一个Consumer的Filter,获取consumer的应用名放入attachment中带到Provider,Provider在filter中从attachment中获取即可。

Consumer Filter中传入应用名至attachment中:

1
2
3
4
5
//手动设置consumer的应用名进attachment
String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);
if (application != null) {
      RpcContext.getContext().setAttachment("dubboApplication", application);
}

Provider Filter中从其中获取调用方的应用名:

1
String application = RpcContext.getContext().getAttachment("dubboApplication");

一对Trace Filter示意

以下是一对消费者和生产者的Filter示意,实现了以下功能:

  1. Provider端记录了打印了调用方的IP和应用名

  2. Consumer端打印了服务提供方的IP即本次调用的耗时

Consumer Filter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Activate(group = Constants.CONSUMER)
public class LogTraceConsumerFilter implements Filter {

    private static final Logger LOG = LoggerFactory.getLogger(LogTraceConsumerFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        //手动设置consumer的应用名进attachment
        String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);
        if (application != null) {
            RpcContext.getContext().setAttachment("dubboApplication", application);
        }

        Result result = null;
        String serverIp = null;
        long startTime = System.currentTimeMillis();
        try {
            result = invoker.invoke(invocation);
            serverIp = RpcContext.getContext().getRemoteHost();//这次返回结果是哪个ip
            return result;
        } finally {
            Throwable throwable = (result == null) ? null : result.getException();
            Object resultObj = (result == null) ? null : result.getValue();
            long costTime = System.currentTimeMillis() - startTime;
            LOG.info("[TRACE] Call {}, {}.{}() param:{}, return:{}, exception:{}, cost:{} ms!", serverIp, invoker.getInterface(), invocation.getMethodName(), invocation.getArguments(), resultObj, throwable, costTime);
        }
    }

}

Provider Filter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Activate(group = Constants.PROVIDER)
public class LogTraceProviderFilter implements Filter {

    private static final Logger LOG = LoggerFactory.getLogger(LogTraceProviderFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        //上游如果手动设置了consumer的应用名进attachment,则取出来打印
        String clientIp = RpcContext.getContext().getRemoteHost();//这次请求来自哪个ip
        String application = RpcContext.getContext().getAttachment("dubboApplication");
        String from = clientIp;
        if (!StringUtils.isEmpty(application)) {
            from = application+"("+clientIp+")";
        }

        LOG.warn("[Trace]From {}, {}.{}() param:{}", from, invoker.getInterface(), invocation.getMethodName(), invocation.getArguments());
        return invoker.invoke(invocation);
    }
}

Filter 文件中配置启用(注:替换对应的包名):

1
2
logTraceProviderFilter=xxxx.LogTraceProviderFilter
logTraceConsumerFilter=xxxx.LogTraceConsumerFilter

利用Java进程名进行jstat -gc

需要实时观看GC的情况,我们可以类似如下命令进行监控

1
 jstat -gc $pid 100 10

但是这里需要一个进程号,很麻烦,每个Java进程在不同机器或者启动不一样就会不一样,对于自动监控脚本或者是如果需要定位应用刚开始启动时候gc的问题时,当你手动敲完命令拿到pid的时候,可能都凉了。

对此写了一个简单的shell脚本可以传入进程名去执行jstat

gcstat.sh:

1
2
3
4
5
6
7
8
9
#! /bin/bash

process=$1
interval=$2
count=$3
pid=$(ps -ef | grep java | grep $process | grep -v grep | awk '{print $2}')
echo $pid
echo $interval
echo $count

使用:

1
./gcstat.sh  processName 1000 5

自定义ShardingSphere的加解密器

默认的Sharding Sphere 支持AES和MD5两种加密器。有些时候可能需要自定义使用自己的加解密算法,如AES的具体实现不一样等。网上公开的并没有直接的指引,通过部分源码的阅读,找到了可行的方式。需要三步:

1.实现自定义解密器 (实现ShardingEncryptor 接口)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class TestShardingEncryptor implements ShardingEncryptor {
        private Properties properties = new Properties();

         @Override
         public String getType() {
                return "TEST";
          }


          @Override
         public void init() {

         }

         @Override
         public String encrypt(final Object plaintext) {
             return "TEST-"+String.valueOf(plaintext);
         }

         @Override
        public Object decrypt(final String ciphertext) {
             return ciphertext.replaceAll("TEST-","");
         }
}

其中getType返回的字符串(本例为”TEST”)即为本加解密器的类型(后续使用的时候会使用)

2.创建org.apache.shardingsphere.spi.encrypt.ShardingEncryptor 文件

需要创建一个文件名为org.apache.shardingsphere.spi.encrypt.ShardingEncryptor放入resources路径下的\META-INF\services

sharding-encryptor-file-path

文件的内容就是类名全称,如:

com.yourcompany.TestShardingEncryptor

3.配置使用此自定义类

Java配置模式:

如果未使用Spring Boot,需要显示用代码配置

1
EncryptorRuleConfiguration encryptorConfig = new EncryptorRuleConfiguration("TEST", props);

Spring Boot配置模式:

如果使用的是Spring Boot配置模式,则需要如下配置

1
spring.shardingsphere.encrypt.encryptors.my_encryptor.type=TEST