【发布时间】:2018-11-06 11:53:22
【问题描述】:
我正在camera2上制作反应式包装器,我的目标是获取每一帧,然后传递给人脸识别。
所以,我在 setOnImageAvailableListener 上创建了一个包装方法
fun createOnImageAvailableFlowable(imageReader: ImageReader, handler: Handler): Flowable<ImageReader> {
return Flowable.create({ subscriber ->
imageReader.setOnImageAvailableListener({
if (!subscriber.isCancelled)
subscriber.onNext(it)
}, handler)
subscriber.setCancellable {
imageReader.setOnImageAvailableListener(null, null)
}
}, BackpressureStrategy.LATEST)
}
反应链如下所示:
createOnImageAvailableFlowable(imageReader!!, null)
.concatMap {
it.acquireLatestImage()?.use { image ->
val rotation = ReactiveCamera.getRotationCompensation(cameraId!!, this, applicationContext)
val visionImage = FirebaseVisionImage.fromMediaImage(image, rotation)
firebaseFaceDetector
.detectInImage(visionImage)
.toFlowable(BackpressureStrategy.LATEST)
.map { list ->Optional(list)}
} ?: Flowable.just(Optional(null))
}
...
此代码有效,但由于所有工作都在主线程中执行,因此会导致预览表面出现一些滞后。这需要在单独的线程中执行。我天真的解决方案是在 concatMap 之前添加 observeOn 运算符:
createOnImageAvailableFlowable(imageReader!!, null)
.observeOn(Schedulers.io()) // doesn't switch thread
.concatMap {
// still main thread
}
...
但这并不影响,所有工作仍在主线程中。如果我指定 concatMapEager 而不是 concatMap,则所有工作都在单独的线程中按预期工作,但帧会带来很大的延迟。
我做错了什么?在这种情况下,如何指示反应流在单独的线程中执行?在实时帧处理的情况下如何处理背压?
更新
我按照 Kiskae 的建议提供了自己的线程,但是现在,调度程序的线程中只发生了第一次发射,而其余的发射仍然在主线程中:
createOnImageAvailableFlowable(imageReader!!, null)
.subscribeOn(AndroidSchedulers.from(nonMainThread.looper))
.concatMap {
val t = Thread.currentThread()
val name = t.name
Log.d(TAG, "current thread {$name}")
...
}
输出:
D/MainActivity: current thread {Camera2}
D/MainActivity: current thread {main}
D/MainActivity: current thread {main}
D/MainActivity: current thread {main}
D/MainActivity: current thread {main}
【问题讨论】:
标签: android concurrency rx-java rx-java2 android-camera2