Kotlin 协程三 —— 数据流 Flow(Kotlin collaboration process III – data flow)

Kotlin 协程系列文章导航:
Kotlin 协程一 —— 协程 Coroutine
Kotlin 协程二 —— 通道 Channel
Kotlin 协程三 —— 数据流 Flow
Kotlin 协程四 —— Flow 和 Channel 的应用
Kotlin 协程五 —— 在Android 中使用 Kotlin 协程

  • 一、Flow 的基本使用1.1 Sequence 与 Flow1.2 Flow 的简单使用1.3 创建常规 Flow 的常用方式:1.4 Flow 是冷流(惰性的)1.5 Flow 的取消
  • 1.1 Sequence 与 Flow
  • 1.2 Flow 的简单使用
  • 1.3 创建常规 Flow 的常用方式:
  • 1.4 Flow 是冷流(惰性的)
  • 1.5 Flow 的取消
  • 二、 Flow 的操作符2.1 Terminal flow operators 末端流操作符2.1.1 collect2.1.2 reduce2.1.3 fold2.1.4 launchIn2.2 流是连续的2.3 onStart 流启动时2.4 onCompletion 流完成时2.4.1 使用 try … finally 实现2.4.2 通过 onCompletion 函数实现2.5 Backpressure 背压2.5.1 buffer 缓冲2.5.2 conflate 合并2.6 Flow 异常处理2.6.1 catch 操作符捕获上游异常2.6.2 retry、retryWhen 操作符重试2.7 Flow 线程切换2.7.1 响应线程2.7.2 flowOn 切换线程2.8 Flow 中间转换操作符2.8.1 map2.8.2 transform2.8.3 onEach2.8.4 filter2.8.5 drop / dropWhile2.8.6 take2.8.7 zip2.8.8 combine2.8.9 flattenContact 和 flattenMerge 扁平化处理2.8.10 flatMapMerge 和 flatMapContact2.8.11 flatMapLatest
  • 2.1 Terminal flow operators 末端流操作符2.1.1 collect2.1.2 reduce2.1.3 fold2.1.4 launchIn
  • 2.1.1 collect
  • 2.1.2 reduce
  • 2.1.3 fold
  • 2.1.4 launchIn
  • 2.2 流是连续的
  • 2.3 onStart 流启动时
  • 2.4 onCompletion 流完成时2.4.1 使用 try … finally 实现2.4.2 通过 onCompletion 函数实现
  • 2.4.1 使用 try … finally 实现
  • 2.4.2 通过 onCompletion 函数实现
  • 2.5 Backpressure 背压2.5.1 buffer 缓冲2.5.2 conflate 合并
  • 2.5.1 buffer 缓冲
  • 2.5.2 conflate 合并
  • 2.6 Flow 异常处理2.6.1 catch 操作符捕获上游异常2.6.2 retry、retryWhen 操作符重试
  • 2.6.1 catch 操作符捕获上游异常
  • 2.6.2 retry、retryWhen 操作符重试
  • 2.7 Flow 线程切换2.7.1 响应线程2.7.2 flowOn 切换线程
  • 2.7.1 响应线程
  • 2.7.2 flowOn 切换线程
  • 2.8 Flow 中间转换操作符2.8.1 map2.8.2 transform2.8.3 onEach2.8.4 filter2.8.5 drop / dropWhile2.8.6 take2.8.7 zip2.8.8 combine2.8.9 flattenContact 和 flattenMerge 扁平化处理2.8.10 flatMapMerge 和 flatMapContact2.8.11 flatMapLatest
  • 2.8.1 map
  • 2.8.2 transform
  • 2.8.3 onEach
  • 2.8.4 filter
  • 2.8.5 drop / dropWhile
  • 2.8.6 take
  • 2.8.7 zip
  • 2.8.8 combine
  • 2.8.9 flattenContact 和 flattenMerge 扁平化处理
  • 2.8.10 flatMapMerge 和 flatMapContact
  • 2.8.11 flatMapLatest
  • 三、 StateFlow 和 SharedFlow3.1 StateFlow3.1.1 StateFlow 基本使用3.1.2 为什么使用 StateFlow3.1.3 防止任务泄漏3.1.4 SateFlow 只会发射最新的数据给订阅者。3.2 SharedFlow3.2.1 SharedFlow 基本使用3.2.2 MutableSharedFlow 的其它接口3.3 StateFlow 和 SharedFlow 的使用场景3.4 将冷流转换为热流
  • 3.1 StateFlow3.1.1 StateFlow 基本使用3.1.2 为什么使用 StateFlow3.1.3 防止任务泄漏3.1.4 SateFlow 只会发射最新的数据给订阅者。
  • 3.1.1 StateFlow 基本使用
  • 3.1.2 为什么使用 StateFlow
  • 3.1.3 防止任务泄漏
  • 3.1.4 SateFlow 只会发射最新的数据给订阅者。
  • 3.2 SharedFlow3.2.1 SharedFlow 基本使用3.2.2 MutableSharedFlow 的其它接口
  • 3.2.1 SharedFlow 基本使用
  • 3.2.2 MutableSharedFlow 的其它接口
  • 3.3 StateFlow 和 SharedFlow 的使用场景
  • 3.4 将冷流转换为热流

一、Flow 的基本使用

Kotlin 协程中使用挂起函数可以实现非阻塞地执行任务并将结果返回回来,但是只能返回单个计算结果。但是如果希望有多个计算结果返回回来,则可以使用 Flow。

1.1 Sequence 与 Flow

介绍 Flow 之前,先看下序列生成器:

val intSequence = sequence<Int> {
        Thread.sleep(1000) // 模拟耗时任务1
        yield(1)
        Thread.sleep(1000) // 模拟耗时任务2
        yield(2)
        Thread.sleep(1000) // 模拟耗时任务3
        yield(3)
    }

intSequence.forEach {
        println(it)
    }

如上,取出序列生成器中的值,需要迭代序列生成器,按照我们的预期,依次返回了三个结果。

Sequence 是同步调用,是阻塞的,无法调用其它的挂起函数。显然,我们更多地时候希望能够异步执行多个任务,并将结果依次返回回来,Flow 是该场景的最优解。

Flow 源码如下,只有一个 collect 方法。

public interface Flow<out T> {

    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

Flow 可以非阻塞地执行多个任务,并返回多个结果, 在 Flow 中可以调用其它挂起函数。取出 Flow 中的值,则需要调用 方法,Flow 的使用形式为:

collect
Flow.collect()  // 伪代码

由于 collect 是一个挂起函数, collect 方法必须要在协程中调用。

1.2 Flow 的简单使用

实现上述 Sequence 类似的效果:

private fun createFlow(): Flow<Int> = flow {
    delay(1000)
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

fun main() = runBlocking {
    createFlow().collect {
        println(it)
    }
}

上述代码使用 来构建一个 Flow 类型,具有如下特点:

 flow{ ... }
  • flow{ … } 内部可以调用 suspend 函数;
  • createFlow 不需要 suspend 来标记;(为什么没有标记为挂起函数,去可以调用挂起函数?)
  • 使用 emit() 方法来发射数据;
  • 使用 collect() 方法来收集结果。

1.3 创建常规 Flow 的常用方式:

flow{…}

flow {
    delay(1000)
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

flowOf()

flowOf(1,2,3).onEach {
    delay(1000)
}

构建器定义了一个发射固定值集的流, 使用 构建 Flow 不需要显示调用 发射数据

flowOf()
flowOf
emit()

asFlow()

listOf(1, 2, 3).asFlow().onEach {
    delay(1000)
}

使用 扩展函数,可以将各种集合与序列转换为流,也不需要显示调用 发射数据

asFlow()
emit()

1.4 Flow 是冷流(惰性的)

如同 Sequences 一样, Flows 也是惰性的,即在调用末端流操作符(collect 是其中之一)之前, flow{ … } 中的代码不会执行。我们称之为冷流。

private fun createFlow(): Flow<Int> = flow {
    println("flow started")
    delay(1000)
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

fun main() = runBlocking {
    val flow = createFlow()
    println("calling collect...")
    flow.collect {
        println(it)
    }
    println("calling collect again...")
    flow.collect {
        println(it)
    }
}

结果如下:

calling collect...
flow started
1
2
3
calling collect again...
flow started
1
2
3

这是一个返回一个 Flow 的函数 没有标记 的原因,即便它内部调用了 suspend 函数,但是调用 createFlow 会立即返回,且不会进行任何等待。而再每次收集结果的时候,才会启动流。

createFlow
suspend

那么有没有热流呢? 后面讲的 ChannelFlow 就是热流。只有上游产生了数据,就会立即发射给下游的收集者。

1.5 Flow 的取消

流采用了与协程同样的协助取消。流的收集可以在当流在一个可取消的挂起函数(例如 delay)中挂起的时候取消。取消Flow 只需要取消它所在的协程即可。
以下示例展示了当 withTimeoutOrNull 块中代码在运行的时候流是如何在超时的情况下取消并停止执行其代码的:

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // 在 250 毫秒后超时
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

注意,在 simple 函数中流仅发射两个数字,产生以下输出:

Emitting 1
1
Emitting 2
2
Done

二、 Flow 的操作符

2.1 Terminal flow operators 末端流操作符

末端操作符是在流上用于启动流收集的挂起函数。 collect 是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符:

  • 转化为各种集合,toList/toSet/toCollection
  • 获取第一个(first)值,最后一个(last)值与确保流发射单个(single)值的操作符
  • 使用 reduce 与 fold 将流规约到单个值
  • count
  • launchIn/produceIn/broadcastIn

下面看几个常用的末端流操作符

2.1.1 collect

收集上游发送的数据

2.1.2 reduce

reduce 类似于 Kotlin 集合中的 reduce 函数,能够对集合进行计算操作。前面提到,reduce 是一个末端流操作符。

fun main() = runBlocking {
    val sum = (1..5).asFlow().reduce { a, b ->
        a + b
    }
    println(sum)
}

输出结果:

15

2.1.3 fold

fold 也类似于 Kotlin 集合中的 fold,需要设置一个初始值,fold 也是一个末端流操作符。

fun main() = runBlocking {
    val sum = (1..5).asFlow().fold(100) { a, b ->
        a + b
    }
    println(sum)
}

输出结果:

115

2.1.4 launchIn

launchIn 用来在指定的 CoroutineScope 内启动 flow, 需要传入一个参数: CoroutineScope
源码:

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}

示例:

private val mDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
fun main() {
    val scope = CoroutineScope(mDispatcher)
    (1..5).asFlow().onEach { println(it) }
        .onCompletion { mDispatcher.close() }
        .launchIn(scope)
}

输出结果:

1
2
3
4
5

再看一个例子:

fun main() = runBlocking{
    val cosTime = measureTimeMillis {
        (1..5).asFlow()
            .onEach { delay(100) }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }

        flowOf("one", "two", "three", "four", "five")
            .onEach { delay(200) }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }
    }
    println("cosTime: $cosTime")
}

我们希望并行执行两个 Flow ,看下输出结果:

1
2
3
4
5
one
two
three
four
five
cosTime: 1645

结果并不是并行执行的,这个很好理解,因为第一个 collect 不执行完,不会走到第二个。

正确地写法应该是,为每个 Flow 单独起一个协程:

fun main() = runBlocking<Unit>{
    launch {
        (1..5).asFlow()
            .onEach { delay(100) }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }
    }
    launch {
        flowOf("one", "two", "three", "four", "five")
            .onEach { delay(200) }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }
    }
}

或者使用 launchIn, 写法更优雅:

fun main() = runBlocking<Unit>{

    (1..5).asFlow()
        .onEach { delay(100) }
        .flowOn(Dispatchers.IO)
        .onEach { println(it) }
        .launchIn(this)

    flowOf("one", "two", "three", "four", "five")
        .onEach { delay(200) }
        .flowOn(Dispatchers.IO)
        .onEach { println(it) }
        .launchIn(this)
}

输出结果:

1
one
2
3
4
two
5
three
four
five

2.2 流是连续的

与 Sequence 类似,Flow 的每次单独收集都是按顺序执行的,除非进行特殊操作的操作符使用多个流。 默认情况下不启动新协程。 从上游到下游每个过渡操作符都会处理每个发射出的值然后再交给末端操作符。

fun main() = runBlocking {
    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0
        }
        .map {
            println("Map $it")
            "string $it"
        }.collect {
            println("Collect $it")
        }
}

输出:

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

2.3 onStart 流启动时

Flow 启动开始执行时的回调,在耗时操作时可以用来做 loading。

fun main() = runBlocking {
    (1..5).asFlow()
        .onEach { delay(200) }
        .onStart { println("onStart") }
        .collect { println(it) }
}

输出结果:

onStart
1
2
3
4
5

2.4 onCompletion 流完成时

Flow 完成时(正常或出现异常时),如果需要执行一个操作,它可以通过两种方式完成:

2.4.1 使用 try … finally 实现

fun main() = runBlocking {
    try {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect { println(it) }
    } finally {
        println("Done")
    }
}

2.4.2 通过 onCompletion 函数实现

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.onCompletion { println("Done") }
        .collect { println(it) }
}

输出:

1
2
3
4
5
Done

2.5 Backpressure 背压

Backpressure 是响应式编程的功能之一, Rxjava2 中的 Flowable 支持的 Backpressure 策略有:

  • MISSING:创建的 Flowable 没有指定背压策略,不会对通过 OnNext 发射的数据做缓存或丢弃处理。
  • ERROR:如果放入 Flowable 的异步缓存池中的数据超限了,则会抛出 MissingBackpressureException 异常。
  • BUFFER:Flowable 的异步缓存池同 Observable 的一样,没有固定大小,可以无限制添加数据,不会抛出 MissingBackpressureException 异常,但会导致 OOM。
  • DROP:如果 Flowable 的异步缓存池满了,会丢掉将要放入缓存池中的数据。
  • LATEST:如果缓存池满了,会丢掉将要放入缓存池中的数据。这一点跟 DROP 策略一样,不同的是,不管缓存池的状态如何,LATEST 策略会将最后一条数据强行放入缓存池中。

而在Flow代码块中,每当有一个处理结果 我们就可以收到,但如果处理结果也是耗时操作。就有可能出现,发送的数据太多了,处理不及时的情况。
Flow 的 Backpressure 是通过 suspend 函数实现的。

2.5.1 buffer 缓冲

buffer 对应 Rxjava 的 BUFFER 策略。 buffer 操作指的是设置缓冲区。当然缓冲区有大小,如果溢出了会有不同的处理策略。

  • 设置缓冲区,如果溢出了,则将当前协程挂起,直到有消费了缓冲区中的数据。
  • 设置缓冲区,如果溢出了,丢弃最新的数据。
  • 设置缓冲区,如果溢出了,丢弃最老的数据。

缓冲区的大小可以设置为 0,也就是不需要缓冲区。

看一个未设置缓冲区的示例,假设每产生一个数据然后发射出去,要耗时 100ms ,每次处理一个数据需要耗时 700ms,代码如下:

fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.collect {
            delay(700)
            println("collect: $it")
        }
    }
    println("cosTime: $cosTime")
}

结果如下:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
produce data: 4
collect: 4
produce data: 5
collect: 5
cosTime: 4069

由于流是惰性的,且是连续的,所以整个流中的数据处理完成大约需要 4000ms

下面,我们使用 buffer() 设置一个缓冲区。buffer(),
接收两个参数,第一个参数是 size, 表示缓冲区的大小。第二个参数是 BufferOverflow, 表示缓冲区溢出之后的处理策略,其值为下面的枚举类型,默认是

BufferOverflow.SUSPEND

处理策略源码如下:

public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,

    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,

    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}

设置缓冲区,并采用挂起的策略

修改,代码,我们设置缓冲区大小为 1 :

fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.buffer(2, BufferOverflow.SUSPEND)
            .collect {
                delay(700)
                println("collect: $it")
            }
    }
    println("cosTime: $cosTime")
}

结果如下:

produce data: 1
produce data: 2
produce data: 3
produce data: 4
collect: 1
produce data: 5
collect: 2
collect: 3
collect: 4
collect: 5
cosTime: 3713

可见整体用时大约为 3713ms。buffer 操作符可以使发射和收集的代码并发运行,从而提高效率。
下面简单分析一下执行流程:
这里要注意的是,buffer 的容量是从 0 开始计算的。
首先,我们收集第一个数据,产生第一个数据,然后 2、3、4 存储在了缓冲区。第5个数据发射时,缓冲区满了,会挂起。等到第1个数据收集完成之后,再发射第5个数据。

设置缓冲区,丢弃最新的数据
如果上述代码处理缓存溢出策略为 ,代码如下:

BufferOverflow.DROP_LATEST
fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.buffer(2, BufferOverflow.DROP_LATEST)
            .collect {
                delay(700)
                println("collect: $it")
            }
    }
    println("cosTime: $cosTime")
}

输出如下:

produce data: 1
produce data: 2
produce data: 3
produce data: 4
produce data: 5
collect: 1
collect: 2
collect: 3
cosTime: 2272

可以看到,第4个数据和第5个数据因为缓冲区满了直接被丢弃了,不会被收集。

设置缓冲区,丢弃旧的数据
如果上述代码处理缓存溢出策略为 ,代码如下:

BufferOverflow.DROP_OLDEST
fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.buffer(2, BufferOverflow.DROP_OLDEST)
            .collect {
                delay(700)
                println("collect: $it")
            }
    }
    println("cosTime: $cosTime")
}

输出结果如下:

produce data: 1
produce data: 2
produce data: 3
produce data: 4
produce data: 5
collect: 1
collect: 4
collect: 5
cosTime: 2289

可以看到,第4个数据进入缓冲区时,会把第2个数据丢弃掉,第5个数据进入缓冲区时,会把第3个数据丢弃掉。

2.5.2 conflate 合并

当流代表部分操作结果或操作状态更新时,可能没有必要处理每个值,而是只处理最新的那个。conflate 操作符可以用于跳过中间值:

fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.conflate()
            .collect {
                delay(700)
                println("collect: $it")
            }
    }
    println("cosTime: $cosTime")
}

输出结果:

produce data: 1
produce data: 2
produce data: 3
produce data: 4
produce data: 5
collect: 1
collect: 5
cosTime: 1596

操作符是不设缓冲区,也就是缓冲区大小为 0,丢弃旧数据,也就是采取 DROP_OLDEST 策略,其实等价于 buffer(0, BufferOverflow.DROP_OLDEST) 。

conflate

2.6 Flow 异常处理

2.6.1 catch 操作符捕获上游异常

前面提到的 用来Flow是否收集完成,即使是遇到异常也会执行。

onCompletion
fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.onCompletion {
        println("onCompletion")
    }.collect {
        println("collect: $it")
    }
}

输出:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
onCompletion
Exception in thread "main" java.lang.Exception: test exception
...

其实在 中是可以判断是否有异常的, 是有一个参数的,如果flow 的上游出现异常,这个参数不为 null,如果上游未出现异常,则为 null, 据此,我们可以在 onCompletion 中判断异常:

onCompletion
onCompletion(action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit)
fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.onCompletion { cause ->
        if (cause != null) {
            println("flow completed exception")
        } else {
            println("onCompletion")
        }
    }.collect {
        println("collect: $it")
    }
}

但是, onCompletion 智能判断是否出现了异常,并不能捕获异常。

捕获异常可以使用 操作符。

catch
fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.onCompletion { cause ->
        if (cause != null) {
            println("flow completed exception")
        } else {
            println("onCompletion")
        }
    }.catch { ex ->
        println("catch exception: ${ex.message}")
    }.collect {
        println("collect: $it")
    }
}

输出结果:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
flow completed exception
catch exception: test exception

但是如果把 和 交换一下位置,则 catch 操作捕获到异常之后,不会再影响下游:
代码:

onCompletion
catch
fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.catch { ex ->
        println("catch exception: ${ex.message}")
    }.onCompletion { cause ->
        if (cause != null) {
            println("flow completed exception")
        } else {
            println("onCompletion")
        }
    }.collect {
        println("collect: $it")
    }
}

输出结果:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
catch exception: test exception
onCompletion

catch 操作符用于实现异常透明化处理, catch 只是中间操作符不能捕获下游的异常,。
catch 操作符内,可以使用 throw 再次抛出异常、可以使用 emit() 转换为发射值、可以用于打印或者其他业务逻辑的处理等等

  • catch 操作符用于实现异常透明化处理, catch 只是中间操作符不能捕获下游的异常,。
  • catch 操作符内,可以使用 throw 再次抛出异常、可以使用 emit() 转换为发射值、可以用于打印或者其他业务逻辑的处理等等

2.6.2 retry、retryWhen 操作符重试

public fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE,
    predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
    require(retries > 0) { "Expected positive amount of retries, but had $retries" }
    return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
}

如果上游遇到了异常,并使用了 retry 操作符,则 retry 会让 Flow 最多重试 retries 指定的次数

fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.retry(2) {
        it.message == "test exception"
    }.catch { ex ->
        println("catch exception: ${ex.message}")
    }.collect {
        println("collect: $it")
    }
}

输出结果:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
catch exception: test exception

注意,只有遇到异常了,并且 retry 方法返回 true 的时候才会进行重试。

retry 最终调用的是 retryWhen 操作符。下面的代码与上面的代码逻辑一致。

fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.retryWhen { cause, attempt ->
        cause.message == "test exception" && attempt < 2
    }.catch { ex ->
        println("catch exception: ${ex.message}")
    }.collect {
        println("collect: $it")
    }
}

试想: 如果 将代码中的 catch 和 retry/retryWhen 交换位置,结果如何?

2.7 Flow 线程切换

2.7.1 响应线程

Flow 是基于 CoroutineContext 进行线程切换的。因为 Collect 是一个 suspend 函数,必须在 CoroutineScope 中执行,所以响应线程是由 CoroutineContext 决定的。比如,在 Main 线程总执行 collect, 那么响应线程就是 Dispatchers.Main。

2.7.2 flowOn 切换线程

Rxjava 通过 subscribeOn 和 ObserveOn 来决定发射数据和观察者的线程。并且,上游多次调用 只会以最后一次为准。
而 Flows 通过 flowOn 方法来切换线程,多次调用,都会影响到它上游的代码。举个例子:

subscribeOn
private val mDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()

fun main() = runBlocking {
    (1..5).asFlow().onEach {
        printWithThreadInfo("produce data: $it")
    }.flowOn(Dispatchers.IO)
        .map {
            printWithThreadInfo("$it to String")
            "String: $it"
        }.flowOn(mDispatcher)
        .onCompletion {
            mDispatcher.close()
        }
        .collect {
            printWithThreadInfo("collect: $it")
        }
}

输出结果如下:

thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 1
thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 2
thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 3
thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 4
thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 5
thread id: 12, thread name: pool-1-thread-1 ---> 1 to String
thread id: 12, thread name: pool-1-thread-1 ---> 2 to String
thread id: 1, thread name: main ---> collect: String: 1
thread id: 12, thread name: pool-1-thread-1 ---> 3 to String
thread id: 1, thread name: main ---> collect: String: 2
thread id: 12, thread name: pool-1-thread-1 ---> 4 to String
thread id: 1, thread name: main ---> collect: String: 3
thread id: 12, thread name: pool-1-thread-1 ---> 5 to String
thread id: 1, thread name: main ---> collect: String: 4
thread id: 1, thread name: main ---> collect: String: 5

可以看到,发射数据是在 Dispatchers.IO 线程执行的, map 操作时在我们自定义的线程池中进行的,collect 操作在 Dispatchers.Main 线程进行。

2.8 Flow 中间转换操作符

2.8.1 map

前面例子已经有用到 map 操作符,map 操作符不止可以用于 Flow, map 操作符勇于 List 表示将 List 中的每个元素转换成新的元素,并添加到一个新的 List 中,最后再讲新的 List 返回,
map 操作符用于 Flow 表示将流中的每个元素进行转换后再发射出来。

fun main() = runBlocking {
    (1..5).asFlow().map { "string: $it" }
        .collect {
            println(it)
        }
}

输出:

string: 1
string: 2
string: 3
string: 4
string: 5

2.8.2 transform

在使用 transform 操作符时,可以任意多次调用 emit ,这是 transform 跟 map 最大的区别:

fun main() = runBlocking {
    (1..5).asFlow().transform {
        emit(it * 2)
        delay(100)
        emit("String: $it")
    }.collect {
            println(it)
        }
}

输出结果:

2
String: 1
4
String: 2
6
String: 3
8
String: 4
10
String: 5

2.8.3 onEach

遍历

fun main() = runBlocking {
    (1..5).asFlow()
        .onEach { println("onEach: $it") }
        .collect { println(it) }
}

输出:

onEach: 1
1
onEach: 2
2
onEach: 3
3
onEach: 4
4
onEach: 5
5

2.8.4 filter

按条件过滤

fun main() = runBlocking {
    (1..5).asFlow()
        .filter { it % 2 == 0 }
        .collect { println(it) }
}

输出结果:

2
4

2.8.5 drop / dropWhile

drop 过滤掉前几个元素
dropWhile 过滤满足条件的元素

2.8.6 take

take 操作符只取前几个 emit 发射的值

fun main() = runBlocking {
    (1..5).asFlow().take(2).collect {
            println(it)
        }
}

输出:

1
2

2.8.7 zip

zip 是可以将2个 flow 进行合并的操作符

fun main() = runBlocking {
    val flowA = (1..6).asFlow()
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200)
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect {
            println(it)
        }
}

输出结果:

1 and one
2 and two
3 and three
4 and four
5 and five

zip 操作符会把 flowA 中的一个 item 和 flowB 中对应的一个 item 进行合并。即使 flowB 中的每一个 item 都使用了 delay() 函数,在合并过程中也会等待 delay() 执行完后再进行合并。

如果 flowA 和 flowB 中 item 个数不一致,则合并后新的 flow item 个数,等于较小的 item 个数

fun main() = runBlocking {
    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three","four","five", "six", "seven").onEach { delay(200) }
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect {
            println(it)
        }
}

输出结果:

1 and one
2 and two
3 and three
4 and four
5 and five

2.8.8 combine

combine 也是合并,但是跟 zip 不太一样。

使用 combine 合并时,每次从 flowA 发出新的 item ,会将其与 flowB 的最新的 item 合并。

fun main() = runBlocking {
    val flowA = (1..5).asFlow().onEach { delay(100) }
    val flowB = flowOf("one", "two", "three","four","five", "six", "seven").onEach { delay(200) }
    flowA.combine(flowB) { a, b -> "$a and $b" }
        .collect {
            println(it)
        }
}

输出结果:

1 and one
2 and one
3 and one
3 and two
4 and two
5 and two
5 and three
5 and four
5 and five
5 and six
5 and seven

2.8.9 flattenContact 和 flattenMerge 扁平化处理

flattenContact
flattenConcat 将给定流按顺序展平为单个流,而不交错嵌套流。
源码:

@FlowPreview
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
    collect { value -> emitAll(value) }
}

例子:

fun main() = runBlocking {
    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(1000) }

    flowOf(flowA,flowB)
        .flattenConcat()
        .collect{ println(it) }
}

输出:

1
2
3
4
5
// delay 1000ms
one
// delay 1000ms
two
// delay 1000ms
three
// delay 1000ms
four
// delay 1000ms
five

flattenMerge
fattenMerge 有一个参数,并发限制,默认位 16。
源码:

@FlowPreview
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
    require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
    return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}

可见,参数必须大于0, 并且参数为 1 时,与 flattenConcat 一致。

fun main() = runBlocking {
    val flowA = (1..5).asFlow().onEach { delay(100) }
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }

    flowOf(flowA,flowB)
        .flattenMerge(2)
        .collect{ println(it) }
}

输出结果:

1
one
2
3
two
4
5
three
four
five

2.8.10 flatMapMerge 和 flatMapContact

flatMapMerge 由 map、flattenMerge 操作符实现

@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)

例子:

fun main() = runBlocking {
    (1..5).asFlow()
        .flatMapMerge {
            flow {
                emit(it)
                delay(1000)
                emit("string: $it")
            }
        }.collect { println(it) }
}

输出结果:

1
2
3
4
5
// delay 1000ms
string: 1
string: 2
string: 3
string: 4
string: 5

flatMapContact 由 map、flattenConcat 操作符实现

@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

例子:

fun main() = runBlocking {
    (1..5).asFlow()
        .flatMapConcat {
            flow {
                emit(it)
                delay(1000)
                emit("string: $it")
            }
        }.collect { println(it) }
}

输出结果:

1
// delay 1000ms
string: 1
2
// delay 1000ms
string: 2
3
// delay 1000ms
string: 3
4
// delay 1000ms
string: 4
5
// delay 1000ms
string: 5

flatMapMerge 和 flatMapContact 都是将一个 flow 转换成另一个流。
区别在于: flatMapMerge 不会等待内部的 flow 完成 , 而调用 flatMapConcat 后,collect 函数在收集新值之前会等待 flatMapConcat 内部的 flow 完成。

flatMapMerge 和 flatMapContact 都是将一个 flow 转换成另一个流。
区别在于: flatMapMerge 不会等待内部的 flow 完成 , 而调用 flatMapConcat 后,collect 函数在收集新值之前会等待 flatMapConcat 内部的 flow 完成。

2.8.11 flatMapLatest

当发射了新值之后,上个 flow 就会被取消。

fun main() = runBlocking {
    (1..5).asFlow().onEach { delay(100) }
        .flatMapLatest {
            flow {
                println("begin flatMapLatest $it")
                delay(200)
                emit("string: $it")
                println("end flatMapLatest $it")
            }
        }.collect {
            println(it)
        }
}

输出结果:

begin flatMapLatest 1
begin flatMapLatest 2
begin flatMapLatest 3
begin flatMapLatest 4
begin flatMapLatest 5
end flatMapLatest 5
string: 5

三、 StateFlow 和 SharedFlow

StateFlow 和 SharedFlow 是用来替代 BroadcastChannel 的新的 API。用于上游发射数据,能同时被多个订阅者收集数据。

3.1 StateFlow

官方文档解释:StateFlow 是一个状态容器式可观察数据流,可以向其收集器发出当前状态更新和新状态更新。还可通过其 value 属性读取当前状态值。如需更新状态并将其发送到数据流,请为 MutableStateFlow 类的 value 属性分配一个新值。

在 Android 中,StateFlow 非常适合需要让可变状态保持可观察的类。

StateFlow有两种类型: StateFlow 和 MutableStateFlow :

public interface StateFlow<out T> : SharedFlow<T> {
   public val value: T
}

public interface MutableStateFlow<out T>: StateFlow<T>, MutableSharedFlow<T> {
   public override var value: T
   public fun compareAndSet(expect: T, update: T): Boolean
}

状态由其值表示。任何对值的更新都会反馈新值到所有流的接收器中。

3.1.1 StateFlow 基本使用

使用示例:

class Test {
    private val _state = MutableStateFlow<String>("unKnown")
    val state: StateFlow<String> get() = _state

    fun getApi(scope: CoroutineScope) {
        scope.launch {
            val res = getApi()
            _state.value = res
        }
    }

    private suspend fun getApi() = withContext(Dispatchers.IO) {
        delay(2000) // 模拟耗时请求
        "hello, stateFlow"
    }
}

fun main() = runBlocking<Unit> {
    val test: Test = Test()

    test.getApi(this) // 开始获取结果

    launch(Dispatchers.IO) {
        test.state.collect {
            printWithThreadInfo(it)
        }
    }
    launch(Dispatchers.IO) {
        test.state.collect {
            printWithThreadInfo(it)
        }
    }
}

结果输出如下,并且程序是没有停下来的。

thread id: 14, thread name: DefaultDispatcher-worker-3 ---> unKnown
thread id: 12, thread name: DefaultDispatcher-worker-1 ---> unKnown
// 等待两秒
thread id: 14, thread name: DefaultDispatcher-worker-3 ---> hello, stateFlow
thread id: 12, thread name: DefaultDispatcher-worker-1 ---> hello, stateFlow

StateFlow 的使用方式与 LiveData 类似。
MutableStateFlow 是可变类型的,即可以改变 value 的值。 StateFlow 则是只读的。这与 LiveData、MutableLiveData一样。为了程序的封装性。一般对外暴露不可变的只读变量。

输出结果证明:

  • StateFlow 是发射的数据可以被在不同的协程中,被多个接受者同时收集的。
  • StateFlow 是热流,只要数据发生变化,就会发射数据。

程序没有停下来,因为在 StateFlow 的收集者调用 会挂起当前协程,而且永远不会结束。

collect

StateFlow 与 LiveData 的不同之处:

  • StateFlow 必须有初始值,LiveData 不需要。
  • LiveData 会与 Activity 声明周期绑定,当 View 进入 STOPED 状态时, LiveData.observer() 会自动取消注册,而从 StateFlow 或任意其他数据流收集数据的操作并不会停止。

3.1.2 为什么使用 StateFlow

我们知道 LiveData 有如下特点:

  • 只能在主线程更新数据,即使在子线程通过 postValue()方法,最终也是将值 post 到主线程调用的 setValue()
  • LiveData 是不防抖的
  • LiveData 的 transformation 是在主线程工作
  • LiveData 需要正确处理 “粘性事件” 问题。

鉴于此,使用 StateFlow 可以轻松解决上述场景。

3.1.3 防止任务泄漏

解决办法有两种:

  • 不直接使用 StateFlow 的收集者,使用 asLiveData() 方法将其转换为 LiveData 使用。————为何不直接使用 LiveData, 有毛病?
  • 手动取消 StateFlow 的订阅者的协程,在 Android 中,可以从 Lifecycle.repeatOnLifecycle 块收集数据流。

对应代码如下:

lifecycleSope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        test.state.collect {
            printWithThreadInfo(it)
        }
    }
}

3.1.4 SateFlow 只会发射最新的数据给订阅者。

我们修改上面代码:

class Test {
    private val _state = MutableStateFlow<String>("unKnown")
    val state: StateFlow<String> get() = _state

    fun getApi1(scope: CoroutineScope) {
        scope.launch {
            delay(2000)
            _state.value = "hello,coroutine"
        }
    }

    fun getApi2(scope: CoroutineScope) {
        scope.launch {
            delay(2000)
            _state.value = "hello, kotlin"
        }
    }
}

fun main() = runBlocking<Unit> {
    val test: Test = Test()

    test.getApi1(this) // 开始获取结果
    delay(1000)
    test.getApi2(this) // 开始获取结果

    val job1 = launch(Dispatchers.IO) {
        delay(8000)
        test.state.collect {
            printWithThreadInfo(it)
        }
    }
    val job2 = launch(Dispatchers.IO) {
        delay(8000)
        test.state.collect {
            printWithThreadInfo(it)
        }
    }

    // 避免任务泄漏,手动取消
    delay(10000)
    job1.cancel()
    job2.cancel()
}

现在的场景是,先请求 getApi1(), 一秒之后再次请求 getApi2(), 这样 stateFlow 的值加上初始值,一共被赋值过 3 次。确保,三次赋值都完成后,我们再收集 StateFlow 中的数据。
输出结果如下:

thread id: 13, thread name: DefaultDispatcher-worker-2 ---> hello, kotlin
thread id: 12, thread name: DefaultDispatcher-worker-1 ---> hello, kotlin

结果显示了,StateFlow 只会将最新的数据发射给订阅者。对比 LiveData, LiveData 内部有 version 的概念,对于注册的订阅者,会根据 version 进行判断,将历史数据发送给订阅者。即所谓的“粘性”。我不认为 “粘性” 是 LiveData 的设计缺陷,我认为这是一种特性,有很多场景确实需要用到这种特性。StateFlow 则没有此特性。

那总不能需要用到这种特性的时候,我又使用 LiveData 吧?下面要说的 SharedFlow 就是用来解决此种场景的。

3.2 SharedFlow

如果只是需要管理一系列状态更新(即事件流),而非管理当前状态.则可以使用 共享流。如果对发出的一连串值感兴趣,则这API十分方便。相比 LiveData 的版本控制,SharedFlow 则更灵活、更强大。

SharedFlow

SharedFlow 也有两种类型:SharedFlow 和 MutableSharedFlow:

public interface SharedFlow<out T> : Flow<T> {
   public val replayCache: List<T>
}

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
   suspend fun emit(value: T)
   fun tryEmit(value: T): Boolean
   val subscriptionCount: StateFlow<Int>
   fun resetReplayCache()
}

SharedFlow 是一个流,其中包含可用作原子快照的 。每个新的订阅者会先从replay cache中获取值,然后才收到新发出的值。

replayCache

MutableSharedFlow可用于从挂起或非挂起的上下文中发射值。顾名思义,可以重置 MutableSharedFlow 的 replayCache。而且还将订阅者的数量作为 Flow 暴露出来。

实现自定义的 MutableSharedFlow 可能很麻烦。因此,官方提供了一些使用 SharedFlow 的便捷方法:

public fun <T> MutableSharedFlow(
   replay: Int,   // 当新的订阅者Collect时,发送几个已经发送过的数据给它
   extraBufferCapacity: Int = 0,  // 减去replay,MutableSharedFlow还缓存多少数据
   onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND  // 缓存溢出时的处理策略,三种 丢掉最新值、丢掉最旧值和挂起
): MutableSharedFlow<T>

MutableSharedFlow 的参数解释在上面对应的注释中。

3.2.1 SharedFlow 基本使用

class SharedFlowTest {
    private val _state = MutableSharedFlow<Int>(replay = 3, extraBufferCapacity = 2)
    val state: SharedFlow<Int> get() = _state

    fun getApi(scope: CoroutineScope) {
        scope.launch {
            for (i in 0..20) {
                delay(200)
                _state.emit(i)
                println("send data: $i")
            }
        }
    }
}

fun main() = runBlocking<Unit> {
    val test: SharedFlowTest = SharedFlowTest()

    test.getApi(this) // 开始获取结果

    val job = launch(Dispatchers.IO) {
        delay(3000)
        test.state.collect {
            println("---collect1: $it")
        }
    }
    delay(5000)
    job.cancel()  // 取消任务, 避免泄漏
}

输出结果如下:

send data: 0
send data: 1
send data: 2
send data: 3
send data: 4
send data: 5
send data: 6
send data: 7
send data: 8
send data: 9
send data: 10
send data: 11
send data: 12
send data: 13
---collect1: 11
---collect1: 12
---collect1: 13
send data: 14
---collect1: 14
send data: 15
---collect1: 15
send data: 16
---collect1: 16
send data: 17
---collect1: 17
send data: 18
---collect1: 18
send data: 19
---collect1: 19
send data: 20
---collect1: 20

分析一下该结果:
SharedFlow 每 200ms 发射一次数据,总共发射 21 个数据出来,耗时大约 4s。
SharedFlow 的 replay 设置为 3, extraBufferCapacity 设置为2, 即 SharedFlow 的缓存为 5 。缓存溢出的处理策略是默认挂起的。
订阅者是在 3s 之后开始手机数据的。此时应该已经发射了 14 个数据,即 0-13, SharedFlow 的缓存为 8, 缓存的数据为 9-13, 但是,只给订阅者发送 3 个旧数据,即订阅者收集到的值是从 11 开始的。

3.2.2 MutableSharedFlow 的其它接口

MutableSharedFlow 还具有 属性,其中包含处于活跃状态的收集器的数量,以便相应地优化业务逻辑。
MutableSharedFlow 还包含一个 函数,在不想重放已向数据流发送的最新信息的情况下使用。

subscriptionCount
resetReplayCache

3.3 StateFlow 和 SharedFlow 的使用场景

StateFlow 的命名已经说明了适用场景, StateFlow 只会向订阅者发射最新的值,适用于对状态的监听。
SharedFlow 可以配置对历史发射的数据进行订阅,适合用来处理对于事件的监听。

3.4 将冷流转换为热流

使用 方法可以将 Flow 转换为 SharedFlow。详细介绍见下文。

sharedIn
————————

Kotlin collaboration series article navigation:
Kotlin process I – coroutine
Kotlin collaboration process 2 – channel
Kotlin collaboration process III – data flow
Kotlin collaboration process IV — Application of flow and channel
Kotlin collaboration 5 – using kotlin collaboration in Android

  • 1、 The basic use of flow is 1.1 sequence and flow1 2 simple use of flow 1.3 common ways to create a conventional flow: 1.4 flow is the cancellation of cold flow (inert) 1.5 flow
  • 1.1 Sequence 与 Flow
  • 1.2 simple use of flow
  • 1.3 common methods for creating general flow:
  • 1.4 flow is cold flow (inert)
  • 1.5 cancellation of flow
  • 二、 Flow 的操作符2.1 Terminal flow operators 末端流操作符2.1.1 collect2.1.2 reduce2.1.3 fold2.1.4 launchIn2.2 流是连续的2.3 onStart 流启动时2.4 onCompletion 流完成时2.4.1 使用 try … finally 实现2.4.2 通过 onCompletion 函数实现2.5 Backpressure 背压2.5.1 buffer 缓冲2.5.2 conflate 合并2.6 Flow 异常处理2.6.1 catch 操作符捕获上游异常2.6.2 retry、retryWhen 操作符重试2.7 Flow 线程切换2.7.1 响应线程2.7.2 flowOn 切换线程2.8 Flow 中间转换操作符2.8.1 map2.8.2 transform2.8.3 onEach2.8.4 filter2.8.5 drop / dropWhile2.8.6 take2.8.7 zip2.8.8 combine2.8.9 flattenContact 和 flattenMerge 扁平化处理2.8.10 flatMapMerge 和 flatMapContact2.8.11 flatMapLatest
  • 2.1 Terminal flow operators 末端流操作符2.1.1 collect2.1.2 reduce2.1.3 fold2.1.4 launchIn
  • 2.1.1 collect
  • 2.1.2 reduce
  • 2.1.3 fold
  • 2.1.4 launchIn
  • 2.2 the flow is continuous
  • 2.3 onStart 流启动时
  • 2.4 onCompletion 流完成时2.4.1 使用 try … finally 实现2.4.2 通过 onCompletion 函数实现
  • 2.4.1 使用 try … finally 实现
  • 2.4.2 通过 onCompletion 函数实现
  • 2.5 Backpressure 背压2.5.1 buffer 缓冲2.5.2 conflate 合并
  • 2.5.1 buffer 缓冲
  • 2.5.2 conflate 合并
  • 2.6 flow exception handling 2.6.1 catch operator catches upstream exception 2.6.2 retry and retrywhen operators retry
  • 2.6.1 catch operator catches upstream exception
  • 2.6.2 retry、retryWhen 操作符重试
  • 2.7 flow thread switching 2.7.1 response thread 2.7.2 Flowon switching thread
  • 2.7.1 response thread
  • 2.7.2 Flowon switching threads
  • 2.8 Flow 中间转换操作符2.8.1 map2.8.2 transform2.8.3 onEach2.8.4 filter2.8.5 drop / dropWhile2.8.6 take2.8.7 zip2.8.8 combine2.8.9 flattenContact 和 flattenMerge 扁平化处理2.8.10 flatMapMerge 和 flatMapContact2.8.11 flatMapLatest
  • 2.8.1 map
  • 2.8.2 transform
  • 2.8.3 onEach
  • 2.8.4 filter
  • 2.8.5 drop / dropWhile
  • 2.8.6 take
  • 2.8.7 zip
  • 2.8.8 combine
  • 2.8.9 flattenContact 和 flattenMerge 扁平化处理
  • 2.8.10 flatMapMerge 和 flatMapContact
  • 2.8.11 flatMapLatest
  • 三、 StateFlow 和 SharedFlow3.1 StateFlow3.1.1 StateFlow 基本使用3.1.2 为什么使用 StateFlow3.1.3 防止任务泄漏3.1.4 SateFlow 只会发射最新的数据给订阅者。3.2 SharedFlow3.2.1 SharedFlow 基本使用3.2.2 MutableSharedFlow 的其它接口3.3 StateFlow 和 SharedFlow 的使用场景3.4 将冷流转换为热流
  • 3.1 StateFlow3. 1.1 stateflow basic use 3.1.2 why use stateflow 3 1.3 prevent task leakage 3.1.4 sateflow will only send the latest data to subscribers.
  • 3.1.1 StateFlow 基本使用
  • 3.1.2 为什么使用 StateFlow
  • 3.1.3 prevent task leakage
  • 3.1.4 sateflow will only transmit the latest data to subscribers.
  • 3.2 SharedFlow3.2.1 SharedFlow 基本使用3.2.2 MutableSharedFlow 的其它接口
  • 3.2.1 SharedFlow 基本使用
  • 3.2.2 MutableSharedFlow 的其它接口
  • 3.3 StateFlow 和 SharedFlow 的使用场景
  • 3.4 converting cold flow to heat flow

1、 Basic use of flow

Using the hang function in the kotlin coroutine can implement non blocking execution of tasks and return the results, but only a single calculation result can be returned. However, if you want multiple calculation results to be returned, you can use flow.

1.1 Sequence 与 Flow

Before introducing flow, take a look at the sequence generator:

val intSequence = sequence<Int> {
        Thread.sleep(1000) // 模拟耗时任务1
        yield(1)
        Thread.sleep(1000) // 模拟耗时任务2
        yield(2)
        Thread.sleep(1000) // 模拟耗时任务3
        yield(3)
    }

intSequence.forEach {
        println(it)
    }

As mentioned above, to extract the value from the sequence generator, we need to iterate the sequence generator and return three results in turn according to our expectations.

Sequence is a synchronous call and is blocked. Other pending functions cannot be called. Obviously, we often want to execute multiple tasks asynchronously and return the results in turn. Flow is the optimal solution of the scenario.

The flow source code is as follows. There is only one collect method.

public interface Flow<out T> {

    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

Flow can perform multiple tasks without blocking and return multiple results. Other pending functions can be called in flow. To get the value from flow, you need to call the method. The use form of flow is:

collect
Flow.collect()  // 伪代码

Since collect is a hang function, the collect method must be invoked in the association.

1.2 simple use of flow

Achieve similar effects to the above sequence:

private fun createFlow(): Flow<Int> = flow {
    delay(1000)
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

fun main() = runBlocking {
    createFlow().collect {
        println(it)
    }
}

The above code is used to build a flow type, which has the following characteristics:

 flow{ ... }
  • flow{ … } Suspend function can be called internally;
  • Createflow does not need to be marked with suspend; (why is it not marked as a pending function to call a pending function?)
  • Use the emit () method to transmit data;
  • Use the collect () method to collect the results.

1.3 common methods for creating general flow:

flow{…}

flow {
    delay(1000)
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

flowOf()

flowOf(1,2,3).onEach {
    delay(1000)
}

The builder defines a flow that emits a fixed set of values. Using the build flow does not need to display the call emission data

flowOf()
flowOf
emit()

asFlow()

listOf(1, 2, 3).asFlow().onEach {
    delay(1000)
}

Using the extension function, you can convert various sets and sequences into streams without displaying the call emission data

asFlow()
emit()

1.4 flow is cold flow (inert)

Like sequences, flows are also inert, that is, before calling the end flow operator (collect is one of them), flow {…} The code in does not execute. We call it cold flow.

private fun createFlow(): Flow<Int> = flow {
    println("flow started")
    delay(1000)
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

fun main() = runBlocking {
    val flow = createFlow()
    println("calling collect...")
    flow.collect {
        println(it)
    }
    println("calling collect again...")
    flow.collect {
        println(it)
    }
}

The results are as follows:

calling collect...
flow started
1
2
3
calling collect again...
flow started
1
2
3

This is the reason why a function that returns a flow is not marked. Even if it internally calls the suspend function, calling createflow will return immediately without any waiting. Each time the results are collected, the flow will be started.

createFlow
suspend

Is there any heat flow? The channelflow mentioned later is heat flow. The upstream data will be sent to the downstream collector immediately.

1.5 cancellation of flow

The flow adopts the same assistance cancellation as the collaborative process. The collection of streams can be cancelled when the stream is suspended in a cancelable suspend function (such as delay). To cancel flow, you only need to cancel its collaboration.
The following example shows how the flow cancels and stops executing its code when the code in the withtimeoutornull block is running:

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // 在 250 毫秒后超时
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

Note that in the simple function, the stream emits only two numbers, resulting in the following output:

Emitting 1
1
Emitting 2
2
Done

2、 Flow operator

2.1 Terminal flow operators 末端流操作符

The end operator is a pending function on a stream that starts stream collection. Collect is the most basic end operator, but there are other end operators that are more convenient to use:

  • 转化为各种集合,toList/toSet/toCollection
  • Gets the first value, the last value, and the operator that ensures that the stream emits a single value
  • Reduce and fold the flow to a single value
  • count
  • launchIn/produceIn/broadcastIn

Let’s look at some common end flow operators

2.1.1 collect

Collect data sent upstream

2.1.2 reduce

Reduce is similar to the reduce function in the kotlin set. It can calculate the set. As mentioned earlier, reduce is an end stream operator.

fun main() = runBlocking {
    val sum = (1..5).asFlow().reduce { a, b ->
        a + b
    }
    println(sum)
}

Output results:

15

2.1.3 fold

Fold is also similar to fold in the kotlin set. You need to set an initial value, and fold is also an end flow operator.

fun main() = runBlocking {
    val sum = (1..5).asFlow().fold(100) { a, b ->
        a + b
    }
    println(sum)
}

Output results:

115

2.1.4 launchIn

Launchin is used to start flow within the specified coroutinescope. A parameter needs to be passed in: coroutinescope
Source code:

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}

Example:

private val mDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
fun main() {
    val scope = CoroutineScope(mDispatcher)
    (1..5).asFlow().onEach { println(it) }
        .onCompletion { mDispatcher.close() }
        .launchIn(scope)
}

Output results:

1
2
3
4
5

Take another example:

fun main() = runBlocking{
    val cosTime = measureTimeMillis {
        (1..5).asFlow()
            .onEach { delay(100) }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }

        flowOf("one", "two", "three", "four", "five")
            .onEach { delay(200) }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }
    }
    println("cosTime: $cosTime")
}

We want to execute two flows in parallel. Let’s see the output results:

1
2
3
4
5
one
two
three
four
five
cosTime: 1645

The result is not executed in parallel, which is easy to understand, because the first collection will not go to the second until it is executed.

The correct way to write it should be to set up a separate process for each flow:

fun main() = runBlocking<Unit>{
    launch {
        (1..5).asFlow()
            .onEach { delay(100) }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }
    }
    launch {
        flowOf("one", "two", "three", "four", "five")
            .onEach { delay(200) }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }
    }
}

Or use launchin, which is more elegant:

fun main() = runBlocking<Unit>{

    (1..5).asFlow()
        .onEach { delay(100) }
        .flowOn(Dispatchers.IO)
        .onEach { println(it) }
        .launchIn(this)

    flowOf("one", "two", "three", "four", "five")
        .onEach { delay(200) }
        .flowOn(Dispatchers.IO)
        .onEach { println(it) }
        .launchIn(this)
}

Output results:

1
one
2
3
4
two
5
three
four
five

2.2 the flow is continuous

Similar to sequence, each individual collection of flow is performed sequentially, unless the operator performing special operations uses multiple streams. By default, new processes are not started. Each transition operator from upstream to downstream processes each emitted value and then passes it to the end operator.

fun main() = runBlocking {
    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0
        }
        .map {
            println("Map $it")
            "string $it"
        }.collect {
            println("Collect $it")
        }
}

Output:

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

2.3 onStart 流启动时

Flow starts the callback at the beginning of execution, which can be used for loading during time-consuming operations.

fun main() = runBlocking {
    (1..5).asFlow()
        .onEach { delay(200) }
        .onStart { println("onStart") }
        .collect { println(it) }
}

Output results:

onStart
1
2
3
4
5

2.4 onCompletion 流完成时

When flow is completed (normal or abnormal), if an operation needs to be performed, it can be completed in two ways:

2.4.1 使用 try … finally 实现

fun main() = runBlocking {
    try {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect { println(it) }
    } finally {
        println("Done")
    }
}

2.4.2 通过 onCompletion 函数实现

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.onCompletion { println("Done") }
        .collect { println(it) }
}

Output:

1
2
3
4
5
Done

2.5 Backpressure 背压

Backpressure is one of the functions of responsive programming. Flowable in rxjava2 supports the following backpressure strategies:

  • Missing: the created flowable does not specify a back pressure policy and will not cache or discard the data transmitted through onnext.
  • ERROR:如果放入 Flowable 的异步缓存池中的数据超限了,则会抛出 MissingBackpressureException 异常。
  • BUFFER:Flowable 的异步缓存池同 Observable 的一样,没有固定大小,可以无限制添加数据,不会抛出 MissingBackpressureException 异常,但会导致 OOM。
  • Drop: if the asynchronous cache pool of flowable is full, the data to be put into the cache pool will be lost.
  • Latest: if the cache pool is full, the data to be put into the cache pool will be lost. This is the same as the drop policy, except that the latest policy will force the last piece of data into the cache pool regardless of the state of the cache pool.

In the flow code block, we can receive every processing result, but if the processing result is also a time-consuming operation. It is possible that too much data is sent and the processing is not timely.
The backpressure of flow is implemented through the suspend function.

2.5.1 buffer 缓冲

Buffer corresponds to the buffer policy of rxjava. The buffer operation refers to setting the buffer. Of course, the buffer has a size. If it overflows, there will be different processing strategies.

  • Set the buffer. If it overflows, suspend the current collaboration until the data in the buffer is consumed.
  • Set the buffer. If it overflows, discard the latest data.
  • Set the buffer. If it overflows, discard the oldest data.

The size of the buffer can be set to 0, that is, no buffer is required.

Let’s look at an example where no buffer is set. Suppose that it takes 100ms for each data generated and then transmitted, and 700ms for each data processed. The code is as follows:

fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.collect {
            delay(700)
            println("collect: $it")
        }
    }
    println("cosTime: $cosTime")
}

The results are as follows:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
produce data: 4
collect: 4
produce data: 5
collect: 5
cosTime: 4069

Since the stream is inert and continuous, it takes about 4000 MS to complete the data processing in the whole stream

Next, we use buffer () to set a buffer. buffer(),
Two parameters are received. The first parameter is size, which indicates the size of the buffer. The second parameter is bufferoverflow, which represents the processing policy after buffer overflow. Its value is the following enumeration type. The default is

BufferOverflow.SUSPEND

The source code of the processing strategy is as follows:

public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,

    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,

    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}

< strong > set the buffer and adopt the suspended policy < / strong >

After modifying the code, we set the buffer size to 1:

fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.buffer(2, BufferOverflow.SUSPEND)
            .collect {
                delay(700)
                println("collect: $it")
            }
    }
    println("cosTime: $cosTime")
}

The results are as follows:

produce data: 1
produce data: 2
produce data: 3
produce data: 4
collect: 1
produce data: 5
collect: 2
collect: 3
collect: 4
collect: 5
cosTime: 3713

It can be seen that the overall time is about 3713ms. The buffer operator can make the transmitted and collected code run concurrently, so as to improve efficiency.
The following is a brief analysis of the execution process:
Note that the buffer capacity is calculated from 0.
First, we collect the first data, generate the first data, and then 2, 3 and 4 are stored in the buffer. When the fifth data is transmitted, the buffer is full and will hang. Wait until the first data collection is completed before transmitting the fifth data.

< strong > set the buffer and discard the latest data < / strong >
If the above code processing cache overflow strategy is, the code is as follows:

BufferOverflow.DROP_LATEST
fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.buffer(2, BufferOverflow.DROP_LATEST)
            .collect {
                delay(700)
                println("collect: $it")
            }
    }
    println("cosTime: $cosTime")
}

The output is as follows:

produce data: 1
produce data: 2
produce data: 3
produce data: 4
produce data: 5
collect: 1
collect: 2
collect: 3
cosTime: 2272

You can see that the fourth and fifth data are directly discarded because the buffer is full and will not be collected.

< strong > set buffer and discard old data < / strong >
If the above code processing cache overflow strategy is, the code is as follows:

BufferOverflow.DROP_OLDEST
fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.buffer(2, BufferOverflow.DROP_OLDEST)
            .collect {
                delay(700)
                println("collect: $it")
            }
    }
    println("cosTime: $cosTime")
}

The output results are as follows:

produce data: 1
produce data: 2
produce data: 3
produce data: 4
produce data: 5
collect: 1
collect: 4
collect: 5
cosTime: 2289

It can be seen that when the fourth data enters the buffer, the second data will be discarded, and when the fifth data enters the buffer, the third data will be discarded.

2.5.2 conflate 合并

When the flow represents partial operation results or operation status updates, it may not be necessary to process each value, but only the latest one. The conflate operator can be used to skip intermediate values:

fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.conflate()
            .collect {
                delay(700)
                println("collect: $it")
            }
    }
    println("cosTime: $cosTime")
}

Output results:

produce data: 1
produce data: 2
produce data: 3
produce data: 4
produce data: 5
collect: 1
collect: 5
cosTime: 1596

The operator is to set no buffer, that is, the buffer size is 0, discard the old data, that is, take drop_ The oldest policy is actually equivalent to buffer (0, bufferoverflow. Drop_oldest).

conflate

2.6 flow exception handling

2.6.1 catch operator catches upstream exception

The above-mentioned is used to check whether the flow collection is completed, even if an exception is encountered.

onCompletion
fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.onCompletion {
        println("onCompletion")
    }.collect {
        println("collect: $it")
    }
}

Output:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
onCompletion
Exception in thread "main" java.lang.Exception: test exception
...

In fact, we can judge whether there is an exception in. There is a parameter. If there is an exception in the upstream of the flow, this parameter is not null. If there is no exception in the upstream, it is null. Therefore, we can judge the exception in oncompletion:

onCompletion
onCompletion(action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit)
fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.onCompletion { cause ->
        if (cause != null) {
            println("flow completed exception")
        } else {
            println("onCompletion")
        }
    }.collect {
        println("collect: $it")
    }
}

However, oncompletion intelligently judges whether an exception has occurred and cannot catch an exception.

Operators can be used to catch exceptions.

catch
fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.onCompletion { cause ->
        if (cause != null) {
            println("flow completed exception")
        } else {
            println("onCompletion")
        }
    }.catch { ex ->
        println("catch exception: ${ex.message}")
    }.collect {
        println("collect: $it")
    }
}

Output results:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
flow completed exception
catch exception: test exception

However, if the and are exchanged, the catch operation will not affect the downstream after catching the exception:
code:

onCompletion
catch
fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.catch { ex ->
        println("catch exception: ${ex.message}")
    }.onCompletion { cause ->
        if (cause != null) {
            println("flow completed exception")
        } else {
            println("onCompletion")
        }
    }.collect {
        println("collect: $it")
    }
}

Output results:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
catch exception: test exception
onCompletion

The catch operator is used to handle exceptions transparently. Catch is only an intermediate operator and cannot catch downstream exceptions,.
In the catch operator, throw can be used to throw the exception again, emit () can be used to convert to the emission value, and can be used for printing or other business logic processing

  • The catch operator is used to handle exceptions transparently. Catch is only an intermediate operator and cannot catch downstream exceptions,.
  • In the catch operator, throw can be used to throw the exception again, emit () can be used to convert to the emission value, and can be used for printing or other business logic processing

2.6.2 retry、retryWhen 操作符重试

public fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE,
    predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
    require(retries > 0) { "Expected positive amount of retries, but had $retries" }
    return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
}

If an exception is encountered upstream and the retry operator is used, retry will cause flow to retry the specified number of times at most

fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.retry(2) {
        it.message == "test exception"
    }.catch { ex ->
        println("catch exception: ${ex.message}")
    }.collect {
        println("collect: $it")
    }
}

Output results:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
catch exception: test exception

Note that only when an exception is encountered and the retry method returns true will the retry be performed.

Retrywhen finally calls the retrywhen operator. The following code is consistent with the logic of the above code.

fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.retryWhen { cause, attempt ->
        cause.message == "test exception" && attempt < 2
    }.catch { ex ->
        println("catch exception: ${ex.message}")
    }.collect {
        println("collect: $it")
    }
}

Imagine: what happens if you swap catch and retry / retrywhen in your code?

2.7 flow thread switching

2.7.1 response thread

Flow switches threads based on coroutinecontext. Because collect is a suspend function and must be executed in coroutinescope, the response thread is determined by coroutinecontext. For example, if the main thread always executes collect, the response thread is dispatchers Main。

2.7.2 Flowon switching threads

Rxjava determines the thread transmitting data and observers through subscribeon and observaon. Moreover, multiple upstream calls will only be subject to the last one.
Flows switches threads through the Flowon method. Multiple calls will affect the upstream code. for instance:

subscribeOn
private val mDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()

fun main() = runBlocking {
    (1..5).asFlow().onEach {
        printWithThreadInfo("produce data: $it")
    }.flowOn(Dispatchers.IO)
        .map {
            printWithThreadInfo("$it to String")
            "String: $it"
        }.flowOn(mDispatcher)
        .onCompletion {
            mDispatcher.close()
        }
        .collect {
            printWithThreadInfo("collect: $it")
        }
}

The output results are as follows:

thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 1
thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 2
thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 3
thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 4
thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 5
thread id: 12, thread name: pool-1-thread-1 ---> 1 to String
thread id: 12, thread name: pool-1-thread-1 ---> 2 to String
thread id: 1, thread name: main ---> collect: String: 1
thread id: 12, thread name: pool-1-thread-1 ---> 3 to String
thread id: 1, thread name: main ---> collect: String: 2
thread id: 12, thread name: pool-1-thread-1 ---> 4 to String
thread id: 1, thread name: main ---> collect: String: 3
thread id: 12, thread name: pool-1-thread-1 ---> 5 to String
thread id: 1, thread name: main ---> collect: String: 4
thread id: 1, thread name: main ---> collect: String: 5

As you can see, the transmission data is in dispatchers The map operation is performed by the IO thread in our custom thread pool, and the collect operation is performed in dispatchers Main thread.

2.8 flow intermediate conversion operator

2.8.1 map

The previous example has been used for the map operator. The map operator can not only be used for flow, but also for list, which means that each element in the list is converted into a new element and added to a new list. Finally, let’s talk about the return of the new list,
The map operator is used for the flow representation to convert each element in the flow and then emit it.

fun main() = runBlocking {
    (1..5).asFlow().map { "string: $it" }
        .collect {
            println(it)
        }
}

Output:

string: 1
string: 2
string: 3
string: 4
string: 5

2.8.2 transform

When using the transform operator, you can call emit any number of times, which is the biggest difference between transform and map:

fun main() = runBlocking {
    (1..5).asFlow().transform {
        emit(it * 2)
        delay(100)
        emit("String: $it")
    }.collect {
            println(it)
        }
}

Output results:

2
String: 1
4
String: 2
6
String: 3
8
String: 4
10
String: 5

2.8.3 onEach

ergodic

fun main() = runBlocking {
    (1..5).asFlow()
        .onEach { println("onEach: $it") }
        .collect { println(it) }
}

Output:

onEach: 1
1
onEach: 2
2
onEach: 3
3
onEach: 4
4
onEach: 5
5

2.8.4 filter

Filter by criteria

fun main() = runBlocking {
    (1..5).asFlow()
        .filter { it % 2 == 0 }
        .collect { println(it) }
}

Output results:

2
4

2.8.5 drop / dropWhile

Drop filters out the first few elements
Dropwhile filters elements that meet the criteria

2.8.6 take

The take operator takes only the values emitted by the first few emits

fun main() = runBlocking {
    (1..5).asFlow().take(2).collect {
            println(it)
        }
}

Output:

1
2

2.8.7 zip

Zip is an operator that can merge two flows

fun main() = runBlocking {
    val flowA = (1..6).asFlow()
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200)
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect {
            println(it)
        }
}

Output results:

1 and one
2 and two
3 and three
4 and four
5 and five

The zip operator will merge an item in flowa with a corresponding item in flowb. Even if each item in flowb uses the delay () function, it will wait for delay () to execute before merging.

If the number of items in flowa and flowb is inconsistent, the number of new flow items after merging is equal to the smaller number of items

fun main() = runBlocking {
    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three","four","five", "six", "seven").onEach { delay(200) }
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect {
            println(it)
        }
}

Output results:

1 and one
2 and two
3 and three
4 and four
5 and five

2.8.8 combine

Combine is also a merge, but it is different from zip.

When merging with combine, each time a new item is issued from flowa, it will be merged with the latest item of flowb.

fun main() = runBlocking {
    val flowA = (1..5).asFlow().onEach { delay(100) }
    val flowB = flowOf("one", "two", "three","four","five", "six", "seven").onEach { delay(200) }
    flowA.combine(flowB) { a, b -> "$a and $b" }
        .collect {
            println(it)
        }
}

Output results:

1 and one
2 and one
3 and one
3 and two
4 and two
5 and two
5 and three
5 and four
5 and five
5 and six
5 and seven

2.8.9 flattenContact 和 flattenMerge 扁平化处理

flattenContact
Flattenconcat flattens a given flow into a single flow in order without interleaving nested flows.
Source code:

@FlowPreview
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
    collect { value -> emitAll(value) }
}

example:

fun main() = runBlocking {
    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(1000) }

    flowOf(flowA,flowB)
        .flattenConcat()
        .collect{ println(it) }
}

Output:

1
2
3
4
5
// delay 1000ms
one
// delay 1000ms
two
// delay 1000ms
three
// delay 1000ms
four
// delay 1000ms
five

flattenMerge
Fattenmerge has a parameter, concurrency limit, and the default bit is 16.
Source code:

@FlowPreview
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
    require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
    return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}

It can be seen that the parameter must be greater than 0, and when the parameter is 1, it is consistent with flattenconcat.

fun main() = runBlocking {
    val flowA = (1..5).asFlow().onEach { delay(100) }
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }

    flowOf(flowA,flowB)
        .flattenMerge(2)
        .collect{ println(it) }
}

Output results:

1
one
2
3
two
4
5
three
four
five

2.8.10 flatMapMerge 和 flatMapContact

flatMapMerge 由 map、flattenMerge 操作符实现

@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)

example:

fun main() = runBlocking {
    (1..5).asFlow()
        .flatMapMerge {
            flow {
                emit(it)
                delay(1000)
                emit("string: $it")
            }
        }.collect { println(it) }
}

Output results:

1
2
3
4
5
// delay 1000ms
string: 1
string: 2
string: 3
string: 4
string: 5

flatMapContact 由 map、flattenConcat 操作符实现

@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

example:

fun main() = runBlocking {
    (1..5).asFlow()
        .flatMapConcat {
            flow {
                emit(it)
                delay(1000)
                emit("string: $it")
            }
        }.collect { println(it) }
}

Output results:

1
// delay 1000ms
string: 1
2
// delay 1000ms
string: 2
3
// delay 1000ms
string: 3
4
// delay 1000ms
string: 4
5
// delay 1000ms
string: 5

Both flatmapmerge and flatmapcontact convert one flow into another.
The difference is that flatmapmerge does not wait for the internal flow to complete. After calling flatmapconcat, the collect function will wait for the internal flow to complete before collecting new values.

Both flatmapmerge and flatmapcontact convert one flow into another.
The difference is that flatmapmerge does not wait for the internal flow to complete. After calling flatmapconcat, the collect function will wait for the internal flow to complete before collecting new values.

2.8.11 flatMapLatest

When a new value is emitted, the previous flow will be cancelled.

fun main() = runBlocking {
    (1..5).asFlow().onEach { delay(100) }
        .flatMapLatest {
            flow {
                println("begin flatMapLatest $it")
                delay(200)
                emit("string: $it")
                println("end flatMapLatest $it")
            }
        }.collect {
            println(it)
        }
}

Output results:

begin flatMapLatest 1
begin flatMapLatest 2
begin flatMapLatest 3
begin flatMapLatest 4
begin flatMapLatest 5
end flatMapLatest 5
string: 5

三、 StateFlow 和 SharedFlow

Stateflow and sharedflow are new APIs to replace broadcastchannel. It is used for upstream transmission data and can be collected by multiple subscribers at the same time.

3.1 StateFlow

The official document explains that stateflow is a state container observable data flow, which can send current state updates and new state updates to its collector. You can also read the current state value through its value attribute. To update the state and send it to the data flow, assign a new value to the value property of the mutablestateflow class.

In Android, stateflow is ideal for classes that need to keep variable states observable.

StateFlow有两种类型: StateFlow 和 MutableStateFlow :

public interface StateFlow<out T> : SharedFlow<T> {
   public val value: T
}

public interface MutableStateFlow<out T>: StateFlow<T>, MutableSharedFlow<T> {
   public override var value: T
   public fun compareAndSet(expect: T, update: T): Boolean
}

Status is represented by its value. Any update to the value will feed back the new value to the receivers of all streams.

3.1.1 StateFlow 基本使用

Use example:

class Test {
    private val _state = MutableStateFlow<String>("unKnown")
    val state: StateFlow<String> get() = _state

    fun getApi(scope: CoroutineScope) {
        scope.launch {
            val res = getApi()
            _state.value = res
        }
    }

    private suspend fun getApi() = withContext(Dispatchers.IO) {
        delay(2000) // 模拟耗时请求
        "hello, stateFlow"
    }
}

fun main() = runBlocking<Unit> {
    val test: Test = Test()

    test.getApi(this) // 开始获取结果

    launch(Dispatchers.IO) {
        test.state.collect {
            printWithThreadInfo(it)
        }
    }
    launch(Dispatchers.IO) {
        test.state.collect {
            printWithThreadInfo(it)
        }
    }
}

The result output is as follows, and the program does not stop.

thread id: 14, thread name: DefaultDispatcher-worker-3 ---> unKnown
thread id: 12, thread name: DefaultDispatcher-worker-1 ---> unKnown
// 等待两秒
thread id: 14, thread name: DefaultDispatcher-worker-3 ---> hello, stateFlow
thread id: 12, thread name: DefaultDispatcher-worker-1 ---> hello, stateFlow

Stateflow is used in a similar way to livedata.
Mutablestateflow is of variable type, that is, the value of value can be changed. Stateflow is read-only. This is the same as livedata and mutablelivedata. For the encapsulation of the program. Generally, immutable read-only variables are exposed.

The output results prove that:

  • Stateflow is that the transmitted data can be collected simultaneously by multiple recipients in different processes.
  • Stateflow is heat flow. As long as the data changes, it will emit data.

The program does not stop because the collector call in stateflow will suspend the current coroutine and will never end.

collect

Stateflow differs from livedata in that:

  • StateFlow 必须有初始值,LiveData 不需要。
  • LiveData 会与 Activity 声明周期绑定,当 View 进入 STOPED 状态时, LiveData.observer() 会自动取消注册,而从 StateFlow 或任意其他数据流收集数据的操作并不会停止。

3.1.2 为什么使用 StateFlow

We know that livedata has the following characteristics:

  • The data can only be updated in the main thread. Even if the child thread passes the postvalue() method, it will eventually post the value to the setvalue() called by the main thread
  • LiveData 是不防抖的
  • LiveData 的 transformation 是在主线程工作
  • Livedata needs to properly handle “sticky events”.

In view of this, the above scenario can be easily solved by using stateflow.

3.1.3 prevent task leakage

There are two solutions:

  • Instead of directly using the collector of stateflow, use the aslivedata () method to convert it to livedata———— Why not use livedata directly? What’s wrong?
  • 手动取消 StateFlow 的订阅者的协程,在 Android 中,可以从 Lifecycle.repeatOnLifecycle 块收集数据流。

The corresponding codes are as follows:

lifecycleSope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        test.state.collect {
            printWithThreadInfo(it)
        }
    }
}

3.1.4 sateflow will only transmit the latest data to subscribers.

We modify the above code:

class Test {
    private val _state = MutableStateFlow<String>("unKnown")
    val state: StateFlow<String> get() = _state

    fun getApi1(scope: CoroutineScope) {
        scope.launch {
            delay(2000)
            _state.value = "hello,coroutine"
        }
    }

    fun getApi2(scope: CoroutineScope) {
        scope.launch {
            delay(2000)
            _state.value = "hello, kotlin"
        }
    }
}

fun main() = runBlocking<Unit> {
    val test: Test = Test()

    test.getApi1(this) // 开始获取结果
    delay(1000)
    test.getApi2(this) // 开始获取结果

    val job1 = launch(Dispatchers.IO) {
        delay(8000)
        test.state.collect {
            printWithThreadInfo(it)
        }
    }
    val job2 = launch(Dispatchers.IO) {
        delay(8000)
        test.state.collect {
            printWithThreadInfo(it)
        }
    }

    // 避免任务泄漏,手动取消
    delay(10000)
    job1.cancel()
    job2.cancel()
}

The current scenario is to first request getapi1(), and then request getapi2(), so that the value of stateflow plus the initial value has been assigned three times. Ensure that after all three assignments are completed, we can collect the data in stateflow.
The output results are as follows:

thread id: 13, thread name: DefaultDispatcher-worker-2 ---> hello, kotlin
thread id: 12, thread name: DefaultDispatcher-worker-1 ---> hello, kotlin

The results show that stateflow will only send the latest data to subscribers. Compared with livedata, livedata has the concept of version. For registered subscribers, they will judge according to the version and send the historical data to the subscribers. The so-called “Stickiness”. I don’t think “Stickiness” is a design defect of livedata. I think it is a feature that is really needed in many scenarios. Stateflow does not have this attribute.

Then I can’t use this feature. Do I use livedata again? The sharedflow described below is used to solve this scenario.

3.2 SharedFlow

If you only need to manage a series of status updates (i.e. event flow), rather than managing the current status You can use shared streams. This API is handy if you are interested in issuing a series of values. Compared with the version control of livedata, sharedflow is more flexible and powerful.

SharedFlow

SharedFlow 也有两种类型:SharedFlow 和 MutableSharedFlow:

public interface SharedFlow<out T> : Flow<T> {
   public val replayCache: List<T>
}

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
   suspend fun emit(value: T)
   fun tryEmit(value: T): Boolean
   val subscriptionCount: StateFlow<Int>
   fun resetReplayCache()
}

Sharedflow is a stream that contains information that can be used as an atomic snapshot. Each new subscriber will get the value from the replay cache before receiving the newly issued value.

replayCache

Mutablesharedflow can be used to emit values from pending or non pending contexts. As the name suggests, the replaycache of mutablesharedflow can be reset. The number of subscribers is also exposed as flow.

Implementing a custom mutablesharedflow can be cumbersome. Therefore, the government provides some convenient ways to use sharedflow:

public fun <T> MutableSharedFlow(
   replay: Int,   // 当新的订阅者Collect时,发送几个已经发送过的数据给它
   extraBufferCapacity: Int = 0,  // 减去replay,MutableSharedFlow还缓存多少数据
   onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND  // 缓存溢出时的处理策略,三种 丢掉最新值、丢掉最旧值和挂起
): MutableSharedFlow<T>

The parameters of mutablesharedflow are explained in the corresponding comments above.

3.2.1 SharedFlow 基本使用

class SharedFlowTest {
    private val _state = MutableSharedFlow<Int>(replay = 3, extraBufferCapacity = 2)
    val state: SharedFlow<Int> get() = _state

    fun getApi(scope: CoroutineScope) {
        scope.launch {
            for (i in 0..20) {
                delay(200)
                _state.emit(i)
                println("send data: $i")
            }
        }
    }
}

fun main() = runBlocking<Unit> {
    val test: SharedFlowTest = SharedFlowTest()

    test.getApi(this) // 开始获取结果

    val job = launch(Dispatchers.IO) {
        delay(3000)
        test.state.collect {
            println("---collect1: $it")
        }
    }
    delay(5000)
    job.cancel()  // 取消任务, 避免泄漏
}

The output results are as follows:

send data: 0
send data: 1
send data: 2
send data: 3
send data: 4
send data: 5
send data: 6
send data: 7
send data: 8
send data: 9
send data: 10
send data: 11
send data: 12
send data: 13
---collect1: 11
---collect1: 12
---collect1: 13
send data: 14
---collect1: 14
send data: 15
---collect1: 15
send data: 16
---collect1: 16
send data: 17
---collect1: 17
send data: 18
---collect1: 18
send data: 19
---collect1: 19
send data: 20
---collect1: 20

The analysis results are as follows:
Sharedflow transmits data every 200ms, and a total of 21 data are transmitted, taking about 4S.
The replay of sharedflow is set to 3 and the extrabuffercapacity is set to 2, that is, the cache of sharedflow is 5. The processing policy of cache overflow is suspended by default.
The subscriber starts the mobile data after 3S. At this time, 14 pieces of data should have been transmitted, that is, 0-13, the cache of sharedflow is 8, and the cached data is 9-13. However, only 3 old data are sent to the subscriber, that is, the value collected by the subscriber starts from 11.

3.2.2 MutableSharedFlow 的其它接口

Mutablesharedflow also has properties that contain the number of collectors active to optimize the business logic accordingly.
Mutablesharedflow also includes a function to be used without replaying the latest information sent to the data stream.

subscriptionCount
resetReplayCache

3.3 StateFlow 和 SharedFlow 的使用场景

The naming of stateflow has explained the applicable scenarios. Stateflow will only transmit the latest values to subscribers, which is suitable for monitoring states.
Sharedflow can be configured to subscribe to historical transmitted data, which is suitable for listening to events.

3.4 converting cold flow to heat flow

Use this method to convert flow to sharedflow. See below for details.

sharedIn