【问题标题】:gRPC Keep stream alivegRPC 保持流活动
【发布时间】:2021-05-26 15:19:24
【问题描述】:

我正在尝试使用 gRPC 进行长期流式会话,因为我需要保证从服务器到客户端的消息顺序。

我有以下 .proto:

service Subscriber {
    rpc Subscribe(SubscriptionRequest) returns (stream SubscriberEvent);
}

我当前的服务(托管在 ASP.NET / .NET 5.0 中)如下所示:

public class SubscriberService : Subscriber.SubscriberBase
{
    private readonly ILogger<SubscriberService> _logger;
    private readonly ConcurrentDictionary<string, IServerStreamWriter<SubscriberEvent>> _subscriptions = new();
    private int _messageCount = 0;
    private Timer _timer;

    public SubscriberService(ILogger<SubscriberService> logger)
    {
        _logger = logger;
        _timer = new Timer(o => TimerCallback(), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
    }

    private void TimerCallback()
    {
        Broadcast($"Current time is {DateTime.UtcNow}");
    }

    public override Task Subscribe(SubscriptionRequest request, IServerStreamWriter<SubscriberEvent> responseStream, ServerCallContext context)
    {
        _subscriptions.TryAdd(request.ClientId, responseStream);
        return responseStream.WriteAsync(new SubscriberEvent() {Id = 0, Message = "Subscribe successful"});
    }

    public void Broadcast(string message)
    {
        var count = ++_messageCount;
        foreach (var sub in _subscriptions.Values)
        {
            sub.WriteAsync(new SubscriberEvent() { Id = count, Message = message });
        }
        _logger.LogInformation($"Broadcast message #{count}: {message}");
    }
}

我的客户只收到最初的“订阅成功”消息,但不会收到计时器触发的消息。调用 WriteAsync 时不会出现任何异常。

我是在尝试将 gRPC 用于从未设计过的事情(SignalR/WebSocket 替代品),还是我只是遗漏了一些明显的东西?

【问题讨论】:

  • 我没有在 .NET 中使用 gRPC 的经验。根据我基于在其他语言中使用 gRPC 的了解,当您从实现流方法的函数返回时,流将关闭。因此,当您从Subscribe 返回时,流将关闭。如果您尝试在Broadcast 中打印从sub.WriteAsync 返回的错误,您应该会看到一个错误提示stream is closed
  • @EaswarSwaminathan:那肯定可以解释。 WriteAsync 除了可等待的任务外不返回任何其他内容,并且日志中也没有任何内容可以指示问题所在。如果流关闭,我希望 WriteAsync 抛出异常。

标签: c# .net grpc


【解决方案1】:

对于长时间运行的 gRPC 流,您必须等待客户端说连接已关闭。像这样的:

while (!context.CancellationToken.IsCancellationRequested)
{
    // event-based action
    responseStream.WriteAsync(new SubscriberEvent() {Id = 0, Message = "Subscribe successful"});  
}

【讨论】:

  • 这确实是答案。它把我吓了一跳:)
【解决方案2】:

我认为上一个答案是忙等待。所以我想向你展示它的异步版本。

public override Task Subscribe(RequestMessage, IServerStreamWriter<ReplyMessage> responseStream, ServerCallContext context)
 {
    // your event-based code here
    
    var tcs = new TaskCompletionSource();
    context.CancellationToken.Register(() => tcs.TrySetCanceled(), false);
    return tcs.Task;
 }

顺便说一句,我想我一直在做一个和你一样的项目。我使用了来自Rx.Net 的 Observables、Subjects 和 ReplaySubjects。这些对于基于事件的代码非常有帮助。

【讨论】:

  • 感谢您回答这个问题。我做了一个while循环,但使用await Task.Delay 而不是接受的答案。最终将流传递给服务外部的订阅管理器变得更加容易。
猜你喜欢
  • 1970-01-01
  • 2020-10-07
  • 2016-11-17
  • 1970-01-01
  • 2020-02-12
  • 2012-03-09
  • 1970-01-01
  • 2012-05-15
相关资源
最近更新 更多