首頁 > 軟體

SpringCloudRPC呼叫核心原理:RxJava響應式程式設計框架,聚合操作符

2021-05-31 21:30:44

聚合操作符

本節介紹RxJava的兩個聚合型操作符:count操作符和reduce操作符。

count操作符

count操作符用來對源Observable流的資料項進行計數,最後將總數彈射出來;如果源流彈射錯誤,就會將錯誤直接報出來;在源Observable流沒有終止前,count操作符是不會彈射統計資料的。使用count操作符對資料流序列進行計數,具體的執行流程如圖4-9所示。

圖4-9 使用count操作符對資料流序列進行計數

下面是一個使用count操作符的簡單例子,程式碼如下:

package com.crazymaker.demo.rxJava.basic;//省略import@Slf4jpublic class AggregateDemo{ /** *演示count計數操作符 */ @Test public void countDemo() { String[] items = {"one", "two", "three", "four"}; Integer count = Observable .from(items) .count() .toBlocking().single(); log.info("計數的結果為 {}",count); }}

運行以上程式碼,輸出的結果節選如下:

[main] INFO c.c.d.r.basic.AggregateDemo - 計數的結果為 4可以看出,count操作符將一個Observable源流轉換成一個彈射單個值的Observable輸出流,輸出流的唯一資料項的值為原始Observable流所彈射的資料項數量。

在上面的程式碼中,為了獲取count輸出流中的資料項,使用了toBlocking()和single()兩個操作符。其中,Observable.toBlocking()操作返回了一個BlockingObservable阻塞型例項,該類型不是一種新的資料流,僅僅是對源Observable的包裝,只是該類型會阻塞當前執行緒,一直等待直到內部的源Observable彈射了自己想要的資料。BlockingObservable.single()方法表示阻塞當前執行緒,直到從封裝的源Observable獲取到唯一的彈射資料元素項,如果Observable源流彈射出的資料元素不止一個,single()方法就會拋出異常。

reduce操作符

Reduce(歸約)操作符對一個Observable流序列的每一項應用一個歸約函數,最後將流的最終歸約計算結果彈射出去。除了第一項之外,reduce操作符會將上一個資料項應用歸約函數的結果作為下一個資料項在應用歸約函數時的輸入。所以,和scan操作符一樣,reduce操作符也有點類似遞迴操作。

假定歸約函數為一個簡單的累加函數,然後使用reduce操作符對1~5的資料流序列進行歸約,其具體的歸約流程如圖4-10所示。

圖4-10 reduce操作符對1~5的資料流序列的歸約流程

使用reduce操作符實現對1~5的資料流序列的歸約,參考如下的實現程式碼:

package com.crazymaker.demo.rxJava.basic;//省略import@Slf4jpublic class AggregateDemo{ /**演示操作符 *演示reduce操作符 */ @Test public void reduceDemo() { /** *定義一個accumulator歸約函數 */ Func2<Integer, Integer, Integer> accumulator = new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer input1, Integer input2) { log.info(" {} + {} = {} ", input1, input2, input1 + input2); return input1 + input2; } }; /** *使用reduce進行流歸約 */ Observable.range(1, 5) .reduce(accumulator) .subscribe(new Action1<Integer>() { @Override public void call(Integer sum) { log.info(" 歸約的結果: {} ", sum); } }); }}

運行以上程式碼,輸出的結果節選如下:

[main] INFO c.c.d.r.basic.AggregateDemo - 1 + 2 = 3[main] INFO c.c.d.r.basic.AggregateDemo - 3 + 3 = 6[main] INFO c.c.d.r.basic.AggregateDemo - 6 + 4 = 10[main] INFO c.c.d.r.basic.AggregateDemo - 10 + 5 = 15[main] INFO c.c.d.r.basic.AggregateDemo - 歸約的結果: 15

以上例項程式碼中,reduce操作符對原始Observable流所彈射的第一項資料1應用歸約函數,得到中間結果1;然後將第一個中間結果1連同原始流的第二項資料2一起填充給accumulator歸約函數,得到中間結果3。reduce持續對原始流進行迭代,一直到原始流的最後一個數據項5,reduce將5連同中間結果10一起填充給accumulator歸約函數,得到最終結果15。最後,reduce會將最終結果15作為輸出流的資料項彈射出去。reduce操作符與前面介紹的scan操作符很類似,只是scan會彈出每次計算的中間結果,而reduce只會彈出最後的結果。

本文給大家講解的內容是SpringCloudRPC遠端呼叫核心原理: RxJava響應式程式設計框架,聚合操作符

  1. 下篇文章給大家講解的是SpringCloudRPC遠端呼叫核心原理: RxJava響應式程式設計框架,其他操作符;
  2. 覺得文章不錯的朋友可以轉發此文關注小編;
  3. 感謝大家的支援!


IT145.com E-mail:sddin#qq.com