publicabstractclassJobShardingStrategyActiveStandbyDecoratorimplementsJobShardingStrategy{//内置的分配策略采用原来的默认策略:平均privateJobShardingStrategyinner=newAverageAllocationJobShardingStrategy();/** * 判断一个实例是否是备用的实例,在每次触发sharding方法之前会遍历所有实例调用此方法。 * 如果主备实例同时存在于列表中,那么备实例将会被剔除后才进行sharding * @param jobInstance * @return */protectedabstractbooleanisStandby(JobInstancejobInstance,StringjobName);@OverridepublicMap<JobInstance,List<Integer>>sharding(List<JobInstance>jobInstances,StringjobName,intshardingTotalCount){List<JobInstance>jobInstancesCandidates=newArrayList<>(jobInstances);List<JobInstance>removeInstance=newArrayList<>();booleanremoveSelf=false;for(JobInstancejobInstance:jobInstances){booleanisStandbyInstance=false;try{isStandbyInstance=isStandby(jobInstance,jobName);}catch(Exceptione){log.warn("isStandBy throws error, consider as not standby",e);}if(isStandbyInstance){if(IpUtils.getIp().equals(jobInstance.getIp())){removeSelf=true;}jobInstancesCandidates.remove(jobInstance);removeInstance.add(jobInstance);}}if(jobInstancesCandidates.isEmpty()){//移除后发现没有实例了,就不移除了,用原来的列表(后备)的顶上jobInstancesCandidates=jobInstances;log.info("[{}] ATTENTION!! Only backup job instances exist, but do sharding with them anyway {}",jobName,JSON.toJSONString(jobInstancesCandidates));}if(!jobInstancesCandidates.equals(jobInstances)){log.info("[{}] remove backup before really do sharding, removeSelf :{} , remove instances: {}",jobName,removeSelf,JSON.toJSONString(removeInstance));log.info("[{}] after remove backups :{}",jobName,JSON.toJSONString(jobInstancesCandidates));}else{//全部都是master或者全部都是slavelog.info("[{}] job instances just remain the same {}",jobName,JSON.toJSONString(jobInstancesCandidates));}//保险一点,排序一下,保证每个实例拿到的列表肯定是一样的jobInstancesCandidates.sort((o1,o2)->o1.getJobInstanceId().compareTo(o2.getJobInstanceId()));returninner.sharding(jobInstancesCandidates,jobName,shardingTotalCount);}