【问题标题】:Use SparkSession.sql() with JDBC将 SparkSession.sql() 与 JDBC 一起使用
【发布时间】:2018-10-01 07:40:24
【问题描述】:

问题:

我想通过 JDBC 连接使用 spark 发出自定义请求。

这个查询的目标是优化工人的内存分配,因为我不能使用:

ss.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

目前:

我目前正在尝试运行:

ss = SparkSession
  .builder()
  .appName(appName)
  .master("local")
  .config(conf)
  .getOrCreate()

ss.sql("some custom query")

配置:

url=jdbc:mysql://127.0.0.1/database_name
driver=com.mysql.jdbc.Driver
user=user_name
password=xxxxxxxxxx

错误:

[info] Exception encountered when attempting to run a suite with class name: db.TestUserProvider *** ABORTED ***
[info]   org.apache.spark.sql.AnalysisException: Table or view not found: users; line 1 pos 14
[info]   at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:459)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:478)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:463)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
[info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)

假设:

我猜有配置错误,但我找不到哪里。

【问题讨论】:

    标签: mysql scala apache-spark jdbc


    【解决方案1】:

    Spark 可以使用 JDBC 数据源在关系数据库中读取写入数据(就像您在第一个代码示例中所做的那样)。

    此外(并且完全独立),spark 允许使用 SQL 来查询 视图,这些视图是根据已经从某个来源加载到 DataFrame 的数据创建的。例如:

    val df = Seq(1,2,3).toDF("a") // could be any DF, loaded from file/JDBC/memory...
    df.createOrReplaceTempView("my_spark_table")
    spark.sql("select a from my_spark_table").show()
    

    只能使用SparkSession.sql 查询以这种方式创建的“表”(从 Spark 2.0.0 开始称为视图)。

    如果您的数据存储在关系数据库中,Spark 必须先从那里读取数据,然后才能在加载的副本上执行任何分布式计算。底线 - 我们可以使用 read 从表中加载数据,创建一个临时视图,然后查询它:

    ss.read
      .format("jdbc")
      .option("url", "jdbc:mysql://127.0.0.1/database_name")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .load()
      .createOrReplaceTempView("my_spark_table")
    
    // and then you can query the view:
    val df = ss.sql("select * from my_spark_table where ... ")
    

    【讨论】:

    • 你的回答很清楚。当您加载() schema.tablename。表是否完全加载到工作人员内存中?
    • 不是立即 - 像大多数其他 Spark 操作一样,loadlazy - 它实际上不会读取任何数据,直到它需要;因此,例如,如果结果视图的唯一用途将包含一些过滤器(例如select .. where col1 = X),则该过滤器将被“下推”到 MySQL,以便仅加载与过滤器匹配的记录。在这里阅读更多:docs.databricks.com/spark/latest/data-sources/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-03-25
    • 1970-01-01
    • 1970-01-01
    • 2019-11-26
    • 2017-12-06
    • 1970-01-01
    • 2011-01-05
    相关资源
    最近更新 更多