【问题标题】:How to log incoming messages in apache beam pipeline如何在 Apache Beam 管道中记录传入消息
【发布时间】:2019-07-06 08:41:53
【问题描述】:

我正在编写一个简单的 apache 光束流管道,从 pubsub 主题获取输入并将其存储到 bigquery 中。几个小时以来,我以为我什至无法阅读消息,因为我只是试图将输入记录到控制台:

events = p | 'Read PubSub' >> ReadFromPubSub(subscription=SUBSCRIPTION)
logging.info(events)

当我将其写入文本时,它工作正常!但是,我对logger 的呼叫从未发生过。

人们如何开发/调试这些流式传输管道?

我尝试添加以下行: events | 'Log' >> logging.info(events)

使用print() 也不会在控制台中产生任何结果。

【问题讨论】:

    标签: google-cloud-dataflow apache-beam google-cloud-pubsub apache-beam-io


    【解决方案1】:

    这是因为eventsPCollection,所以您需要对其应用PTransform

    最简单的方法是将ParDo 应用于events

    events | 'Log results' >> beam.ParDo(LogResults())
    

    定义为:

    class LogResults(beam.DoFn):
      """Just log the results"""
      def process(self, element):
        logging.info("Pub/Sub event: %s", element)
        yield element
    

    请注意,如果您想在下游应用进一步的步骤,例如在记录元素后写入接收器,我也会生成元素。例如,请参阅问题 here

    【讨论】:

    • 虽然日志记录有效,但奇怪的是这会将类型从 str 转换为 int - TypeError: the JSON object must be str, bytes or bytearray, not int
    • 你能用更多的上下文来编辑问题,比如日志输出和完整代码吗? yield 而不是 return 也会发生同样的情况吗?
    • 我不会编辑问题,因为这是解决方案的问题。 yield 工作正常 - 知道为什么吗?我会说你应该编辑解决方案。
    • 当然,如果解决了问题,则无需为问题添加更多详细信息。我已经相应地编辑了答案。使用yield 会返回一个生成器
    • 这似乎发生在任何 DoFn 上,如果我使用返回数据类型更改...你知道原因吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-19
    • 1970-01-01
    • 2018-11-05
    • 1970-01-01
    • 2020-06-10
    相关资源
    最近更新 更多