Netty系列(三):说说NioEventLoop
前言
本來想先寫下NioServerSocketChannel以及NioSocketChannel的注冊(cè)流程的,但是最后發(fā)現(xiàn)始終離不開NioEventLoop這個(gè)類,所以在這之前必須得先講解下NioEventLoop這個(gè)類到底是用來做啥的。其實(shí)在第一篇文章里面有提及到它的,但是沒有詳細(xì)的去講解,接下來會(huì)對(duì)它分析一波。
設(shè)計(jì)模型
在進(jìn)入正文之前,先簡單的了解下NioEventLoop的工作模型(服務(wù)端):
假設(shè)一個(gè)NioEventLoopGroup(這里服務(wù)端會(huì)用兩個(gè)Group)里面有4個(gè)NioEventLoop,那么netty中的實(shí)際工作模型就如上圖所示,服務(wù)端會(huì)用默認(rèn)的選擇規(guī)則從Group1中選擇出一個(gè)NioEventLoop注冊(cè)ServerChannel,并綁定一個(gè)OP_ACCEPT用于監(jiān)聽客戶端發(fā)起的連接請(qǐng)求,一旦有新的連接進(jìn)來,服務(wù)端則會(huì)從Group2中按一定的規(guī)則選出一個(gè)NioEventLoop來注冊(cè)SocketChannel,并綁定OP_READ興趣事件,這里注意,一個(gè)NioEventLoop可以綁定多個(gè)SocketChannel。具體的注冊(cè)流程我會(huì)在下一篇文章中寫出來。
下面進(jìn)入正題。
構(gòu)造流程
NioEventLoop具體的構(gòu)造流程大家可以去我的Netty系列(一):NioEventLoopGroup源碼解析中去看一下,里面說的還算蠻詳細(xì)的。下面是其調(diào)用的構(gòu)造函數(shù),咱們可以觀察到其身上會(huì)綁定一個(gè)選擇器Selector,供后期channel注冊(cè)的時(shí)候使用的,這一塊是JAVA NIO相關(guān)的知識(shí)點(diǎn)。
內(nèi)部還維護(hù)著一個(gè)executor去開啟執(zhí)行線程的,以及taskQueue任務(wù)隊(duì)列和一個(gè)tailTasks尾部隊(duì)列(這個(gè)隊(duì)列里面的任務(wù)是在每次執(zhí)行taskQueue任務(wù)隊(duì)列中的任務(wù)結(jié)束后都會(huì)去調(diào)用的,不多介紹)。上面介紹的三個(gè)四個(gè)內(nèi)部結(jié)構(gòu)Selector,executor,taskQueue,tailTasks會(huì)在后面多次提起。
下圖是NioEventLoop的簡單的層級(jí)結(jié)構(gòu)(下圖取之于Netty in Action):
NioEventLoop.execute
這里我們先看一下NioEventLoop的execute方法。實(shí)際上這個(gè)方法是在其父類SingleThreadEventExecutor中。這個(gè)方法的功能就是將任務(wù)丟到taskQueue中。
1????public?void?execute(Runnable?task)?{2????????if?(task?==?null)?{
3????????????throw?new?NullPointerException("task");
4????????}
5
6????????boolean?inEventLoop?=?inEventLoop();
7????????addTask(task);
8????????if?(!inEventLoop)?{
9????????????//?開啟工作線程,實(shí)際上也就是執(zhí)行NioEventLoop中的run方法,下面會(huì)介紹
10????????????startThread();
11????????????if?(isShutdown())?{
12????????????????boolean?reject?=?false;
13????????????????try?{
14????????????????????if?(removeTask(task))?{
15????????????????????????reject?=?true;
16????????????????????}
17????????????????}?catch?(UnsupportedOperationException?e)?{
18????????????????????//?The?task?queue?does?not?support?removal?so?the?best?thing?we?can?do?is?to?just?move?on?and
19????????????????????//?hope?we?will?be?able?to?pick-up?the?task?before?its?completely?terminated.
20????????????????????//?In?worst?case?we?will?log?on?termination.
21????????????????}
22????????????????if?(reject)?{
23????????????????????reject();
24????????????????}
25????????????}
26????????}
27
28????????if?(!addTaskWakesUp?&&?wakesUpForTask(task))?{
29????????????wakeup(inEventLoop);
30????????}
31????}
復(fù)制代碼
NioEventLoop的工作模式實(shí)際上就是開啟一個(gè)單線程跑一個(gè)死循環(huán),然后一直輪詢taskQueue隊(duì)列是否有任務(wù)添加進(jìn)來,然后就去處理任務(wù),還有就是如果注冊(cè)在selector上的channel有興趣事件進(jìn)來,也會(huì)去處理selectorKeys,這一塊下面會(huì)做介紹。
NioEventLoop.run
現(xiàn)在看看NioEventLoop中的run方法
1????protected?void?run()?{2????????for?(;;)?{
3????????????try?{
4????????????????try?{
5????????????????????//?按默認(rèn)配置的話要么返回select.selectNow(),
6????????????????????//要么返回SelectStrategy.SELECT
7???????????????????switch?(selectStrategy.calculateStrategy(selectNowSupplier,?hasTasks()))?{
8????????????????????case?SelectStrategy.CONTINUE:
9????????????????????????continue;
10
11????????????????????case?SelectStrategy.BUSY_WAIT:
12????????????????????case?SelectStrategy.SELECT:
13????????????????????????select(wakenUp.getAndSet(false));
14
15????????????????????????if?(wakenUp.get())?{
16????????????????????????????selector.wakeup();
17????????????????????????}
18????????????????????????//?fall?through
19????????????????????default:
20????????????????????}
21????????????????}?catch?(IOException?e)?{
22????????????????????rebuildSelector0();
23????????????????????handleLoopException(e);
24????????????????????continue;
25????????????????}
26
27????????????????cancelledKeys?=?0;
28????????????????needsToSelectAgain?=?false;
29????????????????final?int?ioRatio?=?this.ioRatio;
30????????????????if?(ioRatio?==?100)?{
31????????????????????try?{
32????????????????????????//?IO操作,根據(jù)selectedKeys去處理
33????????????????????????processSelectedKeys();
34????????????????????}?finally?{
35????????????????????????//?保證執(zhí)行完所有的任務(wù)
36????????????????????????runAllTasks();
37????????????????????}
38????????????????}?else?{
39????????????????????final?long?ioStartTime?=?System.nanoTime();
40????????????????????try?{
41????????????????????????//?IO操作,根據(jù)selectedKeys去處理
42????????????????????????processSelectedKeys();
43????????????????????}?finally?{
44????????????????????????//?按一定的比例去處理任務(wù),有可能遺留一部分任務(wù)下次進(jìn)行處理
45????????????????????????final?long?ioTime?=?System.nanoTime()?-?ioStartTime;
46????????????????????????runAllTasks(ioTime?*?(100?-?ioRatio)?/?ioRatio);
47????????????????????}
48????????????????}
49????????????}?catch?(Throwable?t)?{
50????????????????handleLoopException(t);
51????????????}
52????????????//?Always?handle?shutdown?even?if?the?loop?processing?threw?an?exception.
53????????????try?{
54????????????????//?釋放資源,將注冊(cè)的channel全部關(guān)閉掉。
55????????????????if?(isShuttingDown())?{
56????????????????????closeAll();
57????????????????????if?(confirmShutdown())?{
58????????????????????????return;
59????????????????????}
60????????????????}
61????????????}?catch?(Throwable?t)?{?
62????????????????handleLoopException(t);
63????????????}
64????????}
65????}
復(fù)制代碼
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT
1.有任務(wù)便會(huì)直接返回select.selectNow(),則會(huì)直接去跑任務(wù)或者是去處理selectorKeys;
2.若沒任務(wù),則會(huì)走select(wakenUp.getAndSet(false))方法,里面會(huì)有一個(gè)timeout超時(shí)處理,selector.select(timeoutMillis),超時(shí)后也會(huì)去跑任務(wù)或者是去處理selectorKeys;
這一塊具體細(xì)節(jié)也很多,只是說一下處理流程。
這里的IO操作就是processSelectedKeys()方法,代碼雖然很長,但是干的活就是根據(jù)不同的興趣事件干不同的活,里面有對(duì)OP_READ OP_ACCEPT OP_WRITE OP_CONNECT等等不同興趣事件的不同處理方法,這一塊應(yīng)該是JAVA NIO里面的相關(guān)知識(shí)點(diǎn)。感興趣的朋友可以debug針對(duì)某個(gè)觸發(fā)事件研究一下。
runAllTasks
執(zhí)行任務(wù)的代碼如下(下面是runAllTasks的代碼):
1????protected?boolean?runAllTasks()?{2????????assert?inEventLoop();
3????????boolean?fetchedAll;
4????????boolean?ranAtLeastOne?=?false;
5
6????????do?{
7????????????//?這里會(huì)從定時(shí)任務(wù)隊(duì)列中將達(dá)到執(zhí)行事件的task丟到taskQueue中去
8????????????fetchedAll?=?fetchFromScheduledTaskQueue();
9????????????//?執(zhí)行taskQueue中所有的task
10????????????if?(runAllTasksFrom(taskQueue))?{
11????????????????ranAtLeastOne?=?true;
12????????????}
13????????}?while?(!fetchedAll);?//?keep?on?processing?until?we?fetched?all?scheduled?tasks.
14
15????????if?(ranAtLeastOne)?{
16????????????lastExecutionTime?=?ScheduledFutureTask.nanoTime();
17????????}
18????????//?這個(gè)是執(zhí)行上面所說的tailTasks中的task
19????????afterRunningAllTasks();
20????????return?ranAtLeastOne;
21????}
復(fù)制代碼
這一塊的邏輯是:
先執(zhí)行fetchFromScheduledTaskQueue方法,將到期的定時(shí)任務(wù)丟到taskQueue隊(duì)列中,這個(gè)fetchFromScheduledTaskQueue方法里面有個(gè)小細(xì)節(jié),當(dāng)taskQueue隊(duì)列滿了之后,它就會(huì)重新塞到scheduledTaskQueue隊(duì)列中,然后再外圈循環(huán),taskQueue隊(duì)列消費(fèi)完畢,則繼續(xù)執(zhí)行fetchFromScheduledTaskQueue方法,直到把所有到期的任務(wù)都丟到taskQueue隊(duì)列中執(zhí)行完畢為止。如下圖所示:
這一部分到這里就結(jié)束了,下一篇會(huì)對(duì)NioServerSocketChannel的注冊(cè)以及服務(wù)端創(chuàng)建NioSocketChannel進(jìn)行分析。
End
總結(jié)
以上是生活随笔為你收集整理的Netty系列(三):说说NioEventLoop的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: UiBot新版本即将上线!添加Java程
- 下一篇: LeetCode-56-Merge In