【发布时间】:2021-03-09 18:07:11
【问题描述】:
我已经在 Kubernetes 中部署了 pyspark 3.0.1。
我在 jupyter notebook 中使用考拉来执行一些转换,我需要在 Azure Database for PostgreSQL 中进行读写操作。
我可以使用以下代码从 pandas 中读取它:
from sqlalchemy import create_engine
import psycopg2
import pandas
uri = 'postgres+psycopg2://<postgreuser>:<postgrepassword>@<server>:5432/<database>'
engine_azure = create_engine(uri, echo=False)
df = pdf.read_sql_query(f"select * from public.<table>", con=engine_azure)
我想使用以下代码从 Pyspark 读取此表:
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import databricks.koalas as ks
from s3fs import S3FileSystem
import datetime
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3,org.postgresql:postgresql:42.1.1 pyspark-shell pyspark-shell"
os.environ['PYSPARK_SUBMIT_ARGS2'] = "--packages org.postgresql:postgresql:42.1.1 pyspark-shell"
sparkClassPath = os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.postgresql:postgresql:42.1.1 pyspark-shell'
# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
sparkConf.setAppName("spark")
sparkConf.set("spark.kubernetes.container.image", "<image>")
sparkConf.set("spark.kubernetes.namespace", "spark")
sparkConf.set("spark.executor.instances", "3")
sparkConf.set("spark.executor.cores", "2")
sparkConf.set("spark.driver.memory", "2000m")
sparkConf.set("spark.executor.memory", "2000m")
sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "spark")
sparkConf.set("spark.driver.port", "29414")
sparkConf.set("spark.driver.host", "<deployment>.svc.cluster.local")
sparkConf.set("spark.driver.extraClassPath", sparkClassPath)
# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
df3 = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://<host>:5432/<database>") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "select * from public.<table>") \
.option("user", "<user>") \
.option("password", "<password>") \
.load()
但我收到此错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-5-a529178ed9a0> in <module>
1 url = 'jdbc:postgresql://psql-mcf-prod1.postgres.database.azure.com:5342/cpke-prod'
2 properties = {'user': 'adminmcfpsql@psql-mcf-prod1.postgres.database.azure.com', 'password': '4vb44B^V8w2D*q!eQZgl',"driver": "org.postgresql.Driver"}
----> 3 df3 = spark.read.jdbc(url=url, table='select * from public.userinput_write_offs where reversed_date is NULL', properties=properties)
/usr/local/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, column, lowerBound, upperBound, numPartitions, predicates, properties)
629 jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
630 return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
--> 631 return self._df(self._jreader.jdbc(url, table, jprop))
632
633
/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
/usr/local/lib/python3.7/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o89.jdbc.
: org.postgresql.util.PSQLException: The connection attempt failed.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:275)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:194)
at org.postgresql.Driver.makeConnection(Driver.java:450)
at org.postgresql.Driver.connect(Driver.java:252)
at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:64)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:221)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:312)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at org.postgresql.core.PGStream.<init>(PGStream.java:68)
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:144)
... 27 more
【问题讨论】:
标签: postgresql apache-spark kubernetes jdbc pyspark