【问题标题】:RxJS - Control multiple parallel executionsRxJS - 控制多个并行执行
【发布时间】:2019-03-08 19:34:50
【问题描述】:

我有一个场景,我必须控制多个 PTZ 摄像头来分别拍摄多个角度的照片。因此,例如:
Camera A 将采用角度 A1A2A3
Camera B 将采用角度 B1B2B3B4

将相机移动到正确的角度,捕获图像并上传图像,这些都是返回承诺的异步函数。
moveCamera( angle )
captureImage()
uploadImage()

相机必须并行操作,但每个相机拍摄的角度必须按顺序完成。

不知何故,我觉得这可以通过 RxJS 轻松解决,但我正在努力将它们拼凑在一起。我能做到的最好的事情是类似于下面的解决方案,它以某种方式使相机彼此按顺序处理。请注意,我使用 redux-observable,下面的代码是 plain RxJS 中最好的代码。请原谅我的 RxJS。

const angles = {
  'Camera A': [ 'A1', 'A2', 'A3' ],
  'Camera B': [ 'B1', 'B2', 'B3', 'B4' ],
}
const cameras = of( [ 'Camera A', 'Camera B' ] );
const cameraRun = cameras.pipe(
  mergeMap( camera => {
    // in redux-observable, I could return an array here
    return of( angles[ camera ] );
  } )
);
cameraRun.pipe(
  concatMap( angle => {
    return moveCamera( angle )
      .then( () => captureImage() )
      .then( () => uploadImage() )
      .then( () => console.log( 'Image success' ) );
  } )
)

对于知道 redux-observable 的人,我有 3 个史诗:
RUN_CAMERA_SET_ROUTINE - 在 mergeMap 内运行所有摄像机
RUN_CAMERA_ROUTINE - 在 mergeMap 内运行每个摄像机的所有角度
CAPTURE_IMAGE - 在concatMap 中运行上面的异步函数

我最初的想法是CAPTURE_IMAGE 将由于mergeMap 产生流而被“分组”,但我错了。看来CAPTURE_IMAGE 仍然是一个流,所有摄像机的每个角度都在排队。

任何指针都会很有帮助。

【问题讨论】:

    标签: rxjs redux-observable


    【解决方案1】:

    您的问题归结为按顺序并行执行一些 Observables 并从 Promises 创建 Observables。

    1. 要并行执行多个 Observable,请使用:
    • forkJoin,如果您只想在所有相机动作完成后发出最终的 Observable
    • merge,如果您希望每次单个相机动作成功时都会发出最终的 Observable
    1. 使用concat依次执行多个Observables。

    2. 使用defer 从 Promise 创建 Observable 但不要立即执行 Promise。

    你必须这样做

    • 构造一个要按顺序执行的 Observable 数组。
      (单个相机每个角度的动作)
    • 构造一个要并行执行的 Observable 数组。
      (每个相机的相机动作)。

    这可能是纯 RxJS 中的代码

    import { concat, forkJoin, merge, defer } from 'rxjs';
    
    const cameras = ['Camera A', 'Camera B'];
    const cameraAngles = { 
      'Camera A': ['A1', 'A2', 'A3'], 
      'Camera B': ['B1', 'B2', 'B3', 'B4'] 
    }
    
    // Performs a camera action consisting of multiple parts. Returns a Promise.
    // camera: e.g. 'Camera A', angle: e.g. 'A1'
    const doCameraAction = (camera, angle) => moveCamera(angle)
      .then(() => captureImage())
      .then(() => uploadImage())
      .then(() => console.log('Image success'));
    
    // Creates an Observables that executes multiple camera actions in sequence.
    // camera: e.g. 'Camera A', angles: e.g. ['A1', 'A2', 'A3']
    const getCameraActionSequence$ = (camera, angles) => concat(
      // the array of Observables we want to execute in sequence
      ...angles.map(angle => defer(() => doCameraAction(camera, angle)))
    );
    
    // An Observable that will execute multiple camera action sequences in in parallel
    const multiCameraActions$ = forkJoin(
      // the array of Observables we want to execute in parallel
      cameras.map(camera => getCameraActionSequence$(camera, cameraAngles[camera]))
    );
    

    https://stackblitz.com/edit/rxjs-gj1dny?file=index.ts

    【讨论】:

    • 确实很好地提到了forkJoin 和简洁的代码。如果我能接受两个答案,我也会接受这个答案。我现在明白你可以动态地将defer 放入一个数组中以供以后连接。
    【解决方案2】:

    我会试一试。我在StackBlitz 中提出了一个解决方案来展示我的想法。在单击按钮之前单击控制台以开始新的运行。

    关于这个解决方案的几点:

    • 我只使用start$通过鼠标点击开始新的运行,这对解决方案并不重要。
    • 我模拟了具有各种超时的三个摄像头 promise 函数,只是为了展示事情是如何按顺序执行的,以及两个摄像头是如何并行运行的。
    • 我还将camera 的变量传递给每个摄像头功能,但这只是为了让console.log() 可以清楚地显示摄像头正在做什么。
    • 我没有对redux-observable 做任何事情,而是保留了它的原味rxjs
    • 我使用 concat() 将拍摄照片转换为可观察的序列,而不是像你拥有的那样使用一系列承诺 - 这不是必需的,只是一种不同的处理方式。
    • 我将摄像头作为单独的 Observables(cameraA$cameraB$)保留,但这也可以通过一组摄像头来完成。

    请随意分叉并将其更改为更接近您正在寻找的内容。

    这是 StackBlitz 中的内容:

    import { mergeMap, concatMap, tap } from 'rxjs/operators';
    import { fromEvent, from, concat, merge, defer } from 'rxjs';
    
    const moveCamera = (camera, angle) => new Promise(
      (resolve, reject) => { 
        setTimeout(() => {
          console.log(`moved: ${camera} angle: ${angle}`);
          resolve();
        }, 1000) }
    );
    
    const captureImage = (camera) => new Promise(
      (resolve, reject) => { 
        setTimeout(() => {
          console.log(`${camera} captured image.`);
          resolve();
        }, 100) }
    );
    
    const uploadImage = (camera) => new Promise(
      (resolve, reject) => { 
        setTimeout(() => {
          console.log(`${camera} uploaded image.`);
          resolve();
        }, 2000) }
    );
    
    
    const start$ = fromEvent(document.getElementById('start'), 'click');
    
    const takeAPhoto$ = (camera, angle) => concat(
      defer(() => moveCamera(camera, angle)),
      defer(() => captureImage(camera)),
      defer(() => uploadImage(camera))
    );
    
    const cameraA$ = from(['A1', 'A2', 'A3']).pipe(
      concatMap(angle => takeAPhoto$('Camera A', angle))
    );
    
    const cameraB$ = from(['B1', 'B2', 'B3', 'B4']).pipe(
      concatMap(angle => takeAPhoto$('Camera B', angle))
    );
    
    start$.pipe(
      tap(() => console.log('\n\nstart new run')),
      mergeMap(() => merge(cameraA$, cameraB$)),
    ).subscribe();
    

    我希望这会有所帮助。

    【讨论】:

    • 谢谢。与@fridoo 类似的答案。之所以选择答案,是因为代码的详细程度如何解释这个想法很清楚。
    • 嗨,你能看看这个吗,我不知道为什么这不起作用。我想动态添加相机。 stackblitz.com/edit/rxjs-so-q-55069868-wnjpol
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-25
    • 2019-02-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-06-26
    相关资源
    最近更新 更多