首頁 > 軟體

Kotlin下Rxjava的基礎用法及流式呼叫範例詳解

2022-12-01 14:04:03

前言

萬事開頭難,寫文章也是,現在越來越不知道開頭怎麼寫了,所以在前言中,簡單介紹下RxJava吧,第一次聽說還是以前做Android開發的時候,那時候好多庫中都使用了Rxjava,而在網路請求中,也有很多都是使用Rxjava去寫,但自己卻沒怎麼在專案中寫過,而在搜尋資料中發現,微信中搜rxjava時,最多介紹他的還是Android開發者,所以今天來記錄下。

而所謂的響應式程式設計,就是一種用於應用程式非同步程式設計的技術,他是一個通用的思想,類似與AOP,不只是在java中才有。他專注於對資料的變化做出反應,例如,有一個資料來源(這裡被稱為生產者),一個資料目標(這裡被成為消費者),然後在將消費者連線到訂閱者之後,響應式程式設計框架負責將生產者生產的資料推播給消費者,一個可觀察物件可以有任意數量的訂閱者。

而對於一些思想上的框架,類似於Spring,原始碼上大體還是比較難的,畢竟就算是人,在思想上跨越也是有難度的,但對於RxJava來說,原始碼也不是很多,所以在以後會嘗試介紹他的原始碼實現,而使用Rxjava的好處不是在於實現了什麼具體的技術功能,比如使用CGLIB可以實現動態代理的技術,使用JDBC可以進行資料查詢,而沒有rxjava,我們的程式碼還可以藉助Java8的Stream、CompletableFuture來實現。

而rxjava的好處在於讓程式碼更簡潔、優雅,通過他的鏈式呼叫,消除巢狀等。

在下面的例子中,我們會使用Kotlin來做示範。

基礎用法

在這裡,Observable 字面意思是可觀察者,他表示資料來源,通常,一旦訂閱者開始收聽,他們就會開始提供資料,而just表示僅僅,僅僅生產的資料是一個"T",即泛型型別,在這裡是String。

而subscribe表示訂閱,當訂閱後,他會收到Observable生產的資料,來消費。

fun main() {
     Observable.just("hello rxjava").subscribe {
         println(it)
     }
}
輸出:
hello rxjava

fromXXX

而上面說到,just表示僅僅,在rxjava中,不僅僅是具體的資料,還可以是Callable、Array、Future物件等,詳細可以看fromXXX等方法,最終的結果由rxjava呼叫後如Callable的結果後,傳遞給訂閱者。

fun main() {
    Observable.fromCallable {
        println("callable")
        "hello rxjava"
    }.subscribe {
        println(it)
    }
}

create

這個方法給我了我們手動執行的能力,即傳遞資料到訂閱者是我們手動執行的。

fun main() {
    Observable.create<String> {
        it.onNext("hello")
        it.onError(IllegalArgumentException("錯誤"))
        it.onComplete()
    }.subscribe ({
        println(it)
    },{
        println(it.message)
    },{
        println("完成")
    })
}

interval & timer

還可以通過interval實現固定間隔定時。

fun main() {
    val observable = Observable.interval(1, TimeUnit.SECONDS)
    observable.subscribe {
        println(it)
    }
    observable.subscribe {
        println(it)
        Thread.sleep(2000)
    }
    Thread.sleep(100000);
}

而timer方法則是延遲N時間後,傳送資料到訂閱者.

fun main() {
    val observable = Observable.timer(2, TimeUnit.SECONDS)
    observable.subscribe {
        println(it)
    }
    observable.subscribe {
        println(it)
        Thread.sleep(2000)
    }
    Thread.sleep(100000);
}

指定執行緒

而使用上面方法有一個好處,即生產者可以在子執行緒中完成,而實際消費的時候在主執行緒,這在Android可謂是一種福利,如下。

fun main() {
    val threadPool = Executors.newCachedThreadPool()
    val anyFuture = threadPool.submit(Callable {
        Thread.sleep(2000)
        "hello"
    })
    Observable.fromFuture(anyFuture).subscribe {
        println(it)
    }
}

而如果擔心等待時間問題,可是使用第二個過載方法,指定一個超時時間,而subscribe還有兩個主要引數我們沒說,一個是error發生錯誤時回撥,一個是complete完成時回撥,但在發生錯誤後,complete是不會回撥的。

fun main() {
    val threadPool = Executors.newCachedThreadPool()
    val anyFuture = threadPool.submit(Callable {
        Thread.sleep(2000)
        "hello"
    })
    Observable.fromFuture(anyFuture,1,TimeUnit.SECONDS).subscribe({
        println(it)
    },{
        println("錯誤")
    },{
        println("完成")
    })
}

observeOn & subscribeOn

但你以為這就結束了嗎,不,rxjava提供了豐富的執行緒切換,observeOn & subscribeOn這兩個方法就是用來指定在哪裡執行,Schedulers.newThread()表示在新執行緒,但rxjava實現的執行緒中,是守護執行緒,也就是當主執行緒退出後,他們也會自動退出,而在下面的例子中,如果在最後不加sleep,會導致主執行緒退出後,rxjava的所有執行緒在可能沒執行完成後也將退出。

fun main() {
    Observable.create<String> {
        println(Thread.currentThread().isDaemon)
        it.onNext("hello")
    }
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.newThread())
        .subscribe {
            println(Thread.currentThread().name)
            println(it)
        }
    Thread.sleep(10000)
}

而如果想自定義執行緒,也是支援的。

fun createSchedulers(): Scheduler {
    return Schedulers.from {
        thread { it.run() }
    }
}
fun main() {
    Observable.create<String> {
        it.onNext("hello")
    }
        .observeOn(createSchedulers())
        .subscribeOn(Schedulers.newThread())
        .subscribe {
            println(Thread.currentThread().name)
            println(it)
        }
}

Flowable

Flowable可以看成Observable新的實現,他支援背壓,而他的API和Observable相似,在最後會介紹背壓。

流式呼叫

我們已經熟悉了Java Stream的好處,所以在這裡簡單看下rxjava的實現,用法都一樣,如下,建立集合"a","b","c","d"

  • map將所有item前新增字元"1"。
  • filter將b結尾的資料過濾掉。
  • skip忽略前n個資料。
fun main() {
    Flowable.fromIterable(mutableListOf("a","b","c","d"))
        .map { "1${it}" }
        .filter { !it.endsWith("b") }
        .skip(1)
        .subscribe {
            println(it)
        }
}

所以最後收到的訊息將是 1c、1d

當然他提供的這類API非常之多,就不介紹了。

背壓

背壓指的是遇到被觀察者傳送的訊息太快,至於它的訂閱者不能及時處理資料,而我們可以提供一種告訴被觀察者遇到這種情況的策略。

這種場景有個前提條件,被觀察者和訂閱者在不同執行緒。

背壓策略被定義在BackpressureStrategy,有五種。

MISSING

通過create方法建立的Flowable沒有指定背壓策略,不會對通過OnNext傳送的資料做快取或丟棄,需要下游通過背壓操作符制定策略。

ERROR

如果快取池資料超限,則丟擲異常。

BUFFER

可以無限制新增資料。

DROP

如果快取池滿了,則丟棄。

LATEST

僅保留最新的onNext值,如果下游無法跟上,則覆蓋之前的值。

如下,我們使用BUFFER策略,預設的快取池大小是128,可以通過System.setProperty("rx3.buffer-size","5")指定,而這個策略會導致只有快取池不滿的情況下,才會生產資料並行送給訂閱者。

fun main() {
    System.setProperty("rx3.buffer-size","5")
    Observable.interval(1,TimeUnit.MILLISECONDS)
        .toFlowable(BackpressureStrategy.BUFFER)
        .map { User(1) }
        .observeOn(Schedulers.newThread())
        .subscribe {
            Thread.sleep(1000)
            println("hander $it")
        }
    Thread.sleep(100000)
}

而如果我們改成DROP,那麼最終只有5條資料被消費,其他全部丟棄。

fun main() {
    System.setProperty("rx3.buffer-size","5")
    Observable.range(1,999)
        .toFlowable(BackpressureStrategy.DROP)
        .map { User(1) }
        .observeOn(Schedulers.newThread())
        .subscribe {
            Thread.sleep(1000)
            println("hander $it")
        }
    Thread.sleep(100000)
}

其他就不做demo了。

以上就是Kotlin下Rxjava的基礎用法及流式呼叫範例詳解的詳細內容,更多關於Kotlin Rxjava的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com