1
2
3
4
5
6
7
8
9
10
11
12
13
fun main() = runBlocking {
channelFlow<Int> {
withContext(Dispatchers.IO) {
send(1)// send而不是emit
}
withContext(Dispatchers.Default) {
send(2)
}
}.map { it * 2 }
.collect {
println(it)
}
}

使用channelFlow可以在不同的上下文中发射值。

flow{} 构建器则不行,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
fun main() = runBlocking {
flow<Int> {
withContext(CoroutineName("withContext")) {
emit(1)// 在另外的上下文发射值
}
}.map { it * 2 }
.collect {
println(it)
}
}

报错:Flow invariant is violated

channelFlow

解释

创建一个冷流,它允许element在不同的上下文中并发地产生.

The resulting flow completes as soon as the code in the block and all its children completes. Use awaitClose as the last statement to keep it running.

背压措施:A channel with the default buffer size is used. Use the buffer operator on the resulting flow to specify a user-defined value and to control what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.

awaitClose

显式地,不让channelFlowblock块结束。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// example1 当the code in the block and all its children completes,channelFlowblock块退出
fun main() = runBlocking {
channelFlow<Int> {
// block
launch(Dispatchers.Default) {
send(99)
delay(3000)
send(99)
}
launch(Dispatchers.IO) {
repeat(5) {
send(it)
}
}
}.map { it * 2 }
.collect {
println(it)
}
}

// example2 保持channelFlow的block块作用域运行,除非被close或cancel
fun main() = runBlocking {
channelFlow<Int> {
launch(Dispatchers.Default) {
send(99)
delay(3000)
send(99)
close() // this@channelFlow.cancel()
}
launch(Dispatchers.IO) {
repeat(5) {
send(it)
}
}
awaitClose {
println("produce scope 结束")
}
}.map { it * 2 }
.collect {
println(it)
}

}

为什么flow内不能切换上下文

并不是不能切换上下文,只是emit不能在另外的上下文,下面这样的代码还是可以的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun main() = runBlocking {
flow<Int> {
val result = async(Dispatchers.IO) {
Thread.sleep(5000)// 一些CPU计算
3
}
emit(1)
delay(2000)
emit(result.await())
emit(2)

}.map { it * 2 }
.collect {
println(it)
}
}

那本文开头的代码为什么不行,据这篇文章所说,是因为如果那样做,collect的代码块可能在错误的上下文去运行,然后需要写一些样板代码去确保 collect 的代码能在正确的上下文去运行。没太看懂老实说。

最后建议使用 flowOn 操作符去进行上游的上下文的切换。

callbackFlow

callbackFlow 除了强制使用awaitClose之外,并没有和channelFlow什么不同。callbackFlow的设计是为了桥接一些旧的API,那些api设计成传callback.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
val callback = object : Callback { // Implementation of some callback interface
override fun onNextValue(value: T) {
// To avoid blocking you can configure channel capacity using
// either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
trySendBlocking(value)
.onFailure { throwable ->
// Downstream has been cancelled or failed, can log here
}
}
override fun onApiError(cause: Throwable) {
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() = channel.close()
}
api.register(callback)
/*
* Suspends until either 'onCompleted'/'onApiError' from the callback is invoked
* or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
* In both cases, callback will be properly unregistered.
*/
awaitClose { api.unregister(callback) }
}

强制使用 awaitClose 是为了让你使用callback api的时候减少bug的发生机会,因为你必须在 awaitClose 处做点什么,这样可以提醒自己。比如说:网络请求的取消,资源的关闭等等。看具体的情况来和 callback api 互动。

因为在 callback的回调之中不能使用挂起函数,要发送值就不能用 send 了,可以用 trySend / trySendBlocking, trySend是立即返回,建议使用trySendBlocking,可以为返回的 ChannelResult 设置回调