【问题标题】:How to use fully formed SQL with spark structured streaming如何将完全形成的 SQL 与 Spark 结构化流结合使用
【发布时间】:2019-04-14 14:41:59
【问题描述】:

Spark 结构化流式传输的文档说 - 从 spark 2.3 开始,Spark 上下文中的所有方法都可用于 static DataFrame/DataSet's 也可用于 structured流媒体 DataFrame/DataSet 也是如此。但是,我还没有遇到任何相同的 示例

对我来说,使用完全形成的 SQL 比 DSL 更灵活、更有表现力和生产力。此外,对于我的用例,这些 SQL 已经针对 static 版本进行了开发和良好测试。 必须进行一些返工 - 特别是使用joins 代替correlated subqueries。但是,保留整体完整的 sql 结构仍有很大价值。

我希望使用的格式类似于这个假设的连接:

 val tabaDf = spark.readStream(..)
 val tabbDf = spark.readStream(..)

 val joinSql = """select a.*, 
                  b.productName 
                  from taba
                  join tabb 
                  on a.productId = b.productId
                  where ..
                  group by ..
                  having ..
                  order by .."""
 val joinedStreamingDf = spark.sql(joinSql)

有几项不清楚如何做:

  • tabaDftabbDf 是否应该通过 spark.readStream 定义:这是我的假设

  • 如何声明 tabatabb 。尝试使用

    tabaDf.createOrReplaceTempView("taba")
    tabbDf.createOrReplaceTempView("tabb")
    

    结果

    WARN ObjectStore: 获取数据库 global_temp 失败,返回 NoSuchObjectException

我能找到的所有示例都使用DSL 和/或selectExpr() - 如下https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

 df.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value")

或使用select

sightingLoc
  .groupBy("zip_code", window("start_time", "1 hour"))
  .count()
  .select( 
    to_json(struct("zip_code", "window")).alias("key"),
    col("count").cast("string").alias("value")) 

这些真的是唯一的选择吗?所以文档中说static 数据帧/数据集支持的所有方法并不准确?否则:a任何有关如何纠正上述问题并直接使用 sql 流式传输的指针将不胜感激。

【问题讨论】:

    标签: scala apache-spark spark-structured-streaming


    【解决方案1】:

    需要使用createOrReplaceTempView 将流注册为临时视图。 AFAIK createOrReplaceView 不是 Spark API 的一部分(也许你有一些东西可以通过这种方法提供对类的隐式转换)。

    spark.readStream(..).createOrReplaceTempView("taba")
    spark.readStream(..).createOrReplaceTempView("tabb")
    

    现在可以使用纯 SQL 访问视图。例如,将输出打印到控制台:

    spark
      .sql(joinSql)
      .writeStream
      .format("console")
      .start()
      .awaitTermination()
    

    编辑:问题编辑后,我没有发现您的代码有任何问题。这是一个最小的工作示例。假设一个测试文件/tmp/foo/foo.csv

    "a",1
    "b",2
    
    import org.apache.spark.sql.types._
    val schema = StructType(Array(StructField("s", StringType), StructField("i", IntegerType)))
    spark.readStream
      .schema(schema)
      .csv("/tmp/foo")
      .createOrReplaceTempView("df1")
    spark.readStream
      .schema(schema)
      .csv("/tmp/foo")
      .createOrReplaceTempView("df2")
    
    spark.sql("SELECT * FROM df1 JOIN df2 USING (s)")
      .writeStream
      .format("console")
      .start()
      .awaitTermination()
    

    输出

    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +---+---+---+
    |  s|  i|  i|
    +---+---+---+
    |  b|  2|  2|
    |  a|  1|  1|
    +---+---+---+
    

    【讨论】:

    • 嗨 Ollik - 也许你错过了这个问题:注册视图已经包含在 tabaDf.createOrReplaceView("taba")tabbDf.createOrReplaceView("tabb") 中。然后 spark.sql(joinSql) 。您的回答没有添加任何新代码。你有类似这种结构的东西在实际工作吗?如果有的话,sscce 会很棒
    • 特别 - 这里没有提到如何修复WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
    • @javadba 所以在你的 sn-p 中使用 createOrReplaceView 而不是 createOrReplaceTempView。注意 Temp 部分。试过 sn-p 工作
    • 那是一个错字。我的实际代码使用/使用createOrReplaceTempView。但是你是说你有这个工作吗?我有接下来 2.5 小时的承诺。与此同时,我会投票赞成,然后在那个时候再次运行代码。我真的很想看到这项工作并授予:届时将看到。
    • 在答案中添加了一个最小示例以证明其有效
    猜你喜欢
    • 2018-01-24
    • 2017-01-09
    • 2019-03-03
    • 1970-01-01
    • 2019-09-05
    • 2018-04-19
    • 2018-09-25
    • 2018-07-30
    相关资源
    最近更新 更多