【问题标题】:Using PANDAS with Apache Beam将 PANDAS 与 Apache Beam 一起使用
【发布时间】:2020-01-17 15:01:10
【问题描述】:

我是 Apache Beam 的新手,刚开始使用 Python SDK 进行开发。 关于 Apache Beam,我知道高级别的 Pipelines、Pcollections、Ptransforms、ParDo 和 DoFn。

在我当前的项目管道中,已经使用 PANDAS 实现了使用下面提到的语法来读取、转换和写入文件

我想了解这是否是 Apache Beam 的正确实现,因为我们仅使用 PANDAS 直接读取和写入文件,而不是逐个元素地处理文件。

步骤:

  1. 创建管道
  2. 创建输入文件路径的 pcollection
  3. 调用DoFn并传递文件路径
  4. 使用 PANDAS 在 DoFn 中执行所有操作(读取、转换和写入)。

示例高级代码:

import **required libraries

class ActionClass(beam.DoFn):

    def process(self, file_path):
        #reading file using PANDAS into dataframe 
        df = pandas.read_csv('file_path')
        # do some transformation using pandas
        #write dataframe to output file from inside DoFn only.
        return

def run():

    p = beam.Pipeline(options=options)

    input = p | beam.io.ReadFromText('input_file_path') --reading only file path

    output = input | 'PTransform' | beam.ParDo(ActionClass)

【问题讨论】:

    标签: python pandas apache-beam


    【解决方案1】:

    在我看来,如果您想要使用 pandas 处理大量小型 CSV 文件,那么这可能是 Apache Beam 的有效用例。

    谢谢

    【讨论】:

      【解决方案2】:

      我的看法是你没有使用光束的力量。

      因为您的解决方案没有采用光束真正有用的并行过程。

      我建议您使用 ReadFromText 阅读 CSV 并使用 Map 或 ParDo 对数据进行转换 在这种情况下,Beam 将读取 CSV,并可以通过您执行转换的不同工作人员分发数据。

      现在根据您的尝试,您可以直接在 Beam 上使用数据框 https://beam.apache.org/documentation/dsls/dataframes/overview/

        from apache_beam.dataframe.io import read_csv
      
      with beam.Pipeline() as p:
        df = p | read_csv("gs://apache-beam-samples/nyc_taxi/misc/sample.csv")
        agg = df[['passenger_count', 'DOLocationID']].groupby('DOLocationID').sum()
        agg.to_csv('output')
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2022-12-24
        • 1970-01-01
        • 2018-07-26
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-12-11
        相关资源
        最近更新 更多