薛定谔的风口猪

站在巨人的肩膀上学习,猪都能看得很远

消息幂等(去重)通用解决方案,RocketMQ

消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。

举个例子,一个消息M发送到了消息中间件,消息投递到了消费程序A,A接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。

然而这种可靠的特性导致,消息可能被多次地投递。举个例子,还是刚刚这个例子,程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。

这在RockectMQ的场景来看,就是同一个messageId的消息重复投递下来了。

基于消息的投递可靠(消息不丢)是优先级更高的,所以消息不重的任务就会转移到应用程序自我实现,这也是为什么RocketMQ的文档里强调的,消费逻辑需要自我实现幂等。背后的逻辑其实就是:不丢和不重是矛盾的(在分布式场景下),但消息重复是有解决方案的,而消息丢失是很麻烦的。

简单的消息去重解决方案

例如:假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存:

1
2
insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';

要实现消息的幂等,我们可能会采取这样的方案:

1
2
3
4
5
6
7
select * from t_order where order_no = 'order123'

if(order  != null) {

    return ;//消息重复,直接返回

}

这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。

并发重复消息

假设这个消费的所有代码加起来需要1秒,有重复的消息在这1秒内(假设100毫秒)内到达(例如生产者快速重发,Broker重启等),那么很可能,上面去重代码里面会发现,数据依然是空的(因为上一条消息还没消费完,还没成功更新订单状态),

那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题(如主键冲突抛出异常、库存被重复扣减而没释放等)

并发去重的解决方案之一

要解决上面并发场景下的消息幂等问题,一个可取的方案是开启事务把select 改成 select for update语句,把记录进行锁定。

1
2
3
4
select * from t_order where order_no = 'THIS_ORDER_NO' for update  //开启事务
if(order.status != null) {
    return ;//消息重复,直接返回
}

但这样消费的逻辑会因为引入了事务包裹而导致整个消息消费可能变长,并发度下降。

当然还有其他更高级的解决方案,例如更新订单状态采取乐观锁,更新失败则消息重新消费之类的。但这需要针对具体业务场景做更复杂和细致的代码开发、库表设计,不在本文讨论的范围。

但无论是select for update, 还是乐观锁这种解决方案,实际上都是基于业务表本身做去重,这无疑增加了业务开发的复杂度, 一个业务系统里面很大部分的请求处理都是依赖MQ的,如果每个消费逻辑本身都需要基于业务本身而做去重/幂等的开发的话,这是繁琐的工作量。本文希望探索出一个通用的消息幂等处理的方法,从而抽象出一定的工具类用以适用各个业务场景。

Exactly Once

在消息中间件里,有一个投递语义的概念,而这个语义里有一个叫”Exactly Once”,即消息肯定会被成功消费,并且只会被消费一次。以下是阿里云里对Exactly Once的解释:

Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。

在我们业务消息幂等处理的领域内,可以认为业务消息的代码肯定会被执行,并且只被执行一次,那么我们可以认为是Exactly Once。

但这在分布式的场景下想找一个通用的方案几乎是不可能的。不过如果是针对基于数据库事务的消费逻辑,实际上是可行的。

基于关系数据库事务插入消息表

假设我们业务的消息消费逻辑是:更新MySQL数据库的某张订单表的状态:

1
update t_order set status = 'SUCCESS' where order_no= 'order123';

要实现Exaclty Once即这个消息只被消费一次(并且肯定要保证能消费一次),我们可以这样做:在这个数据库中增加一个消息消费记录表,把消息插入到这个表,并且把原来的订单更新和这个插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。

  1. 开启事务
  2. 插入消息表(处理好主键冲突的问题)
  3. 更新订单表(原消费逻辑)
  4. 提交事务

说明:

  1. 这时候如果消息消费成功并且事务提交了,那么消息表就插入成功了,这时候就算RocketMQ还没有收到消费位点的更新再次投递,也会插入消息失败而视为已经消费过,后续就直接更新消费位点了。这保证我们消费代码只会执行一次。

  2. 如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,消息表也没插入成功;而对于RocketMQ服务端来说,消费位点也没更新,所以消息还会继续投递下来,投递下来发现这个消息插入消息表也是成功的,所以可以继续消费。这保证了消息不丢失。

事实上,阿里云ONS的EXACTLY-ONCE语义的实现上,就是类似这个方案基于数据库的事务特性实现的。更多详情可参考:https://help.aliyun.com/document_detail/102777.html

基于这种方式,的确这是有能力拓展到不同的应用场景,因为他的实现方案与具体业务本身无关——而是依赖一个消息表。

但是这里有它的局限性

  1. 消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如Redis这种不支持事务特性的数据源,则这些数据是不可回滚的。
  2. 数据库的数据必须是在一个库,跨库无法解决

注:业务上,消息表的设计不应该以消息ID作为标识,而应该以业务的业务主键作为标识更为合理,以应对生产者的重发。阿里云上的消息去重只是RocketMQ的messageId,在生产者因为某些原因手动重发(例如上游针对一个交易重复请求了)的场景下起不到去重/幂等的效果(因消息id不同)。

更复杂的业务场景

如上所述,这种方式Exactly Once语义的实现,实际上有很多局限性,这种局限性使得这个方案基本不具备广泛应用的价值。并且由于基于事务,可能导致锁表时间过长等性能问题。

例如我们以一个比较常见的一个订单申请的消息来举例,可能有以下几步(以下统称为步骤X):

  1. 检查库存(RPC)

  2. 锁库存(RPC)

  3. 开启事务,插入订单表(MySQL)

  4. 调用某些其他下游服务(RPC)

  5. 更新订单状态

  6. commit 事务(MySQL)

这种情况下,我们如果采取消息表+本地事务的实现方式,消息消费过程中很多子过程是不支持回滚的,也就是说就算我们加了事务,实际上这背后的操作并不是原子性的。怎么说呢,就是说有可能第一条小在经历了第二步锁库存的时候,服务重启了,这时候实际上库存是已经在另外的服务里被锁定了,这并不能被回滚。当然消息还会再次投递下来,要保证消息能至少消费一遍,换句话说,锁库存的这个RPC接口本身依旧要支持“幂等”。

再者,如果在这个比较耗时的长链条场景下加入事务的包裹,将大大的降低系统的并发。所以通常情况下,我们处理这种场景的消息去重的方法还是会使用一开始说的业务自己实现去重逻辑的方式,如前面加select for update,或者使用乐观锁。

那我们有没有方法抽取出一个公共的解决方案,能兼顾去重、通用、高性能呢?

拆解消息执行过程

其中一个思路是把上面的几步,拆解成几个不同的子消息,例如:

  1. 库存系统消费A:检查库存并做锁库存,发送消息B给订单服务

  2. 订单系统消费消息B:插入订单表(MySQL),发送消息C给自己(下游系统)消费

  3. 下游系统消费消息C:处理部分逻辑,发送消息D给订单系统

  4. 订单系统消费消息D:更新订单状态

注:上述步骤需要保证本地事务和消息是一个事务的(至少是最终一致性的),这其中涉及到分布式事务消息相关的话题,不在本文论述。

可以看到这样的处理方法会使得每一步的操作都比较原子,而原子则意味着是小事务,小事务则意味着使用消息表+事务的方案显得可行。

然而,这太复杂了!这把一个本来连续的代码逻辑割裂成多个系统多次消息交互!那还不如业务代码层面上加锁实现呢。

更通用的解决方案

上面消息表+本地事务的方案之所以有其局限性和并发的短板,究其根本是因为它依赖于关系型数据库的事务,且必须要把事务包裹于整个消息消费的环节。

如果我们能不依赖事务而实现消息的去重,那么方案就能推广到更复杂的场景例如:RPC、跨库等。

例如,我们依旧使用消息表,但是不依赖事务,而是针对消息表增加消费状态,是否可以解决问题呢?

基于消息幂等表的非事务方案

dedup-solution-01

以上是去事务化后的消息幂等方案的流程,可以看到,此方案是无事务的,而是针对消息表本身做了状态的区分:消费中、消费完成。只有消费完成的消息才会被幂等处理掉。而对于已有消费中的消息,后面重复的消息会触发延迟消费(在RocketMQ的场景下即发送到RETRY TOPIC),之所以触发延迟消费是为了控制并发场景下,第二条消息在第一条消息没完成的过程中,去控制消息不丢(如果直接幂等,那么会丢失消息(同一个消息id的话),因为上一条消息如果没有消费完成的时候,第二条消息你已经告诉broker成功了,那么第一条消息这时候失败broker也不会重新投递了)

上面的流程不再细说,后文有github源码的地址,读者可以参考源码的实现,这里我们回头看看我们一开始想解决的问题是否解决了:

  1. 消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。
  2. 并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。
  3. 支持上游业务生产者重发的业务重复的消息幂等问题。

关于第一个问题已经很明显已经解决了,在此就不讨论了。

关于第二个问题是如何解决的?主要是依靠插入消息表的这个动作做控制的,假设我们用MySQL作为消息表的存储媒介(设置消息的唯一ID为主键),那么插入的动作只有一条消息会成功,后面的消息插入会由于主键冲突而失败,走向延迟消费的分支,然后后面延迟消费的时候就会变成上面第一个场景的问题。

关于第三个问题,只要我们设计去重的消息键让其支持业务的主键(例如订单号、请求流水号等),而不仅仅是messageId即可。所以也不是问题。

此方案是否有消息丢失的风险?

如果细心的读者可能会发现这里实际上是有逻辑漏洞的,问题出在上面聊到的个三问题中的第2个问题(并发场景),在并发场景下我们依赖于消息状态是做并发控制使得第2条消息重复的消息会不断延迟消费(重试)。但如果这时候第1条消息也由于一些异常原因(例如机器重启了、外部异常导致消费失败)没有成功消费成功呢?也就是说这时候延迟消费实际上每次下来看到的都是消费中的状态,最后消费就会被视为消费失败而被投递到死信Topic中(RocketMQ默认可以重复消费16次)。

有这种顾虑是正确的!对于此,我们解决的方法是,插入的消息表必须要带一个最长消费过期时间,例如10分钟,意思是如果一个消息处于消费中超过10分钟,就需要从消息表中删除(需要程序自行实现)。所以最后这个消息的流程会是这样的:

dedup-solution-01

更灵活的消息表存储媒介

我们这个方案实际上没有事务的,只需要一个存储的中心媒介,那么自然我们可以选择更灵活的存储媒介,例如Redis。使用Redis有两个好处:

  1. 性能上损耗更低
  2. 上面我们讲到的超时时间可以直接利用Redis本身的ttl实现

当然Redis存储的数据可靠性、一致性等方面是不如MySQL的,需要用户自己取舍。

源码:RocketMQDedupListener

以上方案针对RocketMQ的Java实现已经开源放到Github中,具体的使用文档可以参考https://github.com/Jaskey/RocketMQDedupListener

以下仅贴一个Readme中利用Redis去重的使用样例,用以意业务中如果使用此工具加入消息去重幂等的是多么简单:

1
2
3
4
5
6
7
8
9
10
11
        //利用Redis做幂等表
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");
        consumer.subscribe("TEST-TOPIC", "*");

        String appName = consumer.getConsumerGroup();// 大部分情况下可直接使用consumer group名
        StringRedisTemplate stringRedisTemplate = null;// 这里省略获取StringRedisTemplate的过程
        DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
        DedupConcurrentListener messageListener = new SampleListener(dedupConfig);

        consumer.registerMessageListener(messageListener);
        consumer.start();

以上代码大部分是原始RocketMQ的必须代码,唯一需要修改的仅仅是创建一个DedupConcurrentListener示例,在这个示例中指明你的消费逻辑和去重的业务键(默认是messageId)。

更多使用详情请参考Github上的说明。

这种实现是否一劳永逸?

实现到这里,似乎方案挺完美的,所有的消息都能快速的接入去重,且与具体业务实现也完全解耦。那么这样是否就完美的完成去重的所有任务呢?

很可惜,其实不是的。原因很简单:因为要保证消息至少被成功消费一遍,那么消息就有机会消费到一半的时候失败触发消息重试的可能。还是以上面的订单流程X:

  1. 检查库存(RPC)

  2. 锁库存(RPC)

  3. 开启事务,插入订单表(MySQL)

  4. 调用某些其他下游服务(RPC)

  5. 更新订单状态

  6. commit 事务(MySQL)

当消息消费到步骤3的时候,我们假设MySQL异常导致失败了,触发消息重试。因为在重试前我们会删除幂等表的记录,所以消息重试的时候就会重新进入消费代码,那么步骤1和步骤2就会重新再执行一遍。如果步骤2本身不是幂等的,那么这个业务消息消费依旧没有做好完整的幂等处理。

本实现方式的价值?

那么既然这个并不能完整的完成消息幂等,还有什么价值呢?价值可就大了!虽然这不是解决消息幂等的银弹(事实上,软件工程领域里基本没有银弹),但是他能以便捷的手段解决:

1.各种由于Broker、负载均衡等原因导致的消息重投递的重复问题

2.各种上游生产者导致的业务级别消息重复问题

3.重复消息并发消费的控制窗口问题,就算重复,重复也不可能同一时间进入消费逻辑

一些其他的消息去重的建议

也就是说,使用这个方法能保证正常的消费逻辑场景下(无异常,无异常退出),消息的幂等工作全部都能解决,无论是业务重复,还是rocketmq特性带来的重复。

事实上,这已经能解决99%的消息重复问题了,毕竟异常的场景肯定是少数的。那么如果希望异常场景下也能处理好幂等的问题,可以做以下工作降低问题率:

  1. 消息消费失败做好回滚处理。如果消息消费失败本身是带回滚机制的,那么消息重试自然就没有副作用了。
  2. 消费者做好优雅退出处理。这是为了尽可能避免消息消费到一半程序退出导致的消息重试。
  3. 一些无法做到幂等的操作,至少要做到终止消费并告警。例如锁库存的操作,如果统一的业务流水锁成功了一次库存,再触发锁库存,如果做不到幂等的处理,至少要做到消息消费触发异常(例如主键冲突导致消费异常等)
  4. 在#3做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,手动做好#1的回滚,使得下次重试消费成功。

记一次因索引合并导致的MySQL死锁分析过程

生产上偶现这段代码会出现死锁,死锁日志如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
*** (1) TRANSACTION:
TRANSACTION 424487272, ACTIVE 0 sec fetching rows
mysql tables in use 3, locked 3
LOCK WAIT 6 lock struct(s), heap size 1184, 4 row lock(s)
MySQL thread id 3205005, OS thread handle 0x7f39c21c8700, query id 567774892 10.14.34.30 finance Searching rows for update
update repay_plan_info_1
     SET actual_pay_period_amount = 38027,
        actual_pay_principal_amount = 36015,
        actual_pay_interest_amount = 1980,
        actual_pay_fee = 0,
        actual_pay_fine = 32,
        actual_discount_amount = 0,
        repay_status = 'PAYOFF',
        repay_type = 'OVERDUE',
        actual_repay_time = '2019-08-12 15:48:15.025'

     WHERE (  user_id = '938467411690006528'
                  and loan_order_no = 'LN201907120655461690006528458116'
                  and seq_no = 1
                  and repay_status <> 'PAYOFF' )

*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 3680 page no 30 n bits 136 index `PRIMARY` of table `db_loan_core_2`.`repay_plan_info_1` trx id 424487272 lock_mode X locks rec but not gap waiting
Record lock, heap no 64 PHYSICAL RECORD: n_fields 33; compact format; info bits 0
 0: len 8; hex 800000000000051e; asc         ;;
 1: len 6; hex 0000193d35df; asc    =5 ;;
 2: len 7; hex 06000001d402e7; asc        ;;
 3: len 30; hex 323031393036313332303532303634323936303534323130353730303030; asc 201906132052064296054210570000; (total 32 bytes);
 4: len 30; hex 4c4e32303139303631333031323934303136393030303635323831373534; asc LN2019061301294016900065281754; (total 32 bytes);
 5: len 4; hex 80000002; asc     ;;
 6: len 18; hex 393338343637343131363930303036353238; asc 938467411690006528;;
 7: len 4; hex 80000003; asc     ;;
 8: len 4; hex 80000258; asc    X;;
 9: len 3; hex 646179; asc day;;
 10: SQL NULL;
 11: SQL NULL;
 12: len 8; hex 8000000000005106; asc       Q ;;
 13: len 8; hex 8000000000000000; asc         ;;
 14: len 8; hex 8000000000004e1e; asc       N ;;
 15: len 8; hex 8000000000000000; asc         ;;
 16: len 8; hex 80000000000002d6; asc         ;;
 17: len 8; hex 8000000000000000; asc         ;;
 18: len 8; hex 8000000000000000; asc         ;;
 19: len 8; hex 8000000000000000; asc         ;;
 20: len 8; hex 8000000000000012; asc         ;;
 21: len 8; hex 8000000000000000; asc         ;;
 22: len 8; hex 8000000000000000; asc         ;;
 23: len 8; hex 8000000000000000; asc         ;;
 24: len 8; hex 3230313930383131; asc 20190811;;
 25: len 7; hex 4f564552445545; asc OVERDUE;;
 26: SQL NULL;
 27: len 1; hex 59; asc Y;;
 28: SQL NULL;
 29: len 5; hex 99a35a1768; asc   Z h;;
 30: len 4; hex 5d503dd8; asc ]P= ;;
 31: SQL NULL;
 32: len 5; hex 99a3d80281; asc      ;;

*** (2) TRANSACTION:
TRANSACTION 424487271, ACTIVE 0 sec fetching rows
mysql tables in use 3, locked 3
5 lock struct(s), heap size 1184, 3 row lock(s)
MySQL thread id 3204980, OS thread handle 0x7f3db0cf6700, query id 567774893 10.14.34.30 finance Searching rows for update
update repay_plan_info_1
     SET actual_pay_period_amount = 20742,
        actual_pay_principal_amount = 19998,
        actual_pay_interest_amount = 726,
        actual_pay_fee = 0,
        actual_pay_fine = 18,
        actual_discount_amount = 0,
        repay_status = 'PAYOFF',
        repay_type = 'OVERDUE',
        actual_repay_time = '2019-08-12 15:48:15.025'


     WHERE (  user_id = '938467411690006528'
                  and loan_order_no = 'LN201906130129401690006528175485'
                  and seq_no = 2
                  and repay_status <> 'PAYOFF' )
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 3680 page no 30 n bits 136 index `PRIMARY` of table `db_loan_core_2`.`repay_plan_info_1` trx id 424487271 lock_mode X locks rec but not gap
Record lock, heap no 64 PHYSICAL RECORD: n_fields 33; compact format; info bits 0
 0: len 8; hex 800000000000051e; asc         ;;
 1: len 6; hex 0000193d35df; asc    =5 ;;
 2: len 7; hex 06000001d402e7; asc        ;;
 3: len 30; hex 323031393036313332303532303634323936303534323130353730303030; asc 201906132052064296054210570000; (total 32 bytes);
 4: len 30; hex 4c4e32303139303631333031323934303136393030303635323831373534; asc LN2019061301294016900065281754; (total 32 bytes);
 5: len 4; hex 80000002; asc     ;;
 6: len 18; hex 393338343637343131363930303036353238; asc 938467411690006528;;
 7: len 4; hex 80000003; asc     ;;
 8: len 4; hex 80000258; asc    X;;
 9: len 3; hex 646179; asc day;;
 10: SQL NULL;
 11: SQL NULL;
 12: len 8; hex 8000000000005106; asc       Q ;;
 13: len 8; hex 8000000000000000; asc         ;;
 14: len 8; hex 8000000000004e1e; asc       N ;;
 15: len 8; hex 8000000000000000; asc         ;;
 16: len 8; hex 80000000000002d6; asc         ;;
 17: len 8; hex 8000000000000000; asc         ;;
 18: len 8; hex 8000000000000000; asc         ;;
 19: len 8; hex 8000000000000000; asc         ;;
 20: len 8; hex 8000000000000012; asc         ;;
 21: len 8; hex 8000000000000000; asc         ;;
 22: len 8; hex 8000000000000000; asc         ;;
 23: len 8; hex 8000000000000000; asc         ;;
 24: len 8; hex 3230313930383131; asc 20190811;;
 25: len 7; hex 4f564552445545; asc OVERDUE;;
 26: SQL NULL;
 27: len 1; hex 59; asc Y;;
 28: SQL NULL;
 29: len 5; hex 99a35a1768; asc   Z h;;
 30: len 4; hex 5d503dd8; asc ]P= ;;
 31: SQL NULL;
 32: len 5; hex 99a3d80281; asc      ;;

*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 3680 page no 137 n bits 464 index `idx_user_id` of table `db_loan_core_2`.`repay_plan_info_1` trx id 424487271 lock_mode X locks rec but not gap waiting
Record lock, heap no 161 PHYSICAL RECORD: n_fields 2; compact format; info bits 0
 0: len 18; hex 393338343637343131363930303036353238; asc 938467411690006528;;
 1: len 8; hex 800000000000051e; asc         ;;

*** WE ROLL BACK TRANSACTION (2)

代码定位

按照死锁的update sql语句,我们先定位这个死锁SQL中代码是哪个代码片段导致的。后面我们定位到,是如下代码片段导致的:

deadlock-codedeadlock-code

实际上一眼看上去,这段代码有一个很典型的业务开发场景问题:开启事务在for循环写SQL。

注:这在实际的问题定位过程中并不容易,因为死锁日志并不能反向直接定位到方法的对账、线程名等,如果一个库被多个服务同时连接,甚至定位是哪个服务都不容易。

死锁分析(1)——猜测可能消息重发

按照死锁的必要条件:循环等待条件。即 T1事务应该持有了某把锁L1,然后去申请锁L2,而这时候发现T2事务已经持有了L2,而T2事务又去申请L1,这时候就发生循环等待而死锁。

一开始会猜测,是否我们更新表的顺序在两个事务里面是反方向的,即T1事务更新ta、tb表,锁ta表的记录,准备去拿tb表记录的锁;T2事务更新tb、ta表,锁了tb记录准备去拿ta的锁,这是比较常见的死锁情况。但是从SQL看,我们死锁的SQL是同一张表的,即同一张表不同的记录。

而且从死锁日志中可以发现,两个死锁的SQL居然是“一样”的,也就是说是“同一条”SQL/同一段代码(不同的where条件参数)导致的,。即上图代码中的这段for循环更新还款计划的代码。

但是光这段For循环来看,如果要发生死锁,有可能同一批请求,更新记录的顺序是反过来的,然后又并发执行的时候,可能出现。

一开始会猜测上游触发了两条一样的请求(我们这个场景是MQ重发),出现了并发,两条消息分在两个事务中并发执行。但是如果是MQ导致的原因,FOR循环更新的记录顺序是一样的,一样的顺序意味着一样的一样的加锁顺序,一样的加锁顺序意味着最多出现获取锁超时,不会满足【循环等待】的条件,不可能死锁。所以排除MQ重发的可能。

死锁分析(2)

仔细阅读出现问题的两条SQL,可以发现一个规律,这里面都带一个相同的where条件:userId= 938467411690006528,意味着这两个事务的请求都来自一个用户发起的,然后从actual_repay_time = '2019-08-12 15:48:15.025' 来看,的确是瞬间一起执行的两个事务,但是却是不一样的两个借据。对应到真实的用户的操作上,用户的确有可能发起两个借据的同时还款,例如同时结清多笔借据。

通过出现了几次的死锁,总结出了其相同的规律:每次的死锁SQL条件都有一样的特征——相同的userId+不同的借据+并发。基本可以断定,相同的用户在同时还款多笔的时候,可能会发现死锁,但很可惜,测试环境、生产环境我们模拟这个场景都无法复现死锁的情况。

只能靠技术手段分析原因了。

思路:这是了两个完全不同的借据环境计划,操作完全不一样的数据记录,为什么会发生死锁呢?是不是锁的不是行而是锁了表?

死锁日志分析

从事务1中的

1
2
3
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:

RECORD LOCKS space id 3680 page no 30 n bits 136 index `PRIMARY` of table `db_loan_core_2`.`repay_plan_info_1` trx id 424487272 lock_mode X locks rec but not gap waiting

事务2中的

1
2
3
*** (2) HOLDS THE LOCK(S):

RECORD LOCKS space id 3680 page no 30 n bits 136 index `PRIMARY` of table `db_loan_core_2`.`repay_plan_info_1` trx id 424487271 lock_mode X locks rec but not gap

从RECORD LOCKS的标示可知,的确锁的是行锁不是表锁。且从”but not gap”的信息来看,也不存在间隙锁(注:我们线上隔离级别是read committed,本来就不存在间隙锁问题)。所以锁的位置应该的确是我们操作的行记录才对。但是非常奇怪的是,实际业务上操作的记录的确是完全隔离的(因为是不同的借据,记录没有交集),为什么会冲突呢?

再细节阅读死锁日志从事务2中获取到了一点线索:

1
2
3
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:

RECORD LOCKS space id 3680 page no 137 n bits 464 index `idx_user_id` of table `db_loan_core_2`.`repay_plan_info_1` trx id 424487271 lock_mode X locks rec but not gap waiting Record lock, heap no 161 PHYSICAL RECORD: n_fields 2; compact format; info bits 0

这个索引很奇怪,是userid的索引?

分析之前,我们先看先看锁持有情况:

T1等待锁space id 3680 page no 30

T2持有锁space id 3680 page no 30

T2等待锁space id 3680 page no 137

最后回滚了T2

可以推断space id 3680 page no 137应该被T1持有了,但是日志中没有显示出来。

  • 3680 page no 30这个锁是一个主键索引PRIMARY导致的,实际上我们没有用到我们的自增主键,是非聚集索引,所以这是先锁的非主键索引最后找到的主键去加锁。

  • 3680 page no 137这个锁就比较奇怪了,他锁在了idx_user_id这个索引,这个索引是加在userId上的,也就是T2他正在尝试锁所有这个用户的还款计划的记录!

如果是这样,问题就解释通了:

T1: 锁了某行记录X(具体怎么锁的,从死锁日志中未能获取),然后准备去获取LN201907120655461690006528458116,SEQ=1的记录的锁。

T2: 锁了LN201907120655461690006528458116,SEQ=1的锁,而他想去锁所有userId=938467411690006528的记录,这里面肯定包含了记录X,所以他无法获得X的锁。

这样就造成死锁了,因为X已经被T1持有了,而T1又在等T2释放LN201907120655461690006528458116,SEQ=1这个锁。

至于为什么T2明明准备操作LN201906130129401690006528175485,SEQ=2的记录,却之前持有了LN201907120655461690006528458116,SEQ=1的锁,大概率不是因为之前的SQL真的操作LN201907120655461690006528458116,SEQ=1的记录,也是因为他之前本想持有别的记录(从锁的详细信息上猜,可能是LN2019061301294016900065281754的相关记录),但是因为这个idx_user_id的索引问题,顺带锁着了LN201907120655461690006528458116,SEQ=1,因为都属于一个userId。

所以从时间线上分析,顺序应该是:

  1. T1锁了某记录X

  2. T2锁了某记录Y(从hold this lock的日志细节中推断,是LN2019061301294016900065281754),然后准备锁LN201906130129401690006528175485,SEQ=2,这时候的这条SQL触发了idx_user_id,连带一起锁锁住了LN201907120655461690006528458116,SEQ=1并准备锁其它同用户记录

  3. T1 执行下一条sql,准备获取LN201907120655461690006528458116,SEQ=1的锁,发现被T2获取了,等待。

  4. T2在锁其它记录的过程中发现了X,但是锁不住,发现X被T1持有。而自己又持有了LN201907120655461690006528458116,SEQ=1这行记录的锁。

这时候循环等待,死锁!

所以根源是为什么SQL会使用idx_user_id这个索引呢?

索引信息

1
2
3
4
5
PRIMARY KEY (`id`),
UNIQUE KEY `uk_repay_order` (`loan_order_no`,`seq_no`),
UNIQUE KEY `uk_repay_plan_no` (`repay_plan_no`),
KEY `idx_user_id` (`user_id`),
KEY `idx_create_time` (`create_time`)

从唯一主键是UNIQUE KEY uk_repay_order (loan_order_no,seq_no)

从我们SQL上看loan_order_no+seq_no是唯一主键,应该肯定能唯一定位一行记录,索引应该使用这个是最优的才对。

这时候我们去看T2的那条SQL的执行计划得知:其没有使用索引uk_repay_order,而使用了一个type: index_merge,走的索引是uk_repay_order_,idx_user_id,也就是他居然两个索引同时生效了。

deadlock-codedeadlock-code

解决方案

实际上,由于index merge,客观上就会增加update语句的死锁可能性,相关bug连接如下:https://bugs.mysql.com/bug.php?id=77209

而其实如果出现了index merge,在很多情况下意味着我们索引的建立可能并不合理。

解决方案有两个:

  1. 建立联合索引,以避免index merge,让联合索引生效则不会因此锁住所有该userId的记录
  2. 取消index merge的优化

遗留问题

什么时候才会触发index merge,这个在文档中似乎并没有很明确的触发实际,从这些死锁的SQL来看,某些SQL在事后explain的时候,并没有走index merge,而有些却走了。从本案例来看,事务1的SQL并没有走index merge,但是事务2这样类似的SQL却走了。

只查到一个必要条件是:

Intersect和Union都需要使用的索引是ROR的,也就时ROWID ORDERED,即针对不同的索引扫描出来的数据必须是同时按照ROWID排序的,这里的 ROWID其实也就是InnoDB的主键(如果不定义主键,InnoDB会隐式添加ROWID列作为主键)。只有每个索引是ROR的,才能进行归并排序,你懂的。 当然你可能会有疑惑,查不记录后内部进行一次sort不一样么,何必必须要ROR呢,不错,所以有了SORT-UNION。SORT-UNION就是每个非ROR的索引 排序后再进行Merge – 来自 http://www.cnblogs.com/nocode/archive/2013/01/28/2880654.html

为此我在stackoverflow 提了一个问题看后续有结论再更新:https://stackoverflow.com/questions/57987713/why-mysql-decide-to-use-index-merge-though-i-have-already-use-a-unique-key-index

Elastic Job从单点到高可用、同城主备、同城双活

在使用Elastic Job Lite做定时任务的时候,我发现很多开发的团队都是直接部署单点,这对于一些离线的非核心业务(如对账、监控等)或许无关紧要,但对于一些高可用补偿、核心数据定时修改(如金融场景的利息更新等),单点部署则“非常危险”。实际上,Elastic Job Lite是支持高可用的。网上关于Elastic Job的较高级的博文甚少,本文试图结合自身实践的一些经验,大致讲解其方案原理,并延伸至同城双机房的架构实践。

注:本文所有讨论均基于开源版本的Elastic Job Lite, 不涉及Elastic Job Cloud部分。

单点部署到高可用

如本文开头所说,很多系统的部署是采取以下部署架构:

esjob-single

原因是开发者担心定时任务在同一时刻被触发多次,导致业务有问题。实际上这是对于框架最基本的原理不了解。在官方文档的功能列表里http://elasticjob.io/docs/elastic-job-lite/00-overview/ 就已说明其最基本的功能之一就是:

作业分片一致性,保证同一分片在分布式环境中仅一个执行实例

Elastic Job会依赖zookeeper选举出对应的实例做sharding,从而保证只有一个实例在执行同一个分片(如果任务没有采取分片(即分片数是0),就意味着这个任务只有一个实例在执行)

elastic-job-架构

所以如下图所示的部署架构是完全没问题的——一来,服务只会被一个实例调用,二来,如果某个服务挂了,其他实例也能接管继续提供服务从而实现高可用。

esjob-single

双机房高可用

随着互联网业务的发展,慢慢地,对架构的高可用会有更高的要求。下一步可能就是需要同城两机房部署,那这时候为了保证定时服务在两个机房的高可用,我们架构上可能会变成这样的:

esjob-single

这样如果A机房的定时任务全部不可用了,B机房的确也能接手提供服务。而且由于集群是一个,Elastic Job能保证同一个分片在两个机房也只有一个实例运行。看似挺完美的。

注:本文不讨论zookeeper如何实现双机房的高可用,实际上从zookeeper的原理来看,仅仅两个机房组成一个大集群并不可以实现双机房高可用。

优先级调度?

以上的架构解决了定时任务在两个机房都可用的问题,但是实际的生产中,定时任务很可能是依赖存储的数据源的。而这个数据源,通常是有主备之分(这里不考虑单元化的架构的情况):例如主在A机房,备在B机房做实时同步。

如果这个定时任务只有读操作,可能没问题,因为只要配置数据源连接同机房的数据源即可。但是如果是要写入的,就有一个问题——如果所有任务都在B机房被调度了,那么这些数据的写入都会跨机房地往A机房写入,这样延迟就大大提升了,如下图所示。

esjob-single

如图所示,如果Elastic Job把任务都调度到了B机房,那么流量就一直跨机房写了,这样对于性能来说是不好的事情。

那么有没有办法达到如下效果了:

  1. 保证两个机房都随时可用,也就是一个机房的服务如果全部不可用了,另外一个机房能提供对等的服务
  2. 但一个任务可以优先指定A机房执行

Elastic Job分片策略

在回答这个问题之前,我们需要了解下Elastic Job的分片策略,根据官网的说明(http://elasticjob.io/docs/elastic-job-lite/02-guide/job-sharding-strategy/ ) ,Elastic Job是内置了一些分片策略可选的,其中有平均分配算法,作业名的哈希值奇偶数决定IP升降序算法和作业名的哈希值对服务器列表进行轮转;同时也是支持自定义的策略,实现实现JobShardingStrategy接口并实现sharding方法即可。

1
public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount)

假设我们可以实现这一的自定义策略:让做分片的时候知道哪些实例是A机房的,哪些是B机房的,然后我们知道A机房是优先的,在做分片策略的时候先把B机房的实例踢走,再复用原来的策略做分配。这不就解决我们的就近接入问题(接近数据源)了吗?

以下是利用装饰器模式自定义的一个装饰器类(抽象类,由子类判断哪些实例属于standby的实例),读者可以结合自身业务场景配合使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public abstract class JobShardingStrategyActiveStandbyDecorator implements JobShardingStrategy {

    //内置的分配策略采用原来的默认策略:平均
    private JobShardingStrategy inner = new AverageAllocationJobShardingStrategy();


    /**
     * 判断一个实例是否是备用的实例,在每次触发sharding方法之前会遍历所有实例调用此方法。
     * 如果主备实例同时存在于列表中,那么备实例将会被剔除后才进行sharding
     * @param jobInstance
     * @return
     */
    protected abstract boolean isStandby(JobInstance jobInstance, String jobName);

    @Override
    public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount) {

        List<JobInstance> jobInstancesCandidates = new ArrayList<>(jobInstances);
        List<JobInstance> removeInstance = new ArrayList<>();

        boolean removeSelf = false;
        for (JobInstance jobInstance : jobInstances) {
            boolean isStandbyInstance = false;
            try {
                isStandbyInstance = isStandby(jobInstance, jobName);
            } catch (Exception e) {
                log.warn("isStandBy throws error, consider as not standby",e);
            }

            if (isStandbyInstance) {
                if (IpUtils.getIp().equals(jobInstance.getIp())) {
                    removeSelf = true;
                }
                jobInstancesCandidates.remove(jobInstance);
                removeInstance.add(jobInstance);
            }
        }

        if (jobInstancesCandidates.isEmpty()) {//移除后发现没有实例了,就不移除了,用原来的列表(后备)的顶上
            jobInstancesCandidates = jobInstances;
            log.info("[{}] ATTENTION!! Only backup job instances exist, but do sharding with them anyway {}", jobName, JSON.toJSONString(jobInstancesCandidates));
        }

        if (!jobInstancesCandidates.equals(jobInstances)) {
            log.info("[{}] remove backup before really do sharding, removeSelf :{} , remove instances: {}", jobName, removeSelf, JSON.toJSONString(removeInstance));
            log.info("[{}] after remove backups :{}", jobName, JSON.toJSONString(jobInstancesCandidates));
        } else {//全部都是master或者全部都是slave
            log.info("[{}] job instances just remain the same {}", jobName, JSON.toJSONString(jobInstancesCandidates));
        }

        //保险一点,排序一下,保证每个实例拿到的列表肯定是一样的
        jobInstancesCandidates.sort((o1, o2) -> o1.getJobInstanceId().compareTo(o2.getJobInstanceId()));

        return inner.sharding(jobInstancesCandidates, jobName, shardingTotalCount);

    }

利用自定义策略实现同城双机房下的优先级调度

以下是一个很简单的就近接入的例子: 指定在ip白名单的,就是优先执行的,不在的都认为是备用的。我们看如何实现。

一、继承此装饰器策略,指定哪些实例是standby实例

1
2
3
4
5
6
7
8
9
10
public class ActiveStandbyESJobStrategy extends JobShardingStrategyActiveStandbyDecorator{

    @Override
    protected boolean isStandby(JobInstance jobInstance, String jobName) {
        String activeIps = "10.10.10.1,10.10.10.2";//只有这两个ip的实例才是优先执行的,其他都是备用的
        String ss[] = activeIps.split(",");
        return !Arrays.asList(ss).contains(jobInstance.getIp());//不在active名单的就是后备
    }

}

很简单吧!这样实现之后,就能达到以下类似的效果

esjob-single

二、 在任务启动前,指定使用这个策略

以下以Java的方式示意,

1
2
3
4
5
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
return LiteJobConfiguration.newBuilder(simpleJobConfiguration)
        .jobShardingStrategyClass("com.xxx.yyy.job.ActiveStandbyESJobStrategy")//使用主备的分配策略,分主备实例(输入你的实现类类名)
        .build();

这样就大功告成了。

同城双活模式

以上这样改造后,针对定时任务就已经解决了两个问题:

1、定时任务能实现在两个机房下的高可用

2、任务能优先调度到指定机房

这种模式下,对于定时任务来说,B机房其实只是个备机房——因为A机房永远都是优先调度的。

对于B机房是否有一些实际问题其实我们可能是不知道的(常见的例如数据库权限没申请),由于没有流量的验证,这时候真的出现容灾问题,B机房是否能安全接受其实并不是100%稳妥的。

我们能否再进一步做到同城双活呢?也就是,B机房也会承担一部分的流量?例如10%?

回到自定义策略的sharding接口:

1
 public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount)

在做分配的时候,是能拿到一个任务实例的全景图(所有实例列表),当前的任务名,和分片数。

基于此其实是可以做一些事情把流量引流到B机房实例的,例如:

  1. 指定任务的主机房让其是B机房优先调度(例如挑选部分只读任务,占10%的任务数)
  2. 对于分片的分配,把末尾(如1/10)的分片优先分配给B机房。

以上两种方案都能实现让A、B两个机房都有流量(有任务在被调度),从而实现所谓的双活。

以下针对上面抛出来的方案一,给出一个双活的示意代码和架构。

假设我们定时任务有两个任务,TASK_A_FIRST,TASK_B_FIRST,其中TASK_B_FIRST是一个只读的任务,那么我们可以让他配置读B机房的备库让他优先运行在B机房,而TASK_A_FIRST是一个更为频繁的任务,而且带有写操作,我们则优先运行在A机房,从而实现双机房均有流量。

注:这里任意一个机房不可用了,任务均能在另外一个机房调度,这里增强的只是对于不同任务做针对性的优先调度实现双活

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ActiveStandbyESJobStrategy extends JobShardingStrategyActiveStandbyDecorator{

    @Override
    protected boolean isStandby(JobInstance jobInstance, String jobName) {
         String activeIps = "10.10.10.1,10.10.10.2";//默认只有这两个ip的实例才是优先执行的,其他都是备用的
        if ("TASK_B_FIRST".equals(jobName)){//选择这个任务优先调度到B机房
           activeIps = "10.11.10.1,10.11.10.2";
        }

        String ss[] = activeIps.split(",");
        return !Arrays.asList(ss).contains(jobInstance.getIp());//不在active名单的就是后备
    }

}

esjob-single

[DUBBO] ReferenceConfig(null) Is Not DESTROYED When FINALIZE分析及解决

最近发现经常有类似告警:

​ [DUBBO] ReferenceConfig(null) is not DESTROYED when FINALIZE,dubbo version 2.6.2

在此记录一下分析的过程和解决方案。

从日志定位源码位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SuppressWarnings("unused")
private final Object finalizerGuardian = new Object() {
    @Override
    protected void finalize() throws Throwable {
        super.finalize();

        if (!ReferenceConfig.this.destroyed) {
            logger.warn("ReferenceConfig(" + url + ") is not DESTROYED when FINALIZE");

            /* don't destroy for now
            try {
                ReferenceConfig.this.destroy();
            } catch (Throwable t) {
                    logger.warn("Unexpected err when destroy invoker of ReferenceConfig(" + url + ") in finalize method!", t);
            }
            */
        }
    }
};

通过日志搜索源码,发现这个日志是ReferenceConfig类中一个finalizerGuardian的实例变量下复写了finalize而打印出来的。而从其中的源码来看,原本应该是希望做到:在发现原本的对象没有被释放资源的时候,手动回收资源。但是后面缺代码被注释了(猜测是有巨坑),就只保留了一个告警。所以实际上这个对象现在只起到了WARN日志提示的作用,并无实际作用。

注:复写finalize方法会导致一定的GC回收的性能问题,因为一个对象如果其中finalize被复写(哪怕只是写一个分号空实现),在垃圾回收的时候都会以单独的方法回收,简单说就是有一条独立的Finalizer线程(优先级很低)单独回收,如果对象分配频繁,会引起一定的性能问题。回到Dubbo的场景,假设在高并发的场景下不断创建ReferenceConfig对象,会影响这些对象的回收效率(并且这个过程中会产生一些java.lang.ref.Finalizer对象)甚至OOM,现在只是打印一个日志是一个不好的实践。对于finalize的原理和其对垃圾回收的影响可以参考https://blog.heaphero.io/2018/04/13/heaphero-user-manual-2/#ObjFin ,这里摘抄其中一段供参考:

Objects that have finalize() method are treated differently during garbage collection process than the ones which don’t have. During garbage collection phase, objects with finalize() method aren’t immediately evicted from the memory. Instead, as the first step, those objects are added to an internal queue of java.lang.ref.Finalizer. For entire JVM, there is only one low priority JVM thread by name ‘Finalizer’ that executes finalize() method of each object in the queue. Only after the execution of finalize() method, object becomes eligible for Garbage Collection. Assume if your application is producing a lot of objects which has finalize() method and low priority “Finalizer” thread isn’t able to keep up with executing finalize() method, then significant amount unfinalized objects will start to build up in the internal queue of java.lang.ref.Finalize, which would result in significant amount of memory wastage.

问题分析

从以上的源码上,这句日志打印的充分必要条件是:

1.ReferenceConfig被垃圾回收

2.垃圾回收的时候ReferenceConfig没有调用过destroy方法,即!ReferenceConfig.this.destroyed

代码始末

基于以上的分析,需要知道哪里我们会创建ReferenceConfig对象。通常情况下,这个对象我们是不会代码显示创建的,因为正常都是Dubbo基于我们的配置(注解或者配置文件)去管理内部的对象,只有我们在泛化调用的时候,可能会手动创建。

Dubbo官方文档里http://dubbo.apache.org/zh-cn/docs/user/demos/generic-reference.html ,关于泛化调用有类似的代码,其中就会手动创建ReferenceConfig对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 引用远程服务 
// 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>();
// 弱类型接口名
reference.setInterface("com.xxx.XxxService");
reference.setVersion("1.0.0");
// 声明为泛化接口 
reference.setGeneric(true);

// 用org.apache.dubbo.rpc.service.GenericService可以替代所有接口引用  
GenericService genericService = reference.get();

// 基本类型以及Date,List,Map等不需要转换,直接调用 
Object result = genericService.$invoke("sayHello", new String[] {"java.lang.String"}, new Object[] {"world"});

由于以上代码会存在很容易导致连接等相关资源泄露等问题,详见:http://dubbo.apache.org/zh-cn/docs/user/demos/reference-config-cache.html ,所以正常的泛化调用的使用方式则变成这样:

1
2
3
4
5
6
7
8
9
10
ReferenceConfig<XxxService> reference = new ReferenceConfig<XxxService>();
reference.setInterface(XxxService.class);
reference.setVersion("1.0.0");
......
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
// cache.get方法中会缓存 Reference对象,并且调用ReferenceConfig.get方法启动ReferenceConfig
XxxService xxxService = cache.get(reference);
// 注意! Cache会持有ReferenceConfig,不要在外部再调用ReferenceConfig的destroy方法,导致Cache内的ReferenceConfig失效!
// 使用xxxService对象
xxxService.sayHello();

Dubbo官方对于相关建议的解释是:

ReferenceConfig 实例很重,封装了与注册中心的连接以及与提供者的连接,需要缓存。否则重复生成 ReferenceConfig 可能造成性能问题并且会有内存和连接泄漏。在 API 方式编程时,容易忽略此问题。

但是,实际上这句话和其API的实际设计上存在一定的误解。Dubbo认为ReferenceConfig 实例很重,所以应该缓存这个对象,所以设计了一个ReferenceConfigCache类,这个类实际上可以认为就是一个Map,当第一次调用cache.get(reference)的时候,实际上会把这个ReferenceConfig 放到里面的Map中:

1
2
3
4
5
6
7
8
9
10
11
12
public <T> T get(ReferenceConfig<T> referenceConfig) {
    String key = generator.generateKey(referenceConfig);

    ReferenceConfig<?> config = cache.get(key);
    if (config != null) {
        return (T) config.get();
    }

    cache.putIfAbsent(key, referenceConfig);
    config = cache.get(key);
    return (T) config.get();
}

config.get()的时候,实际上会调用内部的各种初始化的代码。

但是,这里接口设计有一个自相矛盾的地方。怎么讲了,因为“ReferenceConfig 实例很重”,所以,ReferenceConfigCache帮我们做了缓存,但是使用的时候,接口的设计却只能接受一个ReferenceConfig 对象,那这个对象从何而来呢?也就是说,这个缓存其实只能给Dubbo内部使用——用户给一个ReferenceConfig 对象给Dubbo,Dubbo判断这个对象是不是和以前的对象等价,等价的话我就不用用户传递的,用以前创建好的(因为这个对象各种资源都创建好了,没必要重复创建)。

原因呼之欲出

分析到这里,其实这个问题的原因已经呼之欲出了:因为泛化调用而创建了ReferenceConfig 对象。实际上,要复现这个问题,只需要模拟不断创建临时ReferenceConfig 变量然后触发GC即可:

1
2
3
4
for (int i = 0; i<100000;i++) {
    new ReferenceConfig<>();
}
System.gc();//手动触发一下GC,确保上面创建的ReferenceConfig能触发GC回收

运行以上代码,你能发现和本文开头一模一样的告警日志。

为什么是ReferenceConfig(null),即URL为什么是null?

你可能会问,上面的分析原因是清楚了,但是为什么ReferenceConfig打印的时候,url参数总是显示null? 其实上面的分析已经回答这个问题了。上面章节我们提到“而config.get()的时候,实际上会调用内部的各种初始化的代码。”而这个初始化的过程之一就是构建合适的url,所以当我们使用`ReferenceConfigCache做泛化调用的时候,除了第一次创建的ReferenceConfig被ReferenceConfigCache缓存起来并初始化了,其他的对象其实都没有初始化过,那自然URL就是空了,同时又因为没有被缓存(所以对象在方法运行结束后不可达了)必然后面会被触发GC,那日志看起来就是ReferenceConfig(null)。

实际上分析到这里,我们可以看出这里Dubbo有两个处理不好的地方

  1. API设计不合理——认为ReferenceConfig很重需要缓存,但是使用的时候必须要提供一个对象
  2. ReferenceConfig回收的告警上,其实是存在优化空间的,像这种没有初始化过的对象,其实没必要打WARN日志,毕竟他没有初始化过就自然没有什么可destroy的

解决方案

从这里分析可以看到这行告警其实是”误报“,只要日志里url显示是null,并没有什么特殊的实际影响(在不考虑上文讲的GC问题的前提下)

1
[DUBBO] ReferenceConfig(null) is not DESTROYED when FINALIZEdubbo version 2.6.2

那如果要修复这个告警,则可考虑显示的缓存ReferenceConfig对象,不要每次泛化调用的时候都创建一个,可参考以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private synchronized ReferenceConfig<GenericService> getOrNewReferenceConfig(String interfaceClass) {
    String refConfigCacheKey = interfaceClass;
    WeakReference<ReferenceConfig<GenericService>> referenceConfigWeakReference = refConfigCache.get(refConfigCacheKey);

    if (referenceConfigWeakReference != null) {//缓存有弱引用
        ReferenceConfig<GenericService> referenceConfigFromWR = referenceConfigWeakReference.get();
        if (referenceConfigFromWR == null) {//证明没人引用自己被GC了,需要重建
            ReferenceConfig<GenericService> referenceConfig = newRefConifg(interfaceClass);
            refConfigCache.put(refConfigCacheKey, new WeakReference<>(referenceConfig));//放入缓存中,用弱应用hold住,不影响该有GC
            return referenceConfig;
        } else {
            return referenceConfigFromWR;
        }

    } else {//缓存没有,则创建
        ReferenceConfig<GenericService> referenceConfig = newRefConifg(interfaceClass);
        refConfigCache.put(refConfigCacheKey, new WeakReference<>(referenceConfig));//放入缓存中,用弱应用hold住,不影响该有GC
        return referenceConfig;
    }
}

注:

  1. 其中newRefConifg即为原先的创建ReferenceConfig的代码
  2. 之所以使用WeakReference是为了保证这个缓存的对象不会影响GC——即该回收的时候还是得回收

优雅地处理重复请求(并发请求)——附Java实现

对于一些用户请求,在某些情况下是可能重复发送的,如果是查询类操作并无大碍,但其中有些是涉及写入操作的,一旦重复了,可能会导致很严重的后果,例如交易的接口如果重复请求可能会重复下单。

重复的场景有可能是:

  1. 黑客拦截了请求,重放
  2. 前端/客户端因为某些原因请求重复发送了,或者用户在很短的时间内重复点击了。
  3. 网关重发
  4. ….

本文讨论的是如果在服务端优雅地统一处理这种情况,如何禁止用户重复点击等客户端操作不在本文的讨论范畴。

利用唯一请求编号去重

你可能会想到的是,只要请求有唯一的请求编号,那么就能借用Redis做这个去重——只要这个唯一请求编号在redis存在,证明处理过,那么就认为是重复的

代码大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    String KEY = "REQ12343456788";//请求唯一编号
    long expireTime =  1000;// 1000毫秒过期,1000ms内的重复请求会认为重复
    long expireAt = System.currentTimeMillis() + expireTime;
    String val = "expireAt@" + expireAt;

    //redis key还存在的话要就认为请求是重复的
    Boolean firstSet = stringRedisTemplate.execute((RedisCallback<Boolean>) connection -> connection.set(KEY.getBytes(), val.getBytes(), Expiration.milliseconds(expireTime), RedisStringCommands.SetOption.SET_IF_ABSENT));

    final boolean isConsiderDup;
    if (firstSet != null && firstSet) {// 第一次访问
        isConsiderDup = false;
    } else {// redis值已存在,认为是重复了
        isConsiderDup = true;
    }

业务参数去重

上面的方案能解决具备唯一请求编号的场景,例如每次写请求之前都是服务端返回一个唯一编号给客户端,客户端带着这个请求号做请求,服务端即可完成去重拦截。

但是,很多的场景下,请求并不会带这样的唯一编号!那么我们能否针对请求的参数作为一个请求的标识呢?

先考虑简单的场景,假设请求参数只有一个字段reqParam,我们可以利用以下标识去判断这个请求是否重复。 用户ID:接口名:请求参数

1
String KEY = "dedup:U="+userId + "M=" + method + "P=" + reqParam;

那么当同一个用户访问同一个接口,带着同样的reqParam过来,我们就能定位到他是重复的了。

但是问题是,我们的接口通常不是这么简单,以目前的主流,我们的参数通常是一个JSON。那么针对这种场景,我们怎么去重呢?

计算请求参数的摘要作为参数标识

假设我们把请求参数(JSON)按KEY做升序排序,排序后拼成一个字符串,作为KEY值呢?但这可能非常的长,所以我们可以考虑对这个字符串求一个MD5作为参数的摘要,以这个摘要去取代reqParam的位置。

1
String KEY = "dedup:U="+userId + "M=" + method + "P=" + reqParamMD5;

这样,请求的唯一标识就打上了!

注:MD5理论上可能会重复,但是去重通常是短时间窗口内的去重(例如一秒),一个短时间内同一个用户同样的接口能拼出不同的参数导致一样的MD5几乎是不可能的。

继续优化,考虑剔除部分时间因子

上面的问题其实已经是一个很不错的解决方案了,但是实际投入使用的时候可能发现有些问题:某些请求用户短时间内重复的点击了(例如1000毫秒发送了三次请求),但绕过了上面的去重判断(不同的KEY值)。

原因是这些请求参数的字段里面,是带时间字段的,这个字段标记用户请求的时间,服务端可以借此丢弃掉一些老的请求(例如5秒前)。如下面的例子,请求的其他参数是一样的,除了请求时间相差了一秒:

1
2
3
4
5
6
7
8
9
10
11
12
    //两个请求一样,但是请求时间差一秒
    String req = "{\n" +
            "\"requestTime\" :\"20190101120001\",\n" +
            "\"requestValue\" :\"1000\",\n" +
            "\"requestKey\" :\"key\"\n" +
            "}";

    String req2 = "{\n" +
            "\"requestTime\" :\"20190101120002\",\n" +
            "\"requestValue\" :\"1000\",\n" +
            "\"requestKey\" :\"key\"\n" +
            "}";

这种请求,我们也很可能需要挡住后面的重复请求。所以求业务参数摘要之前,需要剔除这类时间字段。还有类似的字段可能是GPS的经纬度字段(重复请求间可能有极小的差别)。

请求去重工具类,Java实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ReqDedupHelper {

    /**
     *
     * @param reqJSON 请求的参数,这里通常是JSON
     * @param excludeKeys 请求参数里面要去除哪些字段再求摘要
     * @return 去除参数的MD5摘要
     */
    public String dedupParamMD5(final String reqJSON, String... excludeKeys) {
        String decreptParam = reqJSON;

        TreeMap paramTreeMap = JSON.parseObject(decreptParam, TreeMap.class);
        if (excludeKeys!=null) {
            List<String> dedupExcludeKeys = Arrays.asList(excludeKeys);
            if (!dedupExcludeKeys.isEmpty()) {
                for (String dedupExcludeKey : dedupExcludeKeys) {
                    paramTreeMap.remove(dedupExcludeKey);
                }
            }
        }

        String paramTreeMapJSON = JSON.toJSONString(paramTreeMap);
        String md5deDupParam = jdkMD5(paramTreeMapJSON);
        log.debug("md5deDupParam = {}, excludeKeys = {} {}", md5deDupParam, Arrays.deepToString(excludeKeys), paramTreeMapJSON);
        return md5deDupParam;
    }

    private static String jdkMD5(String src) {
        String res = null;
        try {
            MessageDigest messageDigest = MessageDigest.getInstance("MD5");
            byte[] mdBytes = messageDigest.digest(src.getBytes());
            res = DatatypeConverter.printHexBinary(mdBytes);
        } catch (Exception e) {
            log.error("",e);
        }
        return res;
    }
}

下面是一些测试日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
    //两个请求一样,但是请求时间差一秒
    String req = "{\n" +
            "\"requestTime\" :\"20190101120001\",\n" +
            "\"requestValue\" :\"1000\",\n" +
            "\"requestKey\" :\"key\"\n" +
            "}";

    String req2 = "{\n" +
            "\"requestTime\" :\"20190101120002\",\n" +
            "\"requestValue\" :\"1000\",\n" +
            "\"requestKey\" :\"key\"\n" +
            "}";

    //全参数比对,所以两个参数MD5不同
    String dedupMD5 = new ReqDedupHelper().dedupParamMD5(req);
    String dedupMD52 = new ReqDedupHelper().dedupParamMD5(req2);
    System.out.println("req1MD5 = "+ dedupMD5+" , req2MD5="+dedupMD52);

    //去除时间参数比对,MD5相同
    String dedupMD53 = new ReqDedupHelper().dedupParamMD5(req,"requestTime");
    String dedupMD54 = new ReqDedupHelper().dedupParamMD5(req2,"requestTime");
    System.out.println("req1MD5 = "+ dedupMD53+" , req2MD5="+dedupMD54);

}

日志输出:

1
2
req1MD5 = 9E054D36439EBDD0604C5E65EB5C8267 , req2MD5=A2D20BAC78551C4CA09BEF97FE468A3F
req1MD5 = C2A36FED15128E9E878583CAAAFEFDE9 , req2MD5=C2A36FED15128E9E878583CAAAFEFDE9

日志说明:

  • 一开始两个参数由于requestTime是不同的,所以求去重参数摘要的时候可以发现两个值是不一样的
  • 第二次调用的时候,去除了requestTime再求摘要(第二个参数中传入了”requestTime”),则发现两个摘要是一样的,符合预期。

总结

至此,我们可以得到完整的去重解决方案,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
String userId= "12345678";//用户
String method = "pay";//接口名
String dedupMD5 = new ReqDedupHelper().dedupParamMD5(req,"requestTime");//计算请求参数摘要,其中剔除里面请求时间的干扰
String KEY = "dedup:U=" + userId + "M=" + method + "P=" + dedupMD5;

long expireTime =  1000;// 1000毫秒过期,1000ms内的重复请求会认为重复
long expireAt = System.currentTimeMillis() + expireTime;
String val = "expireAt@" + expireAt;

// NOTE:直接SETNX不支持带过期时间,所以设置+过期不是原子操作,极端情况下可能设置了就不过期了,后面相同请求可能会误以为需要去重,所以这里使用底层API,保证SETNX+过期时间是原子操作
Boolean firstSet = stringRedisTemplate.execute((RedisCallback<Boolean>) connection -> connection.set(KEY.getBytes(), val.getBytes(), Expiration.milliseconds(expireTime),
        RedisStringCommands.SetOption.SET_IF_ABSENT));

final boolean isConsiderDup;
if (firstSet != null && firstSet) {
    isConsiderDup = false;
} else {
    isConsiderDup = true;
}

Dubbo Provider中获取调用者的应用名与IP

在Dubbo做微服务的架构后,对于应用请求的追踪是尤为重要的。试想一下你有一个服务在告警,但你却不知道你的请求是从哪个服务/ip上过来的,这对于问题的定位会造成极大的困难。这对于一个上游调用方多、实例多的系统来说,问题尤为明显。

本文仅讨论如何简单地用日志的形式做到追踪调用方的的应用名与IP,详细的调用链追踪是一个系统的话题,不在本文讨论。

要无缝的获取调用方的相关信息,我们可以借助Dubbo的Filter。通过在Provider端增加一个Filter做一个打印。但具体怎么获取呢?

IP

IP的获取比较简单,我们可以在Provier端直接使用如下代码获取:

1
String clientIp = RpcContext.getContext().getRemoteHost();//这次请求来自哪个ip

应用名

应用名则没那么容易,或许你看到过url中是有一个application的参数的,那我们是否可以使用以下代码来获取呢?

1
2
3
String applicationFromContextUrl = RpcContext.getContext().getUrl().getParameter("application");//得到的是本应用的名字
String applicationFromInvokerURL = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);//得到的也是本应用的名字
LOG.info("applicationFromUrl = {}, applicationFromInvokerURL= {}", applicationFromContextUrl, applicationFromInvokerURL);

答案是否定的,事实上,无论是Provider还是Consumer,当你使用这段代码获取的时候,拿到的都是本应用。

所以需要获取调用方的应用名,我们需要显示的设置进来,这里就需要增加一个Consumer的Filter,获取consumer的应用名放入attachment中带到Provider,Provider在filter中从attachment中获取即可。

Consumer Filter中传入应用名至attachment中:

1
2
3
4
5
//手动设置consumer的应用名进attachment
String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);
if (application != null) {
      RpcContext.getContext().setAttachment("dubboApplication", application);
}

Provider Filter中从其中获取调用方的应用名:

1
String application = RpcContext.getContext().getAttachment("dubboApplication");

一对Trace Filter示意

以下是一对消费者和生产者的Filter示意,实现了以下功能:

  1. Provider端记录了打印了调用方的IP和应用名

  2. Consumer端打印了服务提供方的IP即本次调用的耗时

Consumer Filter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Activate(group = Constants.CONSUMER)
public class LogTraceConsumerFilter implements Filter {

    private static final Logger LOG = LoggerFactory.getLogger(LogTraceConsumerFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        //手动设置consumer的应用名进attachment
        String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);
        if (application != null) {
            RpcContext.getContext().setAttachment("dubboApplication", application);
        }

        Result result = null;
        String serverIp = null;
        long startTime = System.currentTimeMillis();
        try {
            result = invoker.invoke(invocation);
            serverIp = RpcContext.getContext().getRemoteHost();//这次返回结果是哪个ip
            return result;
        } finally {
            Throwable throwable = (result == null) ? null : result.getException();
            Object resultObj = (result == null) ? null : result.getValue();
            long costTime = System.currentTimeMillis() - startTime;
            LOG.info("[TRACE] Call {}, {}.{}() param:{}, return:{}, exception:{}, cost:{} ms!", serverIp, invoker.getInterface(), invocation.getMethodName(), invocation.getArguments(), resultObj, throwable, costTime);
        }
    }

}

Provider Filter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Activate(group = Constants.PROVIDER)
public class LogTraceProviderFilter implements Filter {

    private static final Logger LOG = LoggerFactory.getLogger(LogTraceProviderFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        //上游如果手动设置了consumer的应用名进attachment,则取出来打印
        String clientIp = RpcContext.getContext().getRemoteHost();//这次请求来自哪个ip
        String application = RpcContext.getContext().getAttachment("dubboApplication");
        String from = clientIp;
        if (!StringUtils.isEmpty(application)) {
            from = application+"("+clientIp+")";
        }

        LOG.warn("[Trace]From {}, {}.{}() param:{}", from, invoker.getInterface(), invocation.getMethodName(), invocation.getArguments());
        return invoker.invoke(invocation);
    }
}

Filter 文件中配置启用(注:替换对应的包名):

1
2
logTraceProviderFilter=xxxx.LogTraceProviderFilter
logTraceConsumerFilter=xxxx.LogTraceConsumerFilter

利用Java进程名进行jstat -gc

需要实时观看GC的情况,我们可以类似如下命令进行监控

1
 jstat -gc $pid 100 10

但是这里需要一个进程号,很麻烦,每个Java进程在不同机器或者启动不一样就会不一样,对于自动监控脚本或者是如果需要定位应用刚开始启动时候gc的问题时,当你手动敲完命令拿到pid的时候,可能都凉了。

对此写了一个简单的shell脚本可以传入进程名去执行jstat

gcstat.sh:

1
2
3
4
5
6
7
8
9
#! /bin/bash

process=$1
interval=$2
count=$3
pid=$(ps -ef | grep java | grep $process | grep -v grep | awk '{print $2}')
echo $pid
echo $interval
echo $count

使用:

1
./gcstat.sh  processName 1000 5

自定义ShardingSphere的加解密器

默认的Sharding Sphere 支持AES和MD5两种加密器。有些时候可能需要自定义使用自己的加解密算法,如AES的具体实现不一样等。网上公开的并没有直接的指引,通过部分源码的阅读,找到了可行的方式。需要三步:

1.实现自定义解密器 (实现ShardingEncryptor 接口)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class TestShardingEncryptor implements ShardingEncryptor {
        private Properties properties = new Properties();

         @Override
         public String getType() {
                return "TEST";
          }


          @Override
         public void init() {

         }

         @Override
         public String encrypt(final Object plaintext) {
             return "TEST-"+String.valueOf(plaintext);
         }

         @Override
        public Object decrypt(final String ciphertext) {
             return ciphertext.replaceAll("TEST-","");
         }
}

其中getType返回的字符串(本例为”TEST”)即为本加解密器的类型(后续使用的时候会使用)

2.创建org.apache.shardingsphere.spi.encrypt.ShardingEncryptor 文件

需要创建一个文件名为org.apache.shardingsphere.spi.encrypt.ShardingEncryptor放入resources路径下的\META-INF\services

sharding-encryptor-file-path

文件的内容就是类名全称,如:

com.yourcompany.TestShardingEncryptor

3.配置使用此自定义类

Java配置模式:

如果未使用Spring Boot,需要显示用代码配置

1
EncryptorRuleConfiguration encryptorConfig = new EncryptorRuleConfiguration("TEST", props);

Spring Boot配置模式:

如果使用的是Spring Boot配置模式,则需要如下配置

1
spring.shardingsphere.encrypt.encryptors.my_encryptor.type=TEST

Java GC垃圾收集器这点小事

​ 对于大多数的应用来说,其实默认的JVM设置就够用了,但当你意识到有GC引起的性能问题、并且仅仅加大堆内存空间也解决不了的时候,那你就应该考虑下GC的调优了。但对于大多数程序员来说,这是很麻烦的,因为调优需要很多耐心,并且需要知道垃圾回收器的运作原理及背后对应用的影响。本文是high-level层面的关于Java垃圾回收器的总览,并以例子的形式去讲解性能定位的一些问题。

​ 正文开始。

​ Java提供了很多种垃圾回收器,会在gc运行的线程中搭配着不同的算法。不同的回收器工作原理不一样,优缺点也不同。最重要的是无论哪一种回收器都会”stop the world”。就是说,在收集和回收的过程中,你的应用程序(或多或少)都会处于暂停状态,只不过不同算法的stop the world的表现有所不同。有些算法一直都会闲置不工作直到必须要垃圾收集才开始工作,然后就暂停应用程序很长的时间;而有一些则能和应用程序同步的进行所以在“stop the world”阶段就会有更少的暂停。选择最合适的算法要取决于你的目标:你是想优化整体的吞吐量即便不时地就会长时间暂停也是可以接受的,还是说你是想得到低延迟的优化通过分散各个时间以得到每时每刻都低延迟。

​ 为了增强垃圾回收的过程,Java(准确的说是 HotSpot JVM)把堆分成了两个代,年轻代和年老代(还有一个叫永久代的区域不在我们本文讨论范围)

hotspot-heap

​ 年轻代是一些“年轻”的对象存放的地方,而年轻代还会继续分为以下三个区域:

  1. 伊甸区(Eden Space)
  2. 幸存区1(Survivor Space 1)
  3. 幸存区2(Survivor Space 2)

​ 默认情况下,伊甸区是大于两个幸存者区的总和的。例如在我的Mac OS X上的64位HotSpot JVM上,伊甸区占了大概年轻代76%的区域。所有的对象一开始都会在伊甸区创建,当伊甸区满了之后,就会触发一次次要的垃圾回收(minor gc),期间新对象会快速地被检查是否可以进行垃圾回收。一旦发现那些对象已经死亡(dead),也就是说没有再被其他对象引用了(这里先简单忽略掉引用的具体类型带来的一些差异,不在本文讨论),就会被标记为死亡然后被垃圾回收掉。而其中“幸存”的对象就会被移到其中的一个空的Survivor Space。你可能会问,具体移动到哪一个幸存区呢?要回答这个问题,首先我们先聊一下幸存区的设计。

​ 之所以设计两个幸存区,是为了避免内存碎片。假设只有一个幸存区(然后我们把幸存区想象成一个内存中连续的数组),当年轻代的gc在这个数组上运行了一遍后,会标记一些死亡对象然后删除掉,这样的话势必会在内存中留下一些空洞的区域(原来的对象存活的位置),那么就有必要做压缩了。所以为了避免做压缩,HotSpot JVM就从一个幸存者区复制所有幸存的对象到另外一个(空的)幸存者区里,这样就没有空洞了。这里我们讲到了压缩,顺便提一下年老代的垃圾回收器(除了CMS)在进行年老代垃圾回收的时候都会进行压缩以避免内存碎片。

​ 简单地说,次要的垃圾回收(当伊甸区满的时候)就会把存活的对象从伊甸区和其中一个幸存区(gc日志中以“from”呈现)左右捣腾地搬到另外一个幸存区(又叫“to”)。这样会一直的持续下去直到以下的条件发生:

  1. 对象达到了最大的晋升时间阈值(maximum tenuring threshold),就是说在年轻代被左右捣腾得足够久了,媳妇熬成婆。
  2. 幸存区已经没有空间去接受新生的对象了(后面会讲到)

​ 以上条件发生后,对象就会被移动到年老代了。下面用一个具体的例子来理解下。假设我们有以下的应用程序,它会在初始化的时候创建一些长期存活的对象,也会在运行的过程中不断的创建很多存活时间很短的对象(例如我们的web服务器程序在处理请求的时候会不断分配存活时间很短的对象)

1
2
3
4
5
6
7
8
9
10
11
12
13
private static void createFewLongLivedAndManyShortLivedObjects() {
        HashSet<Double> set = new HashSet<Double>();

        long l = 0;
        for (int i=0; i < 100; i++) {
            Double longLivedDouble = new Double(l++);
            set.add(longLivedDouble);  // 加到集合里,让这些对象能持续的存活
        }

        while(true) { // 不断地创建一些存活时间短的对象(这里在实际代码中比较极端,仅为演示用)
            Double shortLivedDouble = new Double(l++);
        }
}

在运行这个程序的过程中我们启用GC的部分日志参数:

1
2
3
4
5
6
-Xmx100m                     // 分配100MV的堆内存
-XX:-PrintGC                 // 开启GC日志打印
-XX:+PrintHeapAtGC           // 开启GC日志打印堆信息
-XX:MaxTenuringThreshold=15  // 为了让对象能在年轻代呆久一点
-XX:+UseConcMarkSweepGC      // 暂时先忽略这个配置,后面会讲到
-XX:+UseParNewGC             // 暂时先忽略这个配置,后面会讲到

gc 日志会显示垃圾收集前后的情况如下:

1
2
3
4
5
6
7
8
9
10
11
12
Heap <b>before</b> GC invocations=5 (full 0):
 par new (<u>young</u>) generation total 30720K, used 28680K
  eden space 27328K,   <b>100%</b> used
  from space 3392K,   <b>39%</b> used
  to   space 3392K,   0% used
 concurrent mark-sweep (<u>old</u>) generation total 68288K, used <b>0K</b> <br/>
Heap <b>after</b> GC invocations=6 (full 0):
 par new generation (<u>young</u>) total 30720K, used 1751K
  eden space 27328K,   <b>0%</b> used
  from space 3392K,   <b>51%</b> used
  to   space 3392K,   0% used
 concurrent mark-sweep (<u>old</u>) generation total 68288K, used <b>0K</b>

​ 从这个日志里我们能得到以下信息。第一,在这次gc之前,已经发生了5次的minor gc了(所以这次是第6次)。第二,伊甸区占用了100%所以触发了这次的gc。第三,其中一个幸存区域已经使用了39%的空间(还有不少可用空间)。而这次垃圾收集结束后,我们能看到伊甸区就被清空了(0%)然后幸存者区域上升到51%。这意味着伊甸区和其中一个幸存区里存活的对象已经被移动到另外一个幸存区了,然后死亡的对象已经被垃圾回收了。怎么推断的死亡对象被回收了呢?我们看到伊甸区原来是比幸存区要大的(27328K vs 3392K),而后面幸存区的空间大小仅仅是轻微的上升(伊甸区被清空了),所以大量的对象肯定是被垃圾回收了。而我们再看看年老代,年老代是一直都是空的,无论是这次垃圾回收前还是后(回想一下,我们设置了晋升阈值为15)。

​ 下面我们再试另外一个实验。这次用多线程不断的创建存活时间很短的对象。直觉上判断,依旧应该没有对象会上升到年老代才对,因为minor gc就应该可以把这些对象清理干净。我们来看看实际情况如何

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static void createManyShortLivedObjects() {
        final int NUMBER_OF_THREADS = 100;
        final int NUMBER_OF_OBJECTS_EACH_TIME = 1000000;

        for (int i=0; i<NUMBER_OF_THREADS; i++) {
            new Thread(() -> {
                    while(true) {
                        for (int i=0; i<NUMBER_OF_OBJECTS_EACH_TIME; i++) {
                            Double shortLivedDouble = new Double(1.0d);
                        }
                        sleepMillis(1);
                    }
                }
            }).start();
        }
    }
}

这次,我们只给10MB的内存,然后看看GC日志

1
2
3
4
5
6
7
8
9
10
11
12
Heap <b>before</b> GC invocations=0 (full 0):
 par new (<u>young</u>) generation total 3072K, used 2751K
  eden space 2752K,  99% used
  from space 320K,   0% used
  to   space 320K,   0% used
 concurrent mark-sweep (<u>old</u>) generation total 6848K, used <b>0K</b> <br/>
Heap <b>after</b> GC invocations=1 (full 0):
 par new generation  (<u>young</u>)  total 3072K, used 318K
  eden space 2752K,   0% used
  from space 320K,  99% used
  to   space 320K,   0% used
 concurrent mark-sweep (<u>old</u>) generation total 6848K, used <b>76K</b>

​ 从日志上看,并不如我们一开始想的那样。这次,老年代在第一次minor gc之后,接受了一些对象。实际上这些对象都是存活时间很短的对象,并且我们设置了晋升阈值是15次,再而且日志里显示的gc只是第一次垃圾收集。这个现象背后实际上是这样的:应用程序创建了大量的对象在伊甸区,minor gc启动的时候尝试去回收,但是大多数的这些存活时间很短的对象实际上都是active的(被一个运行中的线程引用着)。那么年轻代的垃圾收集器就只好把这些对象移动到年老代了。这其实是一个不好的现象,因为这些被移到到年老代的对象其实是过早衰老了(prematurely aged),它们只有在老年代的major gc才能被回收,而major gc通常会耗时更长。对于某些垃圾算法例如CMS,major gc会在年老代70%内存占据后出发。这个值可以通过参数修改-XX:CMSInitiatingOccupancyFraction=70

​ 怎么样防止这些短暂存活的对象过早衰老呢?有几个方法,其中一个理论上可行的方法是估计这些活跃的短暂存活对象的数量,然后设置合理的年轻代大小。我们下面来试试:

  • 年轻代默认是整个堆大小的1/3,这次我们通过 -XX:NewRatio=1 来修改其大小让他内存更大些(大约3.4MB,原来是3MB)
  • 同时调整幸存者区的大小:-XX:SurvivorRatio=1 (大约1.6MB一个区,原来是0.3MB)

问题就解决了。经过8次的minor gc,年老代依旧是空的

1
2
3
4
5
6
7
8
9
10
11
12
Heap <b>before</b> GC invocations=7 (full 0):
 par new generation   total 3456K, used 2352K
  eden space 1792K,  99% used
  from space 1664K,  33% used
  to   space 1664K,   0% used
 concurrent mark-sweep generation total 5120K, used <b>0K</b> <br/>
Heap <b>after</b> GC invocations=8 (full 0):
 par new generation   total 3456K, used 560K
  eden space 1792K,   0% used
  from space 1664K,  33% used
  to   space 1664K,   0% used [
 concurrent mark-sweep generation total 5120K, used <b>0K</b>

​ 对于GC调优,没有银弹。这里只是简单地示意。对于实际的应用,需要不断的修改配置试错来找到最佳配置。例如,这次其实我们也可以将堆的总大小调大一倍来解决此问题。

垃圾回收算法

​ 接下来我们来看看具体的垃圾回收算法。Hotspot JVM针对年轻代和年老代有多个不同的算法。从宏观层面上看,有三种类型的垃圾回收算法,每一类都有单独的性能特性:

serial collector :使用一条线程进行所有的垃圾回收工作,相对来说也是高效的因为没有线程之间的通信。适用于单处理器的机器。使用-XX:+UseSerialGC.启用

parallel collector (同时也称作吞吐回收器) :使用多线程进行垃圾回收,这样能显著的降低垃圾回收的负荷。设计来适用于这样的应用:拥有中等或大数据集的,运行在多核处理器或多线程的硬件

concurrent collector: 大部分的垃圾回收工作会同步的进行(不阻塞应用的运行)以维持短暂的GC暂停时间。它是设计给中等或大数据集的、响应时间比整体的吞吐量要更重要的应用,因为用这种算法去降低GC的停顿会一定程度降低应用的性能。

gc-compared

​ HotSpot JVM可以让我们选择不同的GC算法去回收年轻代和年老代,但是某些算法是需要配套的使用才兼容的。例如,你不能选择Parallel Scavenge去回收年轻代的同时,使用CMS收集器去回收年老代因为这两个收集器是不兼容的。以下是兼容的收集器的示意图

gc-collectors-pairing

  1. “Serial”是一个stop-the-world,复制算法的垃圾收集器,使用一条GC线程。
  2. “Parallel Scavenge”是一个stop-the-world、采用复制算法的垃圾收集器,但是使用多条GC线程。
  3. ParNew是一个stop-the-world,复制算法的收集器,使用多条GC线程。它和Parallel Scavenge的区别是它做了一些增强以适应搭配CMS使用。例如ParNew会做必要的同步(synchronization )以便它能在CMS的同步阶段运行。
  4. Serial Old 是一个stop-the-world,采用标记-清除-压缩算法的回收器,使用一条GC线程
  5. CMS(Concurrent Mark Sweep)是一个同步能力最强、低延迟的垃圾回收器
  6. Parallel Old是一个压缩算法的垃圾回收器,使用多个GC线程。

​ 对于服务端的应用程序(需要处理客户端请求)来说,使用CMS+ParNew是不错的选择。

我在大概10GB堆内存的程序中使用过也能保持响应时间稳定和短暂的GC暂停时间。我认识的一些开发者使用Parallel collectors (Parallel Scavenge + Parallel Old) ,效果也不错。

​ 其中一件需要注意的事是CMS已经宣布废弃了,会被Oralce推荐使用一个新的同步收集器取代, Garbage-First 简称 G1, 一个最先由Java推出的垃圾收集器

​ G1是一个服务端类型(server-style)的垃圾回收器,针对多处理器、大内存的计算机使用。它能尽可能地满足一个GC延迟时间的目标,同时也有很高的吞吐量

G1 会同时在年轻代和年老代进行工作。它针对大堆有专门的优化(>10GB)。我没有亲身尝试过G1,我团队里的开发者仍然使用的CMS,所以我还不能对两者进行比较。但通过快速的搜索之后,我找到了一个性能对比说CMS会比G1更好(CMS outperforming G1)。我倾向于谨慎,但G1应该是不错的。我们能靠以下参数启动

1
-XX:+UseG1GC

注:以上由本人摘选翻译自https://codeahoy.com/2017/08/06/basics-of-java-garbage-collection/

基于Sharding Sphere实现数据“一键脱敏”

在真实业务场景中,数据库中经常需要存储某些客户的关键性敏感信息如:身份证号、银行卡号、姓名、手机号码等,此类信息按照合规要求,通常需要实现加密存储以满足合规要求。

痛点一:

通常的解决方案是我们书写SQL的时候,把对应的加密字段手动进行加密再进行插入,在查询的时候使用之前再手动进行解密。此方法固然可行,但是使用起来非常不便捷且繁琐,使得日常的业务开发与存储合规的细节紧耦合

痛点二:

对于一些为了快速上线而一开始没有实现合规脱敏的系统,如何比较快速的使得已有业务满足合规要求的同时,尽量减少对原系统的改造。(通常的这个过程至少包括:1.新增脱敏列的存储 2.同时做数据迁移 3.业务的代码做兼容逻辑等)。

Apache ShardingSphere下面存在一个数据脱敏模块,此模块集成的常用的数据脱敏的功能。其基本原理是对用户输入的SQL进行解析拦截,并依靠用户的脱敏配置进行SQL的改写,从而实现对原文字段的加密及加密字段的解密。最终实现对用户无感的加解密存储、查询。

脱敏配置Quick Start——Spring 显示配置:

以下介绍基于Spring如何快速让系统支持脱敏配置。

1.引入依赖

<!-- for spring namespace -->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-spring-namespace</artifactId>
    <version>${sharding-sphere.version}</version>
</dependency>

2.创建脱敏配置规则对象

在创建数据源之前,需要准备一个EncryptRuleConfiguration进行脱敏的配置,以下是一个例子,对于同一个数据源里两张表card_info,pay_order的不同字段进行AES的加密

private EncryptRuleConfiguration getEncryptRuleConfiguration() {
Properties props = new Properties();

//自带aes算法需要
props.setProperty("aes.key.value", aeskey);
EncryptorRuleConfiguration encryptorConfig = new EncryptorRuleConfiguration("AES", props);

//自定义算法
//props.setProperty("qb.finance.aes.key.value", aeskey);
//EncryptorRuleConfiguration encryptorConfig = new EncryptorRuleConfiguration("QB-FINANCE-AES", props);

EncryptRuleConfiguration encryptRuleConfig = new EncryptRuleConfiguration();
encryptRuleConfig.getEncryptors().put("aes", encryptorConfig);

//START: card_info 表的脱敏配置
{
    EncryptColumnRuleConfiguration columnConfig1 = new EncryptColumnRuleConfiguration("", "name", "", "aes");
    EncryptColumnRuleConfiguration columnConfig2 = new EncryptColumnRuleConfiguration("", "id_no", "", "aes");
    EncryptColumnRuleConfiguration columnConfig3 = new EncryptColumnRuleConfiguration("", "finshell_card_no", "", "aes");
    Map<String, EncryptColumnRuleConfiguration> columnConfigMaps = new HashMap<>();
    columnConfigMaps.put("name", columnConfig1);
    columnConfigMaps.put("id_no", columnConfig2);
    columnConfigMaps.put("finshell_card_no", columnConfig3);
    EncryptTableRuleConfiguration tableConfig = new EncryptTableRuleConfiguration(columnConfigMaps);
    encryptRuleConfig.getTables().put("card_info", tableConfig);
}
//END: card_info 表的脱敏配置

//START: pay_order 表的脱敏配置
{
    EncryptColumnRuleConfiguration columnConfig1 = new EncryptColumnRuleConfiguration("", "card_no", "", "aes");
    Map<String, EncryptColumnRuleConfiguration> columnConfigMaps = new HashMap<>();
    columnConfigMaps.put("card_no", columnConfig1);
    EncryptTableRuleConfiguration tableConfig = new EncryptTableRuleConfiguration(columnConfigMaps);
    encryptRuleConfig.getTables().put("pay_order", tableConfig);
}

log.info("脱敏配置构建完成:{} ", encryptRuleConfig);
return encryptRuleConfig;

}

说明:

  1. 创建 EncryptColumnRuleConfiguration 的时候有四个参数,前两个参数分表叫plainColumn、cipherColumn,其意思是数据库存储里面真实的两个列(名文列、脱敏列),对于新的系统,只需要设置脱敏列即可,所以以上示例为plainColumn为”“。
  2. 创建EncryptTableRuleConfiguration 的时候需要传入一个map,这个map存的value即#1中说明的EncryptColumnRuleConfiguration ,而其key则是一个逻辑列,对于新系统,此逻辑列即为真实的脱敏列。Sharding Shpere在拦截到SQL改写的时候,会按照用户的配置,把逻辑列映射为名文列或者脱敏列(默认)如下的示例

shardings sphere basic

3.使用Sharding Sphere的数据源进行管理

把原始的数据源包装一层

1
2
3
4
5
@Bean("tradePlatformDataSource")
public DataSource dataSource(@Qualifier("druidDataSource") DataSource ds) throws SQLException {
    return EncryptDataSourceFactory.createDataSource(ds, getEncryptRuleConfiguration(), new Properties());

}

脱敏配置Quick Start——Spring Boot版:

以下步骤使用Spring Boot管理,可以仅用配置文件解决:

1.引入依赖

<!-- for spring boot -->

<dependency>
<groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
    <version>${sharding-sphere.version}</version>
</dependency>

<!-- for spring namespace -->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-spring-namespace</artifactId>
    <version>${sharding-sphere.version}</version>
</dependency>
  1. Spring 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

spring.shardingsphere.datasource.name=ds
spring.shardingsphere.datasource.ds.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.ds.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds.url=xxxxxxxxxxxxx
spring.shardingsphere.datasource.ds.username=xxxxxxx
spring.shardingsphere.datasource.ds.password=xxxxxxxxxxxx



# 默认的AES加密器
spring.shardingsphere.encrypt.encryptors.encryptor_aes.type=aes
spring.shardingsphere.encrypt.encryptors.encryptor_aes.props.aes.key.value=hkiqAXU6Ur5fixGHaO4Lb2V2ggausYwW

# card_info 姓名 AES加密
spring.shardingsphere.encrypt.tables.card_info.columns.name.cipherColumn=name
spring.shardingsphere.encrypt.tables.card_info.columns.name.encryptor=encryptor_aes

# card_info 身份证 AES加密
spring.shardingsphere.encrypt.tables.card_info.columns.id_no.cipherColumn=id_no
spring.shardingsphere.encrypt.tables.card_info.columns.id_no.encryptor=encryptor_aes

# card_info 银行卡号 AES加密
spring.shardingsphere.encrypt.tables.card_info.columns.finshell_card_no.cipherColumn=finshell_card_no
spring.shardingsphere.encrypt.tables.card_info.columns.finshell_card_no.encryptor=encryptor_aes

# pay_order 银行卡号 AES加密
spring.shardingsphere.encrypt.tables.pay_order.columns.card_no.cipherColumn=card_no
spring.shardingsphere.encrypt.tables.pay_order.columns.card_no.encryptor=encryptor_aes