【问题标题】:Upload Files directly to S3 chunk-by-chunk using Play Scala using Iteratees使用 Iteratees 使用 Play Scala 将文件直接逐块上传到 S3
【发布时间】:2014-12-23 13:32:44
【问题描述】:

我曾尝试使用 Iteratees 将文件直接上传到 s3,但徒劳无功。我还是函数式编程的新手,发现很难拼凑一些工作代码。

我编写了一个迭代器,它处理上传文件的块并将它们发送到 S3。最后上传失败并出现错误。

请帮我解决这个问题。

下面是我想出的代码

控制器处理程序

  def uploadFile = Action.async(BodyParser(rh => S3UploadHelper("bucket-name").s3Iteratee() ))  { implicit request =>
    Future {
      if(uploadLogger.isInfoEnabled) uploadLogger.info(s"Contents of Uploaded file :: \n " + request.body)
      Ok(views.html.index("File uploaded"))
    }
  }

助手类

case class S3UploadHelper(bucket: String, key: String = UUID.generate()) {

  private val AWS_ACCESS_KEY = ""
  private val AWS_SECRET_KEY = ""
  private val yourAWSCredentials = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY)
  val amazonS3Client = new AmazonS3Client(yourAWSCredentials)


  private val initRequest = new InitiateMultipartUploadRequest(bucket, key)
  private val initResponse = amazonS3Client.initiateMultipartUpload(initRequest)
  val uploadId = initResponse.getUploadId

  val uploadLogger = Logger("upload")

  def s3Iteratee(etags: Seq[PartETag] = Seq.empty[PartETag]): Iteratee[Array[Byte], Either[Result, CompleteMultipartUploadResult]] = Cont {
    case in: El[Array[Byte]] =>
      // Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk
      val uploadRequest = new UploadPartRequest()
        .withBucketName(bucket)
        .withKey(key)
        .withPartNumber(etags.length + 1)
        .withUploadId(uploadId)
        .withInputStream(new ByteArrayInputStream(in.e))
        .withPartSize(in.e.length)
      if(uploadLogger.isDebugEnabled) uploadLogger.debug(">> " + String.valueOf(in.e))
      val etag = Future { amazonS3Client.uploadPart(uploadRequest).getPartETag }
      etag.map(etags :+ _)
      Await.result(etag, 1.seconds)
      s3Iteratee(etags)
    case in @ Empty => s3Iteratee(etags)
    case in @ EOF =>
      import scala.collection.JavaConversions._
      val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toList)
      val result = amazonS3Client.completeMultipartUpload(compRequest)
      Done(Right(result), in)
    case in => s3Iteratee(etags)
  }

}

虽然 Iteratee 似乎可以工作并且我能够逐块处理文件,但上传失败并出现一个奇怪的错误。这是日志

[debug] upload - >> [B@1df9048d
[debug] upload - >> [B@152dcf59
[debug] upload - >> [B@7cfeb0d8
[debug] upload - >> [B@136844c5
[debug] upload - >> [B@16f41590
[debug] upload - >> [B@6dd85710
[debug] upload - >> [B@64294203
[debug] upload - >> [B@35366c2f
[debug] upload - >> [B@358a78c
[debug] upload - >> [B@2c171020
[debug] upload - >> [B@20076fb
[debug] upload - >> [B@4d13580
[debug] upload - >> [B@42738651
[debug] upload - >> [B@5671082f
[debug] upload - >> [B@57c70bb4
[debug] upload - >> [B@4154394f
[debug] upload - >> [B@4f93cf15
[debug] upload - >> [B@4bac523f
[debug] upload - >> [B@eaec52e
[debug] upload - >> [B@6ed00bf5
[debug] upload - >> [B@3f6a8a5d
[debug] upload - >> [B@16fe1a25
[debug] upload - >> [B@6e813a61
[debug] upload - >> [B@e01be7
[debug] upload - >> [B@6bb351c4
[debug] upload - >> [B@dfa51a5
[debug] upload - >> [B@6acf2049
[debug] upload - >> [B@6a7021d4
[debug] upload - >> [B@1b3c602f
[debug] upload - >> [B@44146d94
[debug] upload - >> [B@574ac037
[debug] upload - >> [B@3cdf258b
[debug] upload - >> [B@441a0727
[debug] upload - >> [B@2385aafd
[debug] upload - >> [B@224f9dc2
[debug] upload - >> [B@6779077d
[debug] upload - >> [B@734e178a
[debug] upload - >> [B@7d92895c
[debug] upload - >> [B@23edaaa1
[debug] upload - >> [B@c00134e
[debug] upload - >> [B@ff1a703
[error] play - Cannot invoke the action, eventually got an error: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 98h72s0EBA7653AD, AWS Error Code: MalformedXML, AWS Error Message: The XML you provided was not well-formed or did not validate against our published schema, S3 Extended Request ID: R7e44g8oRy5b4xd7MU++atibwrBSRFezeMxNCXE38gyzcwci5Zf
[error] application - 

! @6k2maob49 - Internal server error, for (POST) [/v1/file_upload] ->

play.api.Application$$anon$1: Execution exception[[AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema]]
        at play.api.Application$class.handleError(Application.scala:296) ~[play_2.10-2.3.2.jar:2.3.2]
        at play.api.DefaultApplication.handleError(Application.scala:402) [play_2.10-2.3.2.jar:2.3.2]
        at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        at scala.Option.map(Option.scala:145) [scala-library.jar:na]
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema
        at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:556) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:289) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:170) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:2723) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:1964) ~[aws-java-sdk-1.3.11.jar:na]

【问题讨论】:

    标签: scala amazon-s3 playframework-2.0


    【解决方案1】:

    我过去做过,Amazon s3 需要 5Mb 块,我最后返回的是元组,您可以根据您的要求进行更改。

    val client = new AmazonS3Client(new BasicAWSCredentials(access_key, secret_key))
    
    def my_parser = BodyParser { 
    
    val consume_5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5) &>> Iteratee.consume()
    val rechunkAdapter: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped(consume_5MB)
    
    multipartFormData(Multipart.handleFilePart({
    
      case Multipart.FileInfo(partName, file_name, content_type) => {
    
        val object_id = java.util.UUID.randomUUID().toString().replaceAll("-", "")
        val object_id_key = if (content_type.getOrElse("").contains("video") || content_type.getOrElse("").contains("audio")) object_id else object_id + file_name.substring(file_name.lastIndexOf('.'))
        var position = 0
        val etags = new java.util.ArrayList[PartETag]()
    
        val initRequest = new InitiateMultipartUploadRequest(bucket, object_id_key)
        val initResponse = client.initiateMultipartUpload(initRequest)
        println("fileName = " + file_name)
        println("contentType = " + content_type)
    
        (rechunkAdapter &>> Iteratee.foldM[Array[Byte], Int](1) { (c, bytes) =>
          Future {
            println("got a chunk!  :" + c + " size in KB: " + (bytes.length / 1024));
            val is = new java.io.ByteArrayInputStream(bytes)
    
            val uploadRequest = new UploadPartRequest()
              .withBucketName(bucket).withKey(object_id_key)
              .withUploadId(initResponse.getUploadId())
              .withPartNumber(c)
              .withFileOffset(position)
              .withInputStream(is)
              .withPartSize(bytes.length)
    
            etags.add(client.uploadPart(uploadRequest).getPartETag)
            position = position + bytes.length
    
            c + 1
          }
        }).map { v =>
          try {
            val compRequest = new CompleteMultipartUploadRequest(
              bucket,
              object_id_key,
              initResponse.getUploadId(),
              etags)
            client.completeMultipartUpload(compRequest)
            println("Finished uploading " + file_name)   
            client.setObjectAcl(bucket, object_id_key, com.amazonaws.services.s3.model.CannedAccessControlList.PublicRead)
            (object_id_key, file_name, content_type.getOrElse("application/octet-stream")) 
          } catch {
            case e: Exception => {
              println("S3 upload Ex " + e.getMessage())
              val abortMPURequest = new AbortMultipartUploadRequest("xxxxxxx", object_id, initResponse.getUploadId())
              client.abortMultipartUpload(abortMPURequest);
             ("error", file_name, content_type.getOrElse("application/octet-stream"))
            }
          }
        }
      }
    }))
    

    }

    【讨论】:

      猜你喜欢
      • 2011-06-29
      • 2014-08-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-08-08
      • 2021-10-04
      • 1970-01-01
      • 2019-12-06
      相关资源
      最近更新 更多