【发布时间】:2019-04-14 14:41:59
【问题描述】:
Spark 结构化流式传输的文档说 - 从 spark 2.3 开始,Spark 上下文中的所有方法都可用于 static DataFrame/DataSet's 也可用于 structured流媒体 DataFrame/DataSet 也是如此。但是,我还没有遇到任何相同的 示例。
对我来说,使用完全形成的 SQL 比 DSL 更灵活、更有表现力和生产力。此外,对于我的用例,这些 SQL 已经针对 static 版本进行了开发和良好测试。 必须进行一些返工 - 特别是使用joins 代替correlated subqueries。但是,保留整体完整的 sql 结构仍有很大价值。
我希望使用的格式类似于这个假设的连接:
val tabaDf = spark.readStream(..)
val tabbDf = spark.readStream(..)
val joinSql = """select a.*,
b.productName
from taba
join tabb
on a.productId = b.productId
where ..
group by ..
having ..
order by .."""
val joinedStreamingDf = spark.sql(joinSql)
有几项不清楚如何做:
tabaDf和tabbDf是否应该通过spark.readStream定义:这是我的假设-
如何声明
taba和tabb。尝试使用tabaDf.createOrReplaceTempView("taba") tabbDf.createOrReplaceTempView("tabb")结果
WARN ObjectStore: 获取数据库 global_temp 失败,返回 NoSuchObjectException
我能找到的所有示例都使用DSL 和/或selectExpr() - 如下https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
df.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value")
或使用select:
sightingLoc
.groupBy("zip_code", window("start_time", "1 hour"))
.count()
.select(
to_json(struct("zip_code", "window")).alias("key"),
col("count").cast("string").alias("value"))
这些真的是唯一的选择吗?所以文档中说static 数据帧/数据集支持的所有方法并不准确?否则:a任何有关如何纠正上述问题并直接使用 sql 流式传输的指针将不胜感激。
【问题讨论】:
标签: scala apache-spark spark-structured-streaming