【发布时间】:2017-04-11 15:51:29
【问题描述】:
我是 Spark 的新手。我正在尝试将 Spark 2.1 版本用于 CEP。 检测最后 2 分钟内丢失的事件。我正在将接收到的输入转换为 JavaDSStream 的输入事件,然后在 inputEvents 上执行 reducebykeyandWindow 并执行 spark sql。
JavaPairDStream<String, Long> reduceWindowed = inputEvents.reduceByKeyAndWindow(new MaxTimeFuntion(),
Durations.seconds(124), new Duration(2000));
reduceWindowed.foreachRDD((rdd, time) -> {
SparkSession spark = TestSparkSessionSingleton.getInstance(rdd.context().getConf());
JavaRDD<EventData> rowRDD = rdd.map(new org.apache.spark.api.java.function.Function<Tuple2<String,Long>, EventData>() {
@Override
public EventData call(Tuple2<String, Long> javaRDD) {
{
EventData record = new EventData ();
record.setId(javaRDD._1);
record.setEventTime(javaRDD._2);
return record;
}
})
Dataset<Row> eventDataFrames = spark.createDataFrame(rowRDD, EventData.class);
eventDataFrames.createOrReplaceTempView("checkins");
Dataset<Row> resultRows=
spark.sql("select id, max(eventTime) as maxval, from events group by id having (unix_timestamp()*1000 - maxval >= 120000)");
我使用 RDD 函数执行相同的过滤:
JavaPairDStream<String, Long> filteredStream = reduceWindowed.filter(new Function<Tuple2<String,Long>, Boolean>() {
public Boolean call(Tuple2<String,Long> val)
{
return (System.currentTimeMillis() - val._2() >= 120000);
}
});
filteredStream.print();
这两种方法都为我提供了数据集和 RDD 相同的结果。
我是否正确使用了 Spark sql。
在本地模式下,相同输入速率下,Spark SQL 查询执行消耗的 CPU 比 RDD 函数高。任何人都可以帮助我理解为什么 Spark SQL 与 RDD 过滤器功能相比消耗相对较高的 CPU..
【问题讨论】:
标签: apache-spark spark-streaming