【问题标题】:Apache beam global counterApache Beam 全局计数器
【发布时间】:2018-08-03 14:46:49
【问题描述】:

我有一个包含百万条记录的文件,其中一些记录是不良记录(在 ParDo 中处理记录时会知道这些记录)。我想将坏记录连同它们出现在文件中的行号一起写入单独的 PCollection,并将好的记录写入单独的 PCollection。

有没有办法维护一个全局计数器,记录到目前为止跨工作进程读取的行数,以便我可以用它来写出行号?

【问题讨论】:

  • 您可以使用metrics 来跟踪处理的元素数量,但由于工作是并行化的,因此不一定与输入文件中的行匹配。是否可以预处理数据以包含行号?例如,SOURCEROWNUMBER 的 Dataprep。

标签: python google-cloud-dataflow apache-beam


【解决方案1】:

您可以使用 Apache Beam 指标来保存一个全局监控计数器,您可以从您的机器或跑步者的 UI 中查询该计数器。

如果您想保留所有不良记录的精确集合,以及有关它们的信息(例如行号),那么您需要添加一个可以让您执行此操作的转换。像这样的:

original_records = p | LoadRecords()

class SplitRecords(beam.DoFn):
  BAD_RECORD_TAG = 'BadRecord'

  def process(self, record):
    if self.is_bad(record):
      # Output the record onto the 'special' BadRecord input.
      yield beam.pvalue.TaggedOutput(self.BAD_RECORD_TAG, record)
    else:
      yield record   # Output the record onto the main input

record_collections = (original_records | 
                      beam.ParDo(SplitRecords()).with_outputs(
                          SplitRecords.BAD_RECORD_TAG,
                          main='GoodRecords'))

bad_records = record_collections[SplitRecords.BAD_RECORD_TAG]

good_records = record_collections['GoodRecords']

对于更详细的示例,我建议您查看 Apache Beam 食谱目录,其中包含 example with a multiple-output ParDo

【讨论】:

  • 嗨@Pablo,我正在使用侧面输出来生成好的和坏的记录pCollections。但是,使用 Metrics 时,我可以在处理元素时递增计数器。但是,有没有办法在 Pardo 中查询 Metrics 以获取当前计数?我相信只有在工作完成后才能查询指标。
  • 您无法从 ParDo 中查询指标(尽管您可以在管道运行时从程序中查询它们,例如 p.metrics().query(...))。要拥有这样一个可访问的全局计数器,您需要在 ParDo 中访问的 redis 实例或 memcached 之类的东西。我相信一旦 State API 得到支持,你也能做到这一点
猜你喜欢
  • 1970-01-01
  • 2018-08-28
  • 2018-11-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-10-20
  • 2014-11-14
相关资源
最近更新 更多