Kotlin的Flow和RxJava有着很多相似之处,提供了很多操作符来操作数据流。简单写一些使用笔记
1. 特点
- 冷流
创建后不发射数据,使用终端操作符后才触发发射数据。 - 生产消费交替执行
在数据流上,发射数据的称为生产者,处理数据的称为消费者,二者交替执行,生产-消费-生产-消费,在考虑背压情况下,生产次数可能会多于消费次数。
同RxJava,所以Flow也是有着大量的操作符,掌握了这些操作符,基本也就会用了。
1. 流构建器
- flow方法
1
flow { for (i in 1..5) { emit(i) } }.collect { println(it) }
- flowOf方法
内部调用的是flow方法1
flowOf(1, 2, 3).collect { print(it) }
- asFlow方法
内部调用的还是flow方法此外,还有几个其他的,提供的默认构建器都写在1
arrayOf(1, 2, 3).asFlow().collect { print(it) }
Builders.kt
里。
2. 切换线程操作符
是用操作符flowOn
来切换上游操作发生的线程
1 | flowOf(1, 2, 3).flowOn(Dispatchers.IO).collect { print(it) } |
而处理数据所在的线程取决于调用终端操作符的线程
1 | // 上游发射数据操作发生在IO线程 |
3. 终端操作符
上面提到了终端操作符,所谓终端操作符,就是在这条数据流上末端调用的操作符,即触发收集数据的操作,除了上面提到的collect,还有几个:
- collect:收集所有
- first/single:取首个/单个(只允许有一个,否则报错)
- toList/toSet/toCollection:转化成集合
- count:计数
- reduce/fold:合并,同集合操作,fold有初始值reduce没有
- launchIn/produceIn/broadcastIn:触发流启动
4. 过渡操作符
- filter
过滤操作,返回一个boolean值,true时将传入下游1
flowOf(1, 2, 3).filter { it % 2 == 0 }.collect { print(it) }
- map
这里的转换操作是指,将上游流中的传下来的每个数据转换成其他形式1
flowOf(1, 2, 3).map { "to String: $it" }.collect { print(it) }
5. 限长操作符
使用take来限制传到下游数据的数量
1 | flowOf(1, 2, 3).take(2).collenct { print(it) } |
6. 转换操作符
常用transform,可以实现map和filter,也可以实现更为复杂的转换,可以发射任意次数据
1 | flowOf(1, 2, 3).transform { |
7. 捕获异常
1 | flowOf(1, 2, 3).catch { print(it) }.collect { print(it) } |
8. 背压
背压是指上游生成数据的速度高于下游处理数据的速度,这个时候下游应该怎么办。一般有这么几种策略:把多出的扔掉、把多出的合并和把多出的缓存
- conflate
合并的意思是,只保留最新的,当消费者正在忙的时候,生产出来的数据直接放弃了。如,生产者每100ms生产一个数据,但是消费者处理一个数据需要500ms,那么,消费者只能处理到第1,5,10…个数据1
2
3
4
5
6
7
8
9
10
11flow {
for (i in 1..5) {
emit(i)
delay(100)
}
}
.conflate()
.collect {
delay(500)
pritnt("finish with $it")
} - collectLatest
收集最新数据,对于每一个从上游传来的数据,消费者如果正在忙碌,那么就取消当前的任务,来处理最新来的数据。意思就是说,消费者对于每一个数据都会处理,但不一定能处理完。对于下面这个例子,前面的数据都只能打出begin,只有最后一条数据能打出begin和finish1
2
3
4
5
6
7
8
9
10flor {
for (i in 1..5) {
emit(i)
delay(100)
}
}.collectLatest {
print("begin with $it")
delay(500)
print("finish with $it")
} - buffer
缓冲,设置缓冲大小后,会将上游发来的数据存在缓冲池。缓冲池是在生产者和消费者之间创建的一个Channel,缓冲池大小就是Channel的大小,同时允许配置生产的数据超过缓冲池大小时的行为,挂起、扔掉最新和扔掉最旧1
2
3
4
5
6
7
8
9
10
11
12flor {
for (i in 1..5) {
emit(i)
delay(100)
}
}
.buffer(3)
.collectLatest {
print("begin with $it")
delay(500)
print("finish with $it")
}
9. 合并流
- zip
合并两个流成一个流,最终流的长度取决于最短流的长度1
2
3
4
5val flow = flowOf(1, 2, 3).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
flow.zip(flow2) { i, s -> i.toString() + s }.collect {
println(it) // Will print "1a 2b 3c"
} - combine
它和zip的区别是,最终流的长度是较长流的长度,在合并时,使用另一个流中最新的数据1
2
3
4
5val flow = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
flow.combine(flow2) { i, s -> i.toString() + s }.collect {
println(it) // Will print "1a 2a 2b 2c"
}
10. 其他操作符
- onEach
在收集每个数据前,可以插入一些操作1
flowOf(1, 2, 3).onEach { print(it) }.collect { print(it) }
- onStart
在收集前加入操作1
flowOf(1, 2, 3).onStart { emit(0) }.collect { print(it) }
- onCompletion
同上,参数是Throwable,如果处理过程中有异常,便通过这个参数传出1
flowOf(1, 2, 3).onCompletion { print("exception:${it?.toString()}") }.collect { print(it) }