【问题标题】:Rx Buffering With Variable Count具有可变计数的 Rx 缓冲
【发布时间】:2018-08-03 16:35:44
【问题描述】:

我有一个IObservable<byte>,我想拆分成IObservable<byte[]>,大概是使用Buffer()、Window()、Scan()等的组合。

不过,我很难找到合适的 Rx 函数组合来处理我的具体情况。大多数关于该主题的 Q/A 都以答案结尾,提到您可以通过测试以查看项目(字节/字符)是否是分隔符,并以这种方式分解缓冲区。我的问题是它不仅仅是一个字节作为分隔符。就我而言,我正在读取 4 个字节的长度,然后我想从以下数据中缓冲该数量作为返回字节 []。

我尝试的一种方法是我可以创建一个IObservable<int>,它表示数据包的长度,其他用户可以使用它来将数据分解为缓冲的字节[]。可能是这样的:

IObservable<int> lengthsObservable = byteObservable
    .Buffer(4)
    .Select((b) => BitConverter.ToInt32(b.ToArray(), 0))
    ...

但这有一个问题是我不确定如何插入逻辑以在 int 转换后跳过数据。如何缓冲 4,转换为 int,跳过该数量,然后重复缓冲区 (4),然后继续?

我一直在尝试用 API 编写一些解决方案,但没有真正的运气。我的核心选项是使用非常自定义的累加器类创建一个非常繁琐的 Scan() 调用,但我觉得有更好的更洁净的方式。

TLDR Rx 老手们知道 Buffer() 是否有一个通用的组合模式,它的分隔符不只是一个单元?

编辑: 一个更具体的问题示例是将IObservable&lt;byte&gt; 与输出分开:

0B 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64 12 00 00 00 54 68 61 6E 6B 73 20 66 6F 72 20 68 65 6C 70 69 6E 67

并将其处理成具有两个数组输出的IObservable&lt;byte[]&gt;

48 65 6C 6C 6F 20 57 6F 72 6C 64 // Hello world

54 68 61 6E 6B 73 20 66 6F 72 20 68 65 6C 70 69 6E 67 // Thanks for helping

最初的0B 00 00 00 是后面的字节块的长度。紧随其后的是另一个长度12 00 00 00,带有另一个字节块。

【问题讨论】:

  • 我发现描述比样本数据和预期输出更难理解。您能否给我们一个示例字节流以及您在这些字节上寻找的结果?
  • 最后添加了一个更具体的例子。感谢您的观看!
  • 很好的解释。
  • 我建议您“使用非常自定义的累加器类创建一个非常繁琐的 Scan() 调用”。

标签: reactive-programming system.reactive reactive


【解决方案1】:

这是一个“带有非常自定义的累加器类的繁琐.Scan() 调用”的示例。我不认为这有那么可怕。至少它抽象得很好:

class AccumulatorState
{
    public AccumulatorState()
        : this(4, 0, true, new byte[0])
    {}

    public AccumulatorState(int bytesToConsume, int bytesConsumed, bool isHeaderState, IEnumerable<byte> accumulatedBytes)
    {
        this.TotalBytesToConsume = bytesToConsume;
        this.BytesConsumed = bytesConsumed;
        this.IsHeaderState = isHeaderState;
        this.AccumulatedBytes = accumulatedBytes;
    }

    public int TotalBytesToConsume { get; }
    public int BytesConsumed { get; }
    public bool IsHeaderState { get; }
    public bool IsComplete => BytesConsumed == TotalBytesToConsume;
    public IEnumerable<byte> AccumulatedBytes { get; }

    public AccumulatorState ConsumeByteToNewState(byte b)
    {
        if(IsComplete)
        {
            if (IsHeaderState)
                return new AccumulatorState(ConvertHeaderToByteCount(AccumulatedBytes), 1, false, new[] { b });
            else
                return new AccumulatorState(4, 1, true, new[] { b });
        }

        return new AccumulatorState(TotalBytesToConsume, BytesConsumed + 1, IsHeaderState, AccumulatedBytes.Concat(new[] { b }));
    }

    private int ConvertHeaderToByteCount(IEnumerable<byte> bytes)
    {
        return bytes
            .Select(b => (int)b)
            .Reverse()
            .Aggregate(0, (total, i) => total * 16 + i);
    }
}

你可以这样称呼它:

var bytes = new byte[] { 0x0B, 0x00, 0x00, 0x00, 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x20, 0x57, 
                         0x6F, 0x72, 0x6C, 0x64, 0x12, 0x00, 0x00, 0x00, 0x54, 0x68, 0x61, 
                         0x6E, 0x6B, 0x73, 0x20, 0x66, 0x6F, 0x72, 0x20, 0x68, 0x65, 0x6C,
                         0x70, 0x69, 0x6E, 0x67 };
bytes.ToObservable()
    .Scan (new AccumulatorState(), (state, b) => state.ConsumeByteToNewState(b))
    .Where(s => s.IsComplete && !s.IsHeaderState)
    .Select(s => s.AccumulatedBytes.ToArray())
    .Dump(); //Linqpad. Switch to a .Subscribe(a => {}) if you're using something else.

这会输出一个带有两个输出的IObservable&lt;byte[]&gt;

48 65 6C 6C 6F 20 57 6F 72 6C 64 
54 68 61 6E 6B 73 20 66 6F 72 20 68 65 6C 70 69 6E 67 

【讨论】:

  • 感谢 sn-p。这绝对不是太糟糕,但我的直觉认为会有一种可组合的干净方式来做到这一点,我只是没有考虑或天真。好名字,顺便说一句
【解决方案2】:

我想到了另一种解决方案,但我更喜欢基于 .Scan 的解决方案:

public static class X
{
    public static IObservable<byte[]> RecurseHeaders(this IObservable<byte> source)
    {
        return source.Publish(_bytes => _bytes
                .Take(4)
                .ToArray()
                .Select(a => ConvertHeaderToByteCount(a))
                .SelectMany(count => count == 0
                    ? Observable.Empty<byte[]>()
                    : _bytes.Take(count).ToArray().Concat(_bytes.RecurseHeaders())
                )
            );
    }

    private static int ConvertHeaderToByteCount(IEnumerable<byte> bytes)
    {
        return bytes
            .Select(b => (int)b)
            .Reverse()
            .Aggregate(0, (total, i) => total * 16 + i);
    }

}

调用它很简单:

var bytes = new byte[] { 0x0B, 0x00, 0x00, 0x00, 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x20, 0x57, 
                         0x6F, 0x72, 0x6C, 0x64, 0x12, 0x00, 0x00, 0x00, 0x54, 0x68, 0x61, 
                         0x6E, 0x6B, 0x73, 0x20, 0x66, 0x6F, 0x72, 0x20, 0x68, 0x65, 0x6C,
                         0x70, 0x69, 0x6E, 0x67 };

var result = bytes.ToObservable()
    .RecurseHeaders();

这非常接近分解和重新实现的Scan。您可以按如下方式实现Scan

public static class RxReimplementations
{
    //This is a functional implementation of Observable.Scan
    public static IObservable<TState> FunctionalScan<TSource, TState>(this IObservable<TSource> source, TState initialState, Func<TState, TSource, TState> f)
    {
        return source.Publish(_source => _source
            .Take(1)
            .SelectMany(item => f(initialState, item)
                .Using(newState => Observable.Return(newState)
                    .Concat(_source.FunctionalScan(newState, f))
                )
            )
        );
    }

    //A functional way to re-use a function result.
    public static U Using<T, U>(this T t, Func<T, U> f)
    {
        return f(t);
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多