我设法通过几个步骤做到了这一点。
- 将消息保留在具有 Id 的存储库中
- 在发送/连接事件中,获取所有未处理的消息并发送它们
- 在客户端,向服务器发送确认并从存储库中删除消息
根据设计,signalR 不能保证客户端会收到消息并且只收到一次,因为即使你实现了这样的协议,你可能会丢失 ack 消息,但仍然会收到两次消息。
第一种方法:服务器为所有未处理的消息发送事件触发器 addNewMessages。可以做到这一点,但如果发送事件永远不会被调用...
//just a sample repo : should be persisted, thread safe ...
public static class MessageRepository
{
public static List<Message> UnprocessedMessages = new List<Message>();
public static void AddMessage(Message msg)
{
UnprocessedMessages.Add(msg);
}
public static List<Message> GetMessagesByIssuer(string issuer)
{
return UnprocessedMessages.Where(m => m.Issuer.Equals(issuer)).ToList();
}
public static void Remove(Guid id)
{
if (UnprocessedMessages.Any(m => m.messageId.Equals(id)))
{
var message = UnprocessedMessages.FirstOrDefault(m => m.messageId.Equals(id));
UnprocessedMessages.Remove(message);
}
}
}
[Authorize]
public class ChatHub : Hub
{
public void Send(string who, string data)
{
string name = Context.User.Identity.Name;
List<string> groups = new List<string>();
groups.Add(name);
groups.Add(who);
Message message = new Message()
{
messageId = Guid.NewGuid(),
data = data,
Issuer = name,
Receiver = who,
CreationDate = DateTime.UtcNow
};
MessageRepository.AddMessage(message);
var unProcessedMessages = MessageRepository.GetMessagesByIssuer(name).OrderBy(m => m.CreationDate).ToList();
unProcessedMessages.ForEach(m =>
{
Clients.Groups(groups).addNewMessageToPage(name, JsonConvert.SerializeObject(m));
});
}
public void AcknowledgeServer(Guid messageId)
{
// Process the message acknowledge
var msgGuid = messageId;
MessageRepository.Remove(msgGuid);
}
public override Task OnConnected()
{
string name = Context.User.Identity.Name;
Groups.Add(Context.ConnectionId, name);
return base.OnConnected();
}
}
public class Message
{
public Guid messageId { get; set; }
public String data { get; set; }
public String Issuer { get; set; }
public String Receiver { get; set; }
public DateTime CreationDate { get; set; }
}
这是第二种方法(我认为更好的方法)。你有一个带有 Timer 的 C# 客户端单例,它定期检查你的存储库,然后发送未处理的消息。您还应该删除过期消息。
public class PresenceMonitor
{
private Timer _timer;
// How often we plan to check if the connections in our store are valid
private readonly TimeSpan _presenceCheckInterval = TimeSpan.FromSeconds(10);
public PresenceMonitor()
{
}
public void StartMonitoring()
{
if (_timer == null)
{
_timer = new Timer(_ =>
{
try
{
Check();
}
catch (Exception ex)
{
// Don't throw on background threads, it'll kill the entire process
Trace.TraceError(ex.Message);
}
},
null,
TimeSpan.Zero,
_presenceCheckInterval);
}
}
private void Check()
{
var context = GlobalHost.ConnectionManager.GetHubContext<ChatHub>();
var messages = MessageRepository.GetAllMessages();
messages.ForEach(m =>
{
if (context != null)
{
List<string> groups = new List<string>();
groups.Add(m.Issuer);
groups.Add(m.Receiver);
context.Clients.Groups(groups).addNewMessageToPage(m.Issuer, JsonConvert.SerializeObject(m));
}
});
}
}
在你的startuc 类中,初始化Monitor。