<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
相關範例原始碼:github.com/chentianmin…
public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)
在onNext()
、onComplete()
、onError()
方法進行執行緒切換,publishOn()
使得它下游的消費階段非同步執行。
Scheduler
用來生成實際執行非同步任務的Worker
。Error
。如果為true
,當收到上游的Error
時,會等佇列中的元素消費完畢後再向下游轉發Error
。否則會立即轉發Error
,可能導致佇列中的元素丟失。預設為true
。Queues.SMALL_BUFFER_SIZE
,該值通過設定進行修改。/** * 每隔delayMillis生產一個元素 */ protected Flux<Integer> delayPublishFlux(int delayMillis, int startInclusive, int endExclusive) { return Flux.create(fluxSink -> { IntStream.range(startInclusive, endExclusive) .forEach(i -> { // 同步next sleep(delayMillis); logInt(i, "生產"); fluxSink.next(i); }); fluxSink.complete(); }); } @Test public void testPreFetch() { delayPublishFlux(1000, 1, 5) .doOnRequest(i -> logLong(i, "request")) .publishOn(Schedulers.boundedElastic(), 2) .subscribe(i -> logInt(i, "消費")); sleep(10000); }
每次會都向上游請求2個元素。另外還能發現,從第二個request開始,執行緒發生了切換。
/** * 每隔delayMillis生產一個元素,最後傳送Error */ protected Flux<Integer> delayPublishFluxError(int delayMillis, int startInclusive, int endExclusive) { return Flux.create(fluxSink -> { IntStream.range(startInclusive, endExclusive) .forEach(i -> { // 同步next sleep(delayMillis); logInt(i, "生產"); fluxSink.next(i); }); fluxSink.error(new RuntimeException("釋出錯誤!")); }); } @Test public void testDelayError() { delayPublishFluxError(500, 1, 5) .publishOn(Schedulers.boundedElastic()) // 只是為了消費慢一點 .doOnNext(i -> sleep(1000)) .subscribe(i -> logInt(i, "消費")); sleep(10000); }
元素消費完才觸發Error
!
@Test public void testNotDelayError() { delayPublishFluxError(500, 1, 5) .publishOn(Schedulers.boundedElastic(), false, 256) // 只是為了消費慢一點 .doOnNext(i -> sleep(1000)) .subscribe(i -> logInt(i, "消費")); sleep(10000); }
元素還沒消費完就觸發Error
!
首先看一下publishOn()
操作符在裝配階段做了什麼,直接檢視Flux#publishOn()
原始碼。
publishOn()
裝配階段重點是建立了FluxPublishOn
物件。
接下來,我們分析訂閱階段發生了什麼。一個Publisher
在訂閱的時候呼叫的是其subscribe()
方法,因此我們繼續看Flux#subscribe()
原始碼。
在Flux#subscribe()
方法的實現中,如果上游Publisher
是OptimizableOperator
型別,實際的Subscriber
是通過呼叫該InternalFluxOperator#subscribeOrReturn()
方法返回的。如果返回值為null
,直接return
。
對於publishOn()
操作符來說,裝配階段建立的FluxPublishOn
就是OptimizableOperator
型別。所以繼續檢視FluxPublishOn#subscribeOrReturn()
原始碼。
可以看到,方法返回的是PublishOnSubscriber
,它包裝了原始的Subscriber
。
在後續的訂閱階段一定會呼叫其onSubscribe()
方法,在執行階段一定會呼叫其onNext()
方法。我們先看FluxPublishOn#onSubscribe()
原始碼。
在onSubscribe()
實現中,分為同步佇列融合、非同步佇列融合以及非融合方式處理。
如果上游的Subscription
是QueueSubscription
型別,則會進行佇列融合。具體採用同步還是非同步,取決於該QueueSubscription#requestFusion()
實現。
onSubscribe()
方法,但不會繼續呼叫上游request()
方法。onSubscribe()
以及上游request()
方法,請求數量是prefetch
。onSubscribe()
以及上游request()
方法,請求數量是prefetch
。接下來,我們從原始碼角度分別介紹上述三種方式的處理邏輯,首先介紹非融合方式。
先看如下程式碼範例,該程式碼會以非融合方式執行。
@Test public void testNoFuse() { delayPublishFlux(1000, 1, 5) .publishOn(Schedulers.boundedElastic()) .subscribe(i -> logInt(i, "消費")); sleep(10000); }
間隔1s生產消費元素!
在消費階段,一定會呼叫FluxPublishOn#onNext()
方法。
我們重點關注非融合方式執行邏輯,其實只做了2件事:
onSubscribe()
階段建立的新佇列。trySchedule()
方法進行排程。繼續看FluxPublishOn#trySchedule()
原始碼。
這裡其實就是交由woker
非同步執行,後續會執行FluxPublishOn.run()
方法。
在run()方法執行的時候,分為3段邏輯:
runBackfused()
方法。runSync()
方法。runAsync()
方法。對於當前例子,實際執行的是runAsync()
方法,繼續檢視其原始碼。
runAsync()
做的事情比較簡單,就是排空佇列中的元素下發給下游。同時在這裡會繼續呼叫request()
向上遊請求資料,這也是前面說的從第二個request()
開始會進行執行緒切換的原因。
另外這裡還會呼叫checkTerminated()
,檢查終止情況。
如果delayError=true
,必須當前佇列為空是才會轉發Error
。如果delayError=false
,則直接轉發Error
。繼續檢視onComplete()
方法。
如果未結束,將done
標記設定為true
,然後再次呼叫trySchedule()
進行排程。後續再被排程到的時候,如果佇列已經排空,才會呼叫下游onComplete()
,觸發完成。
簡單總結一下非融合執行過程:
在onSubscribe()
時建立一個佇列,在onNext()
時將上游下發的元素新增到佇列中,然後非同步排空佇列中的元素,繼續下發給下游。
以下程式碼會以同步佇列融合方式執行。
@Test public void testSyncFuse() { Flux.just(1, 2 ,3, 4, 5) .publishOn(Schedulers.boundedElastic()) .subscribe(this::logInt); sleep(10000); }
因為Flux.just()
對應的Subscription
是SynchronousSubscription
,其requestFusion()
方法實現如下:
此時返回的是SYNC
,執行同步佇列融合。
前面提到過,同步佇列融合會複用當前佇列,繼續呼叫下游onSubscribe()
方法,但不會繼續呼叫上游request()
方法。
這意味著,此時FluxPublishOn#onNext()
和FluxPublishOn#onComplete()
方法並不會呼叫。但是FluxPublishOn#request()
依然會被下游呼叫到。
在request()
方法中還是會呼叫trySchedule()
,後續會非同步呼叫runSync()
方法(前面已經分析了)。
對於非融合方式,trySchedule()
也會執行,只是這次排程的時候,佇列中還沒有資料被新增進去。
runSync()
實現上runAsync()
差不多,也是排空佇列的元素,繼續下發給下游。不同的點是少了request()
呼叫,以及取消完成控制有差異。
簡單總結一下同步佇列融合執行過程:
在onSubsrribe()
時直接複用上游QueueSubscription
作為佇列,不會呼叫上游request()
請求資料,在自身request()
時非同步排空佇列中的元素,繼續下發給下游。
以下程式碼會以非同步佇列融合方式執行。
@Test public void testAsyncFuse() { Flux.just(1, 2, 3, 4, 5) .windowUntil(i -> i % 3 == 0) .publishOn(Schedulers.boundedElastic()) .flatMap(Function.identity()) .subscribe(this::logInt); sleep(10000); }
因為windowUntil()
對應的Subscription
是WindowPredicateMain
,其requestFusion()
方法實現如下:
此時返回ASYNC
,執行非同步佇列融合。接下來再看一下FluxPublishOn#onNext()
原始碼。
注意,此時onNext()
方法引數是null
,表明上游並沒有真正下發元素,可以將其看做是一個觸發Worker
排程的訊號。後續還是會非同步執行runAsync()
方法,這裡就不再分析了。
這其實也很容易理解:非同步佇列融合直接複用了上游的QueueSubscription
作為佇列,真正的資料應該由這個佇列下發。
簡單總結一下同步佇列融合執行過程:
在onSubsrribe()
時直接複用上游QueueSubscription
作為佇列,在onNext()
時接收上游訊號,非同步排空佇列中的元素,繼續下發給下游。
非融合、同步佇列融合、非同步佇列融合比較如下:
以上就是Project Reactor原始碼解析publishOn使用範例的詳細內容,更多關於Project Reactor publishOn的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45