【问题标题】:Load SparkSQL dataframe into Postgres database with automatically defined schema使用自动定义的模式将 SparkSQL 数据帧加载到 Postgres 数据库中
【发布时间】:2021-09-08 10:28:57
【问题描述】:

我目前正在尝试将 Parquet 文件加载到 Postgres 数据库中。 Parquet 文件已经定义了架构,我希望将该架构延续到 Postgres 表中。

我没有在 Postgres 中定义任何模式或表。但我希望加载过程在读取时自动推断架构并创建表,然后将 SparkSQL 数据帧加载到该表中。

这是我的代码:

import findspark
findspark.init()

from pyspark.sql import SparkSession

appName = "load_parquet"
master = "local"

spark = SparkSession.builder \
        .master(master) \
        .appName(appName) \
        .getOrCreate()

以 Spark 数据帧的形式读入 Parquet 数据

customers_sdf = spark.read.parquet('/home/jovyan/filesystem/customers.parquet')

检查架构是否正确

customers_sdf.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)

将 SparkSQL 数据帧写入 Postgres

customers_sdf.write \
    .jdbc(
        url="jdbc:postgresql:destdb", 
        table="public.customers", 
        properties={"user": "destdb1", "password": "destdb1"}
    )

我的 Postgres 容器主机名是 postgres-dest,它的端口映射是 5434:5432。见下文:

  postgres-dest:
    image: postgres:latest
    environment:
      POSTGRES_USER: destdb1
      POSTGRES_PASSWORD: destdb1
      POSTGRES_DB: destdb
    logging:
      options:
        max-size: 10m
        max-file: "3"
    ports:
      - "5434:5432"
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "destdb1"]
      interval: 5s
      retries: 5
    restart: always

  pyspark-notebook:
    build: .
    image: jupyter/pyspark-notebook:latest
    environment:
      JUPYTER_ENABLE_LAB: 'yes'
    ports:
      - "8889:8889"
      - "4040-4080:4040-4080"
    volumes:
      - ./notebooks:/home/jovyan/work/notebooks
      - ./filesystem:/home/jovyan/filesystem

如前所示,我尝试将数据帧写入 Postgres,但出现此错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/tmp/ipykernel_101/2914557055.py in <module>
----> 1 customers_sdf.write \
      2     .jdbc(url="jdbc:postgresql://postgres-dest/destdb", table="public.customers", properties={"user": "destdb1", "password": "destdb1"})

/usr/local/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, mode, properties)
   1443         for k in properties:
   1444             jprop.setProperty(k, properties[k])
-> 1445         self.mode(mode)._jwrite.jdbc(url, table, jprop)
   1446 
   1447 

/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o231.jdbc.
: java.sql.SQLException: No suitable driver
    at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:108)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:817)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:829)

注意:我是 Spark 的绝对初学者,所以请像我 5 岁一样解释。

【问题讨论】:

    标签: python postgresql apache-spark pyspark py4j


    【解决方案1】:

    url 更改为jdbc:postgresql://postgres-dest:5432/destdb

    并确保 PostgreSQL 驱动程序 jar 存在于类路径中。 你可以从here下载jar。

    【讨论】:

    • 我已经下载了驱动jar。如何确保驱动程序 jar 存在于类路径中?什么是类路径?
    • 把那个 jar 放到 $SPARK_HOME/jars 文件夹中。
    • 改用网址jdbc:postgresql://postgres-dest:5432/destdb
    猜你喜欢
    • 2014-04-19
    • 1970-01-01
    • 2012-08-14
    • 1970-01-01
    • 2023-03-06
    • 2021-06-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多