【问题标题】:spark structured streaming exception : Append output mode not supported without watermarkspark结构化流异常:不支持不带水印的附加输出模式
【发布时间】:2019-06-04 16:33:29
【问题描述】:

我已经按年份执行了一个简单的分组操作,并进行了一些聚合,如下所示。我尝试将结果附加到 hdfs 路径,如下所示。我收到错误消息,

   org.apache.spark.sql.AnalysisException: Append output mode not supported 
   when there are streaming aggregations on streaming DataFrames/DataSets 
   without watermark;;
   Aggregate [year#88], [year#88, sum(rating#89) AS rating#173, 
   sum(cast(duration#90 as bigint)) AS duration#175L]
   +- EventTimeWatermark event_time#96: timestamp, interval 10 seconds

下面是我的代码。有人可以帮忙吗

    val spark =SparkSession.builder().appName("mddd").
    enableHiveSupport().config("hive.exec.dynamic.partition", "true").
    config("hive.exec.dynamic.partition.mode", "nonstrict").
    config("spark.sql.streaming.checkpointLocation", "/user/sa/sparkCheckpoint").
    config("spark.debug.maxToStringFields",100).
    getOrCreate()

    val mySchema = StructType(Array(
     StructField("id", IntegerType),
     StructField("name", StringType),
     StructField("year", IntegerType),
     StructField("rating", DoubleType),
     StructField("duration", IntegerType)
    ))

    val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa/kafdata/") 
    import java.util.Calendar
    val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))

    val df_agg_with_time = df_agg_without_time.withWatermark("event_time", "10 seconds").groupBy($"year").agg(sum($"rating").as("rating"),sum($"duration").as("duration"))
    val cop = df_agg_with_time.withColumn("column_name_with", to_json(struct($"window")))

    df_agg_with_time.writeStream.outputMode("append").partitionBy("year").format("csv").
    option("path", "hdfs://dio/apps/hive/warehouse/gt.db/sample_mov/").start()

我的输入是 csv 格式

    id,name,year,rating,duration
    1,The Nightmare Before Christmas,1993,3.9,4568
    2,The Mummy,1993,3.5,4388
    3,Orphans of the Storm,1921,3.2,9062
    4,The Object of Beauty,1921,2.8,6150
    5,Night Tide,1963,2.8,5126
    6,One Magic Christmas,1963,3.8,5333
    7,Muriel's Wedding,1963,3.5,6323
    8,Mother's Boys,1963,3.4,5733

我的预期输出应该在 hdfs 中,并在一年中进行分区

    year,rating,duration
    1993,7.4,8956
    1921,6.0,15212
    1963,10.7,17389

我真的不确定我的方法有什么问题。请帮忙

【问题讨论】:

  • 这是一种不寻常的方法来做到这一点。另外,如果第二次调用还有一些已经写入的另一年的数据怎么办?
  • 关于“第二次执行”..我想调用 1 个 spark sql 作业来最终聚合数据...有没有其他选择?
  • 好的,刚刚看完越狱并更新了答案。不确定您的最终用例。
  • 那么..你怎么看..我应该使用火花结构化流还是火花流....我将每 30 秒获取一次数据...而且我需要应用聚合并写入配置单元表......例如......在 1 分钟内,我将聚合结果 3 次附加到配置单元......当它处理时......
  • Spark Streaming 已被弃用,但除此之外还有更多需要考虑。

标签: apache-spark spark-structured-streaming


【解决方案1】:

这是一个多方面的问题:

  • 结构化流 API 有限制恕我直言。
  • 可以通过管道传输多个查询,并且在技术上它会运行,但不会产生任何输出,因此这样做没有任何价值 - 即使您可以指定它,它也无法执行此类其他功能。
  • 手册规定:必须在与时间戳相同的列上调用 withWatermark 聚合中使用的列。

    例如 df.withWatermark("time", "1 min").groupBy("time2").count() 在 Append 输出模式下无效,如 水印是在与聚合不同的列上定义的 柱子。 简单地说,对于 Append,您需要 WaterMark。 我觉得你这里有问题。

  • 使用路径时以下是否相关?

  .enableHiveSupport().config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  • 另外,您的最终用例未知。这里的问题是这是否是一个好方法,但我无法评估的洞察力太少,我们只是假设它是这样。
  • 我们假设同一部电影的评分将成为未来微批次的一部分。
  • 提要中缺少 event_time,但您自己创建。有点不切实际,但我们可以接受,尽管 TimeStamp 有点问题。
  • 我建议您查看此博客 http://blog.madhukaraphatak.com/introduction-to-spark-structured-streaming-part-12/,以获得对结构化流的出色评估。

所以,总的来说:

  • 在完成、追加和更新选项中,我认为您选择了正确的追加。可以使用更新,但我将其排除在范围之外。
  • 但没有将 event_time 放在窗口中。你应该做这个。我在最后放了一个例子,在 Spark Shell 中运行,我无法让案例类工作 - 这就是为什么花了这么长时间,但在编译的程序中它不是问题,或者 DataBricks。
  • 从功能上讲,您不能编写多个查询来执行您尝试过的聚合。在我的情况下,它只会产生一些错误。
  • 我建议您使用我使用的时间戳方法,它更容易,因为我无法测试您所有的东西。

那么:

  • 或者,将此模块的输出写入 KAFKA 主题并将该主题读入另一个模块,然后进行第二次聚合并写出,考虑到您可以在不同的微批次中获得多个电影评级。
  • 或者,将包含计数字段的数据写出,然后提供一个用于查询的视图层,考虑到有多次写入的事实。

这是一个使用套接字输入和 Spark Shell 的示例 - 您可以将其推断为您自己的数据和微批处理的输出(注意查看数据时存在延迟):

import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode

val sparkSession = SparkSession.builder
  .master("local")
  .appName("example")
  .getOrCreate()
//create stream from socket

import sparkSession.implicits._
sparkSession.sparkContext.setLogLevel("ERROR")
val socketStreamDs = sparkSession.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()
  .as[String]

val stockDs = socketStreamDs.map(value => (value.trim.split(","))).map(entries=>(new java.sql.Timestamp(entries(0).toLong),entries(1),entries(2).toDouble)).toDF("time","symbol","value")//.toDS() 

val windowedCount = stockDs
  .withWatermark("time", "20000 milliseconds")
  .groupBy( 
    window($"time", "10 seconds"),
           $"symbol" 
  )
  .agg(sum("value"), count($"symbol"))

val query =
  windowedCount.writeStream
    .format("console")
    .option("truncate", "false")
    .outputMode(OutputMode.Append())

query.start().awaitTermination()

结果:

Batch: 14
----------------------------------------------+------+----------+-------------+  
|window                                       |symbol|sum(value)|count(symbol)|
+---------------------------------------------+------+----------+-------------+
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap1"|4200.0    |6            |
|[2016-04-27 04:34:30.0,2016-04-27 04:34:40.0]|"app1"|800.0     |2            |
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap2"|2500.0    |1            |
|[2016-04-27 04:34:40.0,2016-04-27 04:34:50.0]|"app1"|2800.0    |4            |
+---------------------------------------------+------+----------+-------------+

这是一个相当大的话题,你需要从整体上看待它。

您可以看到输出在某些情况下可能会很方便,尽管平均输出可用于计算总体平均数。成功。

【讨论】:

  • 我需要根据年份进行分组并从中取出总和...如果我不使用水印会出错...有什么办法解决这个问题吗?
  • 你的意思是我们不能应用火花结构化流来聚合除水印以外的列吗?很伤心
  • 他的问题是多重聚合...我的只有 1 个聚合
  • 是的,但是您需要考虑水印中的 group by,然后再考虑另一个 aggr
猜你喜欢
  • 2018-07-22
  • 2018-01-16
  • 2021-05-21
  • 2019-01-07
  • 2019-04-06
  • 2019-12-07
  • 2023-03-26
  • 2018-08-11
  • 1970-01-01
相关资源
最近更新 更多