nio框架中的多个Selector结构
隨著并發(fā)數(shù)量的提高,傳統(tǒng)nio框架采用一個Selector來支撐大量連接事件的管理和觸發(fā)已經(jīng)遇到瓶頸,因此現(xiàn)在各種nio框架的新版本都采用多個Selector并存的結(jié)構(gòu),由多個Selector均衡地去管理大量連接。這里以Mina和Grizzly的實(shí)現(xiàn)為例。
?? 在Mina 2.0中,Selector的管理是由org.apache.mina.transport.socket.nio.NioProcessor來處理,每個NioProcessor對象保存一個Selector,負(fù)責(zé)具體的select、wakeup、channel的注冊和取消、讀寫事件的注冊和判斷、實(shí)際的IO讀寫操作等等,核心代碼如下:
????????super(executor);
????????try?{
????????????//?Open?a?new?selector
????????????selector?=?Selector.open();
????????}?catch?(IOException?e)?{
????????????throw?new?RuntimeIoException("Failed?to?open?a?selector.",?e);
????????}
????}
????protected?int?select(long?timeout)?throws?Exception?{
????????return?selector.select(timeout);
????}
??
????protected?boolean?isInterestedInRead(NioSession?session)?{
????????SelectionKey?key?=?session.getSelectionKey();
????????return?key.isValid()?&&?(key.interestOps()?&?SelectionKey.OP_READ)?!=?0;
????}
????protected?boolean?isInterestedInWrite(NioSession?session)?{
????????SelectionKey?key?=?session.getSelectionKey();
????????return?key.isValid()?&&?(key.interestOps()?&?SelectionKey.OP_WRITE)?!=?0;
????}
????protected?int?read(NioSession?session,?IoBuffer?buf)?throws?Exception?{
????????return?session.getChannel().read(buf.buf());
????}
????protected?int?write(NioSession?session,?IoBuffer?buf,?int?length)?throws?Exception?{
????????if?(buf.remaining()?<=?length)?{
????????????return?session.getChannel().write(buf.buf());
????????}?else?{
????????????int?oldLimit?=?buf.limit();
????????????buf.limit(buf.position()?+?length);
????????????try?{
????????????????return?session.getChannel().write(buf.buf());
????????????}?finally?{
????????????????buf.limit(oldLimit);
????????????}
????????}
????}
?? 這些方法的調(diào)用都是通過AbstractPollingIoProcessor來處理,這個類里可以看到一個nio框架的核心邏輯,注冊、select、派發(fā),具體因?yàn)榕c本文主題不合,不再展開。NioProcessor的初始化是在NioSocketAcceptor的構(gòu)造方法中調(diào)用的:
????????super(new?DefaultSocketSessionConfig(),?NioProcessor.class);
????????((DefaultSocketSessionConfig)?getSessionConfig()).init(this);
????}
?? 直接調(diào)用了父類AbstractPollingIoAcceptor的構(gòu)造函數(shù),在其中我們可以看到,默認(rèn)是啟動了一個SimpleIoProcessorPool來包裝NioProcessor:
????????????Class<??extends?IoProcessor<T>>?processorClass)?{
????????this(sessionConfig,?null,?new?SimpleIoProcessorPool<T>(processorClass),
????????????????true);
????}
?? 這里其實(shí)是一個組合模式,SimpleIoProcessorPool和NioProcessor都實(shí)現(xiàn)了Processor接口,一個是組合形成的Processor池,而另一個是單獨(dú)的類。調(diào)用的SimpleIoProcessorPool的構(gòu)造函數(shù)是這樣:
??? public?SimpleIoProcessorPool(Class<??extends?IoProcessor<T>>?processorType)?{
????????this(processorType,?null,?DEFAULT_SIZE);
????}
??? 可以看到,默認(rèn)的池大小是cpu個數(shù)+1,也就是創(chuàng)建了cpu+1個的Selector對象。它的重載構(gòu)造函數(shù)里是創(chuàng)建了一個數(shù)組,啟動一個CachedThreadPool來運(yùn)行NioProcessor,通過反射創(chuàng)建具體的Processor對象,這里就不再列出了。
??? Mina當(dāng)有一個新連接建立的時候,就創(chuàng)建一個NioSocketSession,并且傳入上面的SimpleIoProcessorPool,當(dāng)連接初始化的時候?qū)ession加入SimpleIoProcessorPool:
????????????ServerSocketChannel?handle)?throws?Exception?{
????????SelectionKey?key?=?handle.keyFor(selector);
????????
????????if?((key?==?null)?||?(!key.isValid())?||?(!key.isAcceptable())?)?{
????????????return?null;
????????}
????????//?accept?the?connection?from?the?client
????????SocketChannel?ch?=?handle.accept();
????????
????????if?(ch?==?null)?{
????????????return?null;
????????}
????????return?new?NioSocketSession(this,?processor,?ch);
????}
????????
????????private?void?processHandles(Iterator<H>?handles)?throws?Exception?{
????????????while?(handles.hasNext())?{
????????????????H?handle?=?handles.next();
????????????????handles.remove();
????????????????//?Associates?a?new?created?connection?to?a?processor,
????????????????//?and?get?back?a?session
????????????????T?session?=?accept(processor,?handle);
????????????????
????????????????if?(session?==?null)?{
????????????????????break;
????????????????}
????????????????initSession(session,?null,?null);
????????????????//?add?the?session?to?the?SocketIoProcessor
????????????????session.getProcessor().add(session);
????????????}
????????}
??? 加入的操作是遞增一個整型變量并且模數(shù)組大小后對應(yīng)的NioProcessor注冊到session里:
????private?IoProcessor<T>?nextProcessor()?{
????????checkDisposal();
????????return?pool[Math.abs(processorDistributor.getAndIncrement())?%?pool.length];
????}
??? if?(p?==?null)?{
????????????p?=?nextProcessor();
????????????IoProcessor<T>?oldp?=
????????????????(IoProcessor<T>)?session.setAttributeIfAbsent(PROCESSOR,?p);
????????????if?(oldp?!=?null)?{
????????????????p?=?oldp;
????????????}
??? }
??? 這樣一來,每個連接都關(guān)聯(lián)一個NioProcessor,也就是關(guān)聯(lián)一個Selector對象,避免了所有連接共用一個Selector負(fù)載過高導(dǎo)致server響應(yīng)變慢的后果。但是注意到NioSocketAcceptor也有一個Selector,這個Selector用來干什么的呢?那就是集中處理OP_ACCEPT事件的Selector,主要用于連接的接入,不跟處理讀寫事件的Selector混在一起,因此Mina的默認(rèn)open的Selector是cpu+2個。
??? 看完mina2.0之后,我們來看看Grizzly2.0是怎么處理的,Grizzly還是比較保守,它默認(rèn)就是啟動兩個Selector,其中一個專門負(fù)責(zé)accept,另一個負(fù)責(zé)連接的IO讀寫事件的管理。Grizzly 2.0中Selector的管理是通過SelectorRunner類,這個類封裝了Selector對象以及核心的分發(fā)注冊邏輯,你可以將他理解成Mina中的NioProcessor,核心的代碼如下:
????????selectorHandler?=?transport.getSelectorHandler();
????????selectionKeyHandler?=?transport.getSelectionKeyHandler();
????????strategy?=?transport.getStrategy();
????????
????????try?{
????????????if?(isResume)?{
????????????????//?If?resume?SelectorRunner?-?finish?postponed?keys
????????????????isResume?=?false;
????????????????if?(keyReadyOps?!=?0)?{
????????????????????if?(!iterateKeyEvents())?return?false;
????????????????}
????????????????
????????????????if?(!iterateKeys())?return?false;
????????????}
????????????lastSelectedKeysCount?=?0;
????????????
????????????selectorHandler.preSelect(this);
????????????
????????????readyKeys?=?selectorHandler.select(this);
????????????if?(stateHolder.getState(false)?==?State.STOPPING)?return?false;
????????????
????????????lastSelectedKeysCount?=?readyKeys.size();
????????????
????????????if?(lastSelectedKeysCount?!=?0)?{
????????????????iterator?=?readyKeys.iterator();
????????????????if?(!iterateKeys())?return?false;
????????????}
????????????selectorHandler.postSelect(this);
????????}?catch?(ClosedSelectorException?e)?{
????????????notifyConnectionException(key,
????????????????????"Selector?was?unexpectedly?closed",?e,
????????????????????Severity.TRANSPORT,?Level.SEVERE,?Level.FINE);
????????}?catch?(Exception?e)?{
????????????notifyConnectionException(key,
????????????????????"doSelect?exception",?e,
????????????????????Severity.UNKNOWN,?Level.SEVERE,?Level.FINE);
????????}?catch?(Throwable?t)?{
????????????logger.log(Level.SEVERE,"doSelect?exception",?t);
????????????transport.notifyException(Severity.FATAL,?t);
????????}
????????return?true;
????}
??? 基本上是一個reactor實(shí)現(xiàn)的樣子,在AbstractNIOTransport類維護(hù)了一個SelectorRunner的數(shù)組,而Grizzly用于創(chuàng)建tcp server的類TCPNIOTransport正是繼承于AbstractNIOTransport類,在它的start方法中調(diào)用了startSelectorRunners來創(chuàng)建并啟動SelectorRunner數(shù)組:
?@Override
??public?void?start()?throws?IOException?{
??if?(selectorRunnersCount?<=?0)?{
????????????????selectorRunnersCount?=?DEFAULT_SELECTOR_RUNNERS_COUNT;
????????????}
??startSelectorRunners();
}
?protected?void?startSelectorRunners()?throws?IOException?{
????????selectorRunners?=?new?SelectorRunner[selectorRunnersCount];
????????
????????synchronized(selectorRunners)?{
????????????for?(int?i?=?0;?i?<?selectorRunnersCount;?i++)?{
????????????????SelectorRunner?runner?=
????????????????????????new?SelectorRunner(this,?SelectorFactory.instance().create());
????????????????runner.start();
????????????????selectorRunners[i]?=?runner;
????????????}
????????}
????}
? 可見Grizzly并沒有采用一個單獨(dú)的池對象來管理SelectorRunner,而是直接采用數(shù)組管理,默認(rèn)數(shù)組大小是2。SelectorRunner實(shí)現(xiàn)了Runnable接口,它的start方法調(diào)用了一個線程池來運(yùn)行自身。剛才我提到了說Grizzly的Accept是單獨(dú)一個Selector來管理的,那么是如何表現(xiàn)的呢?答案在RoundRobinConnectionDistributor類,這個類是用于派發(fā)注冊事件到相應(yīng)的SelectorRunner上,它的派發(fā)方式是這樣:
????????????SelectableChannel?channel,?int?interestOps,?Object?attachment,
????????????CompletionHandler?completionHandler)?
????????????throws?IOException?{
????????SelectorRunner?runner?=?getSelectorRunner(interestOps);
????????
????????return?transport.getSelectorHandler().registerChannelAsync(
????????????????runner,?channel,?interestOps,?attachment,?completionHandler);
????}
????
????private?SelectorRunner?getSelectorRunner(int?interestOps)?{
????????SelectorRunner[]?runners?=?getTransportSelectorRunners();
????????int?index;
????????if?(interestOps?==?SelectionKey.OP_ACCEPT?||?runners.length?==?1)?{
????????????index?=?0;
????????}?else?{
????????????index?=?(counter.incrementAndGet()?%?(runners.length?-?1))?+?1;
????????}
????????
????????return?runners[index];
????}
????getSelectorRunner這個方法道出了秘密,如果是OP_ACCEPT,那么都使用數(shù)組中的第一個SelectorRunner,如果不是,那么就通過取模運(yùn)算的結(jié)果+1從后面的SelectorRunner中取一個來注冊。
??? 分析完mina2.0和grizzly2.0對Selector的管理后我們可以得到幾個啟示:
1、在處理大量連接的情況下,多個Selector比單個Selector好
2、多個Selector的情況下,處理OP_READ和OP_WRITE的Selector要與處理OP_ACCEPT的Selector分離,也就是說處理接入應(yīng)該要一個單獨(dú)的Selector對象來處理,避免IO讀寫事件影響接入速度。
3、Selector的數(shù)目問題,mina默認(rèn)是cpu+2,而grizzly總共就2個,我更傾向于mina的策略,但是我認(rèn)為應(yīng)該對cpu個數(shù)做一個判斷,如果CPU個數(shù)超過8個,那么更多的Selector線程可能帶來比較大的線程切換的開銷,mina默認(rèn)的策略并非合適,幸好可以設(shè)置這個數(shù)值。
原文地址:http://click.aliyun.com/m/21432/ ?????????????
轉(zhuǎn)載于:https://www.cnblogs.com/iyulang/p/6878559.html
總結(jié)
以上是生活随笔為你收集整理的nio框架中的多个Selector结构的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 立讯精密捐赠 1000 万元支援涿州灾区
- 下一篇: 软银起诉其投资的社交媒体初创公司,指控其