【问题标题】:Using Faker with PySpark Dataframe to Anonymise Data使用 Faker 和 PySpark Dataframe 来匿名数据
【发布时间】:2020-11-25 20:34:41
【问题描述】:

我正在尝试更改 Spark DataFrame 中的几列,我有几列,例如:

  • 名字
  • 姓氏
  • 电子邮件

我想对此匿名并生成有意义的值,我正在使用Faker
但是如果我使用

df.withColumn('FirstName', lit(fake.first_name()))

它为所有行添加相同的名称,例如:

如您所见,每个名字都有相同的值,理想情况下,我希望有不同的 faker 值而不是常数。我将如何实现这一目标?

更新 1:

我查看了 Steven 的建议,这是我的更新代码

import pyspark.sql.functions as sf
from faker import Faker
from pyspark.sql import functions as F


MSG_FORMAT = '%(asctime)s %(levelname)s %(name)s: %(message)s'
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
logging.basicConfig(format=MSG_FORMAT, datefmt=DATETIME_FORMAT)
logger = logging.getLogger("[SFDC-GLUE-LOG]")
fake = Faker()
source_df = spark.read.format("jdbc").option("url",connection_url).option("query",query).option("driver", driver_name).option("user", user_name).option("password", password).option("StmtCallLimit",0).load()
         


fake_firstname = F.udf(fake.first_name)
        
masked_df=source_df.withColumn("FirstName", fake_firstname())

现在我明白了

Traceback (most recent call last):
  File "script_2020-08-05-17-15-26.py", line 52, in <module>
    masked_df=source_df.withColumn("FirstName", fake_firstname())
  File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/sql/udf.py", line 189, in wrapper
    return self(*args)
  File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/sql/udf.py", line 167, in __call__
    judf = self._judf
  File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/sql/udf.py", line 151, in _judf
    self._judf_placeholder = self._create_judf()
  File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/sql/udf.py", line 160, in _create_judf
    wrapped_func = _wrap_function(sc, self.func, self.returnType)
  File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/sql/udf.py", line 35, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/mnt/yarn/usercache/root/appcache/application_1596647211940_0002/container_1596647211940_0002_01_000001/pyspark.zip/pyspark/serializers.py", line 600, in dumps
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle weakref objects

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql pyspark-dataframes faker


    【解决方案1】:

    您需要为此使用 UDF:

    from pyspark.sql import functions as F
    
    fake_firstname = F.udf(fake.first_name)
    
    df.withColumn("FirstName", fake_firstname())
    
    

    【讨论】:

    • 如果我这样做,我会得到 PicklingError: Could not serialize object: TypeError: can't pickle weakref objects。有什么想法吗?
    • faker 安装在你的所有节点上?
    • 是的,我正在使用 AWS 胶水,它可用
    • 你有很多数据要“伪造”?显然,lib 的工作方式与 UDF 不兼容 ...
    【解决方案2】:

    我遇到了同样的问题,请按照对我有用的解决方案。

    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    from faker import Factory
    
    def fake_name():
        faker = Factory.create()
        return faker.name()
    
    fake_name_udf = udf(fake_name, StringType())
    df = df.withColumn('name', fake_name_udf())
    

    【讨论】:

      猜你喜欢
      • 2023-04-03
      • 2023-01-12
      • 2013-04-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-11-22
      • 1970-01-01
      相关资源
      最近更新 更多