【问题标题】:Processing multiline events from a text file in Dataflow在 Dataflow 中处理来自文本文件的多行事件
【发布时间】:2015-04-21 11:39:49
【问题描述】:

我正在尝试构建一个数据流管道来处理包含跨多行事件的文本文件。数据流 SDK TextIO 类假定每一行都是一个新事件。

我的计划是创建一个新的 TextReader 并将其注册到 DataPipelineRunner。这个新读者将知道如何将多行聚合成一行。

我很确定这种方法会奏效,但我想知道这是否是正确的方法,或者是否有更简单的解决方案?

我要解析的文本是:

==============> len:45 pktype:4 mtype:2
SYMBOL: USOCSTIA151632.00
OPEN_INT: 212
PR_OPEN_INTEREST: 212
TIME_STAMP: 04/10/2015 06:30:17:420  val:1428661817

结果应该是最后 4 行连接在一起并删除第一行。

最好的问候, 彼得

【问题讨论】:

  • 还有几个问题:1) 你有很多小文件,还是你的文件很大? (即您是否想在一个文件中并行处理) 2)说可以通过查找“============== 来检测文件中记录的开头是否正确>" ?
  • 有很多大(200G+)文件需要处理。以“===>”开头的行确实表示一条新记录,但我需要从输出中删除该行。
  • 谢谢!我相应地更新了我的答案。

标签: google-cloud-dataflow


【解决方案1】:

请注意,TextReader 是一个内部实现细节类,因此非常不鼓励对它进行子类化,并且很难正确地进行操作。

定义像您这样的基于文件的新格式的推荐方法是使用user-defined source API子类 FileBasedSource

在您的情况下,我建议您将类基于文档中的 LineIO 示例,并将其中定义的 LineReader 包装到您自己的类中,该类将使用 LineReader 作为读取各个行的助手,但是:

  • startReading() 中,它会一直跳到以“====>”开头的行
  • readNextRecord() 中,它将读取行直到下一个“====>”,并将它们捆绑到一个记录中。

请务必仔细阅读 FileBasedSource 和 FileBasedReader 的文档:并行化机制依赖于其中描述的一致性属性,您的格式必须满足这些属性,以确保记录不会在相邻处理分片之间的边界上重复或省略. XmlSource tests 是如何对这些属性进行单元测试的一个很好的例子。

请告诉我们它的进展情况并报告任何问题或疑问 - 我们非常希望收到有关此 API 的反馈。

【讨论】:

  • 是的,我很快发现扩展 TextReader 非常困难。我最初尝试对 Source 进行子类化,但遇到了一些问题。我会回头再看一遍。
  • “LineIO 示例”在哪里?
猜你喜欢
  • 2022-08-17
  • 2012-04-05
  • 2021-02-07
  • 2011-01-30
  • 2015-11-05
  • 1970-01-01
  • 1970-01-01
  • 2018-11-15
  • 2010-12-27
相关资源
最近更新 更多