【问题标题】:How do I "create"/"assign" a logging handler for Google Cloud Pubsub?如何为 Google Cloud Pubsub“创建”/“分配”日志处理程序?
【发布时间】:2019-06-14 10:21:53
【问题描述】:

previous thread 的开发发现,提出问题时的假设是题外话(子流程实际上并没有导致问题),所以我正在做一个更有针对性的帖子。

我的错误信息:

找不到记录器的处理程序 “google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager”

我的意图:

将 Google PubSub 消息属性作为 Python 变量传递,以便在以后的代码中重复使用。

我的代码:

import time
import logging

from google.cloud import pubsub_v1

project_id = "redacted"
subscription_name = "redacted"

def receive_messages_with_custom_attributes(project_id, subscription_name):
    """Receives messages from a pull subscription."""
    # [START pubsub_subscriber_sync_pull_custom_attributes]

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message.data))
        if message.attributes:
            #print('Attributes:')
            for key in message.attributes:
                value = message.attributes.get(key);
                #commented out to not print to terminal
                #which should not be necessary
                #print('{}: {}'.format(key, value))
        message.ack()

        print("this is before variables")
        dirpath = "~/subfolder1/"
        print(dirpath)
        namepath = message.data["name"]
        print(namepath)
        fullpath = dirpath + namepath
        print(fullpath)
        print("this is after variables")


    subscriber.subscribe(subscription_path, callback=callback)
    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)
    # [END pubsub_subscriber_sync_pull_custom_attributes]

receive_messages_with_custom_attributes(project_id, subscription_name)

我运行上述代码后的完整控制台输出:

Listening for messages on projects/[redacted]
Received message: {
  "kind": "storage#object",
  "id": "[redacted]/0.testing/1548033442364022",
  "selfLink": "https://www.googleapis.com/storage/v1/b/[redacted]/o/BSD%2F0.testing",
  "name": "BSD/0.testing",
  "bucket": "[redacted]",
  "generation": "1548033442364022",
  "metageneration": "1",
  "contentType": "application/octet-stream",
  "timeCreated": "2019-01-21T01:17:22.363Z",
  "updated": "2019-01-21T01:17:22.363Z",
  "storageClass": "MULTI_REGIONAL",
  "timeStorageClassUpdated": "2019-01-21T01:17:22.363Z",
  "size": "0",
  "md5Hash": "1B2M2Y8AsgTpgAmY7PhCfg==",
  "mediaLink": "https://www.googleapis.com/download/storage/v1/b/[redacted]/o/BSD%2F0.testing?generation=1548033442364022&alt=media",
  "crc32c": "AAAAAA==",
  "etag": "CPb0uvvZ/d8CEAE="
}

this is before variables
/home/[redacted]
No handlers could be found for logger "google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager"

如您所见,第一个字符串和 string-defined-as-variable 已打印,但代码在尝试从刚刚生成的字典中定义变量时中断,并且没有进一步执行 print()s。

Potentially related thread,该用户使用 cron 作业发布,并从 crontab envpaths 中找到了修复程序,但我的情况是接收但未使用任何 cron 作业,但可能暗示 python 后面/内部的另一层?

谁能帮我添加一个处理程序以使此代码按预期运行?

【问题讨论】:

  • 那么你在运行这个时是否设置了GOOGLE_APPLICATION_CREDENTIALS
  • 它在谷歌云引擎上运行,所以我只去了gcloud auth login。够了吗,还是我需要下载和export .json 密钥?

标签: python logging google-cloud-platform handler google-cloud-pubsub


【解决方案1】:

首先,如果我对您在输出中显示的内容理解正确,那么您在对 Cloud Storage 对象进行更改时使用 Pub/Sub 通知发送消息。这些信息可能会有所帮助。

现在,message.data["name"] 将不起作用,因为 message.data 是 BYTES object。因此,不能作为字典索引。

要将其视为 dict,您首先必须将其解码为 base64 (import base64)。之后,剩下的是一个看起来像 JSON 格式的字符串。然后您使用json.load() (不要忘记import json 将此字符串转换为字典。现在您可以索引消息了。

代码如下:

print("This is before variables")
dirpath = "/subfolder1/"
print(dirpath)

#Transform the bytes object into a string by decoding it
namepath = base64.b64decode(message.data).decode('utf-8')

#Transform the json formated string into a dict
namepath = json.loads(namepath)

print(namepath["name"])
fullpath = dirpath + namepath["name"]
print(fullpath)
print("this is after variables")

现在,如果您的意图是仅读取属性,则它们在顶部正确定义,例如:

    if message.attributes:
        print('Attributes:')
        for key in message.attributes:
            value = message.attributes.get(key)
            print('{}: {}'.format(key, value))

所以,你可以使用:

    print("this is before variables")
    dirpath = "~/subfolder1/"
    print(dirpath)
    namepath = message.attributes["objectId"]
    print(namepath)
    fullpath = dirpath + namepath
    print(fullpath)
    print("this is after variables")

请记住,对于这种特殊情况,"objectId" 是文件的名称,因为它是来自 Cloud Storage 的 Pub/Sub 通知使用的属性。如果您假装发送自定义消息,请将 "objectId" 更改为您想要的属性名称。

【讨论】:

  • 使用ascii 编解码器的前提是bytes 对象不包含非ASCII 值。在一般情况下,您必须知道(或正确猜测)编码,尽管盲目地假设像 cp-1252 这样的东西是一种快速而肮脏的解决方法。 (不过,完全盲目地这样做会产生太多的错误和问题,即不理解其中的含义。谷歌“mojibake”。)
  • 是的!你说的对。字节对象以 base64 [cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage] 编码。我更新了我的答案。
  • 感谢您的帮助!你肯定在这里做一些事情,但它还没有完全工作。我在每一行之间添加了namepath = base64.b64decode(message.data).decode('utf-8')namepath = json.loads(namepath),并在print("string") 处添加了base64.b64decode,代码仍然在base64.b64decode 处中断,给出了关于处理程序的相同错误。我对缩进的使用正确吗?还是它们太“未缩进”并且丢失了 message.data 变量?
  • 我尝试过更深的缩进,使其与iffor 部分保持一致。在这两种情况下,代码仍然在 base64.b64decode 处中断,但输出控制台变得疯狂,因为它没有 message.ack() 并继续循环。
  • 我通过为 pubsub appengine 网站调整代码示例取得了一些进展(它在 html 上显示主题中的所有消息)。就我而言,message.ack() 之后的第一步是执行json.loads,即payload = json.loads(message.data.decode('utf-8'))。这个prints 是控制台中字符串中的整个消息。handler 错误仍然存​​在,现在我必须弄清楚如何将其修剪为objectId...
【解决方案2】:

正如 Nahuel 和 Tripleee 所解释的,问题在于消息是 BYTES 而不是字符串。然而,他们的代码并不完全有效,仍然抛出错误,我不知道为什么。通过与 Google 的 pubsub appengine 网站示例代码进行交叉引用,并经过几个小时的反复试验,我发现以下代码可以正常工作。 可能不优雅和/或有不良做法,在这种情况下,请对其进行编辑并使其更加健壮。

#Continues from after message.ack(), above code remains unchanged
#except needing to <import json>

    #this makes a message.data a true python dict with strings.
    payload = json.loads(message.data.decode('utf-8')) 

    #this finds the value of the dict with key "name"
    namepath = payload["name"]

    #this is just a static string to pre-pend to the file path
    dirpath = "/home/[redacted]/"

    #combine them into a single functioning path
    fullpath = dirpath + namepath

    #currently type 'unicode', so convert them to type 'str'
    fullpath = fullpath.encode("utf-8")

最后,我们将拥有一个纯类型“str”的完整路径,供以后的函数/命令使用。

【讨论】:

    猜你喜欢
    • 2019-02-10
    • 1970-01-01
    • 2018-06-22
    • 2016-07-26
    • 1970-01-01
    • 1970-01-01
    • 2023-03-25
    • 2016-08-09
    • 2018-02-09
    相关资源
    最近更新 更多