【问题标题】:Apache Beam TextIO.Read with line numberApache Beam TextIO.Read 行号
【发布时间】:2018-06-06 19:08:47
【问题描述】:

是否可以通过从 TextIO.Read 读入 PCollection 的行来访问行号?对于这里的上下文,我正在处理一个 CSV 文件,并且需要访问给定行的行号。

如果无法通过 TextIO.Read 实现,似乎应该可以使用某种自定义读取或转换,但我无法确定从哪里开始。

【问题讨论】:

  • 您可能需要从 textIO 继承并添加自定义逻辑以输出元组 。或者作为替代方案,将此 LINE_NUMBER 作为额外列添加到 CSV 文件的每一行(我的意思是修改您的输入文件内容);然后你可以使用原始的 textIO 和后续的 DoFn 来提取行号。
  • 关于如何从 textIO 继承的任何指针?没有真正看到在那里我可以获取行号。
  • 所以我猜你不能通过添加一个额外的列来修改你的输入数据?那么,我想您可能必须从 filebasedsource.FileBasedSource 开始(python:github.com/apache/beam/blob/master/sdks/python/apache_beam/io/…)。实现您自己的 read_records() 方法,该方法返回一个迭代器,该迭代器提供 2 元组(行、行号)。您可以为您引入的新 source 类设置 splittable = False,这样逻辑会更容易(至少开始时)。

标签: google-cloud-dataflow apache-beam


【解决方案1】:

您可以使用FileIO 手动读取文件,您可以在读取ReadableFile 时确定行号。

一个简单的解决方案如下:

p
    .apply(FileIO.match().filepattern("/file.csv"))
    .apply(FileIO.readMatches())
    .apply(FlatMapElements
            .into(strings())
            .via((FileIO.ReadableFile f) -> {
                List<String> result = new ArrayList<>();
                try (BufferedReader br = new BufferedReader(Channels.newReader(f.open(), "UTF-8"))) {
                    int lineNr = 1;
                    String line = br.readLine();
                    while (line != null) {
                        result.add(lineNr + "," + line);
                        line = br.readLine();
                        lineNr++;
                    }
                } catch (IOException e) {
                    throw new RuntimeException("Error while reading", e);
                }
                return result;
            }));

上面的解决方案只是将行号添加到每个输入行。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-16
    • 1970-01-01
    • 1970-01-01
    • 2018-12-07
    • 2021-02-25
    相关资源
    最近更新 更多