【问题标题】:Request/Response using streams使用流的请求/响应
【发布时间】:2016-12-15 03:35:59
【问题描述】:

我有一个来自 RS-232 端口的输入流和一个使用 Rx 流到串行端口的命令队列。我已将代码简化如下:

void Init()
{
    SerialPort srl;

    ... // open serial port

    IObservable<string> obInput =
        Observable.FromEventPattern<
        SerialDataReceivedEventHandler,
        SerialDataReceivedEventArgs>
        (
            handler => srl.DataReceived += handler,
            handler => srl.DataReceived -= handler
        ).SelectMany(_ =>
        {
        List<string> ret;
        ... //extract messages
        return ret;
        }).Publish().Refcount();

    obCommandOk =
        obInput
        .Where(msg => msg == "OK" || msg == "KO");

    var sAction = new Subject<string>();
    var sCommandOk = new Subject<Tuple<string,bool>>();

    sAction
        .Do(srl.WriteLine)
        .Zip(obCommandOk, (cmd, result) =>
        {
        if (result == "OK")
            sCommandOk.OnNext(Tuple.Create(cmd, true))
        else
            sCommandOk.OnNext(Tuple.Create(cmd, false))
        });
}

async bool Command(string cmd)
{
    sAction.OnNext(cmd);

    return
        await sCommandOk
        .Where(t => t.Item1 == cmd)
        .Select(t => t.Item2)
        .FirstAsync();
}

有时会发生在 OnNext 之后结果已经被推送到 sCommandOk 的情况,所以我丢失了它。

你能建议我一个更好的方法来避免丢失响应吗?

【问题讨论】:

  • 我们可以提供stackoverflow.com/help/mcve吗?当然我知道它是伪代码,但距离真正编译的 C# 大约有 5 个更改。
  • @LeeCampbell 很难从我的代码中推断出你的要求,但我已经写了一些可以在这里编译的东西gist.github.com/zpul/b7a59e559e523a8a3b1077b1ec8013ef(对 SO 来说太大了)
  • 你真的不应该使用来自SelectMany 的外部可变缓冲区,这是一个非常糟糕的主意。您将无法保证对其内容一无所知。
  • @TheInnerLight 应该用什么代替它?
  • 您需要在不使用外部可变性的情况下聚合流数据。我建议看看这个函数:msdn.microsoft.com/en-us/library/hh229432(v=vs.103).aspx

标签: c# functional-programming system.reactive


【解决方案1】:

您的 Command 方法中有竞争条件。 您推送操作,然后订阅结果。 如果结果很快,那么你就会失去它。

这里的一个小改动可以缓解这种情况:

async Task<bool> Command(string cmd)
{
    var result = sCommandOk
        .Where(t => t.Item1 == cmd)
        .Select(t => t.Item2)
        .FirstAsync();

    sAction.OnNext(cmd);

    return await result;
}

我认为您可以通过将sCommandOk 主题全部删除来进一步优化

async Task<bool> Command(string cmd)
{
    var result = obInput
        .Where(t => t.Item1 == cmd)
        .Select(t => t.Item2)
        .FirstAsync();

    sAction.OnNext(cmd);

    return await result;
}

【讨论】:

  • 不幸的是,我已经尝试过这种方法,但问题仍然存在。这次我发现的唯一解决方案是与命令一起传递一个结果变量,如果在设置 OnNext 之后,我已经有了结果,否则我等待。这是一个非常糟糕的,所以我不喜欢它。
猜你喜欢
  • 2018-10-11
  • 1970-01-01
  • 1970-01-01
  • 2015-03-24
  • 1970-01-01
  • 2020-11-21
  • 1970-01-01
  • 2014-07-07
  • 1970-01-01
相关资源
最近更新 更多