1. Channel的引入及简单使用
初级版协程间通信
先来看一个简单的通信Demo:
fun testChannel() {
//协程1
var deferred= GlobalScope.async {
//假装在加工数据
Thread.sleep(2000)
"Hello fishforest"
}
//协程2
GlobalScope.launch {
var result = deferred.await()
println("get result from coroutine1: $result")
}
}
如上,协程2拿到了协程1的值,这就是一次简单的协程间通信过程。 现在需求变了,协程1一直在生产数据,协程2也需要不断地从中取数据,此时靠async/await 配合无能为力了。当然,我们很容易想到的方案是:
共享一个变量,这个变量可以是个队列。
于是Demo改造如下:
fun testChannel2() {
//阻塞队列
var queue = ArrayBlockingQueue<String>(5)
//协程1
GlobalScope.launch {
var count = 0
while (true) {
//假装在加工数据
Thread.sleep(1000)
queue.put("fish ${
count++}")
}
}
//协程2
GlobalScope.launch {
while (true) {
Thread.sleep(1000)
println("get result from coroutine1:${
queue.take()}")
}
}
}
通过阻塞队列,当协程2取数据时,如果队列是空,那么等待协程1往队列里放数据;当协程1放数据时,如果队列满了,那么等待协程2从队列里取出数据。如此,就是简单的协程通信。 看似美好,实际上此处有个很大的漏洞:
队列满/队列空 时,此时等待动作阻塞的是线程,而我们知道协程的挂起并不阻塞线程,因此此种方式并没有利用到协程的优势。
我们期望协程发现队列满/空时将自己挂起等待,此时就引入了Channel。
升级版协程间通信-Channel
同样的需求,我们用Channel 实现:
fun testChannel3() {
//定义Channel
var channel = Channel<String>()
//协程1
GlobalScope.launch {
var count = 0
while (true) {
//假装在加工数据
Thread.sleep(1000)
var sendStr = "fish ${
count++}"
println("send $sendStr")
channel.send("$sendStr")
}
}
//协程2
GlobalScope.launch {
while (true) {
Thread.sleep(1000)
println("receive:${
channel.receive()}")
}
}
}
与之前的实现方案相比,仅仅只是将队列换成了Channel,可以看出,Channel 和队列比较类似,而Channel的send/recevie 函数并没有阻塞线程,仅仅只是挂起了协程。 查看打印结果: 你可能发现了端倪:发送者和接收者是成对出现的,难道Channel的内部实现不是队列? 要想解开这个谜题,最好的方法是从源码入手深究其原理。
2. Channel的原理
Channel的构造
先从Channel 构造开始:
#Channel.kt
public fun <E> Channel(
//Channel 容量/叫做Channel类型更合理一些
capacity: Int = Channel.RENDEZVOUS,
//缓冲区满后,发送端的处理方式
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
//信息没有传递出去时的回调
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
//默认是约会模式
Channel.RENDEZVOUS -> {
//默认挂起
if (onBufferOverflow == BufferOverflow.SUSPEND)
RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
else
ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
}
//...
}
此处的Channel() 并不是构造函数,而是顶层函数,Kotlin里有很多伪装为构造函数的顶层函数。该顶层函数默认构造并返回RendezvousChannel类型的Channel。 RendezvousChannel 类本身很简单,就重写了一些属性,它继承自AbstractChannel。
重点在AbstractChannel/AbstractSendChannel及其子类里。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/292342.html