【发布时间】:2017-05-29 03:17:43
【问题描述】:
我正在使用 Apache Beam 2.x 和 Python 创建一个谷歌数据流管道
基本上我有一个文本文件,每个新行都包含一个英文句子。
我正在尝试为每个新行/句子调用 Google NLP (Sentiments) API。
所以我有一个调用 NLP API 的函数:
class CalculateSentiments(beam.DoFn):
def process(self, element):
language_client = language.Client()
pre_text = re.sub('<[^>]*>', '', element)
text = re.sub(r'[^\w]', ' ', pre_text)
document = language_client.document_from_text(text)
sentiment = document.analyze_sentiment().sentiment
return sentiment.score
我正在使用 ParDo 为每个句子调用此函数。我假设,以下 ParDo 将自动为文本文件中的每一行调用 NLP 情感 api(基本上我不必遍历文本文件中的每一行!?)
output = lines | beam.ParDo(CalculateSentiments())
output | WriteToText(known_args.output)
但是我在执行数据流后得到这个错误:
TypeCheckError:FlatMap 和 ParDo 必须返回一个可迭代对象。而是返回。 [运行时 'ParDo(CalculateSentiments)'] Traceback(最近一次通话最后一次):
文件 “/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py”, 第 297 行,在 调用 evaluator.process_element(值)文件“/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py”, 第 366 行,在 process_element 中 self.runner.process(元素)文件“/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/runners/common.py”, 第 267 行,处理中 self.reraise_augmented(exn) 文件“/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/runners/common.py”, 第 263 行,处理中 self._dofn_simple_invoker(元素)文件“/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/runners/common.py”, 第 198 行,在 _dofn_simple_invoker self._process_outputs(element, self.dofn_process(element.value)) 文件 "/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", 第 60 行,处理中 返回self.wrapper(self.dofn.process,args,kwargs)文件“/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py”, 第 84 行,在包装器中 返回self._check_type(结果)文件“/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py”, 第 98 行,在 _check_type 中 % 类型(输出))
我做错了什么?我使用 Pardo 的方式与 Apache beam doco 中所示的非常相似!
有什么想法吗?
【问题讨论】:
标签: python python-2.7 google-cloud-platform google-cloud-dataflow apache-beam