【问题标题】:Update Spark Streaming Variable During Execution在执行期间更新 Spark Streaming 变量
【发布时间】:2017-04-19 15:03:06
【问题描述】:

我有一个从 S3 目录读取 ELB 日志的 spark 流式传输作业, s3://elb-data/2017-04-17/, 解析它们并将它们转换为 ORC,然后将它们存储在一个新目录中:s3://parsed-data/2017-04-17/。这是我的代码:

val streamContext = new StreamingContext(sc, Seconds(30))
val rawLogFormat = new SimpleDateFormat("yyyy/MM/dd/")
val rawLogDate = rawLogFormat.format(new java.util.Date())
val filepath = args(0) + rawLogDate
val parsedLog = streamContext.textFileStream(filepath)

val jsonRows = parsedLog.mapPartitions(lines => {
  val txfm = new LogLine2Json
  lines.map(line =>
    try{
      txfm.parseLine(line)
    }
    catch {
      case e: Exception => {println(line); "";}
    }
  )
})

在新的一天开始时,AWS 会自动将日志放入一个新目录中,我希望我的流作业可以参考。 (s3://elb-logs/2017-04-18/) 但是,我注意到一旦我的工作到达 04/17 数据的末尾,它就不再看到新文件了。有没有办法在代码执行时更新这个变量?或者这是否需要我通过spark-submit 提交一份新工作?感谢您的帮助

【问题讨论】:

    标签: scala apache-spark spark-streaming


    【解决方案1】:

    我认为您的 filepath 对于您的流式作业保持不变,它会一直解析同一目录 s3://parsed-data/2017-04-17/。您需要将其指向您的根目录,即s3://parsed-data/,然后如果有新日期s3://parsed-data/2017-04-18/,它应该能够获取新数据。另请参阅此SO 帖子。

    【讨论】:

    • 我唯一的问题是我是否可以相应地更改输出的位置?我需要s3://elb-logs/2017-04-17/ 中的所有数据以s3://parsed-logs/2017-04-17/ 结尾,s3://elb-logs/2017-04-18/ 中的所有数据都以s3://parsed-logs/2017-04-18/ 结尾,等等。通配符解决方案可以做到吗?
    猜你喜欢
    • 2015-04-12
    • 2019-04-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-07-25
    • 2016-08-18
    • 2016-02-18
    • 1970-01-01
    相关资源
    最近更新 更多