分发计数器依赖于redis,分发时支持指定范围优先分发,支持分发阈值上限设置,范围成员每次被分发计数后,从小到大排列,能保证尽量均匀分发。
分发执行器业务主类:
package com.sankuai.grocerywms.logistics.strategy.domain.remeasuretask.basic; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import java.util.*; import java.util.stream.Collectors; /** * Created by zzq on 2021/3/10. */ @Slf4j public abstract class AbstractDistributionCounter<Type> { /** * 锁定member15秒 */ public final String lockPrefix = "distributionCounterLockTag"; public final Integer lockExpireInSeconds = 15; public final Integer offset = 0; public final Integer count = 10; /** * 获取ZSet的key值 * * @return */ protected String getMembersZSetKey() { return "allMember"; } /** * 分发计数时,ZSet正序或倒序 * * @return */ protected Boolean orderASC() { return true; } /** * ==========ZSet操作抽象方法========== */ /** * 将原生对象,转换为框架内对象 * * @return */ protected abstract MemberInfoTuple convertToMemberInfoTuple(Type t); protected abstract void zadd(String allMember, double l, Object memberId); protected abstract void zremrangeByRank(String allMember, long l, long l1); protected abstract Double zincrby(String key, double count, Object memberId); protected abstract Boolean setnx(String key, Object value, int expireInSeconds); protected abstract Boolean delete(String memberId); protected abstract Boolean exists(String memberId); protected abstract Set<Type> zrangeByScoreWithScore(String key, double min, double max, int offset, int count); protected abstract Set<Type> zrevrangeByScoreWithScore(String key, double max, double min, int offset, int count); protected abstract Double zscore(String key, Object memberId); /** * ==========均匀分发业务操作========== */ /** * 初始化ZSet中的members元素 * * @param members */ public void membersRefresh(Collection<String> members) { if (CollectionUtils.isEmpty(members)) { log.info("params is null"); return; } String lockName = lockPrefix.concat(getMembersZSetKey()); Boolean locked = lock(lockName, 30); if (!locked) { log.info("membersRefreshing。。。"); return; } log.info("membersRefreshing-locked=[{}]", lockName); try { removeMembersAllRank(); for (String memberId : members) { addOne(memberId); } } catch (Exception e) { log.error("membersRefreshing invoke fail"); } finally { unLock(lockName); } } /** * 全范围匹配操作 * 当不提供范围信息时,则需要(从小到大排列)进行全范围匹配操作 * * @return */ public Boolean fullRangeExecute(DistributionProcessor distributionProcessor) { MemberInfoSetTuple memberInfoSetTuple = getBatchMembersInfo(distributionProcessor); for (; ; ) { if (memberInfoSetTuple.getTimes() > 1) { memberInfoSetTuple = getBatchMembersInfo(memberInfoSetTuple, distributionProcessor); } List<MemberInfoTuple> membersInfo = memberInfoSetTuple.getMemberInfo(); if (CollectionUtils.isEmpty(membersInfo)) { return false; } Boolean ret = syncDistributionToMember(distributionProcessor, membersInfo, true); if (ret != null) { return ret; } memberInfoSetTuple.increaseTimes(); } } /** * 固定范围匹配操作 * * @param distributionProcessor * @return */ public Boolean rangeExecute(DistributionProcessor distributionProcessor) { List<String> rangeMembers = distributionProcessor.getRangeMembers(); if (CollectionUtils.isEmpty(rangeMembers)) { return false; } List<MemberInfoTuple> membersInfo = getMembersInfoByIds(rangeMembers); if (CollectionUtils.isEmpty(membersInfo)) { return false; } Boolean ret = syncDistributionToMember(distributionProcessor, membersInfo, false); if (ret != null) { return ret; } return false; } /** * 如果范围匹配操作成功,则结束,否则继续使用全范围匹配 * * @param distributionProcessor * @return */ public Boolean execute(DistributionProcessor distributionProcessor) { Boolean step1Ret = rangeExecute(distributionProcessor); if (!step1Ret) { Boolean step2Ret = fullRangeExecute(distributionProcessor); return step2Ret; } return true; } /** * 优先获取分配较少异常商品的大仓集合,分批获取 * * @param memberInfoSetTuple * @return */ private MemberInfoSetTuple getBatchMembersInfo(MemberInfoSetTuple memberInfoSetTuple, DistributionProcessor distributionProcessor) { /** * 每次获取10个,持有问题商品最少的仓库id列表 */ MemberInfoSetTuple ret; Integer offsetTmp; if (memberInfoSetTuple == null) { /** * 首次查询ZSet时会初始化返回对象默认值 */ ret = new MemberInfoSetTuple(); offsetTmp = offset; } else { ret = memberInfoSetTuple; offsetTmp = memberInfoSetTuple.getOffset() + count; } Long maxCount = distributionProcessor.getMaxCount(); if (maxCount == null) { maxCount = 60L; } Long minCount = distributionProcessor.getMinCount(); if (minCount == null) { minCount = 0L; } List<MemberInfoTuple> membersTmp = getRangeMembers(minCount, maxCount, offsetTmp, count); ret.setCount(count); ret.setOffset(offsetTmp); ret.setMemberInfo(membersTmp); return ret; } private MemberInfoSetTuple getBatchMembersInfo(DistributionProcessor distributionProcessor) { return getBatchMembersInfo(null, distributionProcessor); } /** * 锁定一个member后,进行该count值同步操作 * * @param distributionProcessor * @param membersInfo * @return 返回值可以是null ,fullRangeExecute判断如果为null则继续尝试寻找下一批可用的member */ private Boolean syncDistributionToMember(DistributionProcessor distributionProcessor, List<MemberInfoTuple> membersInfo, Boolean fullRangeMembers) { for (MemberInfoTuple item : membersInfo) { if (!geCount(item, distributionProcessor)) { return false; } String memberId = item.getMemberId(); Boolean locked = lock(memberId, lockExpireInSeconds); if (!locked) { continue; } /** * step 1 如果每第一个成员超过了阈值,那么ZSet从小到大排列,则直接返回结束即可,拿到后面的成员count值会更大 */ if (!geCount(item, distributionProcessor)) { return false; } /** * step 2 如果业务处理成功,直接结束流程,否则继续尝试ZSet中的其它成员项 */ log.info("locked success memberId=[{}],lockExpireInSeconds=[{}]", memberId, lockExpireInSeconds); try { if (distributionProcessor.process(memberId, fullRangeMembers)) { increaseOne(memberId); log.info("distributionCounter#process,#increaseOne invoke success | memberId=[{}],currMembers=[{}]", memberId, membersInfo); return true; } } catch (Exception e) { e.printStackTrace(); log.info("distributionCounter#process,#increaseOne invoke fail | memberId=[{}],currMembers=[{}],error=[{}]", memberId, membersInfo, e); } finally { unLock(memberId); log.info("unLocked success memberId=[{}]", memberId); } } return null; } /** * 如果传入的count数大于阈值则返回false * * @param item * @return */ private Boolean geCount(MemberInfoTuple item, DistributionProcessor distributionProcessor) { Long count = item.getCount(); if (count >= distributionProcessor.getMaxCount()) { return false; } return true; } /** * 根据一批成员id,获取member信息列表 * * @param memberIds * @return */ private List<MemberInfoTuple> getMembersInfoByIds(List<String> memberIds) { Long countMin = 0L; List<MemberInfoTuple> ret = new ArrayList<>(); for (int i = 0; i < memberIds.size(); i++) { String memberIdTmp = memberIds.get(i); Long countTmp; try { countTmp = getCountByMember(memberIdTmp); } catch (Exception e) { e.printStackTrace(); log.error("memberId=[{}]在ZSet中不存在,已经从rangeMembers中放弃该数据 error=[{}]", memberIdTmp, e); continue; } MemberInfoTuple mTmp = new MemberInfoTuple(); mTmp.setMemberId(memberIdTmp); mTmp.setCount(countTmp); if (countTmp < countMin) { ret.add(0, mTmp); } else { ret.add(mTmp); } countMin = countTmp; } return ret; } /** * 档期数据优先入推荐仓,该列表由算法提供,优先选择一个已经分配商品最少的仓 * * @param memberIds * @return */ private MemberInfoTuple getMinCountMember(List<String> memberIds) { if (CollectionUtils.isEmpty(memberIds)) { return null; } String memberId = memberIds.get(0); Long count = getCountByMember(memberId); for (int i = 1; i < memberIds.size(); i++) { String memberIdTmp = memberIds.get(i); Long countTmp = getCountByMember(memberIdTmp); if (countTmp < count) { count = countTmp; memberId = memberIdTmp; } } MemberInfoTuple memberInfoTuple = new MemberInfoTuple(); memberInfoTuple.setCount(count); memberInfoTuple.setMemberId(memberId); return memberInfoTuple; } /** * ==========基于ZSet操作========== */ /** * ZSet中初始化一个仓id,用于初始化仓列表 * * @param memberId */ private void addOne(String memberId) { zadd(getMembersZSetKey(), 0L, memberId); } /** * 删除ZSet列表所有的成员 */ private void removeMembersAllRank() { zremrangeByRank(getMembersZSetKey(), 0L, -1L); } /** * ZSet中选定的成员,value数值加一操作 * * @param memberId * @return */ private Long increaseOne(String memberId) { return zincrby(getMembersZSetKey(), 1L, memberId).longValue(); } /** * 锁定member成员,锁定标识为memberId,value=1 * * @param memberId * @return */ private Boolean lock(String memberId, Integer expireInSeconds) { return setnx(getMembersZSetKey().concat(memberId), 1, expireInSeconds); } /** * 解锁member成员,成员不存在直接返回true * * @param memberId * @return */ private Boolean unLock(String memberId) { if (exists(getMembersZSetKey().concat(memberId))) { return delete(getMembersZSetKey().concat(memberId)); } return true; } /** * 按照ZSet成员的value数值范围,获取成员列表并排序,返回有序的List * <p> * orderASC true从小到大排序,false从大到小排序 * * @param minCount * @param maxCount * @param offset * @param count * @return */ private List<MemberInfoTuple> getRangeMembers(Long minCount, Long maxCount, Integer offset, Integer count) { Comparator<MemberInfoTuple> comparing = Comparator.comparing(MemberInfoTuple::getCount); Set<Type> set; if (orderASC()) { set = zrangeByScoreWithScore(getMembersZSetKey(), minCount, maxCount, offset, count); } else { comparing = comparing.reversed(); set = zrevrangeByScoreWithScore(getMembersZSetKey(), minCount, maxCount, offset, count); } return set.stream().map(item -> convertToMemberInfoTuple(item)).sorted(comparing).collect(Collectors.toList()); } /** * 在ZSet中,获取一个当前成员的value数值 * * @param memberId * @return */ private Long getCountByMember(String memberId) { return zscore(getMembersZSetKey(), memberId).longValue(); } }