【问题标题】:How to write a splittable DoFn in python - convert json to ndjson in apache beam如何在 python 中编写可拆分的 DoFn - 在 apache Beam 中将 json 转换为 ndjson
【发布时间】:2020-02-01 12:18:58
【问题描述】:

我在 GCS 中有一个 json 格式的大型数据集,我需要将其加载到 BigQuery 中。 问题是 json 数据不是存储在 NdJson 中,而是存储在几个大的 json 文件中,其中 JSON 中的每个键实际上应该是 json 本身的一个字段。

例如 - 下面的 Json:

{
  "johnny": {
    "type": "student"
  }, 
  "jeff": {
    "type": "teacher"
  }
}

应该转换成

[ 
  {
    "name": "johnny",
    "type": "student"
  }, 
  {
    "name": "jeff",
    "type": "teacher"
  }
]

我正在尝试通过 Google Data Flow 和 Apache Beam 来解决它,但性能很糟糕,因为 ech “Worker” 必须做很多工作:

class JsonToNdJsonDoFn(beam.DoFn):
    def __init__(self, pk_field_name):
        self.__pk_field_name = pk_field_name

    def process(self, line):
        for key, record in json.loads(line).items():
            record[self.__pk_field_name] = key
            yield record

我知道这可以通过将其实现为 SplittableDoFn 以某种方式解决 - 但 Python 中的实现示例并不十分清楚。我应该如何将此 DoFn 构建为可拆分的,以及如何将其用作管道的一部分?

【问题讨论】:

    标签: json google-cloud-dataflow apache-beam ndjson


    【解决方案1】:

    您需要一种方法来指定要处理的 json 文件的部分范围。例如,它可以是一个字节范围。

    Avro example in the blog post 不错。比如:

    class MyJsonReader(DoFn):
      def process(filename, tracker=DoFn.RestrictionTrackerParam)
        with fileio.ChannelFactory.open(filename) as file:
          start, stop = tracker.current_restriction()
          # Seek to the first block starting at or after the start offset.
          file.seek(start)
          next_record_start = find_next_record(file, start)
          while start:
            # Claim the position of the current record
            if not tracker.try_claim(next_record_start):
              # Out of range of the current restriction - we're done.
              return
            # start will point to the end of the record that was read
            record, start = read_record(file, next_record_start)
            yield record
    
      def get_initial_restriction(self, filename):
        return (0, fileio.ChannelFactory.size_in_bytes(filename))
    

    但是,json 没有明确的记录边界,因此如果您的工作必须从 548 字节开始,则没有明确的方法可以告诉您要转移多少。如果文件确实是您所拥有的,那么您可以跳过字节,直到看到模式"<string>": {。然后读取从{开始的json对象。

    【讨论】:

    • 您的示例仅将 AvroReader 更改为 MyJsonReader,这并没有真正的帮助
    猜你喜欢
    • 2020-11-23
    • 2020-06-14
    • 1970-01-01
    • 2021-03-07
    • 2022-12-31
    • 1970-01-01
    • 2019-10-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多