【发布时间】:2018-06-29 23:41:17
【问题描述】:
我在 Postgres 中有一个名为“mytable”的表,它有两列,id (bigint) 和 value (varchar(255))。
id 从使用nextval('my_sequence') 的序列中获取其值。
PySpark 应用程序需要一个数据帧并使用 Postgres JDBC jar (postgresql-42.1.4.jar) 将数据帧插入“mytable”。我正在使用以下方法创建 id 列:
df.withColumn('id', lit("nextval('my_sequence')"))
Postgres 将该列解释为“可变字符”。
我可以看到在读取数据时可以调用 Postgres 方法 (How to remotely execute a Postgres SQL function on Postgres using PySpark JDBC connector?),但我不确定如何调用像 nextval() 这样的 Postgres 函数来将数据写入 Postgres。
这是我目前将数据从 Pyspark 写入 Postgres 的方式:
df.write.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", 'mytable') \
.mode('append') \
.save()
当一列需要使用nextval() 的序列号时,如何使用 PySpark 写入 Postgres 表?
【问题讨论】:
标签: sql postgresql apache-spark jdbc pyspark