【发布时间】:2018-01-24 22:02:30
【问题描述】:
我有一个相当简单的任务 - 事件即将到来,我想在同一窗口中通过键过滤那些值高于每组平均值的事件。 我认为这是代码的相关部分:
val avgfuel = events
.groupBy(window($"enqueuedTime", "30 seconds"), $"weatherCondition")
.agg(avg($"fuelEfficiencyPercentage") as "avg_fuel")
val joined = events.join(avgfuel, Seq("weatherCondition"))
.filter($"fuelEfficiencyPercentage" > $"avg_fuel")
val streamingQuery1 = joined.writeStream
.outputMode("append").
.trigger(Trigger.ProcessingTime("10 seconds")).
.option("checkpointLocation", checkpointLocation).
.format("json").option("path", containerOutputLocation).start()
events 是一个 DStream。 问题是我在输出位置得到空文件。 我正在使用 Databricks 3.5 - Spark 2.2.1 和 Scala 2.11
我做错了什么?
谢谢!
编辑:更完整的代码 -
val inputStream = spark.readStream
.format("eventhubs") // working with azure event hubs
.options(eventhubParameters)
.load()
val schema = (new StructType)
.add("id", StringType)
.add("latitude", StringType)
.add("longitude", StringType)
.add("tirePressure", FloatType)
.add("fuelEfficiencyPercentage", FloatType)
.add("weatherCondition", StringType)
val df1 = inputStream.select($"body".cast("string").as("value")
, from_unixtime($"enqueuedTime").cast(TimestampType).as("enqueuedTime")
).withWatermark("enqueuedTime", "1 minutes")
val df2 = df1.select(from_json(($"value"), schema).as("body")
, $"enqueuedTime")
val df3 = df2.select(
$"enqueuedTime"
, $"body.id".cast("integer")
, $"body.latitude".cast("float")
, $"body.longitude".cast("float")
, $"body.tirePressure"
, $"body.fuelEfficiencyPercentage"
, $"body.weatherCondition"
)
val avgfuel = df3
.groupBy(window($"enqueuedTime", "10 seconds"), $"weatherCondition" )
.agg(avg($"fuelEfficiencyPercentage") as "fuel_avg", stddev($"fuelEfficiencyPercentage") as "fuel_stddev")
.select($"weatherCondition", $"fuel_avg")
val broadcasted = sc.broadcast(avgfuel)
val joined = df3.join(broadcasted.value, Seq("weatherCondition"))
.filter($"fuelEfficiencyPercentage" > $"fuel_avg")
val streamingQuery1 = joined.writeStream.
outputMode("append").
trigger(Trigger.ProcessingTime("10 seconds")).
option("checkpointLocation", checkpointLocation).
format("json").option("path", outputLocation).start()
这执行没有错误,一段时间后开始写入结果。我可能是因为聚合结果的广播,但我不确定。
【问题讨论】:
-
它是否有效。 AFAIK 尚不支持两个流数据集之间的任何类型的连接。单独分析逻辑 - 如果窗口中只有一个,则输出预计为空。
-
如果你使用结构化流,事件不能是 DStream ;)
-
@user8371915 Stream-Stream 连接在 2.3 中
-
@T.Gawęda 但它不是 2.3。
-
@user8371915 是的,所以它可能会失败,也许它没有抛出异常,而是默默地产生零行
标签: apache-spark spark-streaming