【问题标题】:Overshadow data in aws glue studio覆盖 aws 胶水工作室中的数据
【发布时间】:2021-06-05 01:28:32
【问题描述】:

我正在 AWS Glue Studio 中构建一个 ETL 流程,我在其中获取存储桶 s3 中的数据以删除一些字段。在这个过程之后,我需要使用自定义转换来覆盖一些数据,然后将其保存在新的 s3 存储桶中。

当我使用自定义转换时,会显示一个块代码。所以我需要在那里创建我的代码。

我的问题是关于它,我如何使用它来构建我的转换,更具体地说是关于输入和输出数据,因为我不了解 AWS 上的这个过程。

我现在所做的是创建将处理这些数据的函数。我使用 dfc['my_field'] 获取我的字段并应用了转换,但我不明白我需要返回什么以及如何将其保存在 s3 存储桶中。下面你可以看到我的功能。

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
import random
import hashlib
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, lit

df = dfc.select(list(dfc.keys())[0]).toDF()
print('printing key0')
df.show(3)
print('printed key0')
print('imprimindo o tipo de df')
print(type(df))

def encrypt_phone(df):
    '''
     Criptografa dados de telefone
    '''
    if df.withColumn('phone_main', df('phone_main')) is not None:
        hl = hashlib.sha256()
        hl.update(df.withColumn('phone_main', df('phone_main'))).encode().hexdigest()
        hl.update(df.withColumn('phone_alternative', df('phone_alternative'))).encode().hexdigest()
    return df

# Criar um UDF para encriptar os dados
from pyspark.sql.functions import udf
encrypted_udf = udf(encrypt_phone)
print(encrypted_udf)

# Renomear coluna phone_main e phone_alternative
df.withColumnRenamed('phone_main', 'old_phone_main') \
                    .withColumnRenamed('phone_alternative', 'old_phone_alternative')

# Criar novas colunas para dados transformados
new_df = encrypt_phone(df)
encrypted = df.withColumn('phone_main', new_df('phone_main'))
encrypted = df.withColumn('phone_alternative',new_df('phone_alternative'))
print(encrypted)

# Remover campos sensíveis
encripted_df = df.drop('cpf_cnpj', 'name', 'nickname', 'old_phone_main', 'old_phone_alternative')
print(df_encrypted)

# Mascarar email
print(encripted_df.withColumn('email', df('email')))
if encripted_df.withColumn('email', df('email')) is not None:
    masked_df = encripted_df.withColumn('email', lit('****@*****'))
    
# Converter spark dataframe para Glue Dynamic Frame e retornar como um dataframe collection
df_overshadow = DynamicFrame.fromDF(masked_df, glueContext, "df_overshadow")

return (DynamicFrameCollection({"CustomTransform0": df_overshadow}, glueContext))

我在胶水工作室的过程被构造成这样的图像:

但是,当我尝试保存时显示错误:

Parent node overshadow outputs a collection, but node Process-Client_PFS does not accept a collection.

我需要应用转换并保存数据。我该怎么做?

【问题讨论】:

    标签: amazon-web-services aws-glue


    【解决方案1】:

    dfcDynamicFrameCollection。您需要从集合中选择您的 DynamicFrame 才能使用。 From the documentation:

    您必须使用 SelectFromCollection 转换从自定义转换节点的结果中选择单个 DynamicFrame,然后才能将输出发送到目标位置。

    这应该会返回您第一个 DynamicFrame:

    dfc.select(list(dfc.keys())[0])
    

    【讨论】:

    • 我更改了我的代码,但错误发生了。我所做的改变是我在上面的问题中提出的。如果您能看到并帮助我,我将不胜感激!
    • 您仍然返回 DynamicFrameCollection 而不是 DynamicFrame。
    • 我在我的代码中做了一些更正。现在显示的错误是:Error TypeError: 'DataFrame' object is not callable
    • 你能在代码中添加更正吗?
    • 我做到了。这里的问题有新代码。
    【解决方案2】:

    我看到您将列表类型传递给可能导致问题的 toDF()。如果您想从集合中获取特定的动态框架,则可以使用名称访问它。先打印 dfc.keys,然后选择要转换的帧。

    例如df= dfc.select('<frame_name>').toDF(),然后在您的代码中进一步使用它。

    【讨论】:

      猜你喜欢
      • 2010-09-17
      • 2020-08-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多