【问题标题】:Managing state in a reactive pipeline在反应式管道中管理状态
【发布时间】:2016-02-09 07:01:00
【问题描述】:

我正在构建一个需要扩展 (SelectMany) 然后展平(在本例中为 ToArray)的反应式管道,同时保持对在管道开始时获得的状态的访问。

这是我正在尝试的伪代码:

return Observable
    .Start(() => this.GetSearchResults(query))
    .SelectMany(results => results.Hits)     // results.Hits is a list of IDs. But there is also has a bool property that I want to keep through to the end of my pipeline
    .SelectMany(hit => GetById(hit.Id))      // asynchronously load each result
    .ToArray()                               // now need to pull all the results together into a containing data structure, and also include the bool flag from above in it
    .Select(resolvedResults => new ...);     // need access to both resolvedResults and the bool mentioned in the first comment above

所以我试图找到一种方法来干净地从管道末尾的代码中访问在管道开头确定的某些状态。

我尝试的第一件事是使用匿名类型将bool 与每个结果捆绑在一起。这很快就失控了,从性能的角度来看是一种浪费。

我尝试的第二件事是使用如下主题:

var state = new AsyncSubject<bool>();
return Observable
    .Start(() => this.GetSearchResults(query))
    .Do(results =>
        {
            state.OnNext(results.Flag);
            state.OnCompleted();
        }
    .SelectMany(results => results.Hits)
    .SelectMany(hit => GetById(hit.Id))
    .ToArray()
    .Zip(
        state,
        (results, state) => new ResultContainer(state, results));

这似乎工作正常,但我觉得有点恶心。

所以我想知道是否有一种更简洁的方法来管理反应式管道中的状态。

作为参考,这里是实际代码(而不仅仅是伪代码):

public IObservable<ISearchResults<IContact>> Search(string query, int maximumResultCount = 100, float minimumScore = 0.1F)
{
    Ensure.ArgumentNotNull(query, nameof(query));

    var moreHitsAvailable = new AsyncSubject<bool>();

    return Observable
        .Start(
            () => this.searchIndexService.Search<IContact>(query, maximumResultCount, minimumScore),
            this.schedulerService.DataStoreScheduler)
        .Do(
            results =>
            {
                moreHitsAvailable.OnNext(results.MoreHitsAreAvailable);
                moreHitsAvailable.OnCompleted();
            })
        .SelectMany(
            results => results
                .Hits
                .Select(
                    hit => new
                    {
                        Id = hit.Id,
                        ParsedId = ContactId.Parse(hit.Id)
                    }))
        .SelectMany(
            result => this
                .GetById(result.ParsedId)
                .Select(
                    contact => new
                    {
                        Id = result.Id,
                        Contact = contact
                    }))
        .Do(
            result =>
            {
                if (result.Contact == null)
                {
                    this.logger.Warn("Failed to find contact with ID '{0}' provided by the search index. Index may be out of date.", result.Id);
                }
            })
        .Select(result => result.Contact)
        .Where(contact => contact != null)
        .ToArray()
        .Zip(
            moreHitsAvailable,
            (results, more) => new SearchResults<IContact>(more, results.ToImmutableList()))
        .PublishLast()
        .ConnectUntilCompleted();
}

【问题讨论】:

    标签: c# .net system.reactive monads


    【解决方案1】:

    您可以弹出查询理解语法并执行类似的操作

    var x = from result in Observable.Start(() => this.GetSearchResults())
        let hasMore = result.MoreHitsAreAvailable
        from hit in result.Hits
        from contact in GetById(hit.Id)
        select new { hasMore , contact};
    

    告诉你如何处理重复的hasMore 值。正如我们所知,它只是您可以分组的单个不同值(全部为 true 或全部为 false)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-04-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-26
      • 2019-12-07
      • 1970-01-01
      • 1970-01-01
      • 2018-06-12
      相关资源
      最近更新 更多