【发布时间】:2017-10-23 19:12:36
【问题描述】:
我在 Python 客户端 API 中找不到 returnImmediately 标志。 有什么具体原因吗? 是否有另一种方法可以从 Python 中的订阅同步拉取排队消息?
【问题讨论】:
标签: python google-cloud-platform google-cloud-pubsub
我在 Python 客户端 API 中找不到 returnImmediately 标志。 有什么具体原因吗? 是否有另一种方法可以从 Python 中的订阅同步拉取排队消息?
【问题讨论】:
标签: python google-cloud-platform google-cloud-pubsub
Google 不提供类似的功能。但是您可以通过实现自己的队列轻松解决它
from Queue import Queue
from google.cloud import pubsub
subscriber = pubsub.SubscriberClient()
topic = "projects/newproject-xxxxx/topics/tarunlalwani"
subscription_name = 'projects/newproject-xxxxx/subscriptions/sub1'
class SynchronousSubscription(object):
def callback(self, message):
print(message.data)
message.ack()
self.pending_messages.put(message)
def __init__(self, subscription):
self.subscription_future = subscriber.subscribe(subscription_name, self.callback)
self.pending_messages = Queue()
def consume_sync(self):
return self.pending_messages.get()
sub = SynchronousSubscription(subscription_name)
data = sub.consume_sync()
当我测试时它确实对我很有用
【讨论】:
扩展上一个答案:
目前存在具有所需功能的函数,这是来自subscriber_client.py 的文档:
def pull(self,
subscription,
max_messages,
return_immediately=None,
options=None):
...
Args:
...
return_immediately (bool): If this field set to true, the system
will respond immediately even if
it there are no messages available to return in the ``Pull`` response.
Otherwise, the system may wait (for a bounded amount of time) until at
least one message is available, rather than returning no messages. The
client may cancel the request if it does not wish to wait any longer for
the response.
但是执行,首先阅读this comment,返回两个异常(我提出的一个是两个的聚合):
RetryError(未分类的重试方法发生异常 作为瞬态,由终止的 RPC 的 <_rendezvous>)
如果您想了解更多详情,请联系related issue。
【讨论】:
Cloud Pub/Sub 客户端库不直接公开 pull 方法,而是提供了一个为高效接收消息而设计的异步 API。如果您有特定的原因要调用同步拉取方法(包括使用 returnImmediately 属性),那么您需要生成基于 gRPC 的库。您将需要获取service definition,然后获取generate the client。或者,您可以使用 REST API version of pull 发出 HTTP 请求。
【讨论】:
Google Cloud 之前的官方gcloud python 库(最新版本是 0.18.3,在 pip 中可用)确实对惯用 python 中的拉取函数有稳定的支持。尽管 Cloud Pub/Sub API 是 GA,因此这个已弃用的库应该是稳定的,但请注意,该库不会获得任何更新。在过去的两年中,我广泛使用它,没有发生任何事故。
from gcloud import pubsub
# Connect to pubsub
client = pubsub.Client(project='myproject')
topic = client.topic('mytopic')
sub = topic.subscription('mysub')
if not topic.exists():
topic.create()
if not sub.exists():
sub.create()
# In your code, use a try-except for this pull and handle failures appropriately
recv = sub.pull(return_immediately=False, max_messages=1)
ack_id, msg = recv[0]
msg_attribute = msg.attributes['myattribute']
msg_data = msg.data
sub.acknowledge([ack_id, ])
【讨论】: