IP地址检测
视频播放器
输入关键词搜索
登录
Kotlin协程Channel和Flow

Kotlin协程Channel和Flow

2024-09-28
编程开发

先看这里

flow和channel的区别,flow的消息会被消费N次,channel的每个消息只会被消费一次

SafeFlow不是线程共享的,MutableStateFlow和MutableSharedFlow是线程共享的

MutableStateFlow遇到相同的消息不会通知观察者,MutableSharedFlow不管消息是否相同都会通知观察者

SafeFlow

默认情况下,调用kotlin提供的Flow扩展函数(flow,asFlow,flowOf...),创建的就是SafeFlow实例。SafeFlow的每一个观察者都拥有自己独立的数据流

val flow = flow<Int> {
    repeat(5) {
        emit(it)
    }
}
// 或者:
// val flow = (1..5).asFlow()
runBlocking {
    flow.collect {
        println("张三 receive msg=$it")
    }
    flow.collect {
        println("李四 receive msg=$it")
    }
    flow.collect {
        println("王五 receive msg=$it")
    }
}

运行结果:

张三 receive msg=1
张三 receive msg=2
张三 receive msg=3
张三 receive msg=4
张三 receive msg=5
李四 receive msg=1
李四 receive msg=2
李四 receive msg=3
李四 receive msg=4
李四 receive msg=5
王五 receive msg=1
王五 receive msg=2
王五 receive msg=3
王五 receive msg=4
王五 receive msg=5

flow函数源码总结一下:给FlowCollector实例定义了一个扩展函数block(产生数据流)。对于观察者来说,每次调用collect方法都会生成一个SafeCollector(实现了FlowCollector接口)实例,调用block函数产生数据流,所以每个观察者拥有独立且完整的数据流。block函数中又调用了emit方法,emit再去调用flow.collect注册的数据处理函数

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }

    /**
     * Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
     *
     * A valid implementation of this method has the following constraints:
     * 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values.
     *    The emission should happen in the context of the [collect] call.
     *    Please refer to the top-level [Flow] documentation for more details.
     * 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not
     *    thread-safe by default.
     *    To automatically serialize emissions [channelFlow] builder can be used instead of [flow]
     *
     * @throws IllegalStateException if any of the invariants are violated.
     */
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

MutableStateFlow

MutableStateFlow 是线程安全的,多个观察者共享同一个数据流。只有当值发生改变时,才会通知观察者

val mutableStateFlow = MutableStateFlow(1)
runBlocking {
    launch {
        mutableStateFlow.collect {
            println("observe value -> $it")
        }
    }

    launch {
        repeat(5) {
            delay(100)
            println("change value to 2")
            mutableStateFlow.value = 2
        }
    }
}

运行结果:

observe value -> 1 // 初始值
change value to 2
observe value -> 2
change value to 2 // 值未改变,观察者不会收到消息
change value to 2 // 值未改变,观察者不会收到消息
change value to 2 // 值未改变,观察者不会收到消息
change value to 2 // 值未改变,观察者不会收到消息

如果多次修改不同的值,观察者只会收到最新值,不会收到中间值,比如

val mutableStateFlow = MutableStateFlow(1)
runBlocking {
    launch {
        mutableStateFlow.collect {
            println("observe value -> $it")
        }
    }

    launch {
        repeat(5) {
            println("change value to $it")
            mutableStateFlow.value = it
        }
    }
}

打印结果:

observe value -> 1 // 初始值
change value to 0
change value to 1
change value to 2
change value to 3
change value to 4
observe value -> 4 // 观察者只会收到最新值

MutableSharedFlow

与MutableStateFlow的区别,无论值是否改变,都会通知观察者

val mutableSharedFlow = MutableSharedFlow<Int>()
runBlocking {
    launch {
        mutableSharedFlow.collect {
            println("observe value -> $it")
        }
    }
    launch {
        println("emit 2")
        mutableSharedFlow.emit(2)
        println("emit 2")
        mutableSharedFlow.emit(2)
        println("emit 3")
        mutableSharedFlow.emit(3)
        println("emit 3")
        mutableSharedFlow.emit(3)
    }
}

运行结果:

emit 2
observe value -> 2
emit 2
observe value -> 2
emit 3
observe value -> 3
emit 3
observe value -> 3

Channel

每条消息只被消费一次,支持背压

val channel = Channel<Int>()
runBlocking {
    launch {
        // 发送若干条消息
        repeat(10) {
            println("producer send $it")
            channel.send(it)
        }
        // 消息发送结束后调用cancel,否则consumer会一直处于监听状态
        channel.cancel()
    }
    repeat(2) {
        launch {
            while (!channel.isClosedForSend) {
                val msg = channel.receive()
                println("consumer$it receive $msg")
            }
        }
    }
}

运行结果:可以看到每一条消息只会被消费一次

producer send 0
consumer0 receive 0
producer send 1
producer send 2
producer send 3
consumer0 receive 1
consumer0 receive 3
consumer1 receive 2
producer send 4
producer send 5
producer send 6
consumer0 receive 4
consumer0 receive 6
consumer1 receive 5
producer send 7
producer send 8
producer send 9
consumer0 receive 7
consumer0 receive 9
consumer1 receive 8

构造函数参数

RENDEZVOUS(默认): 没有缓冲区(容量为 0),发送者必须与接收者“约会”——也就是说,只有当有接收者等待接收时,send 才能完成,否则发送者会挂起等待。

UNLIMITED: 拥有无限缓冲区(容量为 Int.MAX_VALUE),发送者永远不会因为缓冲满而挂起,但需要注意这可能导致内存无限增长。

CONFLATED: 缓冲区容量固定为 1,当发送新数据时,如果之前的数据还未被接收,新数据会覆盖旧数据;接收者只会获取到最新的那个值。

BUFFERED: 拥有固定大小的缓冲区(默认通常是 64,可以通过 VM 参数调整),当缓冲区满时,发送者会挂起直到有空间腾出。

ChannelFlow

ChannelFlow是Channel的包装(消息只被消费一次),让Channel可以像Flow一样通过collect方式来处理消息,简化了代码

val channel = Channel<Int>()
val channelFlow = channel.receiveAsFlow()
runBlocking {
    launch {
        channelFlow.collect {
            println("张三收到了消息:$it")
        }
    }
    launch {
        channelFlow.collect {
            println("李四收到了消息:$it")
        }
    }
    launch {
        repeat(5) {
            println("发送消息--->$it")
            channel.send(it)
        }
    }
}

运行结果:

发送消息--->0
发送消息--->1
发送消息--->2
张三收到了消息:0
张三收到了消息:2
李四收到了消息:1
发送消息--->3
发送消息--->4
张三收到了消息:3
李四收到了消息:4
THE END
0/500
暂无评论
📢网站公告
欢迎来到这里
⚙️实用工具
html转pdfmarkdown编辑器
本站推荐:腾讯云服务器仅需2.3折 (点击直达)
用户协议
隐私政策
Build Time: 2025-02-06 16:15:33