【发布时间】:2016-12-15 03:35:59
【问题描述】:
我有一个来自 RS-232 端口的输入流和一个使用 Rx 流到串行端口的命令队列。我已将代码简化如下:
void Init()
{
SerialPort srl;
... // open serial port
IObservable<string> obInput =
Observable.FromEventPattern<
SerialDataReceivedEventHandler,
SerialDataReceivedEventArgs>
(
handler => srl.DataReceived += handler,
handler => srl.DataReceived -= handler
).SelectMany(_ =>
{
List<string> ret;
... //extract messages
return ret;
}).Publish().Refcount();
obCommandOk =
obInput
.Where(msg => msg == "OK" || msg == "KO");
var sAction = new Subject<string>();
var sCommandOk = new Subject<Tuple<string,bool>>();
sAction
.Do(srl.WriteLine)
.Zip(obCommandOk, (cmd, result) =>
{
if (result == "OK")
sCommandOk.OnNext(Tuple.Create(cmd, true))
else
sCommandOk.OnNext(Tuple.Create(cmd, false))
});
}
async bool Command(string cmd)
{
sAction.OnNext(cmd);
return
await sCommandOk
.Where(t => t.Item1 == cmd)
.Select(t => t.Item2)
.FirstAsync();
}
有时会发生在 OnNext 之后结果已经被推送到 sCommandOk 的情况,所以我丢失了它。
你能建议我一个更好的方法来避免丢失响应吗?
【问题讨论】:
-
我们可以提供stackoverflow.com/help/mcve吗?当然我知道它是伪代码,但距离真正编译的 C# 大约有 5 个更改。
-
@LeeCampbell 很难从我的代码中推断出你的要求,但我已经写了一些可以在这里编译的东西gist.github.com/zpul/b7a59e559e523a8a3b1077b1ec8013ef(对 SO 来说太大了)
-
你真的不应该使用来自
SelectMany的外部可变缓冲区,这是一个非常糟糕的主意。您将无法保证对其内容一无所知。 -
@TheInnerLight 应该用什么代替它?
-
您需要在不使用外部可变性的情况下聚合流数据。我建议看看这个函数:msdn.microsoft.com/en-us/library/hh229432(v=vs.103).aspx
标签: c# functional-programming system.reactive