【问题标题】:priotizing a message on Google Pubsub优先考虑 Google Pubsub 上的消息
【发布时间】:2018-09-24 20:05:12
【问题描述】:

我有一个包含多个请求订阅的 pubsub 主题。我想要一些机制,我可以发布带有“优先级”标签的消息,使消息尽可能地跳到队列的前面。

我不需要任何有保证的排序语义,只需要“尽力而为”的优先级机制。

使用 pubsub 可以实现这样的事情吗?

【问题讨论】:

  • GCP Pub/Sub 是其他 Pub/Sub 引擎的逻辑功能的子集。例如,如果我们将 JMS 视为一种替代的 Pub/Sub 技术,我们会发现“设置消息优先级”的概念 (javaee.github.io/tutorial/jms-concepts004.html)。但是,GCP Pub/Sub 中不存在这样的概念。

标签: google-cloud-pubsub


【解决方案1】:

Google Cloud Pub/Sub 中不存在这种机制,不。只有当您的订阅者无法跟上发布速度并因此积压时,这样的功能才真正变得相关。如果订阅者正在快速处理和确认消息,那么“优先级”消息的概念就没有必要了。

如果正在积压并且需要以更高优先级处理某些消息,那么一种方法是创建“高优先级”主题和订阅。订阅者订阅此订阅以及“普通”订阅,并在消息到达时优先处理来自“高优先级”订阅的消息。

【讨论】:

    【解决方案2】:

    @Kamal's answer 提供示例实现,以尝试为以下人员提供更多上下文:

    ...当来自“高优先级”订阅的消息到达时,优先处理它们

    import logging
    import threading
    from google.cloud import pubsub
    from google.cloud.pubsub_v1.types import FlowControl
    
    logging.basicConfig(format="%(asctime)s %(message)s", level=logging.INFO)
    
    c = threading.Condition()
    n_priority_messages = 0
    
    def priority_callback(message):
        logging.info(f"PRIORITY received: {message.message_id}")
        global n_priority_messages
        c.acquire()
        n_priority_messages += 1
        c.release()
        handle_message(message)
        logging.info(f"PRIORITY handled: {message.message_id}")
        c.acquire()
        n_priority_messages -= 1
        if n_priority_messages == 0:
            c.notify_all()
        c.release()
    
    def batch_callback(message):
        logging.info(f"BATCH received: {message.message_id}")
        done = False
        modify_count = 0
        global n_priority_messages
        while not done:
            c.acquire()
            priority_queue_is_empty = n_priority_messages == 0
            c.release()
            if priority_queue_is_empty:
                handle_message(message)
                logging.info(f"BATCH handled: {message.message_id}")
                done = True
            else:
                message.modify_ack_deadline(15)
                modify_count += 1
                logging.info(
                    f"BATCH modifyed deadline: {message.message_id} - count: {modify_count}"
                )
                c.acquire()
                c.wait(timeout=10)
                c.release()
    
    subscriber = pubsub.SubscriberClient()
    
    subscriber.subscribe(
            subscription=batch_subscription,
            callback=batch_callback,
            # adjust according to latency/throughput requirements
            flow_control=FlowControl(max_messages=5)
    )
    
    pull_future = subscriber.subscribe(
            subscription=priority_subscription,
            callback=priority_callback,
            # adjust according to latency/throughput requirements
            flow_control=FlowControl(max_messages=2)
    )
    
    pull_future.result()
    

    优先级和批处理消息积压时的示例输出:

    ...
    2021-07-29 10:25:00,115 PRIORITY received: 2786647736421842
    2021-07-29 10:25:00,338 PRIORITY handled: 2786647736421841
    2021-07-29 10:25:00,392 PRIORITY received: 2786647736421843
    2021-07-29 10:25:02,899 BATCH modifyed deadline: 2786667941800415 - count: 2
    2021-07-29 10:25:03,016 BATCH modifyed deadline: 2786667941800416 - count: 2
    2021-07-29 10:25:03,016 BATCH modifyed deadline: 2786667941800417 - count: 2
    2021-07-29 10:25:03,109 BATCH modifyed deadline: 2786667941800418 - count: 2
    2021-07-29 10:25:03,109 BATCH modifyed deadline: 2786667941800419 - count: 2
    2021-07-29 10:25:03,654 PRIORITY handled: 2786647736421842
    2021-07-29 10:25:03,703 PRIORITY received: 2786647736421844
    2021-07-29 10:25:03,906 PRIORITY handled: 2786647736421843
    2021-07-29 10:25:03,948 PRIORITY received: 2786647736421845
    2021-07-29 10:25:07,212 PRIORITY handled: 2786647736421844
    2021-07-29 10:25:07,242 PRIORITY received: 2786647736421846
    2021-07-29 10:25:07,459 PRIORITY handled: 2786647736421845
    2021-07-29 10:25:07,503 PRIORITY received: 2786647736421847
    2021-07-29 10:25:10,764 PRIORITY handled: 2786647736421846
    2021-07-29 10:25:10,807 PRIORITY received: 2786647736421848
    2021-07-29 10:25:11,004 PRIORITY handled: 2786647736421847
    2021-07-29 10:25:11,061 PRIORITY received: 2786647736421849
    2021-07-29 10:25:12,900 BATCH modifyed deadline: 2786667941800415 - count: 3
    2021-07-29 10:25:13,016 BATCH modifyed deadline: 2786667941800416 - count: 3
    2021-07-29 10:25:13,017 BATCH modifyed deadline: 2786667941800417 - count: 3
    2021-07-29 10:25:13,110 BATCH modifyed deadline: 2786667941800418 - count: 3
    2021-07-29 10:25:13,110 BATCH modifyed deadline: 2786667941800419 - count: 3
    2021-07-29 10:25:14,392 PRIORITY handled: 2786647736421848
    2021-07-29 10:25:14,437 PRIORITY received: 2786647736421850
    2021-07-29 10:25:14,558 PRIORITY handled: 2786647736421849
    ...
    

    【讨论】:

    • 好吧,我真的不建议做这么复杂的事情。使用这种方法,您可能会使您的非优先级队列饿死,尤其是当您开始处理更多消息时。此外,当分布在多个服务器上时,这种方法可能无法达到预期效果。
    • @AlexFlint 我明白你关于让非优先级队列挨饿的观点。 batch_callback(message) 应该发布一条消息,而不是无限期地修改它的截止日期。上述订阅者旨在在水平扩展部署的每个 pod 上运行。因此,理想情况下,一旦部署规模扩大到足够大小,就会处理非优先级队列中的消息。
    猜你喜欢
    • 2020-04-20
    • 2021-03-18
    • 2015-12-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-10-27
    • 1970-01-01
    • 2022-11-28
    相关资源
    最近更新 更多