【发布时间】:2021-04-06 07:43:53
【问题描述】:
我有一个写在 pyspark 上的脚本。我尝试做的是使用 pyspark 从 AWS 的 S3 存储桶中读取 *.csv 文件。
我创建一个包含所有数据的 DataFrame,选择我需要的所有列并将它们转换为我的 Redshift 表期望的类型:
mapping = [('id', StringType), ('session', StringType), ('ip', StringType)]
df = spark.read.\
format("csv").\
option("header", True).\
load(f"...")
rows_to_map = [field[0] for field in columns_mapping]
# We need to select only specific columns
mapped_df = df.select(*rows_to_map)
# Now need to cast types
for mapping in columns_mapping:
mapped_df = mapped_df.withColumn(mapping[0], mapped_df[mapping[0]].cast(mapping[1]()))
mapped_df.printSchema()
mapped_df.write.format("com.databricks.spark.redshift").\
option("url", "...").\
option("dbtable", "...").\
option("tempdir", "...").\
option("user", "...").\
option("password", "...").\
option("aws_iam_role", "...").\
mode("append").\
save()
我在向 redshift 插入数据时收到错误消息:请检查“stl_load_errors”系统表了解详细信息。
我看到它尝试随机(几乎)从 csv 读取列。
我的数据框的SCHEMA:
|-- id: string (nullable = true)
|-- session: string (nullable = true)
|-- ip: string (nullable = true)
...
从第一行可以看出,它就像 id -> session -> ip ... 但是我的 Redshift 表显示了具有相同字段但顺序不同的架构。前 3 行:
|-- id: string (nullable = true)
|-- created_at: long (nullable = true)
|-- session: string (nullable = true)
因此,在第二列中,他哭着说我正在尝试将 STRING 写入 LONG 列。他从文件会话中读取而不是 created_at。
问题:我的 DataFrame(tmp_file) 中的列顺序是否重要? 有什么解决办法吗?处理每个文件会花费太多时间。
感谢您的帮助。
【问题讨论】:
标签: amazon-web-services apache-spark pyspark amazon-redshift etl