alibaba sentinel限流组件 源码分析
如何使用?
maven引入:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.5.1</version>
</dependency>
該組件是保護資源用,什么資源呢?Conceptually, physical or logical resource,明白了吧。
web應用中大部分情況都是用于保護接口,防止負載過大,支持限流、降級處理,規(guī)則可配。
入門級使用方法如下:
public void foo() {Entry entry = null;try {entry = SphU.entry("abc");// resource that need protection} catch (BlockException blockException) {// when goes there, it is blocked// add blocked handle logic here} catch (Throwable bizException) {// business exception Tracer.trace(bizException);} finally {// ensure finally be executedif (entry != null){entry.exit();}}}或者
public void foo() {
if (SphO.entry("abc")) {
try {
// business logic
} finally {
SphO.exit(); // must exit()
}
} else {
// failed to enter the protected resource.
}
}
入口為com.alibaba.csp.sentinel.SphU和com.alibaba.csp.sentinel.SphO。兩者的區(qū)別從使用就可以看出,限流發(fā)生時,SphU是通過異常的形式反饋出來,SphO是通過entry的返回值反饋出來的。
查看SphO的代碼,如下圖,其實是內(nèi)部捕獲了異常,然后返回boolean類型。所以,我們就從SphU開始分析吧。
SphU
最終會調(diào)用到com.alibaba.csp.sentinel.CtSph.entryWithPriority(ResourceWrapper, int, boolean, Object...)這個方法。
里面的核心為調(diào)用com.alibaba.csp.sentinel.CtSph.lookProcessChain(ResourceWrapper)獲取對應的處理鏈,方法如下:
從上面代碼可知,一個chain關聯(lián)一個資源,這一點很重要,后面分析node節(jié)點結構時會用到。
public static ProcessorSlotChain newSlotChain() {if (builder != null) {return builder.build();}// 構建chainBuilder,具體說明見下面代碼resolveSlotChainBuilder();if (builder == null) {RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");builder = new DefaultSlotChainBuilder();}return builder.build();}private static void resolveSlotChainBuilder() {List<SlotChainBuilder> list = new ArrayList<SlotChainBuilder>();boolean hasOther = false;// java.util.ServiceLoader 方式擴展功能,這是jdk的,簡稱spi擴展,sentinel有很多地方用到這種擴展方式
// 關于ServiceLoader擴展的詳解不在這里討論,后面有時間可以單獨拎出來分析下
for (SlotChainBuilder builder : LOADER) {if (builder.getClass() != DefaultSlotChainBuilder.class) {hasOther = true;list.add(builder);}}if (hasOther) {builder = list.get(0);} else {// No custom builder, using default.builder = new DefaultSlotChainBuilder();}RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "+ builder.getClass().getCanonicalName());}
通過上面代碼可知,一般情況下采用的默認chainBuilder,那我們就來看看這個build:
通過上面com.alibaba.csp.sentinel.CtSph.entryWithPriority(ResourceWrapper, int, boolean, Object...)方法可知,最終調(diào)用方式如下:
通過entry調(diào)用把chain中的所有ProcessorSlot串起來,挨個調(diào)用。
主體已經(jīng)清晰了,重點其實也就是chain中添加的那些ProcessorSlot,下面我們就一個個看下這些slot到底干了啥,劃下重點,StatisticSlot這個slot是核心,用于統(tǒng)計各種數(shù)量。
?NodeSelectorSlot
構建調(diào)用資源調(diào)用路徑,最終在內(nèi)存中會形成一個樹狀結構。
ContextUtil.enter("entrance1", "appA");Entry nodeA = SphU.entry("nodeA");if (nodeA != null) {nodeA.exit();}ContextUtil.exit();上述代碼對象結構如下:
machine-root
/
/
EntranceNode1
/
/
DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA);
上述代碼會形成結構如下:
? ? ? ? ? ? ? ? machine-root
? ? ? ? ? ? ? ? ? /? ? ? ? ? ? \
? ? ? ? ? ? ? ? /? ? ? ? ? ? ? ? \
? ?EntranceNode1? ? ?EntranceNode2
? ? ? ? ? ? ?/? ? ? ? ? ? ? ? ? ? ? ?\
/ ? ? ? ? \
DefaultNode(nodeA) DefaultNode(nodeA)
| ?|
+- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
上述代碼會形成結構如下:
? ? ? ? ? ? ? ? machine-root
? ? ? ? ? ? ? ? ? /? ? ? ??
? ? ? ? ? ? ? ? /? ? ? ? ??
? ?EntranceNode1?
? ? ? ? ? ? ?/? ? ? ? ??
/
DefaultNode(nodeA)?- - - - - -> ClusterNode(nodeA);
/
/
DefaultNode(nodeB)- - - - - -> ClusterNode(nodeB);
說明:
a.一條路徑對應一個Context;如果是入門的那種使用方法,即沒有顯式使用ContextUtil.enter("entrance1", "appA")方式創(chuàng)建context的話,會默認使用MyContextUtil.myEnter創(chuàng)建一個名為sentinel_default_context的context;
源碼釋義:
一次資源的訪問都會走到com.alibaba.csp.sentinel.CtSph.entryWithPriority(ResourceWrapper, int, boolean, Object...),方法開始處通過ContextUtil.getContext()從threadlocal中獲取context,ContextUtil.enter會檢查threadlocal有沒有context,沒有就創(chuàng)建,有就直接返回。如果限流時沒有調(diào)用ContextUtil.enter顯式開啟context,就會走到下面MyContextUtil.myEnter處創(chuàng)建默認名為
sentinel_default_context的context。最終都會調(diào)用到com.alibaba.csp.sentinel.context.ContextUtil.trueEnter(String, String),創(chuàng)建context部分代碼如下,每一個context會先創(chuàng)建一個EntranceNode入口node,然后掛到Constants.ROOT下,結構見上面的樹狀圖。?
b.一次資源的調(diào)用會根據(jù)threadlocal獲取Context,同一線程下如果要切換context必須調(diào)用ContextUtil.exit();結束上一個context,再ContextUtil.enter("entrance2", "appA")開啟新的context;
源碼釋義:
context的獲取見上面一條釋義,這里看下ContextUtil.exit()方法,要想真正的將threadlocal清空,還得將context的CurEntry清空,怎么做?就是按照entry的調(diào)用順序反向依次調(diào)用com.alibaba.csp.sentinel.CtEntry.exit(int, Object...):
c.一個DefaultNode對應同一個資源調(diào)用在某一個Context下的統(tǒng)計數(shù)據(jù);
源碼釋義:
首先,一次資源的entry調(diào)用會先去尋找ProcessorSlotChain,查看代碼可知,chain是以ResourceWrapper為key緩存在map中的,由DefaultSlotChainBuilder可以對應到一次entry調(diào)用對應一個NodeSelectorSlot實例,而NodeSelectorSlot中緩存了一個以contextName為key,DefaultNode為value的map,一句話總結下就是先去com.alibaba.csp.sentinel.context.ContextUtil.trueEnter(String, String)方法中通過contextNameNodeMap獲取以contextName緩存的該name對應的EntranceNode節(jié)點并以該EntranceNode為參數(shù)new一個context對象返回,然后在該資源對應的slotChain中的NodeSelectorSlot獲取一contextName為key緩存的DefaultNode。
這里解釋下DefaultNode第一次是怎么掛到context的EntranceNode下的。
代碼如下,重點是紅框中的代碼,獲取context中最末尾的節(jié)點,并把當前節(jié)點掛在后面。
我們再看下context的getLastNode節(jié)點,代碼如下:
由上面代碼自然會想的curEntry是什么時候設置的呢?如下圖所示,是在com.alibaba.csp.sentinel.CtSph.entryWithPriority(ResourceWrapper, int, boolean, Object...)中構建CtEntry時設置的,entry之間的關系在context中是以雙向鏈表結構維護的:
回到上一步,我們看下entry的getLastNode方法,如下圖所示,從parentEntry中獲取curNode,如果是第一次調(diào)用的話parent肯定為null,回到上一步的話就是返回該context的entranceNode。
NodeSelectorSlot中就會將當次entry在該context下首次調(diào)用創(chuàng)建的DefaultNode掛到剛剛獲取到的entranceNode節(jié)點下,也就是該context的入口節(jié)點下,如果該context下之前有過別的資源調(diào)用,就會順序掛到那次調(diào)用產(chǎn)生的DefaultNode下面,形成上面描述的樹狀結構圖。
DefaultNode處理完后會將context的CurNode設置為該DefaultNode,如下圖所示:
實際上是設置的context中當前entry對應的CurNode,如下圖所示,所以上面getLastNode是從parenEntry中獲取的CurNode。
d.ClusterNode對應同一個資源在所有context下的統(tǒng)計數(shù)據(jù);
源碼釋義:
ClusterBuilderSlot中維護的一個實例變量實際就是對應一個資源,原因上面已經(jīng)分析了,一個資源對應一個slotChain,然后將該clusterNode設置到本次調(diào)用對應的DefaultNode中,在DefaultNode做加法操作時,會同時調(diào)用clusterNode的加法操作,這樣,分布在不同context下的同一個資源對應的所有DefaultNode都會去調(diào)用clusterNode去匯總統(tǒng)計相同資源的統(tǒng)計數(shù)據(jù)。
e.一個資源對應一個ProcessChain,自然就對應一個NodeSelectorSlot實例,所以在NodeSelectorSlot里面DefaultNode node = map.get(context.getName());這行代碼的一維是資源,二維才是context,如果搞反了可能看到這行會蒙圈;
源碼釋義:
見c的分析。
ClusterBuilderSlot
構建ClusterNode并設置到上面slot選中的node中,如果context中有設置調(diào)用來源Origin就創(chuàng)建一個StatisticNode針對具體某個調(diào)用方統(tǒng)計數(shù)據(jù)
Note that 'origin' usually is Service Consumer's app name:
StatisticSlot
核心slot,用于統(tǒng)計各種數(shù)據(jù)的地方,然后被后面的slot應用。
先調(diào)用fireEntry執(zhí)行后面的slot,檢查本次請求能否通過,如果通過,就給對應的node做加法操作。
先給自身node做加法,在給ClusterBuilderSlot中創(chuàng)建并傳入的closterNode做加法。
最終調(diào)用ArrayMetric的addPass:
需要申明的是,sentinel統(tǒng)計采用的是滑動窗口的實現(xiàn)方式,這里的重點是com.alibaba.csp.sentinel.slots.statistic.base.LeapArray.currentWindow(long)方法,我們看下如何獲取當前窗口。
// 根據(jù)當前時間戳計算當前窗口的索引private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
// 索引從0開始,整除后得到的結果就是當前時間戳所在的窗口,實際上我們的窗口只有array長度的固定幾個,
// 可以想象一下這是在一個無限延長的虛擬時間線窗口,再對array的length取余數(shù)就得到了實際所在的窗口索引long timeId = timeMillis / windowLengthInMs;// Calculate current index so we can map the timestamp to the leap array.return (int)(timeId % array.length());}protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
// 計算虛擬窗口對應的開始時間,當前時間減去超出當前窗口的那一段時間就得到開始時間return timeMillis - timeMillis % windowLengthInMs;}/*** Get bucket item at provided timestamp.** @param timeMillis a valid timestamp in milliseconds* @return current bucket item at provided timestamp if the time is valid; null if time is invalid*/public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}// 獲取當前窗口索引int idx = calculateTimeIdx(timeMillis);// Calculate current bucket start time.
// 獲取窗口開始時間
long windowStart = calculateWindowStart(timeMillis);/** Get bucket item at given time from the array.** (1) Bucket is absent, then just create a new bucket and CAS update to circular array.* (2) Bucket is up-to-date, then just return the bucket.* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.*/while (true) {WindowWrap<T> old = array.get(idx);
// 如果索引對應的窗口還沒有創(chuàng)建就新建窗口對象if (old == null) {/** B0 B1 B2 NULL B4* ||_______|_______|_______|_______|_______||___* 200 400 600 800 1000 1200 timestamp* ^* time=888* bucket is empty, so create new and update** If the old bucket is absent, then we create a new bucket at {@code windowStart},* then try to update circular array via a CAS operation. Only one thread can* succeed to update, while other threads yield its time slice.*/WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// 原子操作,如果設置成功就返回,如果設置失敗就讓出cpu,等待再次被翻牌if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield();}
// 如果當前窗口時間一致,說明當前窗口還未過期,就返回該窗口對象} else if (windowStart == old.windowStart()) {/** B0 B1 B2 B3 B4* ||_______|_______|_______|_______|_______||___* 200 400 600 800 1000 1200 timestamp* ^* time=888* startTime of Bucket 3: 800, so it's up-to-date** If current {@code windowStart} is equal to the start timestamp of old bucket,* that means the time is within the bucket, so directly return the bucket.*/return old;
// 如果窗口開始時間過期了,就重置當前窗口的開始時間為最新的時間} else if (windowStart > old.windowStart()) {/** (old)* B0 B1 B2 NULL B4* |_______||_______|_______|_______|_______|_______||___* ... 1200 1400 1600 1800 2000 2200 timestamp* ^* time=1676* startTime of Bucket 2: 400, deprecated, should be reset** If the start timestamp of old bucket is behind provided time, that means* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.* Note that the reset and clean-up operations are hard to be atomic,* so we need a update lock to guarantee the correctness of bucket update.** The update lock is conditional (tiny scope) and will take effect only when* bucket is deprecated, so in most cases it won't lead to performance loss.*/if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {// Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield();}
// 正常情況下不會走這里,因為時間是往前走的} else if (windowStart < old.windowStart()) {// Should not go through here, as the provided time is already behind.return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}
最終是在MetricBucket中用LongAdder做的原子加,LongAdder是jdk8的特性,這里sentinel直接挪了過來,避免要求sentinel用戶必須使用jdk8.為什么是LongAdder而不是AtomicLong,因為前者在并發(fā)下表現(xiàn)更優(yōu)異,具體區(qū)別請自行了解。
SystemSlot
通過之前統(tǒng)計節(jié)點中Constants.ENTRY_NODE這個node中的數(shù)據(jù)檢查全局qps等是否滿足要求。
AuthoritySlot
黑白名單匹配。通過com.alibaba.csp.sentinel.slots.block.authority.AuthorityRuleManager.loadRules(List<AuthorityRule>)設置規(guī)則。
FlowSlot
流量控制。通過com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager.loadRules(List<FlowRule>)設置規(guī)則。
DegradeSlot
降級處理。通過com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager.loadRules(List<DegradeRule>)設置規(guī)則。
轉載于:https://www.cnblogs.com/restart30/p/10796725.html
總結
以上是生活随笔為你收集整理的alibaba sentinel限流组件 源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 项目Alpha冲刺--5/10
- 下一篇: 广告小程序后端开发(4.导入地区数据,修