Buffer操作符

之前在旧博客用模拟餐厅上菜的例子讲过这个操作符:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun main() = runBlocking<Unit> {
flow<String> {
println("上菜——鸡肉")
emit("鸡肉")
delay(1000)
println("上菜——鱼肉")
emit("鱼肉")
delay(1000)
println("上菜——西瓜")
emit("西瓜")
}.onEach {
println("运送$it")
}.collect {
println("客人开始吃$it")
delay(5000)
println("客人吃完$it")
}
}

输出结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
上菜——鸡肉
运送鸡肉
客人开始吃鸡肉
客人吃完鸡肉
上菜——鱼肉
运送鱼肉
客人开始吃鱼肉
客人吃完鱼肉
上菜——西瓜
运送西瓜
客人开始吃西瓜
客人吃完西瓜

Process finished with exit code 0

因为emit会挂起等collect执行完再resume,所以下一个菜要等客人吃完才上,那可不可以等客人一边吃就一边上菜呢?即要实现:collect不会令emit挂起,并保证emit的值按顺序到达,collect也对应的不取消(collectLatest就会取消),也按顺序对应执行。

用buffer可以解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun main() = runBlocking<Unit> {
flow<String> {
println("上菜——鸡肉")
emit("鸡肉")
delay(1000)
println("上菜——鱼肉")
emit("鱼肉")
delay(1000)
println("上菜——西瓜")
emit("西瓜")
}.onEach {
println("运送$it")
}.buffer().collect {
println("客人开始吃$it")
delay(5000)
println("客人吃完$it")
}
}

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
上菜——鸡肉
运送鸡肉
客人开始吃鸡肉
上菜——鱼肉
运送鱼肉
上菜——西瓜
运送西瓜
客人吃完鸡肉
客人开始吃鱼肉
客人吃完鱼肉
客人开始吃西瓜
客人吃完西瓜

由于有缓冲,上游和下游可以 并发 地执行。

实现缓冲的另外一个方法

其实把 buffer() 改为 flowOn(other Dispatcher) (other Dispatcher指的是不同于 collect 所运行的上下文的调度器),也可以达到缓冲的效果。文档是这样说的:

This operator retains a sequential nature of flow if changing the context does not call for changing the dispatcher.

Otherwise, if changing dispatcher is required, it collects flow emissions in one coroutine that is run using a specified context and emits them from another coroutines with the original collector’s context using a channel with a default buffer size between two coroutines similarly to buffer operator, unless buffer operator is explicitly called before or after flowOn, which requests buffering behavior and specifies channel size.

要就是说,要是 flowOn()中指定的调度器和收集的调度器不同,这样 emitcollect 就会运行在两个不同的协程中,然后有一个带有缓冲的channel把它们连接起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 可以缓冲
fun main() = runBlocking(Dispatchers.Default){
flow<String> {
println("上菜——鸡肉")
emit("鸡肉")
delay(1000)
println("上菜——鱼肉")
emit("鱼肉")
delay(1000)
println("上菜——西瓜")
emit("西瓜")
}.onEach {
println("运送$it")
}.flowOn(Dispatchers.IO).collect {
println("客人开始吃$it")
delay(5000)
println("客人吃完$it")
}
}

// 不可以缓冲
fun main() = runBlocking(Dispatchers.Default){
flow<String> {
println("上菜——鸡肉")
emit("鸡肉")
delay(1000)
println("上菜——鱼肉")
emit("鱼肉")
delay(1000)
println("上菜——西瓜")
emit("西瓜")
}.onEach {
println("运送$it")
}.flowOn(Dispatchers.Default).collect {
println("客人开始吃$it")
delay(5000)
println("客人吃完$it")
}
}

// 不可以缓冲
fun main() = runBlocking(Dispatchers.Default){
flow<String> {
println("上菜——鸡肉")
emit("鸡肉")
delay(1000)
println("上菜——鱼肉")
emit("鱼肉")
delay(1000)
println("上菜——西瓜")
emit("西瓜")
}.onEach {
println("运送$it")
}.flowOn(CoroutineName("没有改变调度器")).collect {
println("客人开始吃$it")
delay(5000)
println("客人吃完$it")
}
}

Buffer的参数

有两个参数,分别是 缓冲区大小溢出时的行为,都很简单,看注释即可。

大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Requests a channel with an unlimited capacity buffer in the `Channel(...)` factory function.
*/
public const val UNLIMITED: Int = Int.MAX_VALUE 没有限制

/**
* Requests a rendezvous channel in the `Channel(...)` factory function &mdash; a channel that does not have a buffer.
*/
public const val RENDEZVOUS: Int = 0 没有缓冲,缓冲区为0

/**
* Requests a conflated channel in the `Channel(...)` factory function. This is a shortcut to creating
* a channel with [`onBufferOverflow = DROP_OLDEST`][BufferOverflow.DROP_OLDEST].
*/
public const val CONFLATED: Int = -1 缓冲区为1

/**
* Requests a buffered channel with the default buffer capacity in the `Channel(...)` factory function.
* The default capacity for a channel that [suspends][BufferOverflow.SUSPEND] on overflow
* is 64 and can be overridden by setting [DEFAULT_BUFFER_PROPERTY_NAME] on JVM.
* For non-suspending channels, a buffer of capacity 1 is used.
*/
public const val BUFFERED: Int = -2 默认64

也可以是自然数。

要注意缓冲区为0不等于没有buffer修饰符的情况。当溢出策略为 suspend 的情况下,前者判断collect已消耗元素的标志是collect块开始处理,后者是collect块结束。一个buffer为0的案例