以细粒度限制线程

限制线程 是解决共享可变状态问题的一种方案:对特定共享状态的所有访问权都限制在单个线程中。它通常应用于 UI 程序中:所有 UI 状态都局限于单个事件分发线程或应用主线程中。这在协程中很容易实现,通过使用一个单线程上下文:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}

//sampleStart
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 将每次自增限制在单线程上下文中
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
//sampleEnd

这段代码运行非常缓慢,因为它进行了 细粒度 的线程限制。每个增量操作都得使用 withContext(counterContext) 块从多线程 Dispatchers.Default 上下文切换到单线程上下文。

以粗粒度限制线程

在实践中,线程限制是在大段代码中执行的,例如:状态更新类业务逻辑中大部分都是限于单线程中。下面的示例演示了这种情况, 在单线程上下文中运行每个协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}

//sampleStart
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
// 将一切都限制在单线程上下文中
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
//sampleEnd

这段代码运行更快而且打印出了正确的结果。

互斥

该问题的互斥解决方案:使用永远不会同时执行关键代码块 来保护共享状态的所有修改。在阻塞的世界中,你通常会为此目的使用 synchronized 或者 ReentrantLock。 在协程中的替代品叫做 Mutex 。它具有 lockunlock 方法, 可以隔离关键的部分。关键的区别在于 Mutex.lock() 是一个挂起函数,它不会阻塞线程。(但会挂起协程)

还有 withLock 扩展函数,可以方便的替代常用的 mutex.lock(); try { …… } finally { mutex.unlock() } 模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}

//sampleStart
val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 用锁保护每次自增
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
//sampleEnd

此示例中锁是细粒度的,因此会付出一些代价。但是对于某些必须定期修改共享状态的场景,它是一个不错的选择,但是没有自然线程可以限制此状态。