好的,所以 - 我在暗示你在 Paul 的回答中所做的评论 “对 Rx 来说是新手......”。
如果最初的结果列表以List<T> 的形式一次性返回,那么您的问题就很简单了。所以我假设它没有 - 它作为可能包含重复信息的项目流返回。
场景
这是一个人为的场景:服务返回一个公司名称流,其中可能有重复的名称。使用辅助服务,您需要查找每家公司的股票代码,但只需一次。如果可能,您希望批量查找以提高效率。对于您想要使用来自第三方服务的股票代码订阅其价格的每家公司。
您最终希望将公司名称流转换为包含所有公司的组合价格代码的单个流。
我不确定接下来的内容对你的情况或一般情况有多实用,但它仍然是一个有趣的转移,希望有教育意义!
模拟服务
这里有一些模拟服务实现可供调用 - 您应该能够将其转储到控制台应用程序的 Program 类中以进行尝试。有很多设置可以模拟出来,但是底部使用所有这些的解决方案真的很短:
首先,这是一个用于捕获公司股票信息的StockInfo 类型,任何一个小型示例数据库:
public class StockInfo
{
public static List<StockInfo> StockDatabase = new List<StockInfo> {
new StockInfo { Symbol = "MSFT", Name = "Microsoft" },
new StockInfo { Symbol = "GOOG", Name = "Google" },
new StockInfo { Symbol = "APPL", Name = "Apple" },
new StockInfo { Symbol = "YHOO", Name = "Yahoo" },
new StockInfo { Symbol = "DELL", Name = "Dell" },
};
public string Symbol { get; set; }
public string Name { get; set; }
}
这是一个服务方法,它接受一个名称列表并为他们返回一个StockInfo 列表:
public static Task<List<StockInfo>> GetStockInfo(IList<string> symbols)
{
return Task.Run(() =>
{
var results = from s in symbols
join i in StockInfo.StockDatabase
on s equals i.Name
select i;
return results.ToList();
});
}
这是一个持有给定 StockInfo 股票价格的类型:
public class StockPrice
{
public StockInfo StockInfo { get; set; }
public double Price { get; set; }
public override string ToString()
{
return StockInfo.Symbol + ":" + Price;
}
}
此服务调用以随机间隔返回给定StockInfo 的价格流。请注意,我们将订阅请求转储到控制台;如果我们的解决方案有效,您将不会多次看到同一家公司的订阅请求:
private static Random random = new Random();
public static IObservable<StockPrice> GetPrices(StockInfo stockInfo)
{
return Observable.Create<StockPrice>(o =>
{
Console.WriteLine("Subscribed for " + stockInfo.Symbol);
return Scheduler.Default.ScheduleAsync(
(Func<IScheduler, CancellationToken, Task>)(
async (ctrl, ct) =>
{
double price = random.Next(1, 10);
while(true)
{
await ctrl.Sleep(TimeSpan.FromSeconds(random.NextDouble() * 10));
price += Math.Round(random.NextDouble() - 0.5d, 2);
var stockPrice = new StockPrice {
StockInfo = stockInfo,
Price = price
};
o.OnNext(stockPrice);
};
}));
});
}
这是逐步返回一些公司名称的初始服务调用,其中有一些重复(这都是做作的 - 我的意思是你为什么不在服务端进行重复数据删除!):
public static IObservable<string> GetStockNames()
{
var exampleResults = new List<string> {
"Microsoft",
"Google",
"Apple",
"Microsoft",
"Google",
"Yahoo",
"Microsoft",
"Apple",
"Apple",
"Dell"
};
return exampleResults.ToObservable()
.Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (x, _) => x);
}
解决方案
完成所有设置后,我们可以着手解决(将其添加到您的控制台应用程序的 Main 方法中):
首先,我们决定分批 - 我们将分批最多 3 家公司,但不要等待超过 5 秒让他们到达。
const int batchSize = 3;
var maxWait = TimeSpan.FromSeconds(5);
现在我们得到了公司名称流:
var names = GetStockNames();
我们使用 Distinct() 来提取每个名称的第一个实例。然后我们使用Buffer对名字进行批处理,最多取batchSize名字或者最多等待maxWait时间跨度;以先到者为准(第 2 行)。然后,我们将每个缓冲区(一个列表)投影到我们的股票信息服务的查找中,并将结果列表转换为单个 StockInfo 对象的流。 (第 3-5 行),将整个内容扁平化为 IObservable<StockInfo>。这将是一个独特的StockInfo 项目流:
var uniqueCompanies = names.Distinct()
.Buffer(maxWait, batchSize)
.SelectMany(nameBatch => GetStockInfo(nameBatch)
.ToObservable()
.SelectMany(info => info));
最后,我们可以将每个 StockInfo 投影到价格流中,然后将批次展平以获得单个组合价格流:
var getPrices = uniqueCompanies.SelectMany(info => GetPrices(info));
getPrices.Subscribe(Console.WriteLine);
输出将是随机间隔的随机数据,例如:
Subscribed for MSFT
Subscribed for GOOG
Subscribed for APPL
GOOG:6.37
MSFT:7.05
GOOG:7.12
APPL:5.34
Subscribed for YHOO
Subscribed for DELL
MSFT:7.35
YHOO:8.2
希望对你有用!