【发布时间】:2018-10-12 18:36:06
【问题描述】:
我们有一个DStream,比如
val ssc = new StreamingContext(sc, Seconds(1))
val kS = KafkaUtils.createDirectStream[String, TMapRecord](
ssc,
PreferConsistent,
Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT)).
mapPartitions(part => {
part.map(_.value())
}).
mapPartitions(part1 => {
part1.map(c => {
TMsg(1,
c.field1,
c.field2, //And others
c.startTimeSeconds
)
})
})
所以每个 RDD 都有一堆 TMsg 对象,其中包含一些(技术)关键字段,我可以使用这些字段来去除 DStream。基本上,如果我们在一个或两个离散 RDD 中有两个 TMsg 对象,它们具有相同的 field1 和 field2,并且它们相差不到 1 秒(我们查看 startTimeSeconds),这是一个重复。
我查看了 mapWithState。 是的,我可以创建 K -> V DStream 之类的
val mappedStream = kS.map(m => (m.field1, m.field2) -> m.startTimeSeconds)
所以我可以使用该功能,但不明白如何使用它来过滤重复项。
窗口函数没用,我不能使用(结构化流).deduplicate 函数,因为解决方案是用 DStreams 编写的。
有什么解决办法吗?谢谢
附: Spark 版本是 2.2
【问题讨论】:
标签: scala apache-spark streaming bigdata spark-streaming