【发布时间】:2016-09-29 17:21:23
【问题描述】:
使用 Spark 流式传输 (1.6) 我有一个文件流,用于读取批量大小为 2 秒的查找数据,但是文件仅每小时复制到目录中。
一旦有一个新文件,它的内容就会被流读取,这就是我想要缓存到内存中并保留在那里的内容
直到读取新文件。
我想将这个数据集加入到另一个流中,因此我想缓存。
这是Batch lookup data for Spark streaming的后续问题。
答案确实适用于updateStateByKey,但是我不知道如何处理 KV 对的情况
从查找文件中删除,因为updateStateByKey 中的值序列不断增长。
此外,任何提示如何使用 mapWithState 来做到这一点都会很棒。
这是我到目前为止尝试过的,但数据似乎没有被持久化:
val dictionaryStream = ssc.textFileStream("/my/dir")
dictionaryStream.foreachRDD{x =>
if (!x.partitions.isEmpty) {
x.unpersist(true)
x.persist()
}
}
【问题讨论】:
标签: scala apache-spark spark-streaming