【问题标题】:Spark Streaming aggregation and filter in the same windowSpark Streaming 聚合和过滤在同一个窗口中
【发布时间】:2018-01-24 22:02:30
【问题描述】:

我有一个相当简单的任务 - 事件即将到来,我想在同一窗口中通过键过滤那些值高于每组平均值的事件。 我认为这是代码的相关部分:

val avgfuel = events
    .groupBy(window($"enqueuedTime", "30 seconds"), $"weatherCondition")    
    .agg(avg($"fuelEfficiencyPercentage") as "avg_fuel")    

val joined = events.join(avgfuel, Seq("weatherCondition"))
    .filter($"fuelEfficiencyPercentage" > $"avg_fuel")

val streamingQuery1 = joined.writeStream
    .outputMode("append").
    .trigger(Trigger.ProcessingTime("10 seconds")).
    .option("checkpointLocation", checkpointLocation).
    .format("json").option("path", containerOutputLocation).start()

events 是一个 DStream。 问题是我在输出位置得到空文件。 我正在使用 Databricks 3.5 - Spark 2.2.1 和 Scala 2.11

我做错了什么?

谢谢!

编辑:更完整的代码 -

val inputStream = spark.readStream
  .format("eventhubs") // working with azure event hubs
  .options(eventhubParameters)
  .load()

val schema = (new StructType)    
      .add("id", StringType)
      .add("latitude", StringType)
      .add("longitude", StringType)
      .add("tirePressure", FloatType)
      .add("fuelEfficiencyPercentage", FloatType)
      .add("weatherCondition", StringType)

val df1 = inputStream.select($"body".cast("string").as("value")
                             , from_unixtime($"enqueuedTime").cast(TimestampType).as("enqueuedTime")
                             ).withWatermark("enqueuedTime", "1 minutes")

val df2 = df1.select(from_json(($"value"), schema).as("body")
                     , $"enqueuedTime")

val df3 = df2.select(
  $"enqueuedTime"
  , $"body.id".cast("integer")
  , $"body.latitude".cast("float")
  , $"body.longitude".cast("float")
  , $"body.tirePressure"
  , $"body.fuelEfficiencyPercentage"
  , $"body.weatherCondition"
)

val avgfuel = df3
  .groupBy(window($"enqueuedTime", "10 seconds"), $"weatherCondition" )    
  .agg(avg($"fuelEfficiencyPercentage") as "fuel_avg", stddev($"fuelEfficiencyPercentage") as "fuel_stddev")
  .select($"weatherCondition", $"fuel_avg")

val broadcasted = sc.broadcast(avgfuel)

val joined = df3.join(broadcasted.value, Seq("weatherCondition"))
                .filter($"fuelEfficiencyPercentage" > $"fuel_avg")

val streamingQuery1 = joined.writeStream.
      outputMode("append").
      trigger(Trigger.ProcessingTime("10 seconds")).
      option("checkpointLocation", checkpointLocation).
      format("json").option("path", outputLocation).start()

这执行没有错误,一段时间后开始写入结果。我可能是因为聚合结果的广播,但我不确定。

【问题讨论】:

  • 它是否有效。 AFAIK 尚不支持两个流数据集之间的任何类型的连接。单独分析逻辑 - 如果窗口中只有一个,则输出预计为空。
  • 如果你使用结构化流,事件不能是 DStream ;)
  • @user8371915 Stream-Stream 连接在 2.3 中
  • @T.Gawęda 但它不是 2.3。
  • @user8371915 是的,所以它可能会失败,也许它没有抛出异常,而是默默地产生零行

标签: apache-spark spark-streaming


【解决方案1】:

小调查;)

  1. 事件不能是 DStream,因为您可以选择对其使用数据集操作 - 它必须是数据集
  2. Spark 2.2 中不允许流-流连接。我尝试使用events 作为rate 源运行您的代码,我得到:

    org.apache.spark.sql.AnalysisException:不支持两个流数据帧/数据集之间的内连接;; 加入内部,(value#1L = eventValue#41L)

  3. 结果出乎意料 - 可能您使用了 read 而不是 readStream,并且您没有创建流数据集,而是静态的。将其更改为 readStream 即可使用 - 当然在升级到 2.3 后

  4. 代码 - 上面没有 cmets - 是正确的,应该在 Spark 2.3 上正确运行。请注意,您还必须将模式更改为complete 而不是append,因为您正在进行聚合

【讨论】:

  • @user8371915 Spark 正在抛出异常,所以对这个问题的唯一解释是使用了错误的方法读取数据:)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-04-22
  • 2016-10-07
  • 2017-04-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多