kotlin开发 Flow的学习
前言
Flow是配合Kotlin协程使用的异步编程工具。其实Flow的概念并不是独家的,更早之前Java端就有自带的stream与大名鼎鼎的RxJava,它们的思想都是响应式编程思想(或者也可以称呼链式编程),当时的响应式编程思想就是为了解决Java各个线程的异步处理结果进行同步。其更底层的思想核心是观察者模式或者生产消费者模式。所以回到Flow,它看似复杂有很多新概念,其实核心归根结底就是观察者模式思想与生产消费者模式思想。
Flow的使用结构
一个简单的例子
一个极简的例子快速了解一下,emit为发射,collect为接收
fun demo1() {
GlobalScope.launch {
flow<Int> {
for (i in 0..10) {
//emit为发射函数
emit(i)
}
}.collect {
//collect接收,它还是挂起函数必须在协程中调用
Log.e("zh", "结果 = ${it} ")
}
}
}
/* 输出结果:
结果 = 0
结果 = 1
结果 = 2
结果 = 3
结果 = 4
结果 = 5
结果 = 6
结果 = 7
结果 = 8
结果 = 9
结果 = 10
*/
构建方式
Flow有多种多样的构建方式,下面一一举例:
方式一 默认方式
以默认的方式创建flow,上面的例子已经举例了,这里再举例一下。
fun demo1() {
GlobalScope.launch {
flow<String> {
delay(500)
emit("苹果")
delay(500)
emit("西瓜")
delay(500)
emit("香蕉")
delay(500)
emit("哈密瓜")
}.collect{
Log.e("zh", "时间 = ${System.currentTimeMillis()} 结果 = ${it}", )
}
}
}
/*
输出结果:
时间 = 1690598408614 结果 = 苹果
时间 = 1690598409116 结果 = 西瓜
时间 = 1690598409617 结果 = 香蕉
时间 = 1690598410118 结果 = 哈密瓜
*/
方式二 从集合发射流
通过集合调用asFlow()函数
fun demo2() {
GlobalScope.launch {
//从集合发射流
(0..3).asFlow().collect {
Log.e("zh", "结果 = ${it}", )
}
listOf<String>("苹果","西瓜","香蕉","哈密瓜").asFlow().collect{
Log.e("zh", "结果 = ${it}", )
}
}
}
/*
输出结果:
结果 = 0
结果 = 1
结果 = 2
结果 = 3
结果 = 苹果
结果 = 西瓜
结果 = 香蕉
结果 = 哈密瓜
*/
方式三 自建集合发射流
通过调用flowOf()函数
fun demo2() {
GlobalScope.launch {
flowOf("苹果" to 1, "香蕉" to 2, "西瓜" to 3).collect { pair ->
Log.e("zh", "结果 = ${pair.first} ${pair.second}", )
}
}
}
/*
输出结果:
结果 = 苹果 1
结果 = 香蕉 2
结果 = 西瓜 3
*/
冷流
Flow是冷流的,冷流的概念是只有订阅者订阅后,发布者才会生产数据。 并且订阅者与发布者是一一对应关系,数据只会给目标订阅者发送,不会发送给其他。这类似于你去工厂下订单,工厂才会生产你需要的产品。
有冷流就会有热流的概念,热流其实是观察者模式(或者广播)的概念,发布端无论有没有订阅者,都会始终执行,并且有多个订阅者时,发布端跟订阅者是一对多的关系,热流可以与多个订阅者共享信息。
以下代码验证Flow的冷流特性:
fun demo3() {
val flow = flow<String> {
val simpleDateFormat = SimpleDateFormat("HH:mm:ss", Locale.getDefault())
val timeString = simpleDateFormat.format(System.currentTimeMillis())
emit(timeString)
}
GlobalScope.launch {
for (index in 0..2){
delay(1000)
flow.collect {
//collect调用它了,flow才会执行
Log.e("zh", "结果 = ${it} ")
}
}
}
}
/*
输出结果:
结果 = 13:53:53
结果 = 13:53:54
结果 = 13:53:55
*/
背压概念 与 collectLatest 停止当前工作以收集最新值
Backpressure(背压)
Backpressure(背压)是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况。在上面的例子中收集都使用了collect函数, collect会收集每一个值并且保证执行完处理代码。 但是如果collect正在处理耗时数据,而flow的发送端是不会停下来,发送端会一直发送,而collect则会堵塞发过来的数据,等待当前collect函数代码块里的耗时逻辑处理完,才会执行下一个发送过来的数据。这样会导致我们看到最后最新的数据时已经延迟了很久。
collectLatest 停止当前工作以收集最新值
上面的背压情况,在一些业务下并不希望耗时等待,只尽快看到最后最新的数据,这个时候就可以使用collectLatest来替代collect进行数据收集。collectLatest并不会保证每一个发送来的时间都处理完,当有新数据发送过来时,如果collectLatest的代码块没有执行完成,它会立刻停止处理老数据。去处理最新发送过来的数据。
代码例子
fun demo1() {
GlobalScope.launch {
flow<Int> {
for (i in 0..2){
//请注意这里延迟50毫秒是为了对比下面collectLatest的延迟
delay(50)
emit(i)
}
}.collectLatest{
//这里分别延迟了2次50毫秒,是为了表示这个代码块里的耗时,是始终大于上面发送端的。
delay(50)
Log.e("zh", "模拟正在耗时处理 = ${it}")
delay(50)
Log.e("zh", "最终处理完结果 = ${it}")
}
}
}
/*
输出结果:
模拟正在耗时处理 = 0
模拟正在耗时处理 = 1
模拟正在耗时处理 = 2
最终处理完结果 = 2
*/
过滤filter
filter属于中间操作符,将数据过滤
fun demo3() {
GlobalScope.launch {
flow<Int> {
for (i in 0..10){
emit(i)
}
}.filter {
//除余,过滤掉奇数
it % 2 == 0
}.collect{
Log.e("zh", "结果 = ${it}")
}
}
}
/*
输出结果:
结果 = 0
结果 = 2
结果 = 4
结果 = 6
结果 = 8
结果 = 10
*/
过滤filterNot
filterNot属于中间操作符, 它是反向过滤的,会将true的结果去除,将false结果继续发射
fun demo3() {
GlobalScope.launch {
flow<Int> {
for (i in 0..10){
emit(i)
}
}.filterNot {
//除余,过滤掉偶数
it % 2 == 0
}.collect{
Log.e("zh", "结果 = ${it}")
}
}
}
/*
输出结果:
结果 = 1
结果 = 3
结果 = 5
结果 = 7
结果 = 9
*/
转换组合map
map属于中间操作符,用于将数据转换或者组合
fun demo3() {
val fruits = listOf<String>("苹果", "香蕉", "芒果")
GlobalScope.launch {
flow<Int> {
for (i in 0..2) {
emit(i)
}
}.map { it: Int ->
Pair<String, Int>(fruits[it], it)
}.collect { it: Pair<String, Int> -> //这里其实可以不用写it: Pair<String, Int>,写出来就是想表达,上面的flow<Int>被转换成了Pair<String, Int>
Log.e("zh", "结果 = ${it}")
}
}
}
/*
输出结果:
结果 = (苹果, 0)
结果 = (香蕉, 1)
结果 = (芒果, 2)
*/
转换组合mapLatest
有时候我们的map方法处理转换需要耗时,但是发送端会一直发射新数据,导致map的耗时堆积积累,这会出现获取最后一个数据时已经耗时了很久。在一些情况下我们并不关切前面的数据,只希望获取最新的数据,这里就可以使用mapLatest。
fun demo3() {
val timeFormat = SimpleDateFormat("HH:mm:ss", Locale.getDefault())
val flow = flow<Int> {
for (i in 1..3){
emit(i)
}
}
GlobalScope.launch {
//这里实现map作为下面mapLatest的对比
flow.map {
//这里写30毫秒延迟,模拟处理数据需要耗时
delay(30)
timeFormat.format(System.currentTimeMillis()) to it
}.collect{
Log.e("zh", "map >> 结果 = ${it}")
}
flow.mapLatest {
//这里写30毫秒延迟,模拟处理数据需要耗时
delay(30)
timeFormat.format(System.currentTimeMillis()) to it
}.collect{
Log.e("zh", "mapLatest >> 最新数据结果 = ${it}")
}
}
}
/*
输出结果:
map >> 结果 = (15:25:03, 1)
map >> 结果 = (15:25:03, 2)
map >> 结果 = (15:25:03, 3)
mapLatest >> 最新数据结果 = (15:25:03, 3)
*/
切换线程
异常捕获
end