【问题标题】:How to stream events with GCP platform?如何使用 GCP 平台流式传输事件?
【发布时间】:2019-03-17 11:07:55
【问题描述】:

我正在考虑构建一个简单的解决方案,其中生产者服务将事件推送到消息队列,然后让流服务通过 gRPC 流 API 提供这些服务。

Cloud Pub/Sub 似乎非常适合这项工作,但扩展流媒体服务意味着该服务的每个副本都需要创建自己的订阅并在缩减之前将其删除,这似乎不必要地复杂,而不是平台的本意为。

另一方面,Kafka 似乎在这种情况下工作得很好,但我想避免必须管理底层平台本身,而是利用云基础架构。

我还应该提到,拥有流式 API 的原因是允许流向前端(可能无法访问底层基础架构)

有没有更好的方法来使用 GCP 平台做这样的事情,而无需部署和管理我自己的基础架构?

【问题讨论】:

  • 不确定我是否关注扩展说明,cloud pub sub 允许同一订阅上的多个订阅者上下扩展吞吐量,您到底是什么意思?
  • gRPC 流服务的想法是将 所有 事件流式传输到 pubsub 主题。如果同一服务的 2 个实例都订阅了同一个订阅,这意味着它们各自将收到大约 50% 的传入事件,这意味着在这 2 个实例中的任何一个上调用 API 的任何人都只会收到一半的事件。为了使流服务正常工作,每个实例都需要接收 100% 的事件,这意味着它们不能订阅相同的订阅。也许我错了?
  • 当您说“这似乎不必要地复杂,而不是平台的目的”时,您指的是哪个“平台”? Cloud Pub/Sub 中的订阅本质上相当于 Kafka 中的消费者组。您发现两者之间有什么主要区别吗?
  • 听起来您希望每个实例跟踪所有事件,如果您要添加实例以增加查询容量,您可能希望按照 CQRS 分离职责(即查询数据存储的查询节点并摄取将事件推送到其中的节点)
  • @KamalAboul-Hosn 我指的是为服务的每个实例动态生成一个新订阅,以获取本质上是短暂消息的内容,并确保在实例关闭时删除订阅,因此它不花钱。在 kafka 中,我可以只有一个主题并附加单独的订阅者,每个订阅者都可以独立跟踪自己在队列中的偏移量

标签: google-cloud-platform google-cloud-pubsub event-stream


【解决方案1】:

如果您本质上想要临时订阅,那么您可以在创建订阅时在 Subscription 对象上设置一些内容:

  1. expiration_policy 设置为更小的持续时间。当订阅者在该时间段内没有收到消息时,订阅将被删除。权衡是,如果您的订阅者由于持续时间超过此期间的暂时性问题而关闭,则订阅将被删除。默认情况下,到期时间为 31 天。您可以将此设置为低至 1 天。对于拉取订阅者,订阅者只需停止向 Cloud Pub/Sub 发出请求,即可在其到期时启动计时器。对于推送订阅,计时器在没有消息成功传递到端点时启动。因此,如果没有发布消息或端点对所有推送的消息都返回错误,则计时器有效。

  2. 减少message_retention_duration 的值。这是在订阅者未接收消息并确认消息的情况下保留消息的时间段。默认情况下,这是 7 天。您可以将其设置为低至 10 分钟。折衷方案是,如果您的订阅者断开连接或处理消息的时间超过此持续时间,则早于该时间的消息将被删除,订阅者将看不到它们。

完全关闭的订阅者可能只是自己调用 DeleteSubscription 以便订阅立即消失,但对于意外关闭的订阅者,设置这两个属性将最大限度地减少订阅继续存在的时间和消息数量(永远不会交付)将被保留。

请记住,Cloud Pub/Sub quotas 将每个主题和每个项目的订阅限制为 1 到 10,000 个。因此,如果创建了很多订阅并且处于活动状态或未清理(手动,或在expiration_policyttl 已通过后自动),则可能无法创建新订阅。

【讨论】:

  • 感谢您的提示,这正是我现在正在做的事情(除了设置过期,因为 Go API 还不支持)。由于 pubsub 的工作方式而不是解决此类问题的正确方法,这只是一种解决方法。我想没有其他方法可以使用 GCP 当前提供的服务来解决这个问题?
【解决方案2】:

我认为您最初的想法比临时订阅要好。我的意思是它有效,但感觉完全不自然。看你的要求是什么。例如,客户端是否只需要在连接时接收消息,还是都需要获取所有消息?

仅在连接时

您最初的想法是更好的 imo。我可能会做的是创建一个客户端可以连接的 gRPC 流服务。该实现本质上是一个观察者模式。消费者将收到一条消息,然后遍历订阅者以向所有订阅者执行“发送”。从那里开始,每当客户端连接到服务时,它只会向该观察者集合注册自己,并在断开连接时取消注册。水平扩展是被动的,因为客户端会粘在他们连接的任何实例上。

如果最终,每个人​​都会收到消息

这个概念与上面的类似,但客户端不会在断开连接时从观察者隐式取消注册。相反,它会显式地注册和取消注册(通过为此而设计的方法/命令)。修改“on disconnected”逻辑,告诉观察者列表客户端已经离线。那么消费者的广播逻辑略有不同。现在它遍历列表并说“如果在线,则发送,否则排队”,并将消息发送到临时队列(属于客户端)。然后,您的“连接时”逻辑将在通知消费者它重新联机之前将所有排队的消息发送到客户端。基本上是一个收件箱。在 RabbitMQ 等大多数产品中,设置临时的、自删除的队列非常容易。我认为你必须做一些管理是否可以删除队列。例如,除非客户端明确取消订阅或长时间处于非活动状态,否则永远不要删除队列。如果不这样做,整个收件箱的想法就会崩溃。

上面选择的答案与我在这里订阅的内容最相似,因为订阅是队列。如果我这样做了,那么我可能会将其实现为内部总线而不是观察者(因为它是不必要的) - 您按需为连接客户端创建一个消费者,该客户端实际上只是转发消息。消息消费者根据客户端是否连接进行订阅和取消订阅。正如 Kamal 所指出的,如果您的规模超过了 pubsub 允许的最大订阅数,您将遇到问题。如果你发现自己处于那个位置,那么你可以通过实现上面的模式来解除这个约束。这基本上是相同的模式,但您将责任转移到您的基础设施,其中唯一的限制是您自己的资源。

gRPC 使这种机制变得非常简单。或者,对于 Web,如果您使用 Microsoft 堆栈,那么 SignalR 也可以让这变得非常容易。客户端连接到集线器,您可以发布到所有连接的客户端。这里的消费者模式基本保持不变,但您不必手动实现观察者模式。

(注意:图中的箭头是依赖的方向,不是数据流的方向)

【讨论】:

    猜你喜欢
    • 2017-11-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-27
    • 2012-07-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多