【发布时间】:2019-06-04 16:33:29
【问题描述】:
我已经按年份执行了一个简单的分组操作,并进行了一些聚合,如下所示。我尝试将结果附加到 hdfs 路径,如下所示。我收到错误消息,
org.apache.spark.sql.AnalysisException: Append output mode not supported
when there are streaming aggregations on streaming DataFrames/DataSets
without watermark;;
Aggregate [year#88], [year#88, sum(rating#89) AS rating#173,
sum(cast(duration#90 as bigint)) AS duration#175L]
+- EventTimeWatermark event_time#96: timestamp, interval 10 seconds
下面是我的代码。有人可以帮忙吗
val spark =SparkSession.builder().appName("mddd").
enableHiveSupport().config("hive.exec.dynamic.partition", "true").
config("hive.exec.dynamic.partition.mode", "nonstrict").
config("spark.sql.streaming.checkpointLocation", "/user/sa/sparkCheckpoint").
config("spark.debug.maxToStringFields",100).
getOrCreate()
val mySchema = StructType(Array(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("year", IntegerType),
StructField("rating", DoubleType),
StructField("duration", IntegerType)
))
val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa/kafdata/")
import java.util.Calendar
val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
val df_agg_with_time = df_agg_without_time.withWatermark("event_time", "10 seconds").groupBy($"year").agg(sum($"rating").as("rating"),sum($"duration").as("duration"))
val cop = df_agg_with_time.withColumn("column_name_with", to_json(struct($"window")))
df_agg_with_time.writeStream.outputMode("append").partitionBy("year").format("csv").
option("path", "hdfs://dio/apps/hive/warehouse/gt.db/sample_mov/").start()
我的输入是 csv 格式
id,name,year,rating,duration
1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1993,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1921,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1963,3.8,5333
7,Muriel's Wedding,1963,3.5,6323
8,Mother's Boys,1963,3.4,5733
我的预期输出应该在 hdfs 中,并在一年中进行分区
year,rating,duration
1993,7.4,8956
1921,6.0,15212
1963,10.7,17389
我真的不确定我的方法有什么问题。请帮忙
【问题讨论】:
-
这是一种不寻常的方法来做到这一点。另外,如果第二次调用还有一些已经写入的另一年的数据怎么办?
-
关于“第二次执行”..我想调用 1 个 spark sql 作业来最终聚合数据...有没有其他选择?
-
好的,刚刚看完越狱并更新了答案。不确定您的最终用例。
-
那么..你怎么看..我应该使用火花结构化流还是火花流....我将每 30 秒获取一次数据...而且我需要应用聚合并写入配置单元表......例如......在 1 分钟内,我将聚合结果 3 次附加到配置单元......当它处理时......
-
Spark Streaming 已被弃用,但除此之外还有更多需要考虑。
标签: apache-spark spark-structured-streaming