【问题标题】:Parallel Cache using Reactive extensions使用响应式扩展的并行缓存
【发布时间】:2012-09-21 06:47:49
【问题描述】:

我正在寻找构建并行缓存。要求是需要同时触发 n 个数据收集器。这些数据收集器中的每一个都将到达边界层(称为服务层)并检索数据。但是,由于这是在同一个请求 (WCF) 中,如果 2 个数据收集器需要在服务层调用相同的方法,我不希望第二个请求等待第一个请求完成。

这需要对构建数据收集器的开发人员透明地构建(使用 Unity Interception 插入此缓存方面)。

这是流程的样子。反应式扩展是否适合这种设计?我过去没有与 Rx 合作过,并且不想在开发 10 天后碰壁。否则,async、await 和 events 的组合也可能在这里很好地发挥作用。

编辑:我使用 Rx 实现了这个 - 在多线程上下文中运行良好。有趣的是尝试 add 而不是 tryGet。 (这是一个 Unity 拦截 CallHandler)

 /// <summary>
    /// Intercepts the calls and tries to retrieve from the cache
    /// </summary>
    class CacheCallHandler : ICallHandler
    {

        [Dependency]
        public ICache RequestCache { get; set; }

        public IMethodReturn Invoke(IMethodInvocation input, GetNextHandlerDelegate getNext)
        {
            IMethodReturn mesg = null;

            string cacheKey = CacheKeyGenerator.GetCacheKey(input);

            //create the task to retrieve the data
            var task = new Task<IMethodReturn>(() =>
            {
                return getNext()(input, getNext);
            });

            //make it observable
            var observableItem = task.ToObservable();

            //try to add it to the cache
            //we need to do this in the order of Add and then try to get, otherwise multiple thread might enter the same area
            if (RequestCache.TryAdd(cacheKey, observableItem))
            {
                //if the add succeeed, it means that we are responsible to starting this task
                task.Start();
            }
            else
            {
                if ( RequestCache.TryGetValue(cacheKey, out observableItem) )
                {
                    //do nothing, the observable item is already updated with the requried reference
                }
                else
                {
                    throw new CacheHandlerException("Could not add to cache AND could not retrieve from cache either. Something's wrong", input);
                }
            }

            //observe the return 
            if ( observableItem != null )
                mesg = observableItem.FirstOrDefault();

            if (mesg == null)
                throw new CacheHandlerException("Not return value found. this should not happen", input);

            return mesg;
        }


        /// <summary>
        /// Should always be the first to execute on the boundary
        /// </summary>
        public int Order
        {
            get { return 1; }
            set { ; }
        }
    }

【问题讨论】:

  • 我不确定我是否理解您的问题。您的意思是“数据收集器”从缓存中读取数据,如果其中两个尝试读取不在缓存中的相同键,它们不应该阻塞?

标签: .net caching parallel-processing system.reactive async-await


【解决方案1】:

是的,Rx 非常适合。

我建议您考虑实现以下字典来支持您的密钥缓存:

Dictionary<K, AsyncSubject<V>>

您的异步获取数据部分只需要订阅主题即可使用结果填充它。

【讨论】:

  • 我可以用并发字典来做这个吗?多线程会让我的梦想被线程锁定的恶魔所困扰吗? :)
  • 如果您知道要从多个线程访问字典,那么当然可以,但除非您知道自己需要它,否则不要“疯狂地使用多线程”。
【解决方案2】:

我倾向于使用async 解决方案,特别是使用AsyncLazy&lt;T&gt; (from my blog) 的解决方案:

public sealed class MyCache<TKey, TValue>
{
  private readonly ConcurrentDictionary<TKey, AsyncLazy<TValue>> dictionary =
      new ConcurrentDictionary<TKey, AsyncLazy<TValue>>();

  private readonly Func<TKey, Task<TValue>> LookupAsync;

  public MyCache(Func<TKey, Task<TValue>> lookupAsync)
  {
    LookupAsync = lookupAsync;
  }

  public AsyncLazy<TValue> Get(TKey key)
  {
    return dictionary.GetOrAdd(key,
        key => new AsyncLazy<TValue>(() => lookupAsync(key)));
  }
}

这是一个非常简单的“缓存”,因为它没有过期时间。可以这样使用:

MyCache<string, MyResource> cache = new MyCache<string, MyResource>(async key =>
{
  MyResource ret = await DataLayer.GetResourceAsync(key);
  ...
  return ret;
});
MyResource resource = await cache.Get("key");

在多线程情况下,GetOrAdd 可能会创建一个额外的AsyncLazy&lt;TValue&gt;,但它永远不会是awaited,因此每个lookupAsync 只会被调用一次TKey。另请注意,lookupAsync 始终是从线程池中调用的。

附:如果你去async,你可能会发现我的async WCF post很有帮助。

【讨论】:

  • 你为什么不考虑响应式扩展?他们似乎更适合这份工作?但是 - 我同意有多种方法可以解决这个问题。
  • Rx 非常适合事件流。 Async 非常适合异步操作。一般来说,我更喜欢异步解决方案,因为它们的学习曲线比 Rx 低。这个问题不是“事件流”问题,所以我倾向于异步。
  • 你说得对,Rx 有一些学习曲线;但我认为简单代码实现的权衡是值得的。我已经编辑了我的查询并包含了我的实现。
【解决方案3】:

https://github.com/reactiveui/ReactiveUI/blob/master/ReactiveUI/ObservableAsyncMRUCache.cs 基本上已经完成了您想要的工作,特别是对相同内容的两个请求被“去抖动”。来自 cmets:

/// ObservableAsyncMRUCache implements memoization for asynchronous or
/// expensive to compute methods. This memoization is an MRU-based cache
/// with a fixed limit for the number of items in the cache.     
///
/// This class guarantees that only one calculation for any given key is
/// in-flight at a time, subsequent requests will wait for the first one and
/// return its results (for example, an empty web image cache that receives
/// two concurrent requests for "Foo.jpg" will only issue one WebRequest -
/// this does not mean that a request for "Bar.jpg" will wait on "Foo.jpg").
///
/// Concurrency is also limited by the maxConcurrent parameter - when too
/// many in-flight operations are in progress, further operations will be
/// queued until a slot is available.

【讨论】:

  • 另一个流行的 Rx 社区项目是 Rxx 项目。我很确定那里有一个“Observable Dictionary”或 Cacheable Observable。附带说明一下,虽然 Stackoverflow.com 摇摆不定,但 MSDN Rx 论坛似乎得到了最好的答案 social.msdn.microsoft.com/Forums/en-US/rx/threads。但是你在 Enigmativity 和 Paul Betts 的帮助下得心应手
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多