盒子
盒子
文章目录
  1. 引言
  2. 原理分析
  3. 热流与冷流
    1. 热流的示例
    2. 冷流的示例
  4. MutableSharedFlow
  5. MutableStateFlow
  6. SharedFlow、StateFlow与LiveData的区别
  7. 简单示例
  8. 高级使用技巧
  9. 实践运用
    1. 全局主题模式管理
    2. 即时聊天应用
  10. 结语
  11. 推荐

SharedFlow vs StateFlow,一篇看懂选择和使用技巧

引言

在Android应用开发中,数据流是一个至关重要的概念。而在Jetpack库中,SharedFlowStateFlow 是两个处理数据流的利器,它们基于协程,提供了一种响应式的编程方式。本文将深入探讨这两个类的原理,以及在实际开发中的使用技巧。

原理分析

SharedFlowStateFlow 基于协程构建,它们利用协程的轻量级特性,在异步操作中更加高效。

SharedFlow 使用了一种基于事件溯源的机制,当有新的事件产生时,将事件添加到共享的事件序列中,然后通知所有订阅者。而 StateFlow 则维护了一个可变的状态,并在状态发生变化时通知所有观察者。

热流与冷流

热流和冷流是关于数据流的两个基本概念,它们描述了数据流何时开始以及如何传递事件的方式。

  1. 热流是一种主动的数据流。它在创建时就开始发射事件,无论是否有观察者订阅。即使没有观察者,热流也会持续产生事件。当观察者订阅时,它只是加入了已经运行的数据流,开始接收当前已经产生的事件。
  2. 冷流是一种被动的数据流。它在有观察者订阅时才开始发射事件。每个观察者都会获得相同的事件序列,而不会受到其他观察者的影响。

SharedFlowStateFlow都是热流。即没有观察者,数据会持续更新,与LiveData类似。
其中MutableSharedFlowMutableStateFlow是它们的可变类型。

热流的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val hotFlow = MutableSharedFlow<Int>()

launch {
repeat(5) {
delay(1000)
hotFlow.emit(it)
}
}

// 观察者1
hotFlow.collect {
println("Observer 1: $it")
}

// 观察者2
delay(3000) // 观察者2延迟3秒后订阅
hotFlow.collect {
println("Observer 2: $it")
}

delay(5000) // 为了保持主线程运行
}

在这个例子中,hotFlow是一个热流,它在启动后每隔一秒产生一个事件。观察者1从一开始就订阅,而观察者2在3秒后订阅,观察者2不会接收到观察者1在订阅之前已经接收的事件。

冷流的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val coldFlow = flow {
emit("Line 1")
delay(1000)
emit("Line 2")
delay(1000)
emit("Line 3")
}

// 观察者1
coldFlow.collect {
println("Observer 1: $it")
}

// 观察者2
delay(2000) // 观察者2延迟2秒后订阅
coldFlow.collect {
println("Observer 2: $it")
}

delay(5000) // 为了保持主线程运行
}

在这个例子中,coldFlow是一个冷流,它在有观察者订阅时才开始发射事件。观察者1从一开始就订阅,而观察者2在2秒后订阅,但它能够接收到从开始运行的事件序列。

MutableSharedFlow

MutableSharedFlow是一种可变的、用于创建共享流的类。下面是MutableSharedFlow的一些主要构造函数参数及其默认值:

1
2
3
4
5
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : MutableSharedFlow<T> { /*...*/ }
  1. replay 表示在订阅时从流中回放的元素数量。默认值为 0,表示不回放任何元素。如果设置为正整数 n,则在订阅时将向新订阅者回放最近的 n 个元素。

  2. extraBufferCapacity 表示额外的缓冲容量,用于存储订阅者尚未消耗的元素。默认值为 0,表示不使用额外的缓冲容量。设置为正整数 m 时,会在内部使用一个带有额外 m 容量的缓冲区。

  3. onBufferOverflow 表示在缓冲区溢出时的处理策略。默认值为 BufferOverflow.SUSPEND,表示当缓冲区溢出时暂停发射,等待订阅者消费。其他选项还包括 BufferOverflow.DROP_OLDESTBufferOverflow.DROP_LATEST,它们分别表示在缓冲区溢出时丢弃最老的元素或最新的元素。

使用示例:

1
val sharedFlow = MutableSharedFlow<Int>(replay = 10, extraBufferCapacity = 5, onBufferOverflow = BufferOverflow.DROP_OLDEST)

在这个示例中,创建了一个带有回放数量为10、额外缓冲容量为5、缓冲溢出处理策略为丢弃最老元素的MutableSharedFlow。这里的参数值是可根据具体需求进行调整的。

MutableStateFlow

MutableStateFlow 的构造函数有一个默认参数,即初始状态值。以下是 MutableStateFlow 构造函数:

1
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)

构造函数中的 value 参数表示 MutableStateFlow 的初始状态值。在创建 MutableStateFlow 时,需要提供这个初始状态值。

以下是一个示例,演示如何创建一个带有初始状态值的 MutableStateFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
val initialState = "Initial State"
val stateFlow = MutableStateFlow(initialState)

// 观察者
val job = launch {
stateFlow.collect { value ->
println("Received: $value")
}
}

// 修改状态
stateFlow.value = "New State"

// 等待观察者执行
job.join()
}

在这个例子中,initialStateMutableStateFlow 的初始状态值,通过构造函数传递给 MutableStateFlow。然后,通过修改 stateFlow.value,可以更新 MutableStateFlow 的状态值。

SharedFlow、StateFlow与LiveData的区别

StateFlow就是SharedFlow的一种特殊类型,特点有三:

  1. 它的replay容量为 1;即可缓存最近的一次粘性事件,如果想避免粘性事件问题,使用SharedFlow,replay默认值0。
  2. 初始化时必须给它设置一个初始值
  3. 每次发送数据都会与上次缓存的数据作比较,只有不一样才会发送。 它还可直接访问它自己的value参数获取当前结果值,在使用上与LiveData相似。

LiveData的不同点

  1. StateFlow必须在构建的时候传入初始值,LiveData不需要;
  2. StateFlow默认是防抖的,即相同值不更新,LiveData默认不防抖;
  3. StateFlow默认没有和生命周期绑定

简单示例

为了帮助大家更好地理解,以下是使用 SharedFlowStateFlow 的简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// SharedFlow 示例
val sharedFlow = MutableSharedFlow<String>()

// 订阅
sharedFlow.collect { value ->
println("Received: $value")
}

// 发送数据
sharedFlow.emit("Hello, SharedFlow!")

// StateFlow 示例
val stateFlow = MutableStateFlow("Initial State")

// 订阅
stateFlow.collect { value ->
println("Current State: $value")
}

// 更新状态
stateFlow.value = "New State"

高级使用技巧

  1. 错误处理

在订阅流时,考虑添加错误处理机制,以确保在流中出现错误时能够得到适当的处理,防止错误传播导致应用崩溃。

1
2
3
4
5
sharedFlow.catch { exception ->
// 处理错误
}.collect { value ->
// 处理正常数据
}
  1. 流的完成处理

使用onCompletion来处理流的完成事件,可以在流完成时执行一些清理工作。

1
2
3
4
5
6
7
8
9
sharedFlow.onCompletion { cause ->
if (cause != null) {
// 处理流异常完成的情况
} else {
// 处理正常完成的情况
}
}.collect { value ->
// 处理正常数据
}
  1. 共享的冷流

使用SharingStarted.WhileSubscribed来创建共享的冷流,确保只有至少一个订阅者时,共享流才会激活。这在事件通知的场景中非常有用。

1
val sharedFlow = flowOf(1, 2, 3).shareIn(viewModelScope, SharingStarted.WhileSubscribed())
  1. 背压策略

在使用bufferconflate等背压策略时,注意根据实际场景选择合适的策略,以平衡性能和内存的消耗。

1
2
3
4
5
sharedFlow
.buffer(Channel.CONFLATED) // 或者 buffer(size = n)
.collect { value ->
// 处理数据
}
  1. 过滤重复的状态

使用distinctUntilChanged来过滤掉重复的状态,确保只在状态发生变化时通知订阅者。

1
2
3
4
5
stateFlow
.distinctUntilChanged()
.collect { state ->
// 处理状态变化
}

实践运用

全局主题模式管理

假设我们需要在应用中管理全局的主题模式,我们可以使用 StateFlow。

1
2
3
4
5
6
7
8
9
10
object ThemeManager {
private val _themeStateFlow = MutableStateFlow(Theme.Light)
val themeStateFlow: StateFlow<Theme> get() = _themeStateFlow

fun setTheme(theme: Theme) {
viewModelScope.launch {
_themeStateFlow.value = theme
}
}
}

在上述示例中,ThemeManager 使用 MutableStateFlow 来创建一个管理主题模式的 StateFlow。当主题模式发生变化时,通过 setTheme 方法来更新 StateFlow,所有订阅者都会收到最新的主题模式。

在需要订阅主题模式的地方,可以这样使用:

1
2
3
4
5
6
7
8
9
10
class ThemedFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
viewModelScope.launch {
ThemeManager.themeStateFlow.collect { theme ->
// 根据主题模式更新 UI
}
}
}
}

即时聊天应用

当涉及到共享数据状态的场景时,SharedFlow 通常是一个合理的选择。
假设我们要实现一个即时聊天应用,多个页面或组件需要获取最近的聊天消息。

1
2
3
4
5
6
7
8
9
10
11
object ChatManager {
private val _chatMessagesFlow = MutableSharedFlow<ChatMessage>(replay = 5, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)
val chatMessagesFlow: SharedFlow<ChatMessage> get() = _chatMessagesFlow

fun sendChatMessage(message: String, sender: String) {
val chatMessage = ChatMessage(message, sender, System.currentTimeMillis())
viewModelScope.launch {
_chatMessagesFlow.emit(chatMessage)
}
}
}

在这个示例中,ChatManager 使用 MutableSharedFlow 来创建一个实时通知聊天消息变化的 SharedFlow。当有新的聊天消息时,通过 sendChatMessage 方法更新 SharedFlow,所有订阅者都能获取到最近的数据序列。

在需要订阅聊天消息的地方,可以这样使用:

1
2
3
4
5
6
7
8
9
10
class ChatFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
viewModelScope.launch {
ChatManager.chatMessagesFlow.collect { chatMessage ->
// 处理收到的聊天消息,更新 UI
}
}
}
}

结语

通过本文的介绍,相信读者已经对SharedFlowStateFlow有了更深入的了解。在实际应用中,提高Android应用的开发效率。

推荐

android_startup: 提供一种在应用启动时能够更加简单、高效的方式来初始化组件,优化启动速度。不仅支持Jetpack App Startup的全部功能,还提供额外的同步与异步等待、线程控制与多进程支持等功能。

AwesomeGithub: 基于Github的客户端,纯练习项目,支持组件化开发,支持账户密码与认证登陆。使用Kotlin语言进行开发,项目架构是基于JetPack\&DataBinding的MVVM;项目中使用了Arouter、Retrofit、Coroutine、Glide、Dagger与Hilt等流行开源技术。

flutter_github: 基于Flutter的跨平台版本Github客户端,与AwesomeGithub相对应。

android-api-analysis: 结合详细的Demo来全面解析Android相关的知识点, 帮助读者能够更快的掌握与理解所阐述的要点。

daily_algorithm: 每日一算法,由浅入深,欢迎加入一起共勉。

支持一下
赞赏是一门艺术