<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
我計劃在後續的一段時間內,寫一系列關於java 9的文章,雖然java 9 不像Java 8或者Java 11那樣的核心java版本,但是還是有很多的特性值得關注。期待您能關注我,我將把java 9 寫成一系列的文章,大概十篇左右。
java9第二篇-Java9改進try-with-resources語法
java9第三篇-支援多JDK版本下執行的Jar檔案打包方式
Java 9的 Reactive Streams是對非同步流式程式設計的一種實現。它基於非同步釋出和訂閱模型,具有非阻塞“背壓”資料處理的特點。
Non-blocking Back Pressure(非阻塞背壓):它是一種機制,讓釋出訂閱模型中的訂閱者避免接收大量資料(超出其處理能力),訂閱者可以非同步通知釋出者降低或提升資料生產釋出的速率。它是響應式程式設計實現效果的核心特點!
Java 9提供了一組定義響應式流程式設計的介面。所有這些介面都作為靜態內部介面定義在java.util.concurrent.Flow
類裡面。
下面是Java 響應式程式設計中的一些重要角色和概念,先簡單理解一下
釋出者(Publisher)是潛在的無限數量的有序資料元素的生產者。 它根據收到的需求(subscription)向當前訂閱者釋出一定數量的資料元素。
訂閱者(Subscriber)從釋出者那裡訂閱並接收資料元素。與釋出者建立訂閱關係後,釋出者向訂閱者傳送訂閱令牌(subscription),訂閱者可以根據自己的處理能力請求釋出者釋出資料元素的數量。
訂閱令牌(subscription)表示訂閱者與釋出者之間建立的訂閱關係。 當建立訂閱關係後,釋出者將其傳遞給訂閱者。 訂閱者使用訂閱令牌與釋出者進行互動,例如請求資料元素的數量或取消訂閱。
public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); }
onSubscribe
:在釋出者接受訂閱者的訂閱動作之後,釋出任何的訂閱訊息之前被呼叫。新建立的Subscription
訂閱令牌物件通過此方法傳遞給訂閱者。
onNext
:下一個待處理的資料項的處理常式
onError
:在釋出者或訂閱遇到不可恢復的錯誤時呼叫
onComplete
:當沒有訂閱者呼叫(包括onNext()方法)發生時呼叫。
訂閱令牌物件通過Subscriber.onSubscribe()
方法傳遞
public static interface Subscription { public void request(long n); public void cancel();}
request(long n)
是無阻塞背壓概念背後的關鍵方法。訂閱者使用它來請求n個以上的消費專案。這樣,訂閱者控制了它當前能夠接收多少個資料。cancel()
由訂閱者主動來取消其訂閱,取消後將不會在接收到任何資料訊息。
@FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); }
呼叫該方法,建立訂閱者Subscriber與釋出者Publisher之間的訊息訂閱關係。
處理者Processor 可以同時充當訂閱者和釋出者,起到轉換髮布者——訂閱者管道中的元素的作用。用於將釋出者T型別的資料元素,接收並轉換為型別R的資料並行布。
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
現在我們要去實現上面的四個介面來完成響應式程式設計
Subscription Interface
訂閱令牌介面通常不需要我們自己程式設計去實現,我們只需要在知道request()方法和cancle()方法含義即可。
Publisher Interface
釋出者介面,Java 9 已經預設為我們提供了實現SubmissionPublisher,該實現類除了實現Publisher介面的方法外,提供了一個方法叫做submit()來完成訊息資料的傳送。
Subscriber Interface
訂閱者介面,通常需要我們自己去實現。因為在資料訂閱接收之後,不同的業務有不同的處理邏輯。
Processor
實際上是 Publisher Interface和Subscriber Interface的集合體,有需要資料型別轉換及資料處理的需求才去實現這個介面
下面的例子實現的式字串的資料訊息訂閱處理
import java.util.concurrent.Flow; public class MySubscriber implements Flow.Subscriber<String> { private Flow.Subscription subscription; //訂閱令牌 @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println("訂閱關係建立onSubscribe: " + subscription); this.subscription = subscription; subscription.request(2); } @Override public void onNext(String item) { System.out.println("item: " + item); // 一個訊息處理完成之後,可以繼續呼叫subscription.request(n);向釋出者要求資料傳送 //subscription.request(n); } @Override public void onError(Throwable throwable) { System.out.println("onError: " + throwable); } @Override public void onComplete() { System.out.println("onComplete"); } }
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; public class SubmissionPublisherExample { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(1); SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize()); sb.subscribe(new MySubscriber()); //建立訂閱關係,可以有多個訂閱者 sb.submit("資料 1"); //傳送訊息1 sb.submit("資料 2"); //傳送訊息2 sb.submit("資料 3"); //傳送訊息3 executor.shutdown(); } }
控制檯列印輸出結果
訂閱關係建立
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 資料 1
item: 資料 2
請注意:即使釋出者submit了3條資料,MySubscriber也僅收到了2條資料進行了處理。是因為我們在MySubscriber#onSubscribe()
方法中使用了subscription.request(2);
。這就是“背壓”的響應式程式設計效果,我有能力處理多少資料,就會通知訊息釋出者給多少資料。
以上就是java9新特性Reactive Stream響應式程式設計 API的詳細內容,更多關於java9 Reactive Stream響應式API的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45