继续来,同我一起撸Kotlin Channel 深水区


1. Channel的引入及简单使用

初级版协程间通信

先来看一个简单的通信Demo:

fun testChannel() {
          
   
        //协程1
        var deferred= GlobalScope.async {
          
   
            //假装在加工数据
            Thread.sleep(2000)
            "Hello fishforest"
        }
        //协程2
        GlobalScope.launch {
          
   
            var result = deferred.await()
            println("get result from coroutine1: $result")
        }
    }

如上,协程2拿到了协程1的值,这就是一次简单的协程间通信过程。 现在需求变了,协程1一直在生产数据,协程2也需要不断地从中取数据,此时靠async/await 配合无能为力了。当然,我们很容易想到的方案是:

共享一个变量,这个变量可以是个队列。

于是Demo改造如下:

fun testChannel2() {
          
   
        //阻塞队列
        var queue = ArrayBlockingQueue<String>(5)
        //协程1
        GlobalScope.launch {
          
   
            var count = 0
            while (true) {
          
   
                //假装在加工数据
                Thread.sleep(1000)
                queue.put("fish ${
            
     count++}")
            }
        }

        //协程2
        GlobalScope.launch {
          
   
            while (true) {
          
   
                Thread.sleep(1000)
                println("get result from coroutine1:${
            
     queue.take()}")
            }
        }
    }

通过阻塞队列,当协程2取数据时,如果队列是空,那么等待协程1往队列里放数据;当协程1放数据时,如果队列满了,那么等待协程2从队列里取出数据。如此,就是简单的协程通信。 看似美好,实际上此处有个很大的漏洞:

队列满/队列空 时,此时等待动作阻塞的是线程,而我们知道协程的挂起并不阻塞线程,因此此种方式并没有利用到协程的优势。

我们期望协程发现队列满/空时将自己挂起等待,此时就引入了Channel。

升级版协程间通信-Channel

同样的需求,我们用Channel 实现:

fun testChannel3() {
          
   
        //定义Channel
        var channel = Channel<String>()
        //协程1
        GlobalScope.launch {
          
   
            var count = 0
            while (true) {
          
   
                //假装在加工数据
                Thread.sleep(1000)
                var sendStr = "fish ${
            
     count++}"
                println("send $sendStr")
                channel.send("$sendStr")
            }
        }

        //协程2
        GlobalScope.launch {
          
   
            while (true) {
          
   
                Thread.sleep(1000)
                println("receive:${
            
     channel.receive()}")
            }
        }
    }

与之前的实现方案相比,仅仅只是将队列换成了Channel,可以看出,Channel 和队列比较类似,而Channel的send/recevie 函数并没有阻塞线程,仅仅只是挂起了协程。 查看打印结果: 你可能发现了端倪:发送者和接收者是成对出现的,难道Channel的内部实现不是队列? 要想解开这个谜题,最好的方法是从源码入手深究其原理。

2. Channel的原理

Channel的构造

先从Channel 构造开始:

#Channel.kt
public fun <E> Channel(
    //Channel 容量/叫做Channel类型更合理一些
    capacity: Int = Channel.RENDEZVOUS,
    //缓冲区满后,发送端的处理方式
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    //信息没有传递出去时的回调
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
          
   
        //默认是约会模式
        Channel.RENDEZVOUS -> {
          
   
            //默认挂起
            if (onBufferOverflow == BufferOverflow.SUSPEND)
                RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
            else
                ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
        }
        //...
    }

此处的Channel() 并不是构造函数,而是顶层函数,Kotlin里有很多伪装为构造函数的顶层函数。该顶层函数默认构造并返回RendezvousChannel类型的Channel。 RendezvousChannel 类本身很简单,就重写了一些属性,它继承自AbstractChannel。

重点在AbstractChannel/AbstractSendChannel及其子类里。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/292342.html

(0)
上一篇 2022年10月30日 18:10
下一篇 2022年10月30日 22:43

相关推荐

发表回复

登录后才能评论