【问题标题】:break up buffer into size rxjs将缓冲区分成大小 rxjs
【发布时间】:2018-08-15 16:51:05
【问题描述】:

我有一个可观察的从流中获取数据,每次大小为 512,接下来我必须在其他可观察到的地方将其分解为 200 个字符,并将 [12] 字符保留在其他缓冲区中以与下一个块连接,我通过使用解决它新的主题和 for 循环,我相信可能会有更好、更漂亮的解决方案。

收到 Observable ----------------------------------------

  • 下一个 [512] -------> [112] [200] [200] -------> [200] [200]
  • 第二个下一个 [512][112] --> [24][200][200] [88+112] --> [200] [200 ]
  • 第三下 [512][24] --> [136] [200] [76+124] .....
  • 第n次迭代[512][194] --> [106][200][200][106+94] --> [200][200][200]

  • n+1th [512][6].......

maxValueSize = 200
this._sreamRecord$.subscribe(
    {
        next: (val) => {
            const bufferToSend: Buffer = Buffer.concat([completationBuffer, val])
            for (let i = 0; i < bufferToSend.length; i += maxValueSize) {
                if (bufferToSend.length - i > maxValueSize) {
                    bufferStreamer.next(bufferToSend.slice(i, i + maxValueSize))
                } else {
                    completationBuffer = bufferToSend.slice(i, i + maxValueSize)
                }
            }
        },
        complete() {
            if (completationBuffer.length) {
                bufferStreamer.next(completationBuffer)
            }
            bufferStreamer.complete()
        }
    })

【问题讨论】:

  • 请分享您的代码以及您目前如何尝试这样做。
  • 其他 100 个字节会怎样? 12 + 200 + 200 = 412
  • 错过计算余数为112

标签: javascript rxjs observable rxjs6


【解决方案1】:

您可能需要考虑这些方面的解决方案

const splitInChunksWithRemainder = (remainder: Array<any>) => {
    return (streamRecord: Array<any>) => {
        const streamRecordWithRemainder = remainder.concat(streamRecord);
        let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
        const last = chunks[chunks.length - 1];
        let newRemainder = [];
        if (last.length != maxValueSize) {
            newRemainder = chunks[chunks.length - 1];
            chunks.length = chunks.length - 1;
        }
        return {chunks, newRemainder};
    };
}

let f = splitInChunksWithRemainder([]);

this._sreamRecord$.pipe(
    switchMap(s => {
        const res = f(s);
        f = splitInChunksWithRemainder(res.newRemainder);
        return from(res.chunks);
    })
)
.subscribe(console.log);

这个想法是在连接前一个 余数 之后,将每个 streamRecordlodash chunk 函数拆分,即从前一个 streamRecord 拆分后留下的数组作为尾部。

这是使用函数splitInChunksWithRemainder 完成的,它是一个更高级别的函数,即返回一个函数的函数,在这种情况下,在设置了来自上一个拆分的remainder 之后.

评论后更新

如果您还需要发出 last newRemainder,那么您可以考虑一个稍微复杂的解决方案,例如以下

const splitInChunksWithRemainder = (remainder: Array<any>) => {
    return (streamRecord: Array<any>) => {
        const streamRecordWithRemainder = remainder.concat(streamRecord);
        let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
        const last = chunks[chunks.length - 1];
        let newRemainder = [];
        if (last.length != maxValueSize) {
            newRemainder = chunks[chunks.length - 1];
            chunks.length = chunks.length - 1;
        }
        return {chunks, newRemainder};
    };
}

const pipeableChain = () => (source: Observable<any>) => {
    let f = splitInChunksWithRemainder([]);
    let lastRemainder: any[];
    return source.pipe(
        switchMap(s => {
            const res = f(s);
            lastRemainder = res.newRemainder;
            f = splitInChunksWithRemainder(lastRemainder);
            return from(res.chunks);
        }),
        concat(defer(() => of(lastRemainder)))
    )
}

_streamRecord$.pipe(
    pipeableChain()
)
.subscribe(console.log);

我们引入了pipeableChain 函数。在此函数中,我们保存执行splitInChunksWithRemainder 返回的余数。一旦源 Observable 完成,我们通过 concat 运算符添加最后一个通知。 如您所见,我们还必须使用 defer 运算符来确保仅在 Observer 订阅时创建 Observable,即在源 Observable 完成之后。如果没有 defer,传递给 concat 的 Observable 作为参数将在最初订阅源 Observable 时创建,即当 lastRemainder 仍未定义时。

【讨论】:

  • 完成时不会发出 newRemainder。
  • 获得lastnewRemainder的最快方法是将代码从return from(res.chunks);更改为return from(res);。通过这种方式,对于 Observable 的每个通知(因此也是最后一个),您将同时获得 chunksremainder。如果您确实需要一个 Observable,它在完成之前发出 last newRemainder 作为其最后一个值,那么您可以查看我更新的答案。
猜你喜欢
  • 2022-11-11
  • 2020-09-05
  • 2014-06-10
  • 1970-01-01
  • 1970-01-01
  • 2019-05-31
  • 1970-01-01
  • 2011-11-26
  • 2021-09-18
相关资源
最近更新 更多