oynix

于无声处听惊雷,于无色处见繁花

Kotlin Flow

Kotlin的Flow和RxJava有着很多相似之处,提供了很多操作符来操作数据流。简单写一些使用笔记

1. 特点

  • 冷流
    创建后不发射数据,使用终端操作符后才触发发射数据。
  • 生产消费交替执行
    在数据流上,发射数据的称为生产者,处理数据的称为消费者,二者交替执行,生产-消费-生产-消费,在考虑背压情况下,生产次数可能会多于消费次数。

同RxJava,所以Flow也是有着大量的操作符,掌握了这些操作符,基本也就会用了。

1. 流构建器

  • flow方法
    1
    flow { for (i in 1..5) { emit(i) } }.collect { println(it) }
  • flowOf方法
    内部调用的是flow方法
    1
    flowOf(1, 2, 3).collect { print(it) }
  • asFlow方法
    内部调用的还是flow方法
    1
    arrayOf(1, 2, 3).asFlow().collect { print(it) }
    此外,还有几个其他的,提供的默认构建器都写在Builders.kt里。

2. 切换线程操作符

是用操作符flowOn来切换上游操作发生的线程

1
flowOf(1, 2, 3).flowOn(Dispatchers.IO).collect { print(it) }

而处理数据所在的线程取决于调用终端操作符的线程

1
2
3
4
5
6
7
8
9
10
11
12
// 上游发射数据操作发生在IO线程
val flow = flowOf(1, 2, 3).flowOn(Dispatchers.IO)

// 处理数据发生的主线程
mainScope.launch {
flow.collect { print(it) }
}

// 处理数据发生在子线程
ioScope.launch {
flow.collect { print(it) }
}

3. 终端操作符

上面提到了终端操作符,所谓终端操作符,就是在这条数据流上末端调用的操作符,即触发收集数据的操作,除了上面提到的collect,还有几个:

  • collect:收集所有
  • first/single:取首个/单个(只允许有一个,否则报错)
  • toList/toSet/toCollection:转化成集合
  • count:计数
  • reduce/fold:合并,同集合操作,fold有初始值reduce没有
  • launchIn/produceIn/broadcastIn:触发流启动

4. 过渡操作符

  • filter
    过滤操作,返回一个boolean值,true时将传入下游
    1
    flowOf(1, 2, 3).filter { it % 2 == 0 }.collect { print(it) }
  • map
    这里的转换操作是指,将上游流中的传下来的每个数据转换成其他形式
    1
    flowOf(1, 2, 3).map { "to String: $it" }.collect { print(it) }

5. 限长操作符

使用take来限制传到下游数据的数量

1
flowOf(1, 2, 3).take(2).collenct { print(it) }

6. 转换操作符

常用transform,可以实现map和filter,也可以实现更为复杂的转换,可以发射任意次数据

1
2
3
4
5
flowOf(1, 2, 3).transform {
if (it > 1) emit(it * it)
if (it > 2) emit(it * 10)
emit(it)
}.collect { print(it) }

7. 捕获异常

1
flowOf(1, 2, 3).catch { print(it) }.collect { print(it) }

8. 背压

背压是指上游生成数据的速度高于下游处理数据的速度,这个时候下游应该怎么办。一般有这么几种策略:把多出的扔掉、把多出的合并和把多出的缓存

  • conflate
    合并的意思是,只保留最新的,当消费者正在忙的时候,生产出来的数据直接放弃了。如,生产者每100ms生产一个数据,但是消费者处理一个数据需要500ms,那么,消费者只能处理到第1,5,10…个数据
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    flow {
    for (i in 1..5) {
    emit(i)
    delay(100)
    }
    }
    .conflate()
    .collect {
    delay(500)
    pritnt("finish with $it")
    }
  • collectLatest
    收集最新数据,对于每一个从上游传来的数据,消费者如果正在忙碌,那么就取消当前的任务,来处理最新来的数据。意思就是说,消费者对于每一个数据都会处理,但不一定能处理完。对于下面这个例子,前面的数据都只能打出begin,只有最后一条数据能打出begin和finish
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    flor {
    for (i in 1..5) {
    emit(i)
    delay(100)
    }
    }.collectLatest {
    print("begin with $it")
    delay(500)
    print("finish with $it")
    }
  • buffer
    缓冲,设置缓冲大小后,会将上游发来的数据存在缓冲池。缓冲池是在生产者和消费者之间创建的一个Channel,缓冲池大小就是Channel的大小,同时允许配置生产的数据超过缓冲池大小时的行为,挂起、扔掉最新和扔掉最旧
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    flor {
    for (i in 1..5) {
    emit(i)
    delay(100)
    }
    }
    .buffer(3)
    .collectLatest {
    print("begin with $it")
    delay(500)
    print("finish with $it")
    }

9. 合并流

  • zip
    合并两个流成一个流,最终流的长度取决于最短流的长度
    1
    2
    3
    4
    5
    val flow = flowOf(1, 2, 3).onEach { delay(10) }
    val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
    flow.zip(flow2) { i, s -> i.toString() + s }.collect {
    println(it) // Will print "1a 2b 3c"
    }
  • combine
    它和zip的区别是,最终流的长度是较长流的长度,在合并时,使用另一个流中最新的数据
    1
    2
    3
    4
    5
    val flow = flowOf(1, 2).onEach { delay(10) }
    val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
    flow.combine(flow2) { i, s -> i.toString() + s }.collect {
    println(it) // Will print "1a 2a 2b 2c"
    }

10. 其他操作符

  • onEach
    在收集每个数据前,可以插入一些操作
    1
    flowOf(1, 2, 3).onEach { print(it) }.collect { print(it) }
  • onStart
    在收集前加入操作
    1
    flowOf(1, 2, 3).onStart { emit(0) }.collect { print(it) }
  • onCompletion
    同上,参数是Throwable,如果处理过程中有异常,便通过这个参数传出
    1
    flowOf(1, 2, 3).onCompletion { print("exception:${it?.toString()}") }.collect { print(it) }
------------- (完) -------------
  • 本文作者: oynix
  • 本文链接: https://oynix.com/2022/05/7c7d27d1e028/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

欢迎关注我的其它发布渠道