【问题标题】:Creating a Dict from CSV dataflow python从 CSV 数据流 python 创建一个字典
【发布时间】:2018-12-05 18:39:37
【问题描述】:

我正在尝试从 python 中的 csv 数据创建一个 dict,我不想使用传统的 split(','),然后使用将行重命名为我想要的标题,因为我将收到不同的 csv具有不同信息量的文件,并且我将无法使用该方法始终如一地定位我想要的行。

标题名称将保持一致,只是与另一个文件相比,一个文件中的标题可能更多

相反,我一直在尝试从 CSV 文件中制定一个列表,然后将第一行压缩到其余行中以创建字典,然后我可以提取我想要的确切内容。

我可以使用 csv.reader 或:

创建一个列表列表
class Split(beam.DoFn):
    def process(self, element):
        rows = element.splitlines()
        data = []
        for row in rows:
            data.append([row])
        return data

这会返回:

[u'FIRST_NAME,last_name,birthdate,voter_id,phone_number']
[u'hector,ABAD,6/15/1970,11*******,7*********']
[u'm,ABAL,6/16/1949,12********,']
[u'jorge,ABDALA,6/15/1962,21********,3********']
[u'karen,ABELLA,6/18/1988,33********,']

虽然当我尝试通过以下方式访问第一行时:

rows = element.splitlines()
data = []
for row in rows:
    # f = pattern.findall(row)
    data.append([row])
return data[0]

返回:

FIRST_NAME,last_name,birthdate,voter_id,phone_number
hector,ABAD,6/15/1970,11*******,7*********
m,ABAL,6/16/1949,109055849,
jorge,ABDALA,6/15/1962,21********,3********
karen,ABELLA,6/18/1988,33********,

我也尝试过 beam_utils csv 阅读器,尽管这表明在我修复 fileio 错误后没有名为“sources”的模块。

如果有人知道更好的方法或者可以指出我做错了什么,那就太好了,这也是我的管道:

with beam.Pipeline(options=pipeline_options) as p:
    (p
     | 'Read' >> ReadFromText(known_args.input)
     | 'Split Values' >> beam.ParDo(Split())
     | 'WriteToText' >> beam.io.WriteToText(known_args.output)) 

我现在只从我的 google-cloud 存储桶中读取,但将来它将来自 pubsub。

我希望内容看起来像:

{"FIRST_NAME": "hector", "last_name": "ABAD", "birthdate": "6/15/1970", "voter_id": 11*******, "phone_number": 7*********}
etc.
etc.
etc.

【问题讨论】:

    标签: python list csv dictionary google-cloud-dataflow


    【解决方案1】:

    python beam SDK 似乎不太支持处理 csv 文件的标头元素(除了丢弃它)。幸运的是,有人创建了这个 repo 来处理这个用例:https://github.com/pabloem/beam_utils

    它包含一个扩展 FileBasedSource(Beam 用于创建自定义文件源的抽象类)的 CSVFileSource 类,以从具有可变标题的文件创建您的 dict。

    安装:

    pip install beam_utils
    from beam_utils.sources import CsvFileSource
    

    可以这样使用:

     p | 'ReadCsvFile' >> beam.io.Read(CsvFileSource(known_args.input))
    

    应该产生您正在寻找的输出。

    编辑:要使数据流工作人员可以使用包,请创建一个 tar 并使用 --extra_package 标志提供给作业,如 https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#local-or-nonpypi

    【讨论】:

    • 感谢 Ryan,我现在正在运行它,虽然我之前尝试过使用它,但第一个错误是找不到 fileio,所以我将 fireio 切换到文件系统,尽管它会产生错误ImportError: No module named beam_utils.sources,因此在我使用 requirements.txt 文件初始化应用程序并返回第一个错误之后。
    • 试试这个:"pip install git+github.com/pabloem/beam_utils --upgrade" master 分支包含针对该问题的修复程序,它不在 pip 包中。现在为我工作。
    • 我仍然收到No module named beam_utils.sources,尽管我在 phonenumbers 模块上遇到了同样的问题,并且通过 requirements.txt 文件解决了这个问题,所以我将 github 模块作为我的目标requirements.txt 文件,希望它能正常工作。另外,在进行 pip 安装时,我使用了pip install git+https://github.com/pabloem/beam_utils --upgrade,但这应该没什么区别吧?正如你所说的那样,它给了我parse error at "'+github.'"
    • 看看beam.apache.org/documentation/sdks/python-pipeline-dependencies/… 虽然 beam-utils 在 PyPI 中,但它是旧版本。因此需要按照“本地或非 PyPI 依赖项”下的步骤为数据流工作人员提供正确的版本。
    【解决方案2】:

    查看python库模块csv.DictReader:https://docs.python.org/2/library/csv.html#csv.DictReader

    从文档中复制示例以供快速参考

    >>> import csv
    >>> with open('names.csv') as csvfile:
    ...     reader = csv.DictReader(csvfile)
    ...     for row in reader:
    ...         print(row['first_name'], row['last_name'])
    

    【讨论】:

    • 谢谢@Priyeshj,我已经检查过了,问题是我正在从函数中的一个元素中读取,因此我尝试了:with element as csvfile: reader = csv.DictReader(csvfile) etc 但它只是返回了AttributeError: __exit__ [while running 'Split Values']跨度>
    猜你喜欢
    • 2016-11-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-04-07
    • 2021-11-11
    • 2017-05-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多