首頁 > 軟體

Project Reactor原始碼解析publishOn使用範例

2022-08-15 22:02:31

功能分析

相關範例原始碼:github.com/chentianmin…

public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)

onNext()onComplete()onError()方法進行執行緒切換,publishOn()使得它下游的消費階段非同步執行。

  • scheduler:執行緒切換的排程器,Scheduler用來生成實際執行非同步任務的Worker
  • delayError:是否延時轉發Error。如果為true,當收到上游的Error時,會等佇列中的元素消費完畢後再向下游轉發Error。否則會立即轉發Error,可能導致佇列中的元素丟失。預設為true
  • prefetch:預取元素的數量,同時也是佇列的容量。預設值為Queues.SMALL_BUFFER_SIZE,該值通過設定進行修改。

程式碼範例

prefetch

/**
 * 每隔delayMillis生產一個元素
 */
protected Flux<Integer> delayPublishFlux(int delayMillis, int startInclusive, int endExclusive) {
    return Flux.create(fluxSink -> {
        IntStream.range(startInclusive, endExclusive)
                .forEach(i -> {
                    // 同步next
                    sleep(delayMillis);
                    logInt(i, "生產");
                    fluxSink.next(i);
                });
        fluxSink.complete();
    });
}
@Test
public void testPreFetch() {
    delayPublishFlux(1000, 1, 5)
            .doOnRequest(i -> logLong(i, "request"))
            .publishOn(Schedulers.boundedElastic(), 2)
            .subscribe(i -> logInt(i, "消費"));
    sleep(10000);
}

每次會都向上游請求2個元素。另外還能發現,從第二個request開始,執行緒發生了切換。

delayError

/**
 * 每隔delayMillis生產一個元素,最後傳送Error
 */
protected Flux<Integer> delayPublishFluxError(int delayMillis, int startInclusive, int endExclusive) {
    return Flux.create(fluxSink -> {
        IntStream.range(startInclusive, endExclusive)
                .forEach(i -> {
                    // 同步next
                    sleep(delayMillis);
                    logInt(i, "生產");
                    fluxSink.next(i);
                });
        fluxSink.error(new RuntimeException("釋出錯誤!"));
    });
}
@Test
public void testDelayError() {
    delayPublishFluxError(500, 1, 5)
            .publishOn(Schedulers.boundedElastic())
            // 只是為了消費慢一點
            .doOnNext(i -> sleep(1000))
            .subscribe(i -> logInt(i, "消費"));
    sleep(10000);
}

元素消費完才觸發Error

@Test
public void testNotDelayError() {
    delayPublishFluxError(500, 1, 5)
            .publishOn(Schedulers.boundedElastic(), false, 256)
            // 只是為了消費慢一點
            .doOnNext(i -> sleep(1000))
            .subscribe(i -> logInt(i, "消費"));
    sleep(10000);
}

元素還沒消費完就觸發Error

原始碼分析

首先看一下publishOn()操作符在裝配階段做了什麼,直接檢視Flux#publishOn()原始碼。

Flux#publishOn()

publishOn()裝配階段重點是建立了FluxPublishOn物件。

接下來,我們分析訂閱階段發生了什麼。一個Publisher在訂閱的時候呼叫的是其subscribe()方法,因此我們繼續看Flux#subscribe()原始碼。

Flux#subscribe()

Flux#subscribe()方法的實現中,如果上游PublisherOptimizableOperator型別,實際的Subscriber是通過呼叫該InternalFluxOperator#subscribeOrReturn()方法返回的。如果返回值為null,直接return

對於publishOn()操作符來說,裝配階段建立的FluxPublishOn就是OptimizableOperator型別。所以繼續檢視FluxPublishOn#subscribeOrReturn()原始碼。

FluxPublishOn#subscribeOrReturn()

可以看到,方法返回的是PublishOnSubscriber,它包裝了原始的Subscriber

在後續的訂閱階段一定會呼叫其onSubscribe()方法,在執行階段一定會呼叫其onNext()方法。我們先看FluxPublishOn#onSubscribe()原始碼。

FluxPublishOn#onSubscribe()

onSubscribe()實現中,分為同步佇列融合、非同步佇列融合以及非融合方式處理。

如果上游的SubscriptionQueueSubscription型別,則會進行佇列融合。具體採用同步還是非同步,取決於該QueueSubscription#requestFusion()實現。

  • 同步佇列融合:複用當前佇列,繼續呼叫下游onSubscribe()方法,但不會繼續呼叫上游request()方法。
  • 非同步佇列融合:複用當前佇列,然後繼續呼叫下游onSubscribe()以及上游request()方法,請求數量是prefetch
  • 非融合:建立一個新的佇列,然後繼續呼叫下游onSubscribe()以及上游request()方法,請求數量是prefetch

接下來,我們從原始碼角度分別介紹上述三種方式的處理邏輯,首先介紹非融合方式。

非融合

先看如下程式碼範例,該程式碼會以非融合方式執行。

@Test
public void testNoFuse() {
    delayPublishFlux(1000, 1, 5)
            .publishOn(Schedulers.boundedElastic())
            .subscribe(i -> logInt(i, "消費"));
    sleep(10000);
}

間隔1s生產消費元素!

在消費階段,一定會呼叫FluxPublishOn#onNext()方法。

FluxPublishOn#onNext()

我們重點關注非融合方式執行邏輯,其實只做了2件事:

  • 將下發的元素新增到佇列中,該佇列就是onSubscribe()階段建立的新佇列。
  • 呼叫trySchedule()方法進行排程。

繼續看FluxPublishOn#trySchedule()原始碼。

FluxPublishOn#trySchedule()

這裡其實就是交由woker非同步執行,後續會執行FluxPublishOn.run()方法。

FluxPublishOn#run()

在run()方法執行的時候,分為3段邏輯:

  • 如果是輸出融合,執行runBackfused()方法。
  • 如果是同步佇列融合,執行runSync()方法。
  • 否則,執行runAsync()方法。

對於當前例子,實際執行的是runAsync()方法,繼續檢視其原始碼。

FluxPublishOn#runAsync()

runAsync()做的事情比較簡單,就是排空佇列中的元素下發給下游。同時在這裡會繼續呼叫request()向上遊請求資料,這也是前面說的從第二個request()開始會進行執行緒切換的原因。

另外這裡還會呼叫checkTerminated(),檢查終止情況。

FluxPublishOn#checkTerminated()

如果delayError=true,必須當前佇列為空是才會轉發Error。如果delayError=false,則直接轉發Error。繼續檢視onComplete()方法。

FluxPublishOn#onComplete()

如果未結束,將done標記設定為true,然後再次呼叫trySchedule()進行排程。後續再被排程到的時候,如果佇列已經排空,才會呼叫下游onComplete(),觸發完成。

小結

簡單總結一下非融合執行過程:

onSubscribe()時建立一個佇列,在onNext()時將上游下發的元素新增到佇列中,然後非同步排空佇列中的元素,繼續下發給下游。

同步佇列融合

以下程式碼會以同步佇列融合方式執行。

@Test
public void testSyncFuse() {
    Flux.just(1, 2 ,3, 4, 5)
            .publishOn(Schedulers.boundedElastic())
            .subscribe(this::logInt);
    sleep(10000);
}

因為Flux.just()對應的SubscriptionSynchronousSubscription,其requestFusion()方法實現如下:

SynchronousSubscription#requestFusion()

此時返回的是SYNC,執行同步佇列融合。

前面提到過,同步佇列融合會複用當前佇列,繼續呼叫下游onSubscribe()方法,但不會繼續呼叫上游request()方法。

這意味著,此時FluxPublishOn#onNext()FluxPublishOn#onComplete()方法並不會呼叫。但是FluxPublishOn#request()依然會被下游呼叫到。

FluxPublishOn#request()

request()方法中還是會呼叫trySchedule(),後續會非同步呼叫runSync()方法(前面已經分析了)。

對於非融合方式,trySchedule()也會執行,只是這次排程的時候,佇列中還沒有資料被新增進去。

FluxPublishOn#runSync()

runSync()實現上runAsync()差不多,也是排空佇列的元素,繼續下發給下游。不同的點是少了request()呼叫,以及取消完成控制有差異。

小結

簡單總結一下同步佇列融合執行過程:

onSubsrribe()時直接複用上游QueueSubscription作為佇列,不會呼叫上游request()請求資料,在自身request()時非同步排空佇列中的元素,繼續下發給下游。

非同步佇列融合

以下程式碼會以非同步佇列融合方式執行。

@Test
public void testAsyncFuse() {
    Flux.just(1, 2, 3, 4, 5)
            .windowUntil(i -&gt; i % 3 == 0)
            .publishOn(Schedulers.boundedElastic())
            .flatMap(Function.identity())
            .subscribe(this::logInt);
    sleep(10000);
}

因為windowUntil()對應的SubscriptionWindowPredicateMain,其requestFusion()方法實現如下:

WindowPredicateMain#requestFusion()

此時返回ASYNC,執行非同步佇列融合。接下來再看一下FluxPublishOn#onNext()原始碼。

FluxPublishOn#onNext()

注意,此時onNext()方法引數是null,表明上游並沒有真正下發元素,可以將其看做是一個觸發Worker排程的訊號。後續還是會非同步執行runAsync()方法,這裡就不再分析了。

這其實也很容易理解:非同步佇列融合直接複用了上游的QueueSubscription作為佇列,真正的資料應該由這個佇列下發。

總結

簡單總結一下同步佇列融合執行過程:

onSubsrribe()時直接複用上游QueueSubscription作為佇列,在onNext()時接收上游訊號,非同步排空佇列中的元素,繼續下發給下游。

非融合、同步佇列融合、非同步佇列融合比較如下:

以上就是Project Reactor原始碼解析publishOn使用範例的詳細內容,更多關於Project Reactor publishOn的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com