【发布时间】:2017-04-19 15:03:06
【问题描述】:
我有一个从 S3 目录读取 ELB 日志的 spark 流式传输作业,
s3://elb-data/2017-04-17/,
解析它们并将它们转换为 ORC,然后将它们存储在一个新目录中:s3://parsed-data/2017-04-17/。这是我的代码:
val streamContext = new StreamingContext(sc, Seconds(30))
val rawLogFormat = new SimpleDateFormat("yyyy/MM/dd/")
val rawLogDate = rawLogFormat.format(new java.util.Date())
val filepath = args(0) + rawLogDate
val parsedLog = streamContext.textFileStream(filepath)
val jsonRows = parsedLog.mapPartitions(lines => {
val txfm = new LogLine2Json
lines.map(line =>
try{
txfm.parseLine(line)
}
catch {
case e: Exception => {println(line); "";}
}
)
})
在新的一天开始时,AWS 会自动将日志放入一个新目录中,我希望我的流作业可以参考。 (s3://elb-logs/2017-04-18/) 但是,我注意到一旦我的工作到达 04/17 数据的末尾,它就不再看到新文件了。有没有办法在代码执行时更新这个变量?或者这是否需要我通过spark-submit 提交一份新工作?感谢您的帮助
【问题讨论】:
标签: scala apache-spark spark-streaming