【发布时间】:2017-11-04 10:15:28
【问题描述】:
我正在开发一个应用程序,其中每 30 秒(也可以是 5 秒)一些文件将被删除到文件系统中。我必须阅读它并解析它并将一些记录推送到 REDIS。
在每个文件中,所有记录都是独立的,我没有进行任何需要updateStateByKey 的计算。
我的问题是,如果由于某些问题(例如:REDIS 连接问题、文件中的数据问题等)某些文件未完全处理,我想再次重新处理(例如 n 次)文件并跟踪文件已处理。
出于测试目的,我从本地文件夹中读取。另外我不确定如何得出一个文件已完全处理并将其标记为已完成的结论(即写入该文件已处理的文本文件或数据库)
val lines = ssc.textFileStream("E:\\SampleData\\GG")
val words = lines.map(x=>x.split("_"))
words.foreachRDD(
x=> {
x.foreach(
x => {
var jedis = jPool.getResource();
try{
i=i+1
jedis.set("x"+i+"__"+x(0)+"__"+x(1), x(2))
}finally{
jedis.close()
}
}
)
}
)
【问题讨论】:
标签: scala apache-spark spark-streaming