【问题标题】:Pyspark JDBC connection to PostgreSQL fails due to missing connectivity between driver and database由于驱动程序和数据库之间缺少连接,Pyspark JDBC 与 PostgreSQL 的连接失败
【发布时间】:2017-08-20 04:18:05
【问题描述】:

我们在 EMR 集群上使用 Pyspark 来访问和分析来自 PostgreSQL 数据库的数据(可通过 JDBC 连接器jdbc:postgresql://psqlhost:5432/psqldatabase 访问)。 EMR 集群和 PostgreSQL 实例之间的网络是开放的,因此当我们通过 SSH 访问 EMR 主服务器时,我们可以成功启动以下作业(在客户端和集群部署模式下):

url = 'jdbc:postgresql://psqlhost:5432/psqldatabase'
table = 'mytable'

properties = {
    'user': 'user',
    'password': 'password',
    'driver': 'org.postgresql.Driver'
}

products_df = spark.read.jdbc(url=url, table=table, properties=properties)

我们现在想从一个与 EMR 集群完全连接的单独客户端实例更改设置并在 EMR 集群上执行 Pyspark 命令。这已通过将必要的纱线配置文件复制到客户端机器并使用--master yarn 执行 Pyspark 来成功设置。

但是,我们在从客户端实例访问 PostgreSQL 数据库时仍然遇到问题。出于安全原因,客户端机器无法访问 PostgreSQL 实例,但 EMR 集群仍然可以访问该实例。上面的代码 sn-p 与--deploy-mode cluster 一起工作没有问题。但是,在--deploy-mode client 中工作时,最后一行会产生以下错误:

17/03/27 11:39:21 ERROR Driver: Connection error: 
org.postgresql.util.PSQLException: The connection attempt failed.
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:275)
    at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
    at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:194)
    at org.postgresql.Driver.makeConnection(Driver.java:431)
    at org.postgresql.Driver.connect(Driver.java:247)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:65)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:56)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:123)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:117)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:237)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:159)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:211)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketTimeoutException: connect timed out
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.postgresql.core.PGStream.<init>(PGStream.java:62)
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:144)
    ... 22 more
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-f5f370f4d567> in <module>()
----> 1 products_df = spark.read.jdbc(url=url, table=table, properties=properties)

/opt/spark-2.0.0/python/pyspark/sql/readwriter.pyc in jdbc(self, url, table, column, lowerBound, upperBound, numPartitions, predicates, properties)
    420             jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
    421             return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
--> 422         return self._df(self._jreader.jdbc(url, table, jprop))
    423 
    424 

/opt/spark-2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/opt/spark-2.0.0/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/opt/spark-2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    310                 raise Py4JJavaError(
    311                     "An error occurred while calling {0}{1}{2}.\n".
--> 312                     format(target_id, ".", name), value)
    313             else:
    314                 raise Py4JError(

Py4JJavaError: An error occurred while calling o47.jdbc.
: org.postgresql.util.PSQLException: The connection attempt failed.
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:275)
    at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
    at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:194)
    at org.postgresql.Driver.makeConnection(Driver.java:431)
    at org.postgresql.Driver.connect(Driver.java:247)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:65)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:56)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:123)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:117)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:237)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:159)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:211)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketTimeoutException: connect timed out
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.postgresql.core.PGStream.<init>(PGStream.java:62)
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:144)
    ... 22 more

失败可能是由于应用程序驱动程序(在--deploy-mode client 中位于客户端计算机上)尝试连接到 PostgreSQL 实例,但是由于客户端无法访问 PostgreSQL 实例而失败。但是,我不明白为什么需要连接驱动程序-postgreSQL:当对数据帧执行实际操作时,实际操作将由集群上的机器执行(而不是由客户端机器上的驱动程序),结果将通过集群主机发送回驱动程序。因此,只要可以从集群访问 postgreSQL 实例,缺少的连接驱动程序-postgreSQL 就不会阻止执行数据帧操作。

问题:

  • 驱动程序必须能够连接到数据库才能执行操作是否正确?
  • 是否有任何充分的理由说明我所缺少的这种连接必须到位?
  • 如果不是这样,我们有什么办法可以绕过此连接检查?

【问题讨论】:

  • "实际操作将由集群上的机器执行" -- 验证 SQL 查询,构建列列表,决定多少任务/分区将被提供(以及在哪些执行者上)?你不觉得这些都是司机必须事先做的事情吗?
  • 如果你真的很好奇,那么在这里查看源代码 >> github.com/apache/spark/blob/master/sql/core/src/main/scala/org/… >> cmets 是明确的:“驱动代码和工作人员都必须能够访问数据库; 驱动程序需要获取模式,而工作人员需要获取数据。” >> 如果你认为你可以做得更好,那么将你的建议提交给 Apache 董事会......
  • 你是对的,源代码的那部分解释了它。 Ofc 我知道驱动程序执行一些任务(“实际操作”这句话的措辞很糟糕,我的错),但我不知道哪些需要数据库连接(例如查询验证不应该?)。执行器 afaik 的调度不是由驱动程序执行的,而是由资源管理器执行的 - 驱动程序设置分区数。不过我不明白你的语气:我是一个关于我不明白的事情的诚实问题,发布在问答网站上。我不知道你为什么在评论中发帖而不是在下面回答。
  • 嗯,这是我在现实生活中的惯用语气。忍受它:-/
  • 顺便说一句,“如果你能做得更好”这句话很严肃,我相信 Spark-JDBC 接口还有改进的空间。

标签: postgresql apache-spark jdbc pyspark


【解决方案1】:

正如Samson Scharfrichter 所指出的,驱动程序需要能够访问数据库才能获取架构。

很遗憾,我们的客户无法直接访问数据库。但是,由于 EMR 集群可以访问数据库并且客户端可以通过 SSH 访问集群,因此我们可以使用以下基于 SSH 隧道的解决方法:

from sshtunnel import SSHTunnelForwarder

url = 'jdbc:postgresql://localhost:5432,psqlhost:5432/psqldatabase'
table = 'mytable'

properties = {
    'user': 'user',
    'password': 'password',
    'driver': 'org.postgresql.Driver'
}

with SSHTunnelForwarder(
    ('emrhost', 22),
    remote_bind_address=('psqlhost', 5432),
    local_bind_address=('localhost', 5432)
    ):
    products_df = spark.read.jdbc(url=url, table=table, properties=properties)

在这一步之后,即使隧道关闭,也可以操作数据帧products_df。使用此解决方法,即使在客户端部署模式下,客户端计算机也可以查询数据库。

【讨论】:

    猜你喜欢
    • 2012-11-15
    • 2016-12-23
    • 1970-01-01
    • 1970-01-01
    • 2016-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-10
    • 2021-10-24
    相关资源
    最近更新 更多