Disruptor并发框架--学习笔记
Disruptor并發框架簡介
Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平臺,它能夠以很低的延遲產生大量交易。這個系統是建立在JVM平臺上,其核心是一個業務邏輯處理器,它能夠在一個線程里每秒處理6百萬訂單。業務邏輯處理器完全是運行在內存中,使用事件源驅動方式。業務邏輯處理器的核心是Disruptor。
Disruptor它是一個開源的并發框架,并獲得2011 Duke’s 程序框架創新獎,能夠在無鎖的情況下實現網絡的Queue并發操作。
Disruptor是一個高性能的異步處理框架,或者可以認為是最快的消息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現。
目前我們使用disruptor已經更新到了3.x版本,比之前的2.x版本性能更加的優秀,提供更多的API使用方式。
下載disruptor-3.3.2.jar引入我們的項目既可以開始disruptor之旅。
在使用之前,首先說明disruptor主要功能加以說明,你可以理解為他是一種高效的”生產者-消費者”模型。也就性能遠遠高于傳統的BlockingQueue容器。
官方學習網站:http://ifeve.com/disruptor-getting-started/
(1)使用Disruptor
- 第一:建立一個Event類,用來承載數據,因為Disruptor是一個事件驅動的,所以再Disruptor中是以事件綁定數據進行傳遞的
- 第二:建立一個工廠Event類,用于創建Event類實例對象
- 第三:需要有一個監聽事件類,用于處理數據(Event類)
- 第四:我們需要進行測試代碼編寫。實例化Disruptor實例,配置一系列參數。然后我們對Disruptor實例綁定監聽事件類,接受并處理數據。
- 第五:在Disruptor中,真正存儲數據的核心叫做RingBuffer,我們通過Disruptor實例拿到它,然后把數據生產出來,把數據加入到RingBuffer的實例對象中即可。
(2)Disruptor術語
- RingBuffer:被看做Disruptor最主要的組件,然而從3.0開始RingBuffer僅僅負責存儲和更新再Disruptor中流通的數據。對一些特殊的使用場景能夠被用戶(使用其他數據結構)完全替代。
- Sequence:Disruptor使用Sequence來表示一個特殊組件處理的序號。和Disruptor一樣,每一個消費者(EventProcessor)都維持著一個Sequence。大部分的并發代碼依賴這些Sequence值得運轉,因此Sequence支持多種當前為AtomicLong類的特性。
- Sequencer:這是Disruptor真正的核心。實現了這個接口的兩種生產者(單生產者和多生產者)均實現了所有的并發算法,為了在生產者和消費者之間進行準確快速的數據傳遞。
- SequenceBarrier:由Sequencer生成,并且包含了已經發布的Sequence的引用,這些Sequence源于Sequencer和一些獨立的消費者的Sequence。它包含了決定是否有供消費者消費的Event的邏輯。用來權衡當消費者無法從RingBuffer里面獲取事件時的處理策略。(例如:當生產者太慢,消費者太快,會導致消費者獲取不到新的事件會根據該策略進行處理,默認會堵塞)
- WaitStrategy:決定一個消費者將如何等待生產者將Event置入Disruptor的策略。用來權衡當生產者無法將新的事件放進RingBuffer時的處理策略。(例如:當生產者太快,消費者太慢,會導致生成者獲取不到新的事件槽來插入新事件,則會根據該策略進行處理,默認會堵塞)
- Event:從生產者到消費者過程中所處理的數據單元。Disruptor中沒有代碼表示Event,因為它完全是由用戶定義的。
- EventProcessor:主要事件循環,處理Disruptor中的Event,并且擁有消費者的Sequence。它有一個實現類是BatchEventProcessor,包含了event loop有效的實現,并且將回調到一個EventHandler接口的實現對象。
- EventHandler:由用戶實現并且代表了Disruptor中的一個消費者的接口。
- Producer:由用戶實現,它調用RingBuffer來插入事件(Event),在Disruptor中沒有相應的實現代碼,由用戶實現。
- WorkProcessor:確保每個sequence只被一個processor消費,在同一個WorkPool中的處理多個WorkProcessor不會消費同樣的sequence。
- WorkerPool:一個WorkProcessor池,其中WorkProcessor將消費Sequence,所以任務可以在實現WorkHandler接口的worker之間移交
- LifecycleAware:當BatchEventProcessor啟動和停止時,于實現這個接口用于接收通知。
(3)Disruptor應用
Disruptor實際上是對RingBuffer的封裝,所以我們也可以直接使用RingBuffer類
API提供的生產者接口
EventTranslator<V>與EventTranslatorOneArg<V v, Object data>,前者不能動態傳參,后者可以動態傳遞一個參數data,V為需要創建的數據對象,data為實際數據,實現translateTo(V v, long sequeue, Object data)方法,其中v就是下一個可用事件槽里面的對象,data為傳進來的真實數據,調用ringBuffer.publishEvent(EventTranslatorOneArg translator, Object data);來發布數據到RingBuffer中。
API提供的消費者接口
WorkerPool :
WorkerPool<Order>(RingBuffer<V> ringBuffer, SequenceBarrier sequenceBarrier, ExceptionHandler<? super V> exceptionHandler, WorkHandler<? super V>... workHandlers)其中RingBuffer為數據緩沖區,sequenceBarrier是消費者與生產者之間的協調策略,API默認提供了一個實現類ProcessingSequenceBarrier,可以通過RingBuffer.newBarrier(Sequence... sequencesToTrack);來獲取,exceptionHandler為異常處理函數,當handler發生異常時回調該函數,workHandlers為實現了EventHandler接口的消息業務處理類,可以有多個。
WorkerPool啟動的方法是 WorkerPool.start(Executor executor)
BatchEventProcessor :
BatchEventProcessor<V>(RingBuffer extends DataProvider, SequenceBarrier sequenceBarrier, EventHandler<? super V> eventHandler) 其中RingBuffer為數據緩沖區,sequenceBarrier是消費者與生產者之間的協調策略,API默認提供了一個實現類ProcessingSequenceBarrier,可以通過RingBuffer.newBarrier(Sequence... sequencesToTrack);來獲取,eventHandler為實現了EventHandler接口的消息業務處理類。
BatchEventProcessor啟動的方法是 Executor.submit(BatchEventProcessor batchEventProcessor)
注意
SequenceBarrier是用來協調消費者和生成者之間效率的策略類,所以要想Barrier生效,必須要將消費者消費的Seuence傳遞給RingBuffer,然后由RingBuffer進行協調:ringBuffer.addGatingSequences(BatchEventProcessor.getSequence()); 多消費者時使用BatchEventProcessor.getWorkerSequences()(這兩個方法WorkerPool同樣適用)。這是在直接使用RingBuffer時需要進行的處理,如果通過Disruptor去進行調用,在調用handleEventsWith注冊消費者方法時會自動添加該處理。
Disruptor注冊消費者的方法是:
Disruptor.handleEventsWith(final EventHandler<? super T>... handlers)
Disruptor提供了一些復雜的并行運行方式。
1、生產者A生成的數據同時被B,C兩個消費者消費,兩者都消費完成之后再由消費者D對兩者同時消費。(注意ABC以及下面提到的消息處理類必須要實現EventHandler接口)
EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(A, B); //聲明在C1,C2完事之后執行JMS消息發送操作 也就是流程走到C3 handlerGroup.then(C);2、生產者A生成的數據同時被B1,C2兩個消費者消費,而B消耗完畢之后由B2處理,C1處理完成之后由C2處理,B2與C2兩者都消費完成之后再由消費者D對兩者同時消費。其中B1與B2,C1與C2是并行執行的。
disruptor.handleEventsWith(B1, C1); disruptor.after(B1).handleEventsWith(B2); disruptor.after(C1).handleEventsWith(C2); disruptor.after(B2, C2).handleEventsWith(h3);3、生產者A生成的數據依次被A,B,C三個消費者消費
disruptor.handleEventsWith(A).handleEventsWith(B).handleEventsWith(C);并行運行方式demo:
Main方法:
package com.zyh.study.disruptor.base;import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType;import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class Main { public static void main(String[] args) throws InterruptedException { long beginTime=System.currentTimeMillis(); int bufferSize=1024; ExecutorService executor=Executors.newFixedThreadPool(8); Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); //菱形操作//使用disruptor創建消費者組C1,C2EventHandlerGroup<Trade> handlerGroup =disruptor.handleEventsWith(new Handler1(), new Handler2());//聲明在C1,C2完事之后執行JMS消息發送操作 也就是流程走到C3handlerGroup.then(new Handler3());//順序操作/**disruptor.handleEventsWith(new Handler1()).handleEventsWith(new Handler2()).handleEventsWith(new Handler3());*///六邊形操作. /**Handler1 h1 = new Handler1();Handler2 h2 = new Handler2();Handler3 h3 = new Handler3();Handler4 h4 = new Handler4();Handler5 h5 = new Handler5();disruptor.handleEventsWith(h1, h2);disruptor.after(h1).handleEventsWith(h4);disruptor.after(h2).handleEventsWith(h5);disruptor.after(h4, h5).handleEventsWith(h3);*/disruptor.start();//啟動 CountDownLatch latch=new CountDownLatch(1); //生產者準備 executor.submit(new TradePublisher(latch, disruptor));latch.await();//等待生產者完事. disruptor.shutdown(); executor.shutdown(); System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime)); } }TradePublisher:
public class TradePublisher implements Runnable { Disruptor<Trade> disruptor; private CountDownLatch latch; private static int LOOP=10;//模擬百萬次交易的發生 public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) { this.disruptor=disruptor; this.latch=latch; } @Override public void run() {TradeEventTranslator tradeTransloator = new TradeEventTranslator(); for(int i=0;i<LOOP;i++){ disruptor.publishEvent(tradeTransloator); } latch.countDown(); } }TradeEventTranslator:
class TradeEventTranslator implements EventTranslator<Trade>{ private Random random=new Random(); @Override public void translateTo(Trade event, long sequence) { this.generateTrade(event); } private Trade generateTrade(Trade trade){ trade.setPrice(random.nextDouble()*9999); return trade; } }Trade:
@Data public class Trade { private String id;//ID private String name;private double price;//金額 private AtomicInteger count = new AtomicInteger(0);}Handler1:
package com.zyh.study.disruptor.base;import com.lmax.disruptor.EventHandler;public class Handler1 implements EventHandler<Trade>{@Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.out.println("handler1: set name");event.setName("h1");Thread.sleep(1000); } }Handler2:
package com.zyh.study.disruptor.base;import com.lmax.disruptor.EventHandler;public class Handler2 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler2: set price");event.setPrice(17.0);Thread.sleep(1000);} }Handler3:
package com.zyh.study.disruptor.base;import com.lmax.disruptor.EventHandler;public class Handler3 implements EventHandler<Trade> {@Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + "; instance: " + event.toString());} }Handler4:
package com.zyh.study.disruptor.base;import com.lmax.disruptor.EventHandler;public class Handler4 implements EventHandler<Trade> {@Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.out.println("handler4: get name : " + event.getName());event.setName(event.getName() + "h4");}}Handler5:
package com.zyh.study.disruptor.base;import com.lmax.disruptor.EventHandler;public class Handler5 implements EventHandler<Trade>{@Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.out.println("handler5: get price : " + event.getPrice());event.setPrice(event.getPrice() + 3.0);} }總結
以上是生活随笔為你收集整理的Disruptor并发框架--学习笔记的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: idea 关于高亮显示与选中字符串相同的
- 下一篇: sql 查出一张表中重复的所有记录数据