【问题标题】:How to give table name in spark-jdbc application for reading data on an RDBMS database?如何在 spark-jdbc 应用程序中给出表名以读取 RDBMS 数据库上的数据?
【发布时间】:2018-12-18 14:07:05
【问题描述】:

我正在尝试使用 spark 读取 greenplum 数据库中的表,如下所示:

val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.table where period_year=2017 and period_num=12"
val yearDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016")
                                .option("user", devUserName)
                                .option("password", devPassword)
                                .option("partitionColumn","header_id")
                                .option("lowerBound", 16550)
                                .option("upperBound", 1152921481695656862L)
                                .option("numPartitions",450).load()

当我使用 spark-submit 运行代码时,我得到一个异常:

Exception in thread "main" org.postgresql.util.PSQLException: ERROR: relation "public.(select je_header_id,source_system_name,je_line_num,last_update" does not exist
  Position: 15
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2310)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2023)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:217)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
    at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:281)
    at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
    at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
    at io.pivotal.greenplum.spark.jdbc.Jdbc$.resolveTable(Jdbc.scala:301)
    at io.pivotal.greenplum.spark.GreenplumRelationProvider.createRelation(GreenplumRelationProvider.scala:29)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
    at com.partition.source.YearPartition$.prepareFinalDF$1(YearPartition.scala:141)
    at com.partition.source.YearPartition$.main(YearPartition.scala:164)
    at com.partition.source.YearPartition.main(YearPartition.scala)

execQuery 中,我可以看到模式名和表名正确形成。当我提交代码时,它显示public.(select je_header_id,source_system_name,) relation not found。我不明白为什么将public 作为架构名称和查询(select je_header_id,source_system_name,je_line_num,last_update" 作为表名。

谁能告诉我我在这里犯了什么错误以及如何解决它?

【问题讨论】:

标签: apache-spark greenplum


【解决方案1】:

如果您使用的是 spark jdbc ,您可以包装查询并将其传递给 dbtable 参数。如果关键就像任何 jdbc 一样工作,这应该可以工作。

val query = """
  (select a.id,b,id,a.name from a left outer join b on a.id=b.id
    limit 100) foo
"""

val df = sqlContext.format("jdbc").
  option("url", "jdbc:mysql://localhost:3306/local_content").
  option("driver", "com.mysql.jdbc.Driver").
  option("useUnicode", "true").
  option("continueBatchOnError","true").
  option("useSSL", "false").
  option("user", "root").
  option("password", "").
  option("dbtable",query).
  load()

【讨论】:

  • 如果您看到我编写的代码,我几乎按照您提到的方式给出了它。 val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.table where period_year=2017 and period_num=12" ----------------- option(" dbtable", s"(${execQuery}) as year2016")
  • 尝试用硬编码查询替换,看看它是否有效,然后我们可以处理参数
  • 按照你的说法给它,但仍然看到相同的异常:线程“main”中的异常 org.postgresql.util.PSQLException: ERROR: syntax error at or near "select"
猜你喜欢
  • 1970-01-01
  • 2021-02-04
  • 1970-01-01
  • 2015-08-15
  • 2021-04-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多