【问题标题】:How to pipe rx operators to combine fragment data?如何通过管道 rx 运算符来组合片段数据?
【发布时间】:2019-12-20 16:28:25
【问题描述】:

我想用Rx来处理串口数据,包结构是这样的。

+-----------+--------+---------+
| Signature | Length | Payload |
+-----------+--------+---------+
| 2 byte    | 1 byte | ...     |
+-----------+--------+---------+

但是接收到的数据会有很多片段。比如(签名是 0xFC 0xFA)
数据1:0xFC 0xFA 0x02 0x01 0x01 0xFC 0xFA 0x03 0x01 //包含一个包和一个分片包
Data 2: 0x02 0x03 0xFC 0xFA 0x02 0x01 0x03 // 包含前一个包的续分片和一个新包

如何通过管道将运算符输出为
数据包1:0xFC 0xFA 0x02 0x01 0x01
数据包2:0xFC 0xFA 0x03 0x01 0x02 0x03
...

【问题讨论】:

  • 我试过你们两个的解决方案。一切都很好!

标签: rxjs rx-java reactive-programming


【解决方案1】:

您正在按定义的模式拆分字节流。我不确定你如何接收你的字节以及你将如何建模你的 observable,Observable<byte>Observable<byte[]>!?

无论如何,这里我猜到了字符串的翻译,但想法仍然相同。我选择了x,后跟y 作为模式(在您的情况下为0xFC 0xFA)。

您会在代码中找到我的 cmets:

final ImmutableList<String> PATTERN = ImmutableList.of("x", "y");

Observable<String> source = Observable
        .fromArray("x", "y", "1", "2", "3", "x", "y", "4", "5", "x", "y", "1", "x", "y", "x", "4", "6", "x")
        .share();//publishing to hot observable (we are splitting this source by some of its elements)

//find the next pattern
Observable<List<String>> nextBoundary = source
        .buffer(2, 1)
        .filter(pairs -> CollectionUtils.isEqualCollection(PATTERN, pairs));

//https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer2.png
//start a buffer for each x found
//buffers (packets) may overlap
source.buffer(source.filter(e -> e.equals("x")),
        x -> source
                .take(1)//next emission after the x
                .switchMap(y -> y.equals("y") ?
                        nextBoundary // if 'y' then find the next patter
                        : Observable.empty() //otherwise stop buffering
                )
)
        .filter(packet -> packet.size() > 2)//do not take the wrong buffers like ["x", "4"] (x not followed by y) but it is not lost
        .map(packet -> {
            //each packet is like the following :
            //[x, y, 1, 2, 3, x, y]
            //[x, y, 4, 5, x, y]
            //[x, y, 1, x, y]
            //[x, y, x, 4, 6, x]
            //because of the closing boundary, the event comes too late
            //then we have to handle the packet (it overlaps on the next one)
            List<String> ending = packet.subList(packet.size() - 2, packet.size());
            return CollectionUtils.isEqualCollection(PATTERN, ending) ? packet.subList(0, packet.size() - 2) : packet;
        })
        .blockingSubscribe(e -> System.out.println(e));

结果:

[x, y, 1, 2, 3]
[x, y, 4, 5]
[x, y, 1]
[x, y, x, 4, 6, x]

【讨论】:

    【解决方案2】:

    你需要一个有状态的观察者。它将具有以下状态:

    1. 监听数据包的开始
    2. 收到第一个字节监听第二个字节
    3. 收到第二个字节监听的长度
    4. 收到正文的标头侦听

    在 RxJava 中,您将创建一个类 Packetizer,它有两个感兴趣的方法:

    public void nextByte(Char next);
    public Observable<Packet> packetSource();
    

    在内部,它会维护状态,包括正文剩余部分的长度等。它还会有一个PublishSubject&lt;Packet&gt;,它会在构造每个数据包时发出它。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-04-05
      • 2019-07-15
      • 2017-06-23
      • 2019-06-06
      • 1970-01-01
      • 2010-11-30
      • 1970-01-01
      • 2018-04-28
      相关资源
      最近更新 更多