【发布时间】:2018-08-03 16:35:44
【问题描述】:
我有一个IObservable<byte>,我想拆分成IObservable<byte[]>,大概是使用Buffer()、Window()、Scan()等的组合。
不过,我很难找到合适的 Rx 函数组合来处理我的具体情况。大多数关于该主题的 Q/A 都以答案结尾,提到您可以通过测试以查看项目(字节/字符)是否是分隔符,并以这种方式分解缓冲区。我的问题是它不仅仅是一个字节作为分隔符。就我而言,我正在读取 4 个字节的长度,然后我想从以下数据中缓冲该数量作为返回字节 []。
我尝试的一种方法是我可以创建一个IObservable<int>,它表示数据包的长度,其他用户可以使用它来将数据分解为缓冲的字节[]。可能是这样的:
IObservable<int> lengthsObservable = byteObservable
.Buffer(4)
.Select((b) => BitConverter.ToInt32(b.ToArray(), 0))
...
但这有一个问题是我不确定如何插入逻辑以在 int 转换后跳过数据。如何缓冲 4,转换为 int,跳过该数量,然后重复缓冲区 (4),然后继续?
我一直在尝试用 API 编写一些解决方案,但没有真正的运气。我的核心选项是使用非常自定义的累加器类创建一个非常繁琐的 Scan() 调用,但我觉得有更好的更洁净的方式。
TLDR Rx 老手们知道 Buffer() 是否有一个通用的组合模式,它的分隔符不只是一个单元?
编辑:
一个更具体的问题示例是将IObservable<byte> 与输出分开:
0B 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64 12 00 00 00 54 68 61 6E 6B 73 20 66 6F 72 20 68 65 6C 70 69 6E 67
并将其处理成具有两个数组输出的IObservable<byte[]>:
48 65 6C 6C 6F 20 57 6F 72 6C 64 // Hello world
和
54 68 61 6E 6B 73 20 66 6F 72 20 68 65 6C 70 69 6E 67 // Thanks for helping
最初的0B 00 00 00 是后面的字节块的长度。紧随其后的是另一个长度12 00 00 00,带有另一个字节块。
【问题讨论】:
-
我发现描述比样本数据和预期输出更难理解。您能否给我们一个示例字节流以及您在这些字节上寻找的结果?
-
最后添加了一个更具体的例子。感谢您的观看!
-
很好的解释。
-
我建议您“使用非常自定义的累加器类创建一个非常繁琐的 Scan() 调用”。
标签: reactive-programming system.reactive reactive