以细粒度限制线程
限制线程 是解决共享可变状态问题的一种方案:对特定共享状态的所有访问权都限制在单个线程中。它通常应用于 UI 程序中:所有 UI 状态都局限于单个事件分发线程或应用主线程中。这在协程中很容易实现,通过使用一个单线程上下文:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import kotlinx.coroutines.* import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 val k = 1000 val time = measureTimeMillis { coroutineScope { repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") }
val counterContext = newSingleThreadContext("CounterContext") var counter = 0
fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { withContext(counterContext) { counter++ } } } println("Counter = $counter") }
|
这段代码运行非常缓慢,因为它进行了 细粒度 的线程限制。每个增量操作都得使用 withContext(counterContext)
块从多线程 Dispatchers.Default 上下文切换到单线程上下文。
以粗粒度限制线程
在实践中,线程限制是在大段代码中执行的,例如:状态更新类业务逻辑中大部分都是限于单线程中。下面的示例演示了这种情况, 在单线程上下文中运行每个协程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import kotlinx.coroutines.* import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 val k = 1000 val time = measureTimeMillis { coroutineScope { repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") }
val counterContext = newSingleThreadContext("CounterContext") var counter = 0
fun main() = runBlocking { withContext(counterContext) { massiveRun { counter++ } } println("Counter = $counter") }
|
这段代码运行更快而且打印出了正确的结果。
互斥
该问题的互斥解决方案:使用永远不会同时执行的 关键代码块 来保护共享状态的所有修改。在阻塞的世界中,你通常会为此目的使用 synchronized
或者 ReentrantLock
。 在协程中的替代品叫做 Mutex 。它具有 lock 和 unlock 方法, 可以隔离关键的部分。关键的区别在于 Mutex.lock()
是一个挂起函数,它不会阻塞线程。(但会挂起协程)
还有 withLock 扩展函数,可以方便的替代常用的 mutex.lock(); try { …… } finally { mutex.unlock() }
模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import kotlinx.coroutines.* import kotlinx.coroutines.sync.* import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 val k = 1000 val time = measureTimeMillis { coroutineScope { repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") }
val mutex = Mutex() var counter = 0
fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { mutex.withLock { counter++ } } } println("Counter = $counter") }
|
此示例中锁是细粒度的,因此会付出一些代价。但是对于某些必须定期修改共享状态的场景,它是一个不错的选择,但是没有自然线程可以限制此状态。