【问题标题】:How to listen for Pub/Sub messages in an ASP.NET Core app continuously?如何在 ASP.NET Core 应用程序中持续监听 Pub/Sub 消息?
【发布时间】:2018-03-29 03:23:10
【问题描述】:

我想实现一个 ASP.NET Core API,它不响应 HTTP 请求,但在启动时开始侦听 Google Cloud Pub/Sub 消息,并且在整个生命周期中无限期地侦听。

使用官方 Pub/Sub SDK 实现此功能的首选方式是什么?

我可以想到两种方法:

方法1:只需使用SimpleSubscriber,并在Startup.Configure 开始收听消息:

public void Configure(IApplicationBuilder app)
{
    var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName);
    var receivedMessages = new List<PubsubMessage>();

    simpleSubscriber.StartAsync((msg, cancellationToken) =>
    {
        // Process the message here.

        return Task.FromResult(SimpleSubscriber.Reply.Ack);
    });

    ...
}

方法 2:使用专门创建的库来定期运行作业,例如 Quartz、Hangfire 或 FluentScheduler,每次触发作业时,使用 SubscriberClient 拉取新消息。

哪种方法是首选方法?第一个看起来比较简单,但我不确定它是否真的可靠。

【问题讨论】:

  • @Flater 这是一个在 Kubernetes 中运行的 ASP.NET Core 应用程序。我希望该应用程序同时具有一些 REST 端点,继续收听一些 Pub/Sub 消息。 (我知道我可以将这两件事分成两个部分,但如果可能的话,为了方便起见,我想将其保留为一个。)
  • 我会从最简单的方法开始,然后如果需要,转移到图书馆。在您的示例中,我只会添加代码以将 simpleSubscriber 保留在静态字段中的某个位置,以保护对象免受 GC
  • 据我知道,第一种方法应该没问题 - 但我正在咨询一位了解更多的同事。

标签: c# asp.net-core google-cloud-platform google-cloud-pubsub


【解决方案1】:

第一种方法肯定是打算如何使用它。

但是,请参阅StartAsync 的文档:

开始接收消息。返回的Task 在以下任一情况下完成 StopAsync(CancellationToken) 被调用或 如果不可恢复 发生故障。此方法不能多次调用 SubscriberClient 实例。

因此,您确实需要处理因不可恢复错误而导致的意外 StartAsync 关机。最简单的做法是使用外部循环,尽管考虑到这些错误被认为是不可恢复的,因此可能需要更改调用的某些内容才能成功。

代码可能如下所示:

while (true)
{
    // Each SubscriberClientinstance must only be used once.
    var subscriberClient = await SubscriberClient.CreateAsync(subscriptionName);
    try
    {
        await subscriberClient.StartAsync((msg, cancellationToken) =>
        {
            // Process the message here.
            return Task.FromResult(SimpleSubscriber.Reply.Ack);
        });
    }
    catch (Exception e)
    {
        // Handle the unrecoverable error somehow...
    }
}

如果这没有按预期工作,请let us know

编辑SimpleSubscriber 在库中已重命名为 SubscriberClient,因此已对答案进行了相应编辑。

【讨论】:

  • 感谢您的信息!通过不可恢复的错误,您的意思是堆栈溢出或内存不足,因此它不会由于暂时的网络故障或类似的事情而停止,对吗?顺便提一句。监听是如何实现的,SimpleSubscriber 是在内部轮询,还是 Pub/Sub 支持 WebSocket 连接之类的东西?
  • @MarkVincze “不可恢复”表示不可恢复的 RPC 错误。 可恢复 RPC 错误列表在这里:github.com/GoogleCloudPlatform/google-cloud-dotnet/blob/master/… 所有其他错误都是不可恢复的。
  • 请注意,SimpleSubscriber 已重命名为 SubscriberClient。新的 GitHub 链接:github.com/googleapis/google-cloud-dotnet/blob/master/apis/…
猜你喜欢
  • 2021-09-16
  • 2023-04-03
  • 2021-11-08
  • 1970-01-01
  • 2021-12-17
  • 1970-01-01
  • 2019-06-30
  • 2022-06-16
  • 2020-07-19
相关资源
最近更新 更多