【发布时间】:2021-06-05 01:28:32
【问题描述】:
我正在 AWS Glue Studio 中构建一个 ETL 流程,我在其中获取存储桶 s3 中的数据以删除一些字段。在这个过程之后,我需要使用自定义转换来覆盖一些数据,然后将其保存在新的 s3 存储桶中。
当我使用自定义转换时,会显示一个块代码。所以我需要在那里创建我的代码。
我的问题是关于它,我如何使用它来构建我的转换,更具体地说是关于输入和输出数据,因为我不了解 AWS 上的这个过程。
我现在所做的是创建将处理这些数据的函数。我使用 dfc['my_field'] 获取我的字段并应用了转换,但我不明白我需要返回什么以及如何将其保存在 s3 存储桶中。下面你可以看到我的功能。
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
import random
import hashlib
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, lit
df = dfc.select(list(dfc.keys())[0]).toDF()
print('printing key0')
df.show(3)
print('printed key0')
print('imprimindo o tipo de df')
print(type(df))
def encrypt_phone(df):
'''
Criptografa dados de telefone
'''
if df.withColumn('phone_main', df('phone_main')) is not None:
hl = hashlib.sha256()
hl.update(df.withColumn('phone_main', df('phone_main'))).encode().hexdigest()
hl.update(df.withColumn('phone_alternative', df('phone_alternative'))).encode().hexdigest()
return df
# Criar um UDF para encriptar os dados
from pyspark.sql.functions import udf
encrypted_udf = udf(encrypt_phone)
print(encrypted_udf)
# Renomear coluna phone_main e phone_alternative
df.withColumnRenamed('phone_main', 'old_phone_main') \
.withColumnRenamed('phone_alternative', 'old_phone_alternative')
# Criar novas colunas para dados transformados
new_df = encrypt_phone(df)
encrypted = df.withColumn('phone_main', new_df('phone_main'))
encrypted = df.withColumn('phone_alternative',new_df('phone_alternative'))
print(encrypted)
# Remover campos sensíveis
encripted_df = df.drop('cpf_cnpj', 'name', 'nickname', 'old_phone_main', 'old_phone_alternative')
print(df_encrypted)
# Mascarar email
print(encripted_df.withColumn('email', df('email')))
if encripted_df.withColumn('email', df('email')) is not None:
masked_df = encripted_df.withColumn('email', lit('****@*****'))
# Converter spark dataframe para Glue Dynamic Frame e retornar como um dataframe collection
df_overshadow = DynamicFrame.fromDF(masked_df, glueContext, "df_overshadow")
return (DynamicFrameCollection({"CustomTransform0": df_overshadow}, glueContext))
我在胶水工作室的过程被构造成这样的图像:
但是,当我尝试保存时显示错误:
Parent node overshadow outputs a collection, but node Process-Client_PFS does not accept a collection.
我需要应用转换并保存数据。我该怎么做?
【问题讨论】:
标签: amazon-web-services aws-glue