【发布时间】:2021-11-12 01:59:34
【问题描述】:
我有一个简单的胶水 pyspark 作业,它通过胶水目录表连接到 Mongodb 源,并从 Mongodb 集合中提取数据,并使用胶水动态框架将 json 输出写入 s3。 这里的 Mongo 数据库是深度嵌套的 no sql 结构和数组。由于它是一个 no-sql 数据库,因此源模式不固定。嵌套列可能因文档而异。 但是,作业失败并出现以下错误。
错误:py4j.protocol.Py4JJavaError: An error occurred while calling o75.pyWriteDynamicFrame.: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, 10.3.29.22, executor 1): com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a IntegerType (value: BsonString{value=''})
由于数据类型不匹配,作业失败,我尝试了所有可能的解决方案,例如使用resolveChoice()。由于错误是针对具有“int”数据类型的属性,因此我尝试将所有具有“int”类型的属性转换为“string”。
我还尝试了dropnullfields、writing with spark dataframe、applymapping、without using catalog table (from_options directly from mongo table)、with and without repartition 的代码。
所有这些尝试都在代码中注释以供参考。
代码片段
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
print("Started")
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "<catalog_db_name>", table_name = "<catalog_table_name>", additional_options = {"database": "<mongo_database_name>", "collection": "<mongo_db_collection>"}, transformation_ctx = "datasource0")
# Code to read data directly from mongo database
# datasource0 = glueContext.create_dynamic_frame_from_options(connection_type = "mongodb", connection_options = { "uri": "<connection_string>", "database": "<mongo_db_name>", "collection": "<mongo_collection>", "username": "<db_username>", "password": "<db_password>"})
# Code sample for resolveChoive (converted all the 'int' datatype to 'string'
# resolve_dyf = datasource0.resolveChoice(specs = [("nested.property", "cast:string"),("nested.further[].property", "cast:string")])
# Code sample to dropnullfields
# dyf_dropNullfields = DropNullFields.apply(frame = resolve_dyf, transformation_ctx = "dyf_dropNullfields")
data_sink0 = datasource0.repartition(1)
print("Repartition done")
# Code sample to sink using spark's write method
# data_sink0.write.format("json").option("header","true").save("s3://<s3_folder_path>")
datasink1 = glueContext.write_dynamic_frame.from_options(frame = data_sink0, connection_type = "s3", connection_options = {"path": "s3://<S3_folder_path>"}, format = "json", transformation_ctx = "datasink1")
print("Data Sink complete")
job.commit()
注意
我不确定为什么会这样,因为这个问题是间歇性的。有时它工作得很好,但有时它会失败。所以很混乱。
我们将不胜感激。
【问题讨论】:
-
请编辑问题以将其限制为具有足够详细信息的特定问题,以确定适当的答案。
标签: mongodb pyspark etl aws-glue aws-glue-data-catalog