【发布时间】:2021-05-26 15:19:24
【问题描述】:
我正在尝试使用 gRPC 进行长期流式会话,因为我需要保证从服务器到客户端的消息顺序。
我有以下 .proto:
service Subscriber {
rpc Subscribe(SubscriptionRequest) returns (stream SubscriberEvent);
}
我当前的服务(托管在 ASP.NET / .NET 5.0 中)如下所示:
public class SubscriberService : Subscriber.SubscriberBase
{
private readonly ILogger<SubscriberService> _logger;
private readonly ConcurrentDictionary<string, IServerStreamWriter<SubscriberEvent>> _subscriptions = new();
private int _messageCount = 0;
private Timer _timer;
public SubscriberService(ILogger<SubscriberService> logger)
{
_logger = logger;
_timer = new Timer(o => TimerCallback(), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
}
private void TimerCallback()
{
Broadcast($"Current time is {DateTime.UtcNow}");
}
public override Task Subscribe(SubscriptionRequest request, IServerStreamWriter<SubscriberEvent> responseStream, ServerCallContext context)
{
_subscriptions.TryAdd(request.ClientId, responseStream);
return responseStream.WriteAsync(new SubscriberEvent() {Id = 0, Message = "Subscribe successful"});
}
public void Broadcast(string message)
{
var count = ++_messageCount;
foreach (var sub in _subscriptions.Values)
{
sub.WriteAsync(new SubscriberEvent() { Id = count, Message = message });
}
_logger.LogInformation($"Broadcast message #{count}: {message}");
}
}
我的客户只收到最初的“订阅成功”消息,但不会收到计时器触发的消息。调用 WriteAsync 时不会出现任何异常。
我是在尝试将 gRPC 用于从未设计过的事情(SignalR/WebSocket 替代品),还是我只是遗漏了一些明显的东西?
【问题讨论】:
-
我没有在 .NET 中使用 gRPC 的经验。根据我基于在其他语言中使用 gRPC 的了解,当您从实现流方法的函数返回时,流将关闭。因此,当您从
Subscribe返回时,流将关闭。如果您尝试在Broadcast中打印从sub.WriteAsync返回的错误,您应该会看到一个错误提示stream is closed。 -
@EaswarSwaminathan:那肯定可以解释。 WriteAsync 除了可等待的任务外不返回任何其他内容,并且日志中也没有任何内容可以指示问题所在。如果流关闭,我希望 WriteAsync 抛出异常。