【问题标题】:Spark Streaming on a S3 DirectoryS3 目录上的 Spark Streaming
【发布时间】:2015-09-08 18:10:53
【问题描述】:

因此,我有数千个事件通过 Amazon Kinesis 流式传输到 SQS,然后转储到 S3 目录中。大约每 10 分钟创建一个新的文本文件,以将 Kinesis 中的数据转储到 S3。我想设置 Spark Streaming,以便将转储到 S3 的新文件流式传输。现在我有

import org.apache.spark.streaming._
val currentFileStream = ssc.textFileStream("s3://bucket/directory/event_name=accepted/")
currentFileStream.print
ssc.start()

但是,Spark Streaming 不会提取转储到 S3 中的新文件。我认为这与文件写入要求有关:

The files must have the same data format.
The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.
Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.

为什么 Spark 流式传输不获取新文件?是因为 AWS 在目录中创建文件而不是移动它们吗?如何确保 Spark 拾取转储到 S3 中的文件?

【问题讨论】:

  • 你能分享你设置s3相关配置的代码吗?

标签: scala amazon-web-services amazon-s3 apache-spark spark-streaming


【解决方案1】:

为了流式传输 S3 存储桶。您需要提供 S3 存储桶的路径。它将从该存储桶中的所有文件中流式传输所有数据。然后,每当在此存储桶中创建 w 个新文件时,它将被流式传输。如果您将数据附加到之前读取的现有文件中,则不会读取这些新更新。

这是一小段有效的代码

import org.apache.spark.streaming._

val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")      
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)

//ones above this may be deprecated?
hadoopConf.set("fs.s3n.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3n.awsSecretAccessKey",mySecretKey)

val ssc = new org.apache.spark.streaming.StreamingContext(
  sc,Seconds(60))
val lines = ssc.textFileStream("s3n://path to bucket")
lines.print()

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

希望它会有所帮助。

【讨论】:

  • 嘿哈菲兹,这对我不起作用。我设置了这些配置,但它仍然没有拾取新文件。我可以使用 sc.textFile("") 获取旧文件,但没有流式文件。
  • 你有什么异常吗?您的应用程序的日志是什么?
  • 不,没有错误。它根本不会提取新文件。
  • 是否可以流式传输多个存储桶?
  • 谢谢你这对我有用,但我用 s3a 和 minio
猜你喜欢
  • 2015-04-27
  • 2023-04-02
  • 1970-01-01
  • 2016-03-23
  • 1970-01-01
  • 2014-06-05
  • 2017-12-23
  • 2023-02-01
  • 2021-04-13
相关资源
最近更新 更多