冷热复位_冷热rx-java可观察
冷熱復(fù)位
我自己對(duì)“熱和冷可觀測(cè)”的理解還很不穩(wěn)定,但這是我到目前為止所了解的!
冷觀測(cè)
考慮一個(gè)返回rx-java Observable的API:
import obs.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; import rx.schedulers.Schedulers;public class Service1 {private static final Logger logger = LoggerFactory.getLogger(Service1.class);public Observable<String> operation() {return Observable.<String>create(s -> {logger.info("Start: Executing slow task in Service 1");Util.delay(1000);s.onNext("data 1");logger.info("End: Executing slow task in Service 1");s.onCompleted();}).subscribeOn(Schedulers.computation());} }現(xiàn)在,首先要注意的是,典型的Observable在訂閱之前不會(huì)做任何事情:
所以基本上,如果我要這樣做:
Observable<String> op1 = service1.operation();除非通過(guò)以下方式在Observable上進(jìn)行訂閱,否則不會(huì)打印或返回任何內(nèi)容:
Observable<String> op1 = service1.operation();CountDownLatch latch = new CountDownLatch(1);op1.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());latch.await();因此,現(xiàn)在,如果此Observable上有多個(gè)訂閱,會(huì)發(fā)生什么情況:
Observable<String> op1 = service1.operation();CountDownLatch latch = new CountDownLatch(3);op1.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());op1.subscribe(s -> logger.info("From Subscriber 2: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());op1.subscribe(s -> logger.info("From Subscriber 3: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());latch.await();有了冷的可觀察到的代碼,代碼將再次被調(diào)用并再次發(fā)出項(xiàng)目,這在我的機(jī)器上得到了:
06:04:07.206 [RxComputationThreadPool-2] INFO o.b.Service1 - Start: Executing slow task in Service 1 06:04:07.208 [RxComputationThreadPool-3] INFO o.b.Service1 - Start: Executing slow task in Service 1 06:04:08.211 [RxComputationThreadPool-2] INFO o.b.BasicObservablesTest - From Subscriber 2: data 1 06:04:08.211 [RxComputationThreadPool-1] INFO o.b.BasicObservablesTest - From Subscriber 1: data 1 06:04:08.211 [RxComputationThreadPool-3] INFO o.b.BasicObservablesTest - From Subscriber 3: data 1 06:04:08.213 [RxComputationThreadPool-2] INFO o.b.Service1 - End: Executing slow task in Service 1 06:04:08.214 [RxComputationThreadPool-1] INFO o.b.Service1 - End: Executing slow task in Service 1 06:04:08.214 [RxComputationThreadPool-3] INFO o.b.Service1 - End: Executing slow task in Service 1熱可觀察–使用ConnectableObservable
另一方面,Hot Observable確實(shí)不需要訂閱即可開(kāi)始發(fā)射項(xiàng)目。 一種實(shí)現(xiàn)Hot Observable的方法是使用ConnectableObservable ,它是一個(gè)Observable,它在調(diào)用connect方法之前不會(huì)發(fā)出項(xiàng)目,但是一旦開(kāi)始發(fā)出項(xiàng)目,它的任何訂閱者只能從訂閱點(diǎn)獲取項(xiàng)目。 因此,再次回顧前面的示例,但使用ConnectableObservable代替:
Observable<String> op1 = service1.operation();ConnectableObservable<String> connectableObservable = op1.publish();CountDownLatch latch = new CountDownLatch(3);connectableObservable.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());connectableObservable.subscribe(s -> logger.info("From Subscriber 2: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());connectableObservable.subscribe(s -> logger.info("From Subscriber 3: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());connectableObservable.connect();latch.await();并打印以下內(nèi)容:
06:07:23.852 [RxComputationThreadPool-3] INFO o.b.Service1 - Start: Executing slow task in Service 1 06:07:24.860 [RxComputationThreadPool-3] INFO o.b.ConnectableObservablesTest - From Subscriber 1: data 1 06:07:24.862 [RxComputationThreadPool-3] INFO o.b.ConnectableObservablesTest - From Subscriber 2: data 1 06:07:24.862 [RxComputationThreadPool-3] INFO o.b.ConnectableObservablesTest - From Subscriber 3: data 1 06:07:24.862 [RxComputationThreadPool-3] INFO o.b.Service1 - End: Executing slow task in Service 1熱點(diǎn)可觀察–使用主題
將冷的Observable轉(zhuǎn)換為高溫的另一種方法是使用Subject 。 主題既表現(xiàn)為可觀察者,又表現(xiàn)為觀察者,有不同類型的主題具有不同的行為。 在這里,我使用一個(gè)名為PublishSubject的Subject,它具有Pub / Sub行為–這些項(xiàng)目被發(fā)送給所有在其上監(jiān)聽(tīng)的訂閱者。 因此,隨著PublishSubject的引入,代碼如下所示:
Observable<String> op1 = service1.operation();PublishSubject<String> publishSubject = PublishSubject.create();op1.subscribe(publishSubject);CountDownLatch latch = new CountDownLatch(3);publishSubject.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());publishSubject.subscribe(s -> logger.info("From Subscriber 2: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());publishSubject.subscribe(s -> logger.info("From Subscriber 3: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());latch.await();了解如何將PublishSubject作為Observable的訂閱者引入,而其他訂閱者則如何訂閱PublishSubject。 輸出將類似于ConnectableObservable的輸出。
從本質(zhì)上來(lái)說(shuō),這就是我對(duì)“熱可觀察”的理解程度。 因此,總而言之,Cold和Hot Observable之間的區(qū)別在于訂戶何時(shí)獲得發(fā)射的項(xiàng)目以及何時(shí)發(fā)射項(xiàng)目–使用Cold Observable,它們?cè)谟嗛啿⑼ǔ+@得所有發(fā)射的項(xiàng)目時(shí)發(fā)射,一個(gè)Hot Observable,項(xiàng)目將在沒(méi)有訂閱服務(wù)器的情況下發(fā)出,而訂閱者通常會(huì)在訂閱點(diǎn)之后獲得項(xiàng)目。
參考
翻譯自: https://www.javacodegeeks.com/2015/03/hot-and-cold-rx-java-observable.html
冷熱復(fù)位
總結(jié)
以上是生活随笔為你收集整理的冷热复位_冷热rx-java可观察的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: jaxb报错_JAXB做错了; 尝试Xe
- 下一篇: 章丘房管局备案查询网站(章丘房备案)