【问题标题】:Guaranteed Message delivery保证消息传递
【发布时间】:2015-04-24 09:01:54
【问题描述】:

我正在研究应用程序(托管(Web)和安装的 Windows 客户端)之间实时消息的架构设计。由于 Windows 客户端安装在客户场所,我们将无法控制防火墙,即打开任何端口。

所以我想到了使用 SignalR 通过 websocket 或回退技术使用 http 发送实例通知。我们的 windows 客户端目前使用 .Net 4.0 Framework。

我做了一些关于通过信号器保证消息传递的研究,人们建议使用 GUID 确认消息,但不确定如何实现这个想法。此外,当客户端未连接时,我需要在 RabbitMQ 上对消息进行排队,并且 onConnected 只需发送队列中的所有消息。

namespace SignalRHub.Hubs
{
    [Authorize]
    public class ChatHub : Hub
    {
        public void Send(string who, string data)
        {
            string name = Context.User.Identity.Name;

            List<string> groups = new List<string>();
            groups.Add(name);
            groups.Add(who);

            Message message = new Message()
            {
                messageId = Guid.NewGuid(),
                data = data
            };

            Clients.Groups(groups).addNewMessageToPage(name, JsonConvert.SerializeObject(message));
        }

        public void AcknowledgeServer(Guid messageId)
        {
            // Process the message acknowledge
            var msgGuid = messageId;
        }

        public override Task OnConnected()
        {
            string name = Context.User.Identity.Name;
            Groups.Add(Context.ConnectionId, name);

            return base.OnConnected();
        }
    }

    public class Message
    {
        public Guid messageId { get; set; }
        public String data { get; set; }
    }
}

请指教?

【问题讨论】:

    标签: c# signalr rabbitmq


    【解决方案1】:

    我设法通过几个步骤做到了这一点。

    1. 将消息保留在具有 Id 的存储库中
    2. 在发送/连接事件中,获取所有未处理的消息并发送它们
    3. 在客户端,向服务器发送确认并从存储库中删除消息

    根据设计,signalR 不能保证客户端会收到消息并且只收到一次,因为即使你实现了这样的协议,你可能会丢失 ack 消息,但仍然会收到两次消息。

    第一种方法:服务器为所有未处理的消息发送事件触发器 addNewMessages。可以做到这一点,但如果发送事件永远不会被调用...

    //just a sample repo : should be persisted, thread safe ...
    public static class MessageRepository
    {
        public static List<Message> UnprocessedMessages = new List<Message>();
    
        public static void AddMessage(Message msg)
        {
            UnprocessedMessages.Add(msg);
        }
    
        public static List<Message> GetMessagesByIssuer(string issuer)
        {
            return UnprocessedMessages.Where(m => m.Issuer.Equals(issuer)).ToList();
        }
    
        public static void Remove(Guid id)
        {
            if (UnprocessedMessages.Any(m => m.messageId.Equals(id)))
            {
                var message = UnprocessedMessages.FirstOrDefault(m => m.messageId.Equals(id));
                UnprocessedMessages.Remove(message);
            }
        }
    }
    
    
    [Authorize]
    public class ChatHub : Hub
    {
        public void Send(string who, string data)
        {
            string name = Context.User.Identity.Name;
    
            List<string> groups = new List<string>();
            groups.Add(name);
            groups.Add(who);
    
            Message message = new Message()
            {
                messageId = Guid.NewGuid(),
                data = data,
                Issuer = name,
                Receiver = who,
                CreationDate = DateTime.UtcNow
            };
    
            MessageRepository.AddMessage(message);
            var unProcessedMessages = MessageRepository.GetMessagesByIssuer(name).OrderBy(m => m.CreationDate).ToList();
    
            unProcessedMessages.ForEach(m =>
            {
                Clients.Groups(groups).addNewMessageToPage(name, JsonConvert.SerializeObject(m));
            });
    
    
        }
    
        public void AcknowledgeServer(Guid messageId)
        {
            // Process the message acknowledge
            var msgGuid = messageId;
            MessageRepository.Remove(msgGuid);
        }
    
        public override Task OnConnected()
        {
            string name = Context.User.Identity.Name;
            Groups.Add(Context.ConnectionId, name);
    
            return base.OnConnected();
        }
    }
    
    public class Message
    {
        public Guid messageId { get; set; }
        public String data { get; set; }
        public String Issuer { get; set; }
        public String Receiver { get; set; }
        public DateTime CreationDate { get; set; }
    }
    

    这是第二种方法(我认为更好的方法)。你有一个带有 Timer 的 C# 客户端单例,它定期检查你的存储库,然后发送未处理的消息。您还应该删除过期消息。

    public class PresenceMonitor
    {
        private Timer _timer;
    
        // How often we plan to check if the connections in our store are valid
        private readonly TimeSpan _presenceCheckInterval = TimeSpan.FromSeconds(10);
    
    
        public PresenceMonitor()
        {
    
        }
    
        public void StartMonitoring()
        {
            if (_timer == null)
            {
                _timer = new Timer(_ =>
                {
                    try
                    {
                        Check();
                    }
                    catch (Exception ex)
                    {
                        // Don't throw on background threads, it'll kill the entire process
                        Trace.TraceError(ex.Message);
                    }
                },
                null,
                TimeSpan.Zero,
                _presenceCheckInterval);
            }
        }
    
        private void Check()
        {
            var context = GlobalHost.ConnectionManager.GetHubContext<ChatHub>();
            var messages = MessageRepository.GetAllMessages();
            messages.ForEach(m =>
            {
                if (context != null)
                {
                    List<string> groups = new List<string>();
                    groups.Add(m.Issuer);
                    groups.Add(m.Receiver);
    
                    context.Clients.Groups(groups).addNewMessageToPage(m.Issuer, JsonConvert.SerializeObject(m));
                }
            });
    
    
        }
    }
    

    在你的startuc 类中,初始化Monitor

    【讨论】:

    • 当客户端 A 向客户端 B 发送消息时,客户端 A 将调用 send 方法,所以我必须将消息添加到 repo 并获取所有消息并从 repo 发送给客户端。如果您有几分钟的时间,请更新我的代码,以便我更好地理解它,我不担心由于确认丢失而导致客户收到重复消息,重要的是他们不会丢失任何消息。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-06-28
    • 1970-01-01
    相关资源
    最近更新 更多