【发布时间】:2020-04-09 06:50:40
【问题描述】:
我想找到一种有效的方法将包含大量对象的 S3 文件夹/前缀复制到同一存储桶上的另一个文件夹/前缀。这是我尝试过的。
测试数据:200 对象左右,100 MB 左右。
1)aws s3 cp --recursive。花了大约150 secs。
2)s3-dist-cp。花了大约59 secs。
3)spark & aws jdk, 2 threads。花了大约440 secs。
4)spark & aws jdk, 64 threads。花了大约50 secs。
线程确实有效,但是当它转到单个线程时,aws java sdk 方法似乎不如aws s3 cp 方法有效。 是否有单线程编程 API 的性能可以媲美aws s3 cp?或者是否有更好的复制数据?
理想情况下,我更愿意使用编程 API 以获得更大的灵活性。
以下是我使用的代码。
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
def listAllFiles(rootPath: String): Seq[String] = {
val fileSystem = FileSystem.get(URI.create(rootPath), new Configuration())
val it = fileSystem.listFiles(new Path(rootPath), true)
var files = List[String]()
while (it.hasNext) {
files = it.next().getPath.toString::files
}
files
}
def s3CopyFiles(spark: SparkSession, fromPath: String, toPath: String): Unit = {
val fromFiles = listAllFiles(fromPath)
val toFiles = fromFiles.map(_.replaceFirst(fromPath, toPath))
val fileMap = fromFiles.zip(toFiles)
s3CopyFiles(spark, fileMap)
}
def s3CopyFiles(spark: SparkSession, fileMap: Seq[(String, String)]): Unit = {
val sc = spark.sparkContext
val filePairRdd = sc.parallelize(fileMap.toList, sc.defaultParallelism)
filePairRdd.foreachPartition(it => {
val p = "s3://([^/]*)/(.*)".r
val s3 = AmazonS3ClientBuilder.defaultClient()
while (it.hasNext) {
val (p(fromBucket, fromKey), p(toBucket, toKey)) = it.next()
s3.copyObject(fromBucket, fromKey, toBucket, toKey)
}
})
}
【问题讨论】:
-
有一个适用于 aws s3 的 scala 扩展库 doc.akka.io/docs/alpakka/current/s3.html
-
@JohnRotenstein 鉴于“aws s3 cp”实际上是多线程的,性能数字看起来很合理。
-
@Vamsi Akka 实现仍在使用
aws java sdk。大概没有太大的提升空间吧?此外,构建分布式应用程序比使用 Spark 困难得多。
标签: scala amazon-web-services apache-spark amazon-s3 amazon-emr