【问题标题】:Combining sequences in Rx在 Rx 中组合序列
【发布时间】:2014-02-14 01:31:06
【问题描述】:

我正在尝试将 Rx 方法应用于使用任务和事件的现有应用程序。感谢之前的问题,我取得了重大进展,但现在有一个场景我不确定如何在 Rx 中解决,所以我想我会寻求建议。

情况如下:

步骤 1) 检索包含标识符的项目列表。这是一个将完成的有限列表(例如,Web 服务调用返回数据)。我目前将其作为 IObservable。

步骤 2) 对于步骤 1) 中的每个唯一标识符,从不同来源检索辅助信息。在单个调用(例如第二个 Web 服务)中获取多个标识符的辅助信息是可能且更有效的。

第 3 步)将信息组合成一个 IObservable

我不想在第 2 步中为我已经请求过的东西打电话。

我确信使用 Rx 一定有一个非常优雅的解决方案,但我对 Rx 还不够熟悉,无法确定它。欢迎任何想法。

问候 艾伦

【问题讨论】:

  • 为清楚起见,您能否提供一个示例项目类型,步骤 1 中的返回类型(不清楚是 IObservable<Item> 还是 IObservable<List<Item>>),调用的方法签名获取辅助信息,并举例说明结果的外观,以及在第 3 步中,最终结果应该是什么样的。我可以用这些信息提供一个解决方案,但由于没有明确的示例输入/输出,这个问题太含糊了。
  • 我不想过度影响任何潜在的回复,而是使用老年股票行情的类比。步骤 1) 可能是从文件/数据库中获取非唯一的股票代码,并查找有关每只股票的相关静态信息(例如公司名称等)。可能是一个列表 - 我不确定什么是最好的。步骤 2) 提供 1) 的结果,以获取实时价格更新流,一次性订阅多只股票,而无需重复订阅。我希望这会有所帮助。

标签: c#-4.0 system.reactive


【解决方案1】:

好的,所以 - 我在暗示你在 Paul 的回答中所做的评论 “对 Rx 来说是新手......”

如果最初的结果列表以List<T> 的形式一次性返回,那么您的问题就很简单了。所以我假设它没有 - 它作为可能包含重复信息的项目流返回。

场景

这是一个人为的场景:服务返回一个公司名称流,其中可能有重复的名称。使用辅助服务,您需要查找每家公司的股票代码,但只需一次。如果可能,您希望批量查找以提高效率。对于您想要使用来自第三方服务的股票代码订阅其价格的每家公司。

您最终希望将公司名称流转换为包含所有公司的组合价格代码的单个流。

我不确定接下来的内容对你的情况或一般情况有多实用,但它仍然是一个有趣的转移,希望有教育意义!

模拟服务

这里有一些模拟服务实现可供调用 - 您应该能够将其转储到控制台应用程序的 Program 类中以进行尝试。有很多设置可以模拟出来,但是底部使用所有这些的解决方案真的很短:

首先,这是一个用于捕获公司股票信息的StockInfo 类型,任何一个小型示例数据库:

public class StockInfo
{
    public static List<StockInfo> StockDatabase = new List<StockInfo> {
        new StockInfo { Symbol = "MSFT", Name = "Microsoft" },    
        new StockInfo { Symbol = "GOOG", Name = "Google" },
        new StockInfo { Symbol = "APPL", Name = "Apple" },
        new StockInfo { Symbol = "YHOO", Name = "Yahoo" },
        new StockInfo { Symbol = "DELL", Name = "Dell" },
    };

    public string Symbol { get; set; }        
    public string Name { get; set; }
}

这是一个服务方法,它接受一个名称列表并为他们返回一个StockInfo 列表:

public static Task<List<StockInfo>> GetStockInfo(IList<string> symbols)
{
    return Task.Run(() => 
    {
        var results = from s in symbols
                      join i in StockInfo.StockDatabase
                      on s equals i.Name
                      select i;

        return results.ToList();                  
    });
}  

这是一个持有给定 StockInfo 股票价格的类型:

public class StockPrice
{
    public StockInfo StockInfo { get; set; }
    public double Price { get; set; }

    public override string ToString()
    {
        return StockInfo.Symbol + ":" + Price;
    }
}

此服务调用以随机间隔返回给定StockInfo 的价格流。请注意,我们将订阅请求转储到控制台;如果我们的解决方案有效,您将不会多次看到同一家公司的订阅请求:

private static Random random = new Random();

public static IObservable<StockPrice> GetPrices(StockInfo stockInfo)
{    
    return Observable.Create<StockPrice>(o =>
    {      
        Console.WriteLine("Subscribed for " + stockInfo.Symbol);

        return Scheduler.Default.ScheduleAsync(
            (Func<IScheduler, CancellationToken, Task>)(
            async (ctrl, ct) =>
        {
            double price = random.Next(1, 10);
            while(true)
            {
                await ctrl.Sleep(TimeSpan.FromSeconds(random.NextDouble() * 10));
                price += Math.Round(random.NextDouble() - 0.5d, 2);
                var stockPrice = new StockPrice {
                    StockInfo = stockInfo,
                    Price = price
                };
                o.OnNext(stockPrice);
            };  
        }));                       
    });
}

这是逐步返回一些公司名称的初始服务调用,其中有一些重复(这都是做作的 - 我的意思是你为什么不在服务端进行重复数据删除!):

public static IObservable<string> GetStockNames()
{
    var exampleResults = new List<string> {
        "Microsoft",
        "Google",
        "Apple",
        "Microsoft",
        "Google",
        "Yahoo",
        "Microsoft",
        "Apple",
        "Apple",
        "Dell"
    };

    return exampleResults.ToObservable()
        .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (x, _) => x);
}

解决方案

完成所有设置后,我们可以着手解决(将其添加到您的控制台应用程序的 Main 方法中):

首先,我们决定分批 - 我们将分批最多 3 家公司,但不要等待超过 5 秒让他们到达。

const int batchSize = 3;
var maxWait = TimeSpan.FromSeconds(5);

现在我们得到了公司名称流:

var names = GetStockNames();  

我们使用 Distinct() 来提取每个名称的第一个实例。然后我们使用Buffer对名字进行批处理,最多取batchSize名字或者最多等待maxWait时间跨度;以先到者为准(第 2 行)。然后,我们将每个缓冲区(一个列表)投影到我们的股票信息服务的查找中,并将结果列表转换为单个 StockInfo 对象的流。 (第 3-5 行),将整个内容扁平化为 IObservable&lt;StockInfo&gt;。这将是一个独特的StockInfo 项目流:

var uniqueCompanies = names.Distinct()
                    .Buffer(maxWait, batchSize)      
                    .SelectMany(nameBatch => GetStockInfo(nameBatch)
                                            .ToObservable()
                                            .SelectMany(info => info));

最后,我们可以将每个 StockInfo 投影到价格流中,然后将批次展平以获得单个组合价格流:

var getPrices = uniqueCompanies.SelectMany(info => GetPrices(info));

getPrices.Subscribe(Console.WriteLine);

输出将是随机间隔的随机数据,例如:

Subscribed for MSFT
Subscribed for GOOG
Subscribed for APPL
GOOG:6.37
MSFT:7.05
GOOG:7.12
APPL:5.34
Subscribed for YHOO
Subscribed for DELL
MSFT:7.35
YHOO:8.2

希望对你有用!

【讨论】:

  • James - 我需要消化这一点并将其应用到我的应用程序中。不删除重复项的原因是,与 StockNames 请求等效的其他属性不是唯一的,尽管等效价格请求仅使用重复键。我试图将我的解决方案简化为一个更容易理解并且可能对其他学习 Rx 的人有用的示例。我在我所做的事情中看到了 Rx 的巨大潜力。
  • 好的,在这种情况下,请注意Distinct 的重载,它接受IEqualityComparer,因此您可以提供有关构成重复的自定义概念。
【解决方案2】:

多田:

GetAListOfThings()
    .SelectMany(list => GetSecondaryInformation(list)
        .Select(secInfo => new { list, secInfo }));

【讨论】:

  • “我不想在第 2 步中为我已经请求过的东西打电话”要求使这成为一个棘手的问题。保罗,你有解决办法吗?
  • 我同意 - 但我希望在回答之前更清楚(参见 OP cmets)。通过分组提取唯一项目很容易,但我不确定我们是在处理列表还是项目流。
  • 保罗的创意点在*上。从他的命名中,我可以推断 GetAListOfThings() 返回一个 IObservable> 序列,该序列具有单个值,即事物列表。然后将此单个列表传递给GetSecondaryInformation 方法。不进行重复调用的责任封装在GetSecondaryInformation 方法中。 *我认为它需要转换为list 变量的查询理解语法才能在Select 中使用
  • 不需要在查询语法中使用外部参数
  • 作为 Rx 的新手,我试图了解是否需要等待整个步骤 1) 完成(即获得完整列表),然后才能向步骤 2) 提出请求更多的是连续流,或者如果对 Rx 有更好的了解,是否有更好的方法(可能是批处理)来做到这一点。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-07-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多