<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
萬事開頭難,寫文章也是,現在越來越不知道開頭怎麼寫了,所以在前言中,簡單介紹下RxJava吧,第一次聽說還是以前做Android開發的時候,那時候好多庫中都使用了Rxjava,而在網路請求中,也有很多都是使用Rxjava去寫,但自己卻沒怎麼在專案中寫過,而在搜尋資料中發現,微信中搜rxjava時,最多介紹他的還是Android開發者,所以今天來記錄下。
而所謂的響應式程式設計,就是一種用於應用程式非同步程式設計的技術,他是一個通用的思想,類似與AOP,不只是在java中才有。他專注於對資料的變化做出反應,例如,有一個資料來源(這裡被稱為生產者),一個資料目標(這裡被成為消費者),然後在將消費者連線到訂閱者之後,響應式程式設計框架負責將生產者生產的資料推播給消費者,一個可觀察物件可以有任意數量的訂閱者。
而對於一些思想上的框架,類似於Spring,原始碼上大體還是比較難的,畢竟就算是人,在思想上跨越也是有難度的,但對於RxJava來說,原始碼也不是很多,所以在以後會嘗試介紹他的原始碼實現,而使用Rxjava的好處不是在於實現了什麼具體的技術功能,比如使用CGLIB可以實現動態代理的技術,使用JDBC可以進行資料查詢,而沒有rxjava,我們的程式碼還可以藉助Java8的Stream、CompletableFuture來實現。
而rxjava的好處在於讓程式碼更簡潔、優雅,通過他的鏈式呼叫,消除巢狀等。
在下面的例子中,我們會使用Kotlin來做示範。
在這裡,Observable 字面意思是可觀察者,他表示資料來源,通常,一旦訂閱者開始收聽,他們就會開始提供資料,而just表示僅僅,僅僅生產的資料是一個"T",即泛型型別,在這裡是String。
而subscribe表示訂閱,當訂閱後,他會收到Observable生產的資料,來消費。
fun main() { Observable.just("hello rxjava").subscribe { println(it) } } 輸出: hello rxjava
而上面說到,just表示僅僅,在rxjava中,不僅僅是具體的資料,還可以是Callable、Array、Future物件等,詳細可以看fromXXX等方法,最終的結果由rxjava呼叫後如Callable的結果後,傳遞給訂閱者。
fun main() { Observable.fromCallable { println("callable") "hello rxjava" }.subscribe { println(it) } }
這個方法給我了我們手動執行的能力,即傳遞資料到訂閱者是我們手動執行的。
fun main() { Observable.create<String> { it.onNext("hello") it.onError(IllegalArgumentException("錯誤")) it.onComplete() }.subscribe ({ println(it) },{ println(it.message) },{ println("完成") }) }
還可以通過interval實現固定間隔定時。
fun main() { val observable = Observable.interval(1, TimeUnit.SECONDS) observable.subscribe { println(it) } observable.subscribe { println(it) Thread.sleep(2000) } Thread.sleep(100000); }
而timer方法則是延遲N時間後,傳送資料到訂閱者.
fun main() { val observable = Observable.timer(2, TimeUnit.SECONDS) observable.subscribe { println(it) } observable.subscribe { println(it) Thread.sleep(2000) } Thread.sleep(100000); }
而使用上面方法有一個好處,即生產者可以在子執行緒中完成,而實際消費的時候在主執行緒,這在Android可謂是一種福利,如下。
fun main() { val threadPool = Executors.newCachedThreadPool() val anyFuture = threadPool.submit(Callable { Thread.sleep(2000) "hello" }) Observable.fromFuture(anyFuture).subscribe { println(it) } }
而如果擔心等待時間問題,可是使用第二個過載方法,指定一個超時時間,而subscribe還有兩個主要引數我們沒說,一個是error發生錯誤時回撥,一個是complete完成時回撥,但在發生錯誤後,complete是不會回撥的。
fun main() { val threadPool = Executors.newCachedThreadPool() val anyFuture = threadPool.submit(Callable { Thread.sleep(2000) "hello" }) Observable.fromFuture(anyFuture,1,TimeUnit.SECONDS).subscribe({ println(it) },{ println("錯誤") },{ println("完成") }) }
但你以為這就結束了嗎,不,rxjava提供了豐富的執行緒切換,observeOn & subscribeOn這兩個方法就是用來指定在哪裡執行,Schedulers.newThread()
表示在新執行緒,但rxjava實現的執行緒中,是守護執行緒,也就是當主執行緒退出後,他們也會自動退出,而在下面的例子中,如果在最後不加sleep,會導致主執行緒退出後,rxjava的所有執行緒在可能沒執行完成後也將退出。
fun main() { Observable.create<String> { println(Thread.currentThread().isDaemon) it.onNext("hello") } .observeOn(Schedulers.newThread()) .subscribeOn(Schedulers.newThread()) .subscribe { println(Thread.currentThread().name) println(it) } Thread.sleep(10000) }
而如果想自定義執行緒,也是支援的。
fun createSchedulers(): Scheduler { return Schedulers.from { thread { it.run() } } } fun main() { Observable.create<String> { it.onNext("hello") } .observeOn(createSchedulers()) .subscribeOn(Schedulers.newThread()) .subscribe { println(Thread.currentThread().name) println(it) } }
Flowable可以看成Observable新的實現,他支援背壓,而他的API和Observable相似,在最後會介紹背壓。
我們已經熟悉了Java Stream的好處,所以在這裡簡單看下rxjava的實現,用法都一樣,如下,建立集合"a","b","c","d"
。
fun main() { Flowable.fromIterable(mutableListOf("a","b","c","d")) .map { "1${it}" } .filter { !it.endsWith("b") } .skip(1) .subscribe { println(it) } }
所以最後收到的訊息將是 1c、1d
。
當然他提供的這類API非常之多,就不介紹了。
背壓指的是遇到被觀察者傳送的訊息太快,至於它的訂閱者不能及時處理資料,而我們可以提供一種告訴被觀察者遇到這種情況的策略。
這種場景有個前提條件,被觀察者和訂閱者在不同執行緒。
背壓策略被定義在BackpressureStrategy,有五種。
MISSING
通過create方法建立的Flowable沒有指定背壓策略,不會對通過OnNext傳送的資料做快取或丟棄,需要下游通過背壓操作符制定策略。
ERROR
如果快取池資料超限,則丟擲異常。
BUFFER
可以無限制新增資料。
DROP
如果快取池滿了,則丟棄。
LATEST
僅保留最新的onNext值,如果下游無法跟上,則覆蓋之前的值。
如下,我們使用BUFFER策略,預設的快取池大小是128,可以通過System.setProperty("rx3.buffer-size","5")
指定,而這個策略會導致只有快取池不滿的情況下,才會生產資料並行送給訂閱者。
fun main() { System.setProperty("rx3.buffer-size","5") Observable.interval(1,TimeUnit.MILLISECONDS) .toFlowable(BackpressureStrategy.BUFFER) .map { User(1) } .observeOn(Schedulers.newThread()) .subscribe { Thread.sleep(1000) println("hander $it") } Thread.sleep(100000) }
而如果我們改成DROP,那麼最終只有5條資料被消費,其他全部丟棄。
fun main() { System.setProperty("rx3.buffer-size","5") Observable.range(1,999) .toFlowable(BackpressureStrategy.DROP) .map { User(1) } .observeOn(Schedulers.newThread()) .subscribe { Thread.sleep(1000) println("hander $it") } Thread.sleep(100000) }
其他就不做demo了。
以上就是Kotlin下Rxjava的基礎用法及流式呼叫範例詳解的詳細內容,更多關於Kotlin Rxjava的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45