【问题标题】:Spark watermark and windowing in Append mode附加模式下的 Spark 水印和窗口化
【发布时间】:2018-11-27 08:05:42
【问题描述】:

在 15 分钟幻灯片中超过 24 小时间隔的结构化流式代码水印和 Windows 数据下方。代码在追加模式下仅生成空批次 0。在更新模式下,结果会正确显示。需要追加模式,因为 S3 接收器仅在追加模式下工作。

String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset<Row> sliding24h = rowData
        .withWatermark(eventTimeCol, slideDuration)
        .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
                col(nameCol)).count();

sliding24h
        .writeStream()
        .format("console")
        .option("truncate", false)
        .option("numRows", 1000)
        .outputMode(OutputMode.Append())
        //.outputMode(OutputMode.Complete())
        .start()
        .awaitTermination();

下面是完整的测试代码:

public static void main(String [] args) throws StreamingQueryException {
     SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();

     ArrayList<String> rl = new ArrayList<>();
     for (int i = 0; i < 200; ++i) {
         long t = 1512164314L + i * 5 * 60;
         rl.add(t + ",qwer");
     }

     String nameCol = "name";
     String eventTimeCol = "eventTime";
     String eventTimestampCol = "eventTimestamp";

     MemoryStream<String> input = new MemoryStream<>(42, spark.sqlContext(), Encoders.STRING());
     input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
     Dataset<Row> stream = input.toDF().selectExpr(
             "cast(split(value,'[,]')[0] as long) as " + eventTimestampCol,
             "cast(split(value,'[,]')[1] as String) as " + nameCol);

     System.out.println("isStreaming: " +  stream.isStreaming());

     Column eventTime = functions.to_timestamp(col(eventTimestampCol));
     Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);

     String windowDuration = "24 hours";
     String slideDuration = "15 minutes";
     Dataset<Row> sliding24h = rowData
             .withWatermark(eventTimeCol, slideDuration)
             .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
                     col(nameCol)).count();

     sliding24h
             .writeStream()
             .format("console")
             .option("truncate", false)
             .option("numRows", 1000)
             .outputMode(OutputMode.Append())
             //.outputMode(OutputMode.Complete())
             .start()
             .awaitTermination();
}

【问题讨论】:

    标签: apache-spark spark-structured-streaming


    【解决方案1】:

    【讨论】:

    • 我用 Spark 2.4.0 重新编译了 cove。测试代码产生预期的结果。
    猜你喜欢
    • 2021-09-07
    • 2019-06-04
    • 2017-11-08
    • 2021-04-30
    • 1970-01-01
    • 2021-01-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多