【发布时间】:2020-07-05 13:22:14
【问题描述】:
我正在使用以下非常简单的代码,它从 S3 存储桶读取 csv 或 parquet 文件并将其复制到另一个 S3 存储桶。
def read_s3_file_as_raw(bucket_name, path_to_file):
object = s3_client.get_object(Bucket=bucket_name, Key=path_to_file)
response_body = object['Body'].read().decode(encoding="utf-8",errors="ignore")
file_name = os.path.basename(path_to_file)
print(file_name + ' read from S3 successfully.')
return response_body
def read_s3_file_as_dataframe(bucket_name, path_to_file):
'''
read single csv or parquet file on s3 as dataframe
'''
file_name = os.path.basename(path_to_file)
if path_to_file.endswith('.csv'):
object = s3_client.get_object(Bucket=bucket_name, Key=path_to_file)
df = pd.read_csv(object['Body'])
print(file_name + ' read from S3 successfully.')
return df
elif path_to_file.endswith('.parquet'):
fs = s3fs.S3FileSystem()
p_dataset = pq.ParquetDataset(f"s3://{bucket_name}/{path_to_file}",filesystem=fs)
df = p_dataset.read().to_pandas()
print(file_name + ' read from S3 successfully.')
return df
当我在永久集群、EC2 甚至我的本地机器上运行代码时,它运行良好(适用于 csv 和 parquet)但是当我尝试通过瞬态 EMR 集群运行它时,我收到以下错误(仅适用于 parquet,没有csv文件的问题):
File "pyarrow/array.pxi", line 559, in pyarrow.lib._PandasConvertible.to_pandas
File "pyarrow/table.pxi", line 1367, in pyarrow.lib.Table._to_pandas
File "/usr/local/lib64/python3.6/site-packages/pyarrow/pandas_compat.py", line 769, in table_to_blockmanager
return BlockManager(blocks, axes)
File "/usr/local/lib64/python3.6/site-packages/pandas/core/internals/managers.py", line 141, in __init__
self._consolidate_check()
File "/usr/local/lib64/python3.6/site-packages/pandas/core/internals/managers.py", line 656, in _consolidate_check
ftypes = [blk.ftype for blk in self.blocks]
File "/usr/local/lib64/python3.6/site-packages/pandas/core/internals/managers.py", line 656, in <listcomp>
ftypes = [blk.ftype for blk in self.blocks]
File "/usr/local/lib64/python3.6/site-packages/pandas/core/internals/blocks.py", line 349, in ftype
return f"{dtype}:{self._ftype}"
File "/usr/local/lib64/python3.6/site-packages/numpy/core/_dtype.py", line 54, in __str__
return dtype.name
File "/usr/local/lib64/python3.6/site-packages/numpy/core/_dtype.py", line 347, in _name_get
if _name_includes_bit_suffix(dtype):
File "/usr/local/lib64/python3.6/site-packages/numpy/core/_dtype.py", line 326, in _name_includes_bit_suffix
elif np.issubdtype(dtype, np.flexible) and _isunsized(dtype):
File "/usr/local/lib64/python3.6/site-packages/numpy/core/numerictypes.py", line 726, in issubdtype
arg1 = dtype(arg1).type
TypeError: data type not understood
Command exiting with ret '1'
我正在使用以下命令来执行它:
aws emr create-cluster --applications Name=Hadoop Name=Spark \
--bootstrap-actions '[{"Path":"s3://propic-nonprod-datalake-force-transient/bootstrap3.sh","Name":"cluster_setup"}]' \
--service-role EMR_DefaultRole \
--release-label emr-5.20.0 \
--log-uri 's3n://propic-nonprod-datalake-logs/logs/emrtransientcluster/development/' \
--name 'emrtransientcluster-dataload-development' \
--instance-type m1.large --instance-count 1 \
--auto-terminate \
--steps Type=CUSTOM_JAR,Name=CustomJAR,ActionOnFailure=CONTINUE,Jar=s3://ap-southeast-2.elasticmapreduce/libs/script-runner/script-runner.jar,Args=["s3://propic-nonprod-datalake-force-transient/s3_file_transfer5.py"]
【问题讨论】:
标签: python amazon-emr parquet