【问题标题】:Rx grouped throttlingRx 分组节流
【发布时间】:2017-03-09 11:18:56
【问题描述】:

我有一个IObservable<T>,其中 T 看起来像

public class Notification
{
    public int Id { get; set; }
    public int Version { get; set; }
}

通知以不同的时间间隔和不同的通知生成,其中版本号随着每个通知 ID 的更新而递增。

在特定时间段内限制可观察对象然后接收具有最新版本字段的不同通知的正确方法是什么?

到目前为止,我想出了这个用于限制和分组的方法,但无法弄清楚如何实际返回 IObservable<Notification>

public static IObservable<int> ThrottledById(this IObservable<Notification> observable)
{
    return observable
        .GroupByUntil(n => n.Id, x => Observable.Timer(TimeSpan.FromSeconds(1)))
        .Select(group => group.Key);
}

编辑: 样本输入/输出(油门延迟:3):

1. { id: 1, v: 1 }
2. { id: 1, v: 2 }  { id: 2, v: 1 }
3. { id: 1, v: 3 }
-----------------------------------> notify { id:1, v: 3 }, notify { id:2, v: 1 }
4. 
5. { id: 2, v: 2 }
6.
-----------------------------------> notify { id:2, v: 2 }
7. { id: 1, v: 4 }
8. { id: 1, v: 5 }  { id: 2, v: 3 }
9. { id: 1, v: 6 }
-----------------------------------> notify { id:1, v: 6 }, notify { id: 2, v: 3 }
...
...

【问题讨论】:

  • 您可以添加一些具有所需输出的示例输入吗?

标签: c# .net system.reactive reactive-programming


【解决方案1】:

这种方法完全符合您的期望:

public static IObservable<Notification> ThrottledById(this IObservable<Notification> observable)
{
    return observable.Buffer(TimeSpan.FromSeconds(3))
        .SelectMany(x =>
            x.GroupBy(y => y.Id)
            .Select(y => y.Last()));
}

如果您想在第一个通知后 n 秒内收集所有具有相同 id 的通知并公开最后一个通知,那么基于 GroupByUntil 的方法就是您所需要的。

public static IObservable<Notification> ThrottledById(this IObservable<Notification> observable)
{
    return observable.GroupByUntil(x => x.Id, x => Observable.Timer(TimeSpan.FromSeconds(3)))
        .SelectMany(x => x.LastAsync());
}

您的示例输入/输出如下所示:

1. { id: 1, v: 1 }
2. { id: 1, v: 2 }  { id: 2, v: 1 }
3. { id: 1, v: 3 }
-----------------------------------> notify { id:1, v: 3 }
4. 
-----------------------------------> notify { id:2, v: 1 }
5. { id: 2, v: 2 }
6.
7. { id: 1, v: 4 }
-----------------------------------> notify { id:2, v: 2 }
8. { id: 1, v: 5 }  { id: 2, v: 3 }
9. { id: 1, v: 6 }
-----------------------------------> notify { id:1, v: 6 }
10.
-----------------------------------> notify { id: 2, v: 3 }
...
...

【讨论】:

    【解决方案2】:

    这似乎产生了你想要的输出

     IObservable<Notification> GroupByIdThrottle(IObservable<Notification> producer, IScheduler scheduler)
        {
            return Observable.Create<Notification>(observer =>
            {
                return producer
                    .GroupByUntil(
                        item => item.Id,
                        item => Observable.Timer(TimeSpan.FromSeconds(3), scheduler))
                    .SelectMany(result =>
                    {
                        return result.Aggregate<Notification, Notification>(null, (dict, item) =>
                        {
                            return item;
                        });
                    })
                    .Subscribe(observer);
    
            });
        }
    

    这个想法是聚合使得它只有每个组的最后一个值使它活着,并且一旦计时器达到 3 秒,分组的流就完成了。

    我把这个吃掉了

    https://social.msdn.microsoft.com/Forums/en-US/7ebf68e8-99af-44e2-b80d-0292cb5546bc/group-by-with-buffer-reactive-extensions?forum=rx

    为了想法

    【讨论】:

    • 为我工作。聚合的好提示。对 SelectMany(g => g.ToArray()) 略有不同,然后选择最后一个,但到目前为止它通过了我的单元测试。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多