前两篇文章只画了流程图,发现反响不够好,没几个阅读量,今天我把看源码的方式也记录下,详细解析下主线逻辑。
sentinel实现其实就是一个大的Try catch ,在StatisticSlot保存各种指标,在其他slot中进行判断,通过就继续执行其他slot,不能通过就抛出异常,达到阈值就进行限流或者降级操作。
先上一个源码分析的流程图
1. sentinel是通过aop实现的它的注解是@SentinelResource那我们就找他的实现
2. 找到SentinelResourceAspect类的invokeResourceWithSentinel方法就是具体实现。
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
Method originMethod = this.resolveMethod(pjp);
SentinelResource annotation = (SentinelResource)originMethod.getAnnotation(SentinelResource.class);
if (annotation == null) {
throw new IllegalStateException("Wrong state for SentinelResource annotation");
} else {
String resourceName = this.getResourceName(annotation.value(), originMethod);
EntryType entryType = annotation.entryType();
int resourceType = annotation.resourceType();
Entry entry = null;
try {
Object var18;
try {
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
Object result = pjp.proceed();
var18 = result;
return var18;
} catch (BlockException var15) {
var18 = this.handleBlockException(pjp, annotation, var15);
return var18;
} catch (Throwable var16) {
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
if (exceptionsToIgnore.length > 0 && this.exceptionBelongsTo(var16, exceptionsToIgnore)) {
throw var16;
} else if (this.exceptionBelongsTo(var16, annotation.exceptionsToTrace())) {
this.traceException(var16, annotation);
Object var10 = this.handleFallback(pjp, annotation, var16);
return var10;
} else {
throw var16;
}
}
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
}
3 调用SphU.entry方法,一步步跟到具体实现,找到CtSph类的entryWithPriority方法。
获取slot的方法。
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
public static ProcessorSlotChain newSlotChain() {
if (builder != null) {
return builder.build();
}
resolveSlotChainBuilder();
if (builder == null) {
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
builder = new DefaultSlotChainBuilder();
}
return builder.build();
}
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
以上逻辑新建了8个slot,以及一个链条,这个就是sentinel核心实现。
回到entryWithPriority继续向下走。
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
执行
DefaultProcessorSlotChain.entry方法。
执行AbstractLinkedProcessorSlot.fireEntry方法
执行transformEntry方法。
transformEntry方法里面有entry,此时会一次执行(slot上面已经new出来了)slot的entry方法,如果在配置的各种指标内,会执行完成,如果大于配置的指标数据,会抛出对应异常。
实现流控/熔断/降级的就三个slot,我们就着重看着三个slot。
StatisticSlot是个统计指标的实现类,这里创建了sentinel的滑动时间窗口,我们来看看他是怎么实现的。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
Iterator var8;
ProcessorSlotEntryCallback handler;
try {
this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
Iterator var13 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();
while(var13.hasNext()) {
ProcessorSlotEntryCallback<DefaultNode> handler = (ProcessorSlotEntryCallback)var13.next();
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException var10) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
}
var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();
while(var8.hasNext()) {
handler = (ProcessorSlotEntryCallback)var8.next();
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException var11) {
BlockException e = var11;
context.getCurEntry().setError(var11);
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseBlockQps(count);
}
var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();
while(var8.hasNext()) {
handler = (ProcessorSlotEntryCallback)var8.next();
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable var12) {
context.getCurEntry().setError(var12);
node.increaseExceptionQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps(count);
}
throw var12;
}
}
从代码可以看出,首先执行了this.fireEntry方法向下执行,如果执行成功对各种指标(执行成功数,执行线程数做++),如果执行失败对异常指标++;
这里面需要插入一段滑动时间窗口的实现。
在StatisticNode类中新建了一个rollingCounterInSecond
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
SampleCountProperty.SAMPLE_COUNT 默认2
IntervalProperty.INTERVAL默认1000
我们去看看他的具体实现
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
这段代码的意思就是创建了两个窗口。
node.addPassRequest(count);
this.clusterNode.addPassRequest(count);
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
这个就是获取时间窗口的方法
/**
* Get bucket item at provided timestamp.
*
* @param timeMillis a valid timestamp in milliseconds
* @return current bucket item at provided timestamp if the time is valid; null if time is invalid
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
我们再看看
FlowSlot的实现
获取时间窗口跟阈值进行判断 checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
有三个对应的实现,有金币算法,漏斗算法,限流算法 rule.getRater().canPass(selectedNode, acquireCount, prioritized)
再看看
DegradeSlot的实现
也是各种阈值判断,不通过就抛出异常。 DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);