【问题标题】:AWS GLUE ERROR : An error occurred while calling o75.pyWriteDynamicFrame. Cannot cast STRING into a IntegerType (value: BsonString{value=''})AWS GLUE 错误:调用 o75.pyWriteDynamicFrame 时发生错误。无法将 STRING 转换为 IntegerType(值:BsonString{value=''})
【发布时间】:2021-11-12 01:59:34
【问题描述】:

我有一个简单的胶水 pyspark 作业,它通过胶水目录表连接到 Mongodb 源,并从 Mongodb 集合中提取数据,并使用胶水动态框架将 json 输出写入 s3。 这里的 Mongo 数据库是深度嵌套的 no sql 结构和数组。由于它是一个 no-sql 数据库,因此源模式不固定。嵌套列可能因文档而异。 但是,作业失败并出现以下错误。

错误py4j.protocol.Py4JJavaError: An error occurred while calling o75.pyWriteDynamicFrame.: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, 10.3.29.22, executor 1): com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a IntegerType (value: BsonString{value=''})

由于数据类型不匹配,作业失败,我尝试了所有可能的解决方案,例如使用resolveChoice()。由于错误是针对具有“int”数据类型的属性,因此我尝试将所有具有“int”类型的属性转换为“string”。 我还尝试了dropnullfieldswriting with spark dataframeapplymappingwithout using catalog table (from_options directly from mongo table)with and without repartition 的代码。 所有这些尝试都在代码中注释以供参考。

代码片段

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("Started")
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "<catalog_db_name>", table_name = "<catalog_table_name>", additional_options = {"database": "<mongo_database_name>", "collection": "<mongo_db_collection>"}, transformation_ctx = "datasource0")

# Code to read data directly from mongo database
# datasource0 = glueContext.create_dynamic_frame_from_options(connection_type = "mongodb", connection_options = { "uri": "<connection_string>", "database": "<mongo_db_name>", "collection": "<mongo_collection>", "username": "<db_username>", "password": "<db_password>"})

# Code sample for resolveChoive (converted all the 'int' datatype to 'string'
# resolve_dyf = datasource0.resolveChoice(specs = [("nested.property", "cast:string"),("nested.further[].property", "cast:string")])

# Code sample to dropnullfields
# dyf_dropNullfields = DropNullFields.apply(frame = resolve_dyf, transformation_ctx = "dyf_dropNullfields")

data_sink0 = datasource0.repartition(1)
print("Repartition done")

# Code sample to sink using spark's write method
# data_sink0.write.format("json").option("header","true").save("s3://<s3_folder_path>")

datasink1 = glueContext.write_dynamic_frame.from_options(frame = data_sink0, connection_type = "s3", connection_options = {"path": "s3://<S3_folder_path>"}, format = "json", transformation_ctx = "datasink1")
print("Data Sink complete")
job.commit()

注意

我不确定为什么会这样,因为这个问题是间歇性的。有时它工作得很好,但有时它会失败。所以很混乱。

我们将不胜感激。

【问题讨论】:

  • 请编辑问题以将其限制为具有足够详细信息的特定问题,以确定适当的答案。

标签: mongodb pyspark etl aws-glue aws-glue-data-catalog


【解决方案1】:

我也遇到了同样的问题。简单的解决方案是将样本大小从 1000(MongoDB 的默认值)增加到 100000。添加示例配置供您参考。

`read_config = {
    "uri": documentdb_write_uri,
    "database": "your_db",
    "collection": "your_collection",
    "username": "user",
    "password": "password",
    "partitioner": "MongoSamplePartitioner",
    "sampleSize": "100000",
    "partitionerOptions.partitionSizeMB": "1000",
    "partitionerOptions.partitionKey": "_id"
}`

【讨论】:

    猜你喜欢
    • 2020-11-05
    • 2018-10-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-20
    • 1970-01-01
    • 2018-09-18
    • 1970-01-01
    相关资源
    最近更新 更多