【问题标题】:how to use nextval() in a postgres jdbc driver for pyspark?如何在 pyspark 的 postgres jdbc 驱动程序中使用 nextval()?
【发布时间】:2018-06-29 23:41:17
【问题描述】:

我在 Postgres 中有一个名为“mytable”的表,它有两列,id (bigint) 和 value (varchar(255))。

id 从使用nextval('my_sequence') 的序列中获取其值。

PySpark 应用程序需要一个数据帧并使用 Postgres JDBC jar (postgresql-42.1.4.jar) 将数据帧插入“mytable”。我正在使用以下方法创建 id 列:

df.withColumn('id', lit("nextval('my_sequence')"))

Postgres 将该列解释为“可变字符”。

我可以看到在读取数据时可以调用 Postgres 方法 (How to remotely execute a Postgres SQL function on Postgres using PySpark JDBC connector?),但我不确定如何调用像 nextval() 这样的 Postgres 函数来将数据写入 Postgres。

这是我目前将数据从 Pyspark 写入 Postgres 的方式:

df.write.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", 'mytable') \
    .mode('append') \
    .save()

当一列需要使用nextval() 的序列号时,如何使用 PySpark 写入 Postgres 表?

【问题讨论】:

    标签: sql postgresql apache-spark jdbc pyspark


    【解决方案1】:

    TL;DR 除非您创建自己的 JdbcDialect 并覆盖插入逻辑,否则您无法在插入时执行数据库代码。我认为你不想为这么小的功能做这件事。

    我个人会使用触发器:

    CREATE FUNCTION set_id() RETURNS trigger AS $set_id$
      BEGIN
        IF NEW.id IS NULL THEN
          NEW.id = nextval('my_sequence');
        END IF;
        RETURN NEW;
      END;
    $set_id$ LANGUAGE plpgsql;
    
    CREATE TRIGGER set_id BEFORE INSERT ON mytable
        FOR EACH ROW EXECUTE PROCEDURE set_id();
    

    并将剩下的工作留给数据库服务器。

    df.select(lit(null).cast("bigint").alias("id"), col("value")).write
        ...
    

    您也可以使用monotonically_increasing_id (Primary keys with Apache Spark) 并根据数据库中最大的 id 移动值,但它可能会很脆弱。

    【讨论】:

    • 现在我需要数据库分配的每一行的特定 ID。有没有简单的获取方法?
    • @SimeonKredatus 你能详细说明一下吗?
    • 您好,感谢您的及时反应。假设我定义一个模式如下:schema = StructType( [ StructField("id", IntegerType(), True), StructField("property1", IntegerType(), True)]),然后我做一些处理并存储它到数据库如下:frame.write.option('mergeSchema', 'true').jdbc(url=url, table=table, mode='append', properties=properties)。正如您所建议的,我确实在数据库中有一个触发器,这意味着 DB 分配了 ID。现在我需要为每一行获取该 ID,最好不进行选择。有没有不运行选择的简单方法?
    • 我的用例的另一个选项是为每一行计算一个唯一的哈希并在数据库之外对其进行管理——这意味着 spark 将始终填充数据库列,但我仍然希望让 postgres 负责ID 序列。一旦我的 spark 作业持久化数据集,我将需要它来发出具有指定 ID 的 kafka 事件。
    • @SimeonKredatus 所以你也想吃蛋糕。您在 Spark 本身上生成唯一 ID(参见 MongoDB ObjectID 的示例实现)。使用 PostgreSQL 特定的解决方案,您可以使用 LISTEN / NOTIFY 监听那里的事件,并直接传播到 Kafka。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-04-06
    相关资源
    最近更新 更多