【发布时间】:2018-05-17 16:04:04
【问题描述】:
我对反应式编程的概念相当陌生。我正在使用Bonsai,它通过 c# 公开了一些但不是全部的 .Net rx 命令。
我正在尝试获得类似大理石图的行为:
input1: ---1--------2--------3--------4--------5--------6--------7
input2: -------abc----------------------------------def-----------
result: ------------a--------b--------c--------c---------d-------e
基本上,输入 2 会生成应存储在队列中的事件波。 输入 1 用作从该队列中发出单个项目的触发器。
当队列为空时,应该发出队列的最后一项。 我尝试了 zip 和 combineLatest 的各种组合,但无法获得所需的行为。
我还尝试了基于this post 的WithLatestFrom 实现,但回想起来这也不会产生预期的行为。
public IObservable<Tuple<TSource, TOther>> Process<TSource, TOther>(
IObservable<TSource> source,
IObservable<TOther> other)
{
// return source1.WithLatestFrom(source2, (xs, ys) => Tuple.Create(xs, ys));
return source.Publish(os => other.Select(a => os.Select(b => Tuple.Create(b, a))).Switch());
}
是否有任何运算符或运算符组合会产生这种行为?一旦我了解了使用哪些运算符,我就可以对 Bonsai 进行实施。
更新 1:2018/05/18
根据 Sentinel 的帖子,我在 Bonsai 命名空间内编写了一个新类 DiscriminatedUnion。我没有设法指定适当的类型。编译器声明“无法推断Merge 的类型参数”(在.Merge(input1.Select... 中)。
在哪里添加正确的类型规范?
using System.Reactive.Linq;
using System.ComponentModel;
using System.Collections.Immutable;
namespace Bonsai.Reactive
{
[Combinator]
// [XmlType(Namespace = Constants.XmlNamespace)]
[Description("Implementation of Discriminated Union")]
public class DiscriminatedUnion
{
public IObservable<int?> Process<TInput1, TInput2>(
IObservable<TInput1> input1,
IObservable<TInput2> input2)
{
var merged =
input2.Select(s2 => Tuple.Create(2, (TInput2)s2))
.Merge(input1.Select(s1 => Tuple.Create(1, (TInput1)s1)))
.Scan(Tuple.Create((int?)null, new Queue<int>(), 0), (state, val) =>
{
int? next = state.Item1;
if (val.Item1 == 1)
{
if (state.Item2.Count > 0)
{
next = state.Item2.Dequeue();
}
}
else
{
state.Item2.Enqueue(val.Item2);
}
return Tuple.Create(next, state.Item2, val.Item1);
})
.Where(x => (x.Item1 != null && x.Item3 == 1))
.Select(x => x.Item1);
return merged;
}
}
}
【问题讨论】:
-
修复我的答案。向下滚动查看第二个版本。
-
又添加了一个答案,但建议您尝试使用该答案或等待 Shlomo 的分析:-D
标签: c# system.reactive