【问题标题】:Is there a way to achieve this rx flow in Kotlin with coroutines/Flow/Channels?有没有办法在 Kotlin 中使用 coroutines/Flow/Channels 实现这个 rx 流?
【发布时间】:2020-01-08 13:48:48
【问题描述】:

我是第一次尝试 Kotlin Coroutines 和 Flow,我正在尝试使用 MVI-ish 方法重现我在带有 RxJava 的 Android 上使用的某个流程,但我很难正确处理,而且我基本上被卡住了此时。

RxJava 应用程序看起来基本上是这样的:

MainActivityView.kt

object MainActivityView {

    sealed class Event {
        object OnViewInitialised : Event()
    }

    data class State(
        val renderEvent: RenderEvent = RenderEvent.None
    )

    sealed class RenderEvent {
        object None : RenderEvent()
        class DisplayText(val text: String) : RenderEvent()
    }
}

MainActivity.kt

MainActivity 有一个PublishSubject 的实例,其类型为Event。即MainActivityView.Event.OnViewInitialisedMainActivityView.Event.OnError 等。初始事件通过主题的.onNext(Event) 调用在onCreate() 中发送。

@MainActivityScope
class MainActivity : AppCompatActivity(R.layout.activity_main) {

    @Inject
    lateinit var subscriptions: CompositeDisposable

    @Inject
    lateinit var viewModel: MainActivityViewModel

    @Inject
    lateinit var onViewInitialisedSubject: PublishSubject<MainActivityView.Event.OnViewInitialised>

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setupEvents()
    }

    override fun onDestroy() {
        super.onDestroy()
        subscriptions.clear()
    }

    private fun setupEvents() {
        if (subscriptions.size() == 0) {
            Observable.mergeArray(
                onViewInitialisedSubject
                    .toFlowable(BackpressureStrategy.BUFFER)
                    .toObservable()
            ).observeOn(
                Schedulers.io()
            ).compose(
                viewModel()
            ).observeOn(
                AndroidSchedulers.mainThread()
            ).subscribe(
                ::render
            ).addTo(
                subscriptions
            )

            onViewInitialisedSubject
                .onNext(
                    MainActivityView
                        .Event
                        .OnViewInitialised
                )
        }
    }

    private fun render(state: MainActivityView.State) {
        when (state.renderEvent) {
            MainActivityView.RenderEvent.None -> Unit
            is MainActivityView.RenderEvent.DisplayText -> {
                mainActivityTextField.text = state.renderEvent.text
            }
        }
    }

}

MainActivityViewModel.kt

然后这些EventMainActivityViewModel 类拾取,该类由.compose(viewModel()) 调用,然后通过ObservableTransformer&lt;Event, State&gt; 将接收到的Event 转换为一种新的State。视图模型返回一个带有renderEvent 的新状态,然后可以通过render(state: MainActivityView.State) 函数再次在MainActivity 中对其进行操作。

@MainActivityScope
class MainActivityViewModel @Inject constructor(
    private var state: MainActivityView.State
) {

    operator fun invoke(): ObservableTransformer<MainActivityView.Event, MainActivityView.State> = onEvent

    private val onEvent = ObservableTransformer<MainActivityView.Event,
        MainActivityView.State> { upstream: Observable<MainActivityView.Event> ->
        upstream.publish { shared: Observable<MainActivityView.Event> ->
            Observable.mergeArray(
                shared.ofType(MainActivityView.Event.OnViewInitialised::class.java)
            ).compose(
                eventToViewState
            )
        }
    }

    private val eventToViewState = ObservableTransformer<MainActivityView.Event, MainActivityView.State> { upstream ->
        upstream.flatMap { event ->
            when (event) {
                MainActivityView.Event.OnViewInitialised -> onViewInitialisedEvent()
            }
        }
    }

    private fun onViewInitialisedEvent(): Observable<MainActivityView.State> {
        val renderEvent = MainActivityView.RenderEvent.DisplayText(text = "hello world")
        state = state.copy(renderEvent = renderEvent)
        return state.asObservable()
    }

}

我可以使用协程/流/通道实现相同的流程吗?甚至可能有点简化?

编辑:

我已经找到了适合我的解决方案,到目前为止我还没有发现任何问题。然而,这个解决方案使用ConflatedBroadcastChannel&lt;T&gt;,最终将被弃用,它可能会用(在撰写本文时)尚未发布的SharedFlow api 替换它(更多关于here

它的工作方式是 Activity 和 viewmodel 共享 一个ConflatedBroadcastChannel&lt;MainActivity.Event&gt;,用于从Activity(或适配器)发送或提供事件。视图模型将事件减少到一个新的状态,然后发出。 Activity 正在收集由viewModel.invoke() 返回的Flow&lt;State&gt;,并最终呈现发出的State

MainActivityView.kt

object MainActivityView {

    sealed class Event {
        object OnViewInitialised : Event()
        data class OnButtonClicked(val idOfItemClicked: Int) : Event()
    }

    data class State(
        val renderEvent: RenderEvent = RenderEvent.Idle
    )

    sealed class RenderEvent {
        object Idle : RenderEvent()
        data class DisplayText(val text: String) : RenderEvent()
    }
}

MainActivity.kt

class MainActivity : AppCompatActivity(R.layout.activity_main) {

    @Inject
    lateinit var viewModel: MainActivityViewModel

    @Inject
    lateinit eventChannel: ConflatedBroadcastChannel<MainActivityView.Event>

    private var isInitialised: Boolean = false

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        init()
    }
    
    private fun init() {
        if (!isInitialised) {
            
            lifecycleScope.launch {
                viewModel()
                    .flowOn(
                        Dispatchers.IO
                    ).collect(::render)
            }

            eventChannel
                .offer(
                    MainActivityView.Event.OnViewInitialised
                )
            isInitialised = true
        }
    }

    private suspend fun render(state: MainActivityView.State): Unit =
        when (state.renderEvent) {
            MainActivityView.RenderEvent.Idle -> Unit
            is MainActivityView.RenderEvent.DisplayText -> 
                renderDisplayText(text = state.renderEvent.text)
            
        }

    private val renderDisplayText(text: String) {
        // render text
    }

}

MainActivityViewModel.kt

class MainActivityViewModel constructor(
    private var state: MainActivityView.State = MainActivityView.State(),
    private val eventChannel: ConflatedBroadcastChannel<MainActivityView.Event>,
 ) {

    suspend fun invoke(): Flow<MainActivityView.State> =
        eventChannel
            .asFlow()
            .flatMapLatest { event: MainActivityView.Event ->
                reduce(event)
            }

    private fun reduce(event: MainActivityView.Event): Flow<MainActivityView.State> =
        when (event) {
            MainActivityView.Event.OnViewInitialised -> onViewInitialisedEvent()
            MainActivityView.Event.OnButtonClicked -> onButtonClickedEvent(event.idOfItemClicked)
        }

    private fun onViewInitialisedEvent(): Flow<MainActivityView.State> = flow 
        val renderEvent = MainActivityView.RenderEvent.DisplayText(text = "hello world")
        state = state.copy(renderEvent = renderEvent)
        emit(state)
    }

    private fun onButtonClickedEvent(idOfItemClicked: Int): Flow<MainActivityView.State> = flow 
        // do something to handle click
        println("item clicked: $idOfItemClicked")
        emit(state)
    }

}

类似问题:

【问题讨论】:

  • 是的,您可以通过BroadcastChannelasFlow() 来实现这一点。
  • 你能举个例子吗?我无法理解 BroadcastChannel 的工作原理。

标签: android kotlin kotlin-coroutines


【解决方案1】:

您的MainActivity 可能看起来像这样。

@MainActivityScope
class MainActivity : AppCompatActivity(R.layout.activity_main) {

    @Inject
    lateinit var subscriptions: CompositeDisposable

    @Inject
    lateinit var viewModel: MainActivityViewModel

    @Inject
    lateinit var onViewInitialisedChannel: BroadcastChannel<MainActivityView.Event.OnViewInitialised>

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setupEvents()
    }

    override fun onDestroy() {
        super.onDestroy()
        subscriptions.clear()
    }

    private fun setupEvents() {
        if (subscriptions.size() == 0) {
            onViewInitialisedChannel.asFlow()
                .buffer()
                .flowOn(Dispatchers.IO)
                .onEach(::render)
                .launchIn(GlobalScope)

            onViewInitialisedChannel
                .offer(
                    MainActivityView
                        .Event
                        .OnViewInitialised
                )
        }
    }

    private fun render(state: MainActivityView.State) {
        when (state.renderEvent) {
            MainActivityView.RenderEvent.None -> Unit
            is MainActivityView.RenderEvent.DisplayText -> {
                mainActivityTextField.text = state.renderEvent.text
            }
        }
    }

}

【讨论】:

  • 多米尼克,谢谢。但也许我不够清楚,或者我误解了,但我不明白这如何适合我遇到的问题。问题是将“事件”分派给“视图模型”,让视图模型将事件转换为新状态,然后由片段中的渲染函数返回并渲染。我没有按照您的示例建议立即渲染“事件”类,但是渲染的是vm返回的状态。在您的示例中,我没有看到对 vm 的任何调用,如果还有更多事件怎么办?例如 OnButtonClickEvent 等。
【解决方案2】:

我认为您正在寻找的是 composeObservableTransformer 的 Flow 版本,据我所知没有。您可以改用 let 运算符并执行 类似 的操作:

主活动:

yourFlow
  .let(viewModel::invoke)
  .onEach(::render)
  .launchIn(lifecycleScope) // or viewLifecycleOwner.lifecycleScope if you're in a fragment

视图模型:

operator fun invoke(viewEventFlow: Flow<Event>): Flow<State> = viewEventFlow.flatMapLatest { event ->
  when (event) {
    Event.OnViewInitialised -> flowOf(onViewInitialisedEvent())
  }
}

就分享流程而言,我会关注这些问题:

Dominic 的回答可能适用于替换发布主题,但我认为协程团队正在远离 BroadcastChannel 并打算在不久的将来弃用它。

【讨论】:

  • 抱歉没有及时回复。我已经用我目前使用的解决方案更新了我的帖子。它确实使用了 ConflatedBroadcastChannel API(如您所指出的),该 API 将在未来被弃用。希望可以用 SharedFlow 替换它。
【解决方案3】:

kotlinx-coroutines-core 提供了一个transform 函数。

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html

它与我们在 RxJava 中使用的不太一样,但应该可以用于实现相同的结果。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-03-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多