首頁 > 軟體

Kotlin Channel處理多個資料組合的流

2022-11-27 14:01:35

結論先行

Kotlin協程中的Channel用於處理多個資料組合的流,隨用隨取,時刻準備著,就像自來水一樣,開啟開關就有水了。

Channel使用範例

fun main() = runBlocking {
    logX("開始")
    val channel = Channel<Int> {  }
    launch {
        (1..3).forEach{
            channel.send(it)
            logX("傳送資料: $it")
        }
        // 關閉channel, 節省資源
        channel.close()
    }
    launch {
        for (i in channel){
            logX("接收資料: $i")
        }
    }
    logX("結束")
}

範例程式碼 使用Channel建立了一組int型別的資料流,通過send傳送資料,並通過for迴圈取出channel中的資料,最後channel是一種協程資源,使用結束後應該及時呼叫close方法關閉,以免浪費不必要的資源。

Channel的原始碼

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> {}
        CONFLATED -> {}
        UNLIMITED -> {}
        else -> {}
    }

可以看到Channel的建構函式包含了三個引數,分別是capacity、onBufferOverflow、onUndeliveredElement.

首先看capacity,這個引數代表了管道的容量,預設引數是RENDEZVOUS,取值是0,還有其他一些值:

  • UNLIMITED: Int = Int.MAX_VALUE,沒有限量
  • CONFLATED: 容量為1,新的覆蓋舊的值
  • BUFFERED: 新增緩衝容量,預設值是64,可以通過修改VM引數:kotlinx.coroutines.channels.defaultBuffer,進行修改

接下來看onBufferOverflow, 顧名思義就是管道容量滿了,怎麼辦?預設是掛起,也就是suspend,一共有三種分別是:

SUSPNED、DROP_OLDEST以及DROP_LATEST

public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,
    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,
    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}
  • SUSPEND,當管道的容量滿了以後,如果傳送方還要繼續傳送,我們就會掛起當前的 send() 方法。由於它是一個掛起函數,所以我們可以以非阻塞的方式,將傳送方的執行流程掛起,等管道中有了空閒位置以後再恢復,有點像生產者-消費者模型
  • DROP_OLDEST,顧名思義,就是丟棄最舊的那條資料,然後傳送新的資料,有點像LRU演演算法。
  • DROP_LATEST,丟棄最新的那條資料。這裡要注意,這個動作的含義是丟棄當前正準備傳送的那條資料,而管道中的內容將維持不變。

最後一個引數是onUndeliveredElement,從名字看像是沒有投遞成功的回撥,也確實如此,當管道中某些資料沒有成功接收時,這個就會被呼叫。

綜合這個引數使用一下

fun main() = runBlocking {
    println("開始")
    val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
        println("onUndeliveredElement = $it")
    }
    launch {
        (1..3).forEach{
            channel.send(it)
            println("傳送資料: $it")
        }
        // 關閉channel, 節省資源
        channel.close()
    }
    launch {
        for (i in channel){
            println("接收資料: $i")
        }
    }
    println("結束")
}

輸出結果如下:
開始
結束
傳送資料: 1
傳送資料: 2
傳送資料: 3
接收資料: 2
接收資料: 3

安全的從Channel中取資料

先看一個例子

val channel: ReceiveChannel<Int> = produce {
        (1..100).forEach{
            send(it)
            println("傳送: $it")
        }
    }
while (!channel.isClosedForReceive){
    val i = channel.receive();
    println("接收: $i")
}

輸出報錯資訊:
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed

可以看到使用isClosedForReceive判斷是否關閉再使用receive方法接收資料,依然會報錯,所以不推薦使用這種方式。

推薦使用上面for迴圈的方式取資料,還有kotlin推薦的consumeEach方式,看一下範例程式碼

val channel: ReceiveChannel<Int> = produce {
        (1..100).forEach{
            send(it)
            println("傳送: $it")
        }
    }
channel.consumeEach {
    println("接收:$it")
}

所以,當我們想要獲取Channel當中的資料時,我們儘量使用 for 迴圈,或者是channel.consumeEach {},不要直接呼叫channel.receive()。

熱的資料流從何而來

先看一下程式碼

    println("開始")
    val channel = Channel<Int>(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
        println("onUndeliveredElement = $it")
    }
    launch {
        (1..3).forEach{
            channel.send(it)
            println("傳送資料: $it")
        }
    }
    println("結束")
}

輸出:
開始
結束
傳送資料: 1
傳送資料: 2
傳送資料: 3

可以看到上述程式碼中並沒有 取channel中的資料,但是傳送的程式碼正常執行了,這種“不管有沒有接收方,傳送方都會工作”的模式,就是我們將其認定為“熱”的原因。

舉個例子,就像去海底撈吃火鍋一樣,你不需要主動要求服務員加水,服務員看到你的杯子中水少了,會自動給你新增,你只管拿起水杯喝水就行了。

總的來說,不管接收方是否存在,Channel 的傳送方一定會工作。

Channel能力的來源

通過原始碼可以看到Channel只是一個介面,它的能力來源於SendChannel和ReceiveChannel,一個傳送管道,一個接收管道,相當於做了一個組合。

這也是一種良好的設計思想,“對讀取開放,對寫入封閉”的開閉原則。

到此這篇關於Kotlin Channel處理多個資料組合的流的文章就介紹到這了,更多相關Kotlin Channel內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com