正如 Guillermo Cacheda 所说,您所需要的只是 PUB/SUB 快速入门。
但是,首先您需要一个部署在 GCP(谷歌云平台)上的项目。
假设您有一个项目并且您知道 project_ID。
您需要使用 pip 安装 google-cloud-pubsub。
/确保您使用的是 python 开发设置指南中所述的 virtualenv/
pip install --upgrade google-cloud-pubsub
/创建一个可以发布或订阅的主题。/
gcloud pubsub topics create my-topic
/发布消息/
from google.cloud import pubsub_v1
/TODO project_id = "您的 Google Cloud 项目 ID"
TODO topic_name = "您的 Pub/Sub 主题名称"/
publisher = pubsub_v1.PublisherClient()
/topic_path 方法创建一个完全限定的标识符
形式为projects/{project_id}/topics/{topic_name}/
topic_path = publisher.topic_path(project_id, topic_name)
for n in range(1, 10):
data = u"Message number {}".format(n)
# Data must be a bytestring
data = data.encode("utf-8")
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data)
print(future.result())
print("Published messages.")
/接收消息/
from google.cloud import pubsub_v1
/TODO project_id = "您的 Google Cloud 项目 ID" , TODO subscription_name = "您的 Pub/Sub 订阅名称", TODO timeout = 5.0 # "订阅者应在几秒内收听消息的时间"/
subscriber = pubsub_v1.SubscriberClient()
/subscription_path 方法创建一个完全限定标识符
形式为projects/{project_id}/subscriptions/{subscription_name}/
subscription_path = subscriber.subscription_path(
project_id, subscription_name
)
def callback(message):
print("Received message: {}".format(message))
message.ack()
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
print("Listening for messages on {}..\n".format(subscription_path))
/result() 如果未设置timeout,将来会无限期阻塞,
除非先遇到异常。/
尝试:
streaming_pull_future.result(超时=超时)
除了:
streaming_pull_future.cancel()
/最后为了避免对您的 GCP 帐户产生资源费用,请使用以下命令删除主题和订阅/
gcloud pubsub subscriptions delete my-sub
gcloud pubsub topics delete my-topic
/请注意,以上代码来自 GCP 文档 Quickstart-client-libraries for python。/