【问题标题】:How to make a queued message broker in pure C#如何在纯 C# 中创建队列消息代理
【发布时间】:2018-06-08 16:26:46
【问题描述】:

背景

我需要一个排队的消息代理以分布式(在连续帧上)的方式调度消息。在下面显示的示例中,它将处理不超过 10 个订阅者,然后在进一步处理之前等待下一帧。

(为了澄清那些不熟悉 Unity3D 的人,Process() 方法使用 Unity 的内置 StartCoroutine() 方法运行,并且 - 在这种情况下 - 将持续游戏的生命周期 - 等待或处理队列。)

所以我有这么一个比较简单的类:

public class MessageBus : IMessageBus
{
    private const int LIMIT = 10;
    private readonly WaitForSeconds Wait;

    private Queue<IMessage> Messages;
    private Dictionary<Type, List<Action<IMessage>>> Subscribers;

    public MessageBus()
    {
        Wait = new WaitForSeconds(2f);
        Messages = new Queue<IMessage>();
        Subscribers = new Dictionary<Type, List<Action<IMessage>>>();
    }

    public void Submit(IMessage message)
    {
        Messages.Enqueue(message);
    }

    public IEnumerator Process()
    {
        var processed = 0;

        while (true)
        {
            if (Messages.Count == 0)
            {
                yield return Wait;
            }
            else
            {
                while(Messages.Count > 0)
                {
                    var message = Messages.Dequeue();

                    foreach (var subscriber in Subscribers[message.GetType()])
                    {
                        if (processed >= LIMIT)
                        {
                            processed = 0;
                            yield return null;
                        }

                        processed++;
                        subscriber?.Invoke(message);
                    }
                }

                processed = 0;
            }
        }
    }

    public void Subscribe<T>(Action<IMessage> handler) where T : IMessage
    {
        if (!Subscribers.ContainsKey(typeof(T)))
        {
            Subscribers[typeof(T)] = new List<Action<IMessage>>();
        }

        Subscribers[typeof(T)].Add(handler);
    }

    public void Unsubscribe<T>(Action<IMessage> handler) where T : IMessage
    {
        if (!Subscribers.ContainsKey(typeof(T)))
        {
            return;
        }

        Subscribers[typeof(T)].Remove(handler);
    }
}

它的工作原理和行为都符合预期,但有一个问题。

问题

我想(从订阅者的角度)这样使用它:

public void Run()
{
    MessageBus.Subscribe<TestEvent>(OnTestEvent);
}

public void OnTestEvent(TestEvent message)
{
    message.SomeTestEventMethod();
}

但这显然失败了,因为Action&lt;IMessage&gt; 无法转换为Action&lt;TestEvent&gt;

我可以使用它的唯一方法是这样的:

public void Run()
{
    MessageBus.Subscribe<TestEvent>(OnTestEvent);
}

public void OnTestEvent(IMessage message)
{
    ((TestEvent)message).SomeTestEventMethod();
}

但这感觉不雅且非常浪费,因为每个订阅者都需要自己进行选角。

我尝试过的

我正在尝试这样的“投射”动作:

public void Subscribe<T>(Action<T> handler) where T : IMessage
{
    if (!Subscribers.ContainsKey(typeof(T)))
    {
        Subscribers[typeof(T)] = new List<Action<IMessage>>();
    }

    Subscribers[typeof(T)].Add((IMessage a) => handler((T)a));
}

这适用于 subscribe 部分,但显然不适用于 unsubscribe。我可以在某个新创建的 handler-wrapper-lambdas 中缓存以供退订时使用,但老实说,我认为这不是真正的解决方案。

问题

我怎样才能让它按我的意愿工作?如果可能的话,最好使用一些 C#“魔法”,但我知道它可能需要完全不同的方法。

另外因为这将在游戏中使用,并在其生命周期内运行,如果可能的话,我想要一个无垃圾的解决方案。

【问题讨论】:

  • 那么为什么不public void Subscribe&lt;T&gt;(Action&lt;T&gt; handler) where T : IMessage 呢?
  • ...并将List&lt;Action&lt;T&gt;&gt; 作为您的订阅者列表?
  • 但是,Subscribers[typeof(T)].Add(handler); 行给出了错误CS1503: Argument `#1' cannot convert `System.Action&lt;T&gt;' expression to type `System.Action&lt;Core.Messaging.Contract.IMessage&gt;'
  • 当更改为Subscribers[typeof(T)] = new List&lt;Action&lt;T&gt;&gt;(); 时也会出现错误CS0029: Cannot implicitly convert type `System.Collections.Generic.List&lt;System.Action&lt;T&gt;&gt;' to `System.Collections.Generic.List&lt;System.Action&lt;Core.Messaging.Contract.IMessage&gt;&gt;'
  • @spender 这需要更改private Dictionary&lt;Type, List&lt;Action&lt;IMessage&gt;&gt;&gt; Subscribers; 声明,但我不知道如何,因为在这种情况下类本身不是通用的。

标签: c# unity3d publish-subscribe


【解决方案1】:

所以问题是您试图将不同类型的列表作为值存储在订阅者字典中。

解决此问题的一种方法可能是存储List&lt;Delegate&gt;,然后使用Delegate.DynamicInvoke

下面是一些总结要点的测试代码:

Dictionary<Type, List<Delegate>> Subscribers = new Dictionary<Type, List<Delegate>>();

void Main()
{
    Subscribe<Evt>(ev => Console.WriteLine($"hello {ev.Message}"));
    IMessage m = new Evt("spender");
    foreach (var subscriber in Subscribers[m.GetType()])
    {
        subscriber?.DynamicInvoke(m);
    }
}

public void Subscribe<T>(Action<T> handler) where T : IMessage
{
    if (!Subscribers.ContainsKey(typeof(T)))
    {
        Subscribers[typeof(T)] = new List<Delegate>();
    }
    Subscribers[typeof(T)].Add(handler);
}

public interface IMessage{}

public class Evt : IMessage
{
    public Evt(string message)
    {
        this.Message = message;
    }
    public string Message { get; }
}

【讨论】:

  • 这确实有效 - 我唯一关心的是性能,DynamicInvoke 不是使用反射和各种不适合(至少对于游戏而言)的东西吗?
  • 好的,经过一些思考和小型研究,我认为只要不时有很少的消息,反射就不应该成为问题,据我所知,应该没有拳击就我而言。因此,与消息对象的池相结合,这应该是完全无垃圾的,这 - 我认为 - 在我的情况下更为重要。所以非常感谢@spender。
猜你喜欢
  • 1970-01-01
  • 2023-03-03
  • 1970-01-01
  • 2011-02-21
  • 2022-06-10
  • 2016-09-25
  • 1970-01-01
  • 2018-10-08
  • 2011-02-26
相关资源
最近更新 更多