oynix

于无声处听惊雷,于无色处见繁花

Kotlin的Channel

Channel,直译过来就是通道的意思,有从通道读取别人数据的通道,也有将数据写给别人的通道,当然,也有既可以读数据也可以写数据的通道。这是通俗的解释,专业一点来讲,通道就是生产者消费者的模型。

1. 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
suspend fun main() {
// 创建一个Channel类型的实例
val channel = Channel<Int>()

// 生产者:负责向通道中写数据
val producer = GlobalScope.launch {
var i = 0
while (true) {
channel.send(i++)
delay(1000)
}
}

// 消费者:负责从通道中读数据
val consumer = GlobalScope.launch {
while (true) {
val element = channel.receive()
}
}

producer.join()
consumer.join()
}

通过Channel方法就可以方便的初始化出来一个通道实例,因为channel的send和receive方法,都是挂起函数,所以要将它们放到协程中调用。为什么都是挂起函数呢?既然是生产者消费者模型,那么肯定就存在速度不同步的情况,生产者生产出一个数据,但是消费者还在忙碌中,写入后无法被接收,那么它就挂起等待;消费者也是同理,它想从通道读取一个数据来处理,但是生产者还没生产出来,那么它也挂起等待,所以这两个函数都是挂起函数。

2. 通道的缓存

通道提供了缓存,来缓解生产者和消费者之间速度差问题。我们上面创建通道使用的Channel,它不是一个构造函数,而是一个伪造成构造方法的工厂方法,

1
2
3
4
5
6
7
8
9
10
11
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannle()
CONFLATED -> ConflatedChannle()
else -> ArrayChannel(capacity)
}

它有3个参数,分别是通道缓存的容量、超出缓存容量时的行为策略,以及超出元素的处理操作。

  • capacity
    RENDEZVOUS,这个词直译过来是约会的意思,虽然现代人的约会千奇百怪,但是在设计者的想法里,约会就是一对一的形式,所以这种情况下,通道是没有缓存的,生产者写不进数据就等着,消费者读不到数据也等着;
    UNLIMITED这个就很好理解了,没有限制,所以它是一个LinkedList类型的通道,来者不拒,所有来不及消费的数据我都给你缓存下来;
    CONFLATED这个词和inflate正好相反,还记得我们在加载布局文件时,用的是LayoutInflater的inflate方法,它是扩充、扩张的意思,所以conflate就是合并、缩小的意思,实际的效果就是,这种类型的通道只缓存最新的、最后一个元素。
    ArrayChannel也很好理解,根据传入的capacity创建一个固定容量的数组型通道。
  • onBufferOverflow
    即便是给通道设置了缓存,那么还是有超出缓存容量的可能,这个参数就是用来设置超出容量时的行为,它有3个可选值:SUSPEND,顾名思义,就是挂起等待的意思;DROP_OLDEST,丢弃最老的,也就是最新的会把最老的元素挤掉;DROP_LATEST,刚好相反,丢弃最新的。
  • onUndeliveredElement
    一个可选参数,当数据元素发送后,但是没有传送到消费者,这个函数就会被调用

3. 迭代通道

前面的例子中,消费者从通道中读取数据时,是在while循环里操作的,实际上,通道本身是支持迭代的,所以消费者还可以这样写,

1
2
3
4
5
6
val consumer = GlobalScope.launch {
val iterator = channel.iterator()
while (iterator.hasNext()) {
val element = iterator.next()
}
}

当没有数据时,协程就会在hasNext方法挂起,直到有新的数据写入到通道内,这个写法自然还可以进一步简化成foreach,

1
2
3
4
5
val consumer = GlobalScope.launch {
for (element in channel)

}
}

4. 通道的关闭

Channel本身是个接口类型,它继承了读通道和写通道

1
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> { }

它的父接口中,只有写通道,也就是SendChannel有关闭方法,而ReceiveChannel没有关闭方法。这是什么意思呢?这就好比会上领导发言,领导讲话结束之后,都会说上一句我说完了,这个时候你也听完了,如果领导刚说到一半,你急着下班回家,然后你说一句我听完了,那么领导这个时候是说,还是不说呢。所以,结束这个事要由发送者来调用。

SendChannel有一个isClosedForSend的属性,当调用close后,这个属性会立刻返回true,表示不会再向通道中写入数据。同样,ReceiveChannle有个isClosedForReceive的属性,因为有缓冲的存在,所以只有当缓冲区内的数据都被处理完时,这个属性才会返回true。

------------- (完) -------------
  • 本文作者: oynix
  • 本文链接: https://oynix.com/2022/05/73d35e73cb4b/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

欢迎关注我的其它发布渠道