Kotlin 协程二 —— 通道 Channel(Kotlin collaboration process 2 – channel)

Kotlin 协程系列文章导航:
Kotlin 协程一 —— 协程 Coroutine
Kotlin 协程二 —— 通道 Channel
Kotlin 协程三 —— 数据流 Flow
Kotlin 协程四 —— Flow 和 Channel 的应用
Kotlin 协程五 —— 在Android 中使用 Kotlin 协程

  • 一、 Channel 基本使用1.1 Channel 的概念1.2 Channel 的简单使用1.3 Channel 的迭代1.4 close 关闭 Channel1.5 Channel 是热流
  • 1.1 Channel 的概念
  • 1.2 Channel 的简单使用
  • 1.3 Channel 的迭代
  • 1.4 close 关闭 Channel
  • 1.5 Channel 是热流
  • 二、Channel 的类型2.1 SendChannel 和 ReceiveChannel2.2 创建不同类型的 Channel
  • 2.1 SendChannel 和 ReceiveChannel
  • 2.2 创建不同类型的 Channel
  • 三、协程间通过 Channel 实现通信3.1 多个协程访问同一个 Channel3.2 produce 和 actor3.3 扇入和扇出3.4 BroadcastChannel3.4.1 BroadcastChannel 基本使用3.4.2 使用拓展函数转换3.4.3 过时的 API
  • 3.1 多个协程访问同一个 Channel
  • 3.2 produce 和 actor
  • 3.3 扇入和扇出
  • 3.4 BroadcastChannel3.4.1 BroadcastChannel 基本使用3.4.2 使用拓展函数转换3.4.3 过时的 API
  • 3.4.1 BroadcastChannel 基本使用
  • 3.4.2 使用拓展函数转换
  • 3.4.3 过时的 API

一、 Channel 基本使用

1.1 Channel 的概念

Channel 翻译过来为通道或者管道,实际上就是个队列, 是一个面向多协程之间数据传输的 BlockQueue,用于协程间通信。Channel 允许我们在不同的协程间传递数据。形象点说就是不同的协程可以往同一个管道里面写入数据或者读取数据。它是一个和 非常相似的概念。区别在于:BlockingQueue 使用 和 往队列里面写入和读取数据,这两个方法是阻塞的。而 Channel 使用 和 两个方法往管道里面写入和读取数据。这两个方法是非阻塞的挂起函数,鉴于此,Channel 的 和 方法也只能在协程中使用。

BlockingQueue
put
take
send
receive
send
receive

1.2 Channel 的简单使用

val channel = Channel<Int>()
launch {
    // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
    for (x in 1..5) channel.send(x * x)
}
// 这里我们打印了 5 次被接收的整数:
repeat(5) { println(channel.receive()) }
println("Done!")

输出结果:

1
4
9
16
25
Done!

1.3 Channel 的迭代

如果要取出 Channel 中所有的数据,可以使用迭代。

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) {
            channel.send(x * x)
        }
    }

    val iterator = channel.iterator()
    while (iterator.hasNext()) {
        val next = iterator.next()
        println(next)
    }
    println("Done!")
}

可以简化成:

val channel = Channel<Int>()
    launch {
        // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
        for (x in 1..5) channel.send(x * x)
    }
    for (y in channel) {
        println(y)
    }
    println("Done!")

此时输出结果:

1
4
9
16
25

最后一行 没有打印出来,并且程序没有结束。此时,我们发现,这种方式,实际上是我们一直在等待读取 Channel 中的数据,只要有数据到了,就会被读取到。

Done!

1.4 close 关闭 Channel

我们可以使用 方法关闭 Channel,来表明没有更多的元素将会进入通道。

close()
val channel = Channel<Int>()
    launch {
        // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
        for (x in 1..5) channel.send(x * x)
        channle.close()  // 结束发送
    }
    for (y in channel) {
        println(y)
    }
    println("Done!")

从概念上来讲,调用 方法就像向通道发送了一个特殊的关闭指令,这个迭代停止,说明关闭指令已经被接收了。所以这里能够保证所有先前发送出去的原色都能在通道关闭前被接收到。
对于一个 Channel,如果我们调用了它的 close,它会立即停止接受新元素,也就是说这时候它的 isClosedForSend 会立即返回 true,而由于 Channel 缓冲区的存在,这时候可能还有一些元素没有被处理完,所以要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。
输出结果:

close
1
4
9
16
25
Done!

1.5 Channel 是热流

Flow 是冷流,只有调用末端流操作的时候,上游才会发射数据,与 Flow 不同,Channel 是热流,不管有没有订阅者,上游都会发射数据。

二、Channel 的类型

2.1 SendChannel 和 ReceiveChannel

Channel 是一个接口,它继承了 和 两个接口

SendChannel
ReceiveChannel
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

SendChannel
提供了发射数据的功能,有如下重点接口:

SendChannel
  • send 是一个挂起函数,将指定的元素发送到此通道,在该通道的缓冲区已满或不存在时挂起调用者。如果通道已经关闭,调用发送时会抛出异常。
  • trySend 如果不违反其容量限制,则立即将指定元素添加到此通道,并返回成功结果。否则,返回失败或关闭的结果。
  • close 关闭通道。
  • isClosedForSend 判断通道是否已经关闭,如果关闭,调用 send 会引发异常。

ReceiveChannel
提供了接收数据的功能,有如下重点接口:

ReceiveChannel
  • receive 如果此通道不为空,则从中检索并删除元素;如果通道为空,则挂起调用者;如果通道为接收而关闭,则引发ClosedReceiveChannel异常。
  • tryReceive 如果此通道不为空,则从中检索并删除元素,返回成功结果;如果通道为空,则返回失败结果;如果通道关闭,则返回关闭结果。
  • receiveCatching 如果此通道不为空,则从中检索并删除元素,返回成功结果;如果通道为空,则返回失败结果;如果通道关闭,则返回关闭的原因。
  • isEmpty 判断通道是否为空
  • isClosedForReceive 判断通道是否已经关闭,如果关闭,调用 receive 会引发异常。
  • cancel(cause: CancellationException? = null) 以可选原因取消接收此频道的剩余元素。此函数用于关闭通道并从中删除所有缓冲发送的元素。
  • iterator() 返回通道的迭代器

2.2 创建不同类型的 Channel

Kotlin 协程库中定义了多个 Channel 类型,所有channel类型的receive方法都是同样的行为: 如果channel不为空, 接收一个元素, 否则挂起。
它们的主要区别在于:

  • 内部可以存储元素的数量
  • send 是否可以被挂起

Channel 的不同类型:

  • Rendezvous channel: 0尺寸buffer (默认类型).
  • Unlimited channel: 无限元素, send不被挂起.
  • Buffered channel: 指定大小, 满了之后send挂起.
  • Conflated channel: 新元素会覆盖旧元素, receiver只会得到最新元素, send永不挂起.

创建 Channel:

val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

三、协程间通过 Channel 实现通信

3.1 多个协程访问同一个 Channel

在协程外部定义 Channel, 就可以多个协程可以访问同一个channel,达到协程间通信的目的。

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x)
    }
    launch {
        delay(10)
        for (y in channel) {
            println(" 1 --> $y")
        }
    }
    launch {
        delay(20)
        for (y in channel) {
            println(" 2 --> $y")
        }
    }
    launch {
        delay(30)
        for (x in 90..100) channel.send(x)
        channel.close()
    }
}

3.2 produce 和 actor

在协程外部定义 Channel,多个协程同时访问 Channel, 就可以实现生产者消费者模式。
produce
使用 produce 可以更便捷地构造生产者

fun main() = runBlocking<Unit> {
    val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce {
        var i = 0
        while(true){
            delay(1000)
            send(i)
            i++
        }
        delay(3000)
        receiveChannel.cancel()
    }
}

我们可以通过 produce 这个方法启动一个生产者协程,并返回一个 ReceiveChannel,其他协程就可以拿着这个 Channel 来接收数据了。

actor
actor 可以用来构建一个消费者协程

fun main() = runBlocking<Unit> {
    val sendChannel: SendChannel<Int> = actor<Int> {
        for (ele in channel)
            ele
        }
    }
    
    delay(2000)
    sendChannel.close()
}

注意:不要在循环中使用 receive ,思考为什么?

注意:不要在循环中使用 receive ,思考为什么?

produce 和 actor 与 launch 一样都被称作“协程启动器”。通过这两个协程的启动器启动的协程也自然的与返回的 Channel 绑定到了一起,因此 Channel 的关闭也会在协程结束时自动完成,以 produce 为例,它构造出了一个 ProducerCoroutine 的对象

3.3 扇入和扇出

多个协程可能会从同一个channel中接收值,这种情况称为Fan-out。
多个协程可能会向同一个channel发射值,这种情况称为Fan-in。

3.4 BroadcastChannel

3.4.1 BroadcastChannel 基本使用

3.1 中例子提到一对多的情形,从数据处理本身来讲,有多个接收端的时候,同一个元素只会被一个接收端读到。而 BroadcastChannel 则不然,多个接收端不存在互斥现象。

public interface BroadcastChannel<E> : SendChannel<E> {

    public fun openSubscription(): ReceiveChannel<E>

    public fun cancel(cause: CancellationException? = null)

    @Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
    public fun cancel(cause: Throwable? = null): Boolean
}

public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
when (capacity) {
    0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
    UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
    CONFLATED -> ConflatedBroadcastChannel()
    BUFFERED -> ArrayBroadcastChannel(CHANNEL_DEFAULT_CAPACITY)
    else -> ArrayBroadcastChannel(capacity)
}

创建 BroadcastChannel
创建 BroadcastChannel 需要指定缓冲区大小

val broadcastChannel = broadcastChannel<Int>(5)

订阅 broadcastChannel
订阅 broadcastChannel,那么只需要调用

val receiveChannel = broadcastChannel.openSubscription()

这样我们就得到了一个 ,获取订阅的消息,只需要调用它的 receive。

ReceiveChannel

3.4.2 使用拓展函数转换

使用 Channel 的拓展函数,也可以将一个 Channel 转换成 BroadcastChannel, 需要指定缓冲区大小。

val channel = Channel<Int>()
val broadcast = channel.broadcast(3)

这样发射给原 channel 的数据会被读取后发射给转换后的 broadcastChannel。如果还有其他协程也在读这个原始的 Channel,那么会与 BroadcastChannel 产生互斥关系。

3.4.3 过时的 API

BroadcastChannel 源码中的说明:

Note: This API is obsolete since 1.5.0. It will be deprecated with warning in 1.6.0 and with error in 1.7.0. It is replaced with SharedFlow.

BroadcastChannel 对于广播式的任务来说有点太复杂了。使用通道进行状态管理时会出现一些逻辑上的不一致。例如,可以关闭或取消通道。但由于无法取消状态,因此在状态管理中无法正常使用!
所以官方决定启用 BroadcastChannel。BroadcastChannel 被标记为过时了,在 kotlin 1.6.0 版本中使用将显示警告,在 1.7.0 版本中将显示错误。请使用 SharedFlow 和 StateFlow 替代它。
关于 SharedFlow 和 StateFlow 将在下文中讲到。

————————

Kotlin collaboration series article navigation:
Kotlin process I – coroutine
Kotlin collaboration process 2 – channel
Kotlin collaboration process III – data flow
Kotlin collaboration process IV — Application of flow and channel
Kotlin collaboration 5 – using kotlin collaboration in Android

  • 一、 Channel 基本使用1.1 Channel 的概念1.2 Channel 的简单使用1.3 Channel 的迭代1.4 close 关闭 Channel1.5 Channel 是热流
  • 1.1 Channel 的概念
  • 1.2 Channel 的简单使用
  • 1.3 Channel 的迭代
  • 1.4 close 关闭 Channel
  • 1.5 Channel 是热流
  • 二、Channel 的类型2.1 SendChannel 和 ReceiveChannel2.2 创建不同类型的 Channel
  • 2.1 SendChannel 和 ReceiveChannel
  • 2.2 create different types of channels
  • 三、协程间通过 Channel 实现通信3.1 多个协程访问同一个 Channel3.2 produce 和 actor3.3 扇入和扇出3.4 BroadcastChannel3.4.1 BroadcastChannel 基本使用3.4.2 使用拓展函数转换3.4.3 过时的 API
  • 3.1 multiple processes access the same channel
  • 3.2 produce 和 actor
  • 3.3 fan in and fan out
  • 3.4 BroadcastChannel3.4.1 BroadcastChannel 基本使用3.4.2 使用拓展函数转换3.4.3 过时的 API
  • 3.4.1 BroadcastChannel 基本使用
  • 3.4.2 using extended function transformation
  • 3.4.3 outdated API

一、 Channel 基本使用

1.1 Channel 的概念

Channel is translated as a channel or pipeline, which is actually a queue. It is a blockqueue for data transmission between multiple processes, which is used for inter process communication. Channel allows us to transfer data between different processes. In other words, different processes can write data or read data into the same pipeline. It is a very similar concept to. The difference is that BlockingQueue uses and writes and reads data into the queue. These two methods are blocked. The channel uses and methods to write and read data into the pipeline. These two methods are non blocking suspend functions. In view of this, the and methods of channel can only be used in the coroutine.

BlockingQueue
put
take
send
receive
send
receive

1.2 Channel 的简单使用

val channel = Channel<Int>()
launch {
    // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
    for (x in 1..5) channel.send(x * x)
}
// 这里我们打印了 5 次被接收的整数:
repeat(5) { println(channel.receive()) }
println("Done!")

Output results:

1
4
9
16
25
Done!

1.3 Channel 的迭代

If you want to retrieve all the data in the channel, you can use iteration.

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) {
            channel.send(x * x)
        }
    }

    val iterator = channel.iterator()
    while (iterator.hasNext()) {
        val next = iterator.next()
        println(next)
    }
    println("Done!")
}

It can be simplified to:

val channel = Channel<Int>()
    launch {
        // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
        for (x in 1..5) channel.send(x * x)
    }
    for (y in channel) {
        println(y)
    }
    println("Done!")

Output result:

1
4
9
16
25

The last line did not print out, and the program did not end. At this time, we find that in this way, we are actually waiting to read the data in the channel. As long as there is data, it will be read.

Done!

1.4 close 关闭 Channel

We can use the method to close the channel to indicate that no more elements will enter the channel.

close()
val channel = Channel<Int>()
    launch {
        // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
        for (x in 1..5) channel.send(x * x)
        channle.close()  // 结束发送
    }
    for (y in channel) {
        println(y)
    }
    println("Done!")

Conceptually, calling a method is like sending a special close instruction to the channel. This iteration stops, indicating that the close instruction has been received. Therefore, it can be ensured that all previously transmitted primary colors can be received before the channel is closed.
For a channel, if we call its close, it will immediately stop accepting new elements, that is, its isclosedforsend will immediately return true. Due to the existence of channel buffer, some elements may not be processed at this time, so isclosedforreceive will return true only after all elements are read.
Output results:

close
1
4
9
16
25
Done!

1.5 Channel 是热流

Flow is a cold flow. The upstream will transmit data only when the end flow operation is called. Unlike flow, channel is a hot flow. The upstream will transmit data whether there are subscribers or not.

二、Channel 的类型

2.1 SendChannel 和 ReceiveChannel

Channel is an interface that inherits and two interfaces

SendChannel
ReceiveChannel
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

SendChannel
It provides the function of transmitting data, with the following key interfaces:

SendChannel
  • Send is a suspend function that sends the specified element to this channel and suspends the caller when the channel’s buffer is full or does not exist. If the channel is closed, an exception will be thrown when calling send.
  • Trysend adds the specified element to this channel immediately and returns a successful result if its capacity limit is not violated. Otherwise, the result of failure or shutdown is returned.
  • Close closes the channel.
  • Isclosedforsend determines whether the channel has been closed. If closed, calling send will throw an exception.

ReceiveChannel
It provides the function of receiving data, including the following key interfaces:

ReceiveChannel
  • Receive if this channel is not empty, retrieve and delete elements from it; If the channel is empty, suspend the caller; If the channel is closed for receiving, a closedreceivechannel exception is thrown.
  • Tryreceiveif this channel is not empty, the element is retrieved and deleted from it, and a successful result is returned; If the channel is empty, a failure result is returned; If the channel is closed, the closing result is returned.
  • Receivecatching if this channel is not empty, retrieve and delete elements from it and return a successful result; If the channel is empty, a failure result is returned; If the channel is closed, the reason for closing is returned.
  • Isempty determines whether the channel is empty
  • isClosedForReceive 判断通道是否已经关闭,如果关闭,调用 receive 会引发异常。
  • Cancel (cause: cancelationexception? = null) cancels receiving the remaining elements of this channel for an optional reason. This function closes the channel and removes all buffered sent elements from it.
  • Iterator() returns the iterator of the channel

2.2 create different types of channels

Multiple channel types are defined in the kotlin collaboration library. The receive methods of all channel types have the same behavior: if the channel is not empty, receive an element, or hang.
Their main differences are:

  • The number of elements that can be stored internally
  • Can send be suspended

Different types of channels:

  • Rendezvous channel: 0尺寸buffer (默认类型).
  • Unlimited channel: 无限元素, send不被挂起.
  • Buffered channel: 指定大小, 满了之后send挂起.
  • Conflated channel: 新元素会覆盖旧元素, receiver只会得到最新元素, send永不挂起.

To create a channel:

val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

3、 Communication between processes is realized through channel

3.1 multiple processes access the same channel

By defining a channel outside the process, multiple processes can access the same channel to achieve the purpose of communication between processes.

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x)
    }
    launch {
        delay(10)
        for (y in channel) {
            println(" 1 --> $y")
        }
    }
    launch {
        delay(20)
        for (y in channel) {
            println(" 2 --> $y")
        }
    }
    launch {
        delay(30)
        for (x in 90..100) channel.send(x)
        channel.close()
    }
}

3.2 produce 和 actor

The producer consumer model can be realized by defining a channel outside the collaboration process and multiple collaboration processes access the channel at the same time.
produce
Using produce makes it easier to construct producers

fun main() = runBlocking<Unit> {
    val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce {
        var i = 0
        while(true){
            delay(1000)
            send(i)
            i++
        }
        delay(3000)
        receiveChannel.cancel()
    }
}

We can start a producer process through the method of produce and return a receivechannel, and other processes can receive data with this channel.

actor
Actor can be used to build a consumer collaboration

fun main() = runBlocking<Unit> {
    val sendChannel: SendChannel<Int> = actor<Int> {
        for (ele in channel)
            ele
        }
    }
    
    delay(2000)
    sendChannel.close()
}

Note: don’t use receive in a loop. Think about why?

Note: don’t use receive in a loop. Think about why?

Both produce and actor, like launch, are called “co process initiators”. The processes started by the initiators of these two processes are also naturally bound to the returned channel, so the closing of the channel will also be completed automatically at the end of the process. Take production as an example, it constructs a producercoroutine object

3.3 fan in and fan out

Multiple processes may receive values from the same channel, which is called fan out.
Multiple co processes may transmit values to the same channel, which is called fan in.

3.4 BroadcastChannel

3.4.1 BroadcastChannel 基本使用

The example in 3.1 refers to the one to many situation. In terms of data processing itself, when there are multiple receivers, the same element will only be read by one receiver. The broadcastchannel is not the case. Multiple receivers are not mutually exclusive.

public interface BroadcastChannel<E> : SendChannel<E> {

    public fun openSubscription(): ReceiveChannel<E>

    public fun cancel(cause: CancellationException? = null)

    @Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
    public fun cancel(cause: Throwable? = null): Boolean
}

public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
when (capacity) {
    0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
    UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
    CONFLATED -> ConflatedBroadcastChannel()
    BUFFERED -> ArrayBroadcastChannel(CHANNEL_DEFAULT_CAPACITY)
    else -> ArrayBroadcastChannel(capacity)
}

< strong > create a broadcastchannel < / strong >
To create a broadcastchannel, you need to specify the buffer size

val broadcastChannel = broadcastChannel<Int>(5)

订阅 broadcastChannel
订阅 broadcastChannel,那么只需要调用

val receiveChannel = broadcastChannel.openSubscription()

In this way, we get a message to get the subscription. We only need to call its receive.

ReceiveChannel

3.4.2 using extended function transformation

You can also convert a channel into a broadcastchannel by using the channel expansion function. You need to specify the buffer size.

val channel = Channel<Int>()
val broadcast = channel.broadcast(3)

In this way, the data transmitted to the original channel will be read and transmitted to the converted broadcastchannel. If other processes are reading the original channel, they will be mutually exclusive with broadcastchannel.

3.4.3 outdated API

Description in the source code of broadcastchannel:

Note: This API is obsolete since 1.5.0. It will be deprecated with warning in 1.6.0 and with error in 1.7.0. It is replaced with SharedFlow.

Broadcast channel is a little too complicated for broadcast tasks. There are some logical inconsistencies when using channels for state management. For example, you can close or cancel channels. However, because the status cannot be cancelled, it cannot be used normally in status management!
Therefore, the official decided to enable broadcastchannel. Broadcastchannel is marked as out of date. It will display warnings in kotlin version 1.6.0 and errors in version 1.7.0. Please replace it with sharedflow and stateflow.
Sharedflow and stateflow are discussed below.