日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Kotlin協程開發之Flow的融合與Channel容量及溢出策略介紹_Android

作者:LeeDuo. ? 更新時間: 2022-10-31 編程語言

一.協程間的通信

當需要進行協程間的通信時,可以調用Channel方法,創建一個Channel接口指向的對象,通過調用該對象的send方法和receive方法實現消息的發送與接收。協程對Channel接口的實現,本質上與阻塞隊列類似,這里不再贅述。

1.通道容量

事實上,send方法與receive方法并沒有定義在Channel接口中,而是分別定義在SendChannel接口和ReceiveChannel接口中。Channel接口中只是定義了一些與Channel容量策略相關的枚舉常量,代碼如下:

// 繼承SendChannel接口和ReceiveChannel接口
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
    // 枚舉常量
    public companion object Factory {
        // Channel的容量為無限
        public const val UNLIMITED: Int = Int.MAX_VALUE
        // Channel的容量為0,沒有緩存
        public const val RENDEZVOUS: Int = 0
        // Channel的容量為1,溢出策略為DROP_OLDEST,
        // 后一個的數據會覆蓋前一個數據
        public const val CONFLATED: Int = -1
        // Channel的容量為默認值CHANNEL_DEFAULT_CAPACITY,
        // 默認溢出策略為SUSPEND,send方法會發生掛起
        // 當容量策略為BUFFERED,而溢出策略不為SUSPEND時,Channel的容量為1
        public const val BUFFERED: Int = -2
        // 協程內部使用的一個默認枚舉值,不對外暴露
        internal const val OPTIONAL_CHANNEL = -3
        // 用于手動配置容量策略為BUFFERED時的默認值
        public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer"
        // 容量策略為BUFFERED時的默認值
        // 默認64,最小1,最大為Int.MAX_VALUE-1
        internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
            64, 1, UNLIMITED - 1
        )
    }
}

從上面的代碼可以看出Channel接口繼承自SendChannel接口和ReceiveChannel接口。因此,一個Channel接口指向的對象,既可以用于發送消息,也可以用于接收消息。

2.溢出策略

Channel除了容量策略外,還有溢出策略,用于決定當Channel的容量已滿時,而下一個消息到來時的行為。溢出策略定義在枚舉類BufferOverflow中,代碼如下:

public enum class BufferOverflow {
    // 當容量已滿時,掛起調用send方法的協程
    SUSPEND,
    // 當容量已滿時,刪除舊數據,將新的數據添加進去,不掛起調用send方法的協程
    DROP_OLDEST,
    // 當容量已滿時,忽略當前要添加的數據,不掛起調用send方法的協程
    DROP_LATEST
}

二.FusibleFlow接口

FusibleFlow接口繼承自Flow接口。一個類實現了該接口,表示該類創建的流可以與其上游或下游相鄰的流進行融合,當流發生融合時,就會調用接口中定義的fuse方法,代碼如下:

@InternalCoroutinesApi
public interface FusibleFlow<T> : Flow<T> {
    // 用于流的融合
    public fun fuse(
        context: CoroutineContext = EmptyCoroutineContext,
        capacity: Int = Channel.OPTIONAL_CHANNEL,
        onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
    ): Flow<T>
}

FusibleFlow接口的fuse方法,默認容量為OPTIONAL_CHANNEL,默認溢出策略為SUSPEND。

流的融合

在Flow中,當channelFlow方法、flowOn方法、buffer方法、produceIn方法、broadcastIn方法相鄰調用時,就會觸發流的融合。

具體融合的過程,其實是將下游流的容量、溢出策略、上下文傳遞給上游的流處理,上游的流根據自身的容量、溢出策略、上下文以及下游的流的容量、溢出策略、上下文重新計算,得到新的容量、溢出策略、上下文,并返回一個融合后的流。

三.ChannelFlow類

ChannelFlow類是一個抽象類,實現了FusibleFlow接口。下面分析一下fuse方法對于上下游流融合的策略,代碼如下:

@InternalCoroutinesApi
public abstract class ChannelFlow<T>(
    // 上游流的上下文
    @JvmField public val context: CoroutineContext,
    // 上下游之間流的緩存容量
    @JvmField public val capacity: Int,
    // 溢出策略
    @JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T> {
    ...
    public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
        // CONFLATED是一個復合的類型,需要拆解成capacity = 0, onBufferOverflow = DROP_OLDEST
        assert { capacity != Channel.CONFLATED }
        // 計算融合后流的上下文
        val newContext = context + this.context
        // 用于保存融合后流的容量
        val newCapacity: Int
        // 用于保存融合后流的溢出策略
        val newOverflow: BufferOverflow
        // SUSPEND為默認溢出策略,如果溢出策略不是默認的策略
        if (onBufferOverflow != BufferOverflow.SUSPEND) {
            // 直接保存
            newCapacity = capacity
            newOverflow = onBufferOverflow
        } else { // 如果是默認策略
            // 計算并保存新的容量
            newCapacity = when {
                // 如果之前的容量為默認枚舉值,則使用新的
                this.capacity == Channel.OPTIONAL_CHANNEL -> capacity
                // 如果新的容量為默認枚舉值,則使用原來的
                capacity == Channel.OPTIONAL_CHANNEL -> this.capacity
                // 如果原來的容量為默認值CHANNEL_DEFAULT_CAPACITY,則使用新的
                this.capacity == Channel.BUFFERED -> capacity
                // 如果新的容量為默認值CHANNEL_DEFAULT_CAPACITY,則使用原來的
                capacity == Channel.BUFFERED -> this.capacity
                // 如果不為默認值或默認枚舉值
                else -> {
                    // 檢查容量都是大于等于0的
                    assert { this.capacity >= 0 }
                    assert { capacity >= 0 }
                    // 將原來的容量和新的容量進行相加
                    val sum = this.capacity + capacity
                    // 如果相加后大與等于0,則容量為相加后的結果,否則為無限
                    if (sum >= 0) sum else Channel.UNLIMITED
                }
            }
            // 保存溢出策略
            newOverflow = this.onBufferOverflow
        }
        // 如果融合的兩個流的上下文相同,容量相同,溢出策略也相同
        if (newContext == this.context && newCapacity == this.capacity && newOverflow == this.onBufferOverflow)
            // 則直接返回
            return this
        // 有變化則根據新計算出得參數,創建融合后的流
        return create(newContext, newCapacity, newOverflow)
    }
    // 由子類進行重寫
    protected abstract fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T>
    ...
}

流融合的原則

根據上面對fuse方法的分析,可以總結出fuse方法在計算容量和溢出策略時的四個原則:

1)下游優先于上游

2)溢出策略優先于容量

3)非默認值優先于默認值

4)上下游容量都不為默認值,則相加取和

原文鏈接:https://blog.csdn.net/LeeDuoZuiShuai/article/details/126617494

欄目分類
最近更新