【问题标题】:Upload ZipOutputStream to S3 without saving zip file (large) temporary to disk using AWS S3 Java将 ZipOutputStream 上传到 S3,而不使用 AWS S3 Java 将 zip 文件(大)临时保存到磁盘
【发布时间】:2019-03-17 05:39:24
【问题描述】:

我需要从 S3 下载照片(不在同一目录中),压缩它们,然后使用 AWS S3 Java SDK 再次上传到 S3。此 zip 文件大小可以以 GB 为单位。目前我正在使用 AWS Lambda,它的临时存储限制为 500 MB。所以我不想将 ZIP 文件保存在磁盘上,而是想将 ZIP 文件(使用从 S3 下载的照片动态创建)直接流式传输到 S3。我需要使用 AWS S3 Java SDK。

【问题讨论】:

  • 由于普通图像已经被压缩(*.bmp 除外),您可以明确添加不压缩这些文件。我很惊讶你想使用一个巨大的 zip 文件。自己的 R/O 文件系统?

标签: java amazon-s3 aws-lambda aws-java-sdk zipoutputstream


【解决方案1】:

基本思想是使用流式操作。这样您就不会等到在文件系统上生成 ZIP,而是立即开始上传,因为 ZIP 算法会生成任何数据。显然,一些数据会缓存在内存中,仍然不需要等待整个 ZIP 在磁盘上生成。我们还将在两个线程中使用流组合和PipedInputStream / PipedOutputStream:一个用于读取数据,另一个用于压缩内容。

这是的版本:

final AmazonS3 client = AmazonS3ClientBuilder.defaultClient();

final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);

final Thread s3In = new Thread(() -> {
    try (final ZipOutputStream zipOutputStream = new ZipOutputStream(pipedOutputStream)) {
        S3Objects
                // It's just a convenient way to list all the objects. Replace with you own logic.
                .inBucket(client, "bucket")
                .forEach((S3ObjectSummary objectSummary) -> {
                    try {
                        if (objectSummary.getKey().endsWith(".png")) {
                            System.out.println("Processing " + objectSummary.getKey());

                            final ZipEntry entry = new ZipEntry(
                                    UUID.randomUUID().toString() + ".png" // I'm too lazy to extract file name from the
                                    // objectSummary
                            );

                            zipOutputStream.putNextEntry(entry);

                            IOUtils.copy(
                                    client.getObject(
                                            objectSummary.getBucketName(),
                                            objectSummary.getKey()
                                    ).getObjectContent(),
                                    zipOutputStream
                            );

                            zipOutputStream.closeEntry();
                        }
                    } catch (final Exception all) {
                        all.printStackTrace();
                    }
                });
    } catch (final Exception all) {
        all.printStackTrace();
    }
});
final Thread s3Out = new Thread(() -> {
    try {
        client.putObject(
                "another-bucket",
                "previews.zip",
                pipedInputStream,
                new ObjectMetadata()
        );

        pipedInputStream.close();
    } catch (final Exception all) {
        all.printStackTrace();
    }
});

s3In.start();
s3Out.start();

s3In.join();
s3Out.join();

但是,请注意它会打印一个警告:

WARNING: No content length specified for stream data.  Stream contents will be buffered in memory and could result in out of memory errors.

这是因为 S3 需要在上传之前提前知道数据的大小。不可能提前知道生成的 ZIP 的大小。您可能可以使用multipart uploads 试试运气,但代码会更棘手。不过,这个想法是相似的:一个线程应该读取数据并以 ZIP 流的形式发送内容,而另一个线程应该读取 ZIPped 条目并将它们作为多部分上传。在所有的条目(部分)上传后,多部分应该完成。

这是 的示例:

final S3Client client = S3Client.create();

final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);

final Thread s3In = new Thread(() -> {
    try (final ZipOutputStream zipOutputStream = new ZipOutputStream(pipedOutputStream)) {
        client.listObjectsV2Paginator(
                ListObjectsV2Request
                        .builder()
                        .bucket("bucket")
                        .build()
        )
                .contents()
                .forEach((S3Object object) -> {
                    try {
                        if (object.key().endsWith(".png")) {
                            System.out.println("Processing " + object.key());

                            final ZipEntry entry = new ZipEntry(
                                    UUID.randomUUID().toString() + ".png" // I'm too lazy to extract file name from the object
                            );

                            zipOutputStream.putNextEntry(entry);

                            client.getObject(
                                    GetObjectRequest
                                            .builder()
                                            .bucket("bucket")
                                            .key(object.key())
                                            .build(),
                                    ResponseTransformer.toOutputStream(zipOutputStream)
                            );

                            zipOutputStream.closeEntry();
                        }
                    } catch (final Exception all) {
                        all.printStackTrace();
                    }
                });
    } catch (final Exception all) {
        all.printStackTrace();
    }
});
final Thread s3Out = new Thread(() -> {
    try {
        client.putObject(
                PutObjectRequest
                        .builder()
                        .bucket("another-bucket")
                        .key("previews.zip")
                        .build(),
                RequestBody.fromBytes(
                        IOUtils.toByteArray(pipedInputStream)
                )
        );
    } catch (final Exception all) {
        all.printStackTrace();
    }
});

s3In.start();
s3Out.start();

s3In.join();
s3Out.join();

它遭受了同样的困扰:ZIP 需要在上传之前在内存中准备好。

如果你有兴趣,我准备了demo project,你可以玩一下代码。

【讨论】:

    【解决方案2】:

    问题是适用于 S3 的 AWS Java 开发工具包不支持流式写入到 OutputStream 的方式。下面的 sn-p 实现了一个“S3OutputStream”,它从 OutputStream 扩展而来,将根据大小自动执行“putObject”或“initiateMultipartUpload”。这允许您将此 S3OutputStream 传递给 ZipOutputStream 的构造函数,例如new ZipOutputStream(new S3OutputStream(s3Client, "my_bucket", "path"))

    import java.io.ByteArrayInputStream;
    import java.io.OutputStream;
    import java.util.ArrayList;
    import java.util.List;
    
    import com.amazonaws.services.s3.AmazonS3;
    import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
    import com.amazonaws.services.s3.model.CannedAccessControlList;
    import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
    import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
    import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
    import com.amazonaws.services.s3.model.ObjectMetadata;
    import com.amazonaws.services.s3.model.PartETag;
    import com.amazonaws.services.s3.model.PutObjectRequest;
    import com.amazonaws.services.s3.model.UploadPartRequest;
    import com.amazonaws.services.s3.model.UploadPartResult;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class S3OutputStream extends OutputStream {
    
        private static final Logger LOG = LoggerFactory.getLogger(S3OutputStream.class);
    
        /** Default chunk size is 10MB */
        protected static final int BUFFER_SIZE = 10000000;
    
        /** The bucket-name on Amazon S3 */
        private final String bucket;
    
        /** The path (key) name within the bucket */
        private final String path;
    
        /** The temporary buffer used for storing the chunks */
        private final byte[] buf;
    
        /** The position in the buffer */
        private int position;
    
        /** Amazon S3 client. TODO: support KMS */
        private final AmazonS3 s3Client;
    
        /** The unique id for this upload */
        private String uploadId;
    
        /** Collection of the etags for the parts that have been uploaded */
        private final List<PartETag> etags;
    
        /** indicates whether the stream is still open / valid */
        private boolean open;
    
        /**
         * Creates a new S3 OutputStream
         * @param s3Client the AmazonS3 client
         * @param bucket name of the bucket
         * @param path path within the bucket
         */
        public S3OutputStream(AmazonS3 s3Client, String bucket, String path) {
            this.s3Client = s3Client;
            this.bucket = bucket;
            this.path = path;
            this.buf = new byte[BUFFER_SIZE];
            this.position = 0;
            this.etags = new ArrayList<>();
            this.open = true;
        }
    
        /**
         * Write an array to the S3 output stream.
         *
         * @param b the byte-array to append
         */
        @Override
        public void write(byte[] b) {
            write(b,0,b.length);
        }
    
        /**
         * Writes an array to the S3 Output Stream
         *
         * @param byteArray the array to write
         * @param o the offset into the array
         * @param l the number of bytes to write
         */
        @Override
        public void write(final byte[] byteArray, final int o, final int l) {
            this.assertOpen();
            int ofs = o, len = l;
            int size;
            while (len > (size = this.buf.length - position)) {
                System.arraycopy(byteArray, ofs, this.buf, this.position, size);
                this.position += size;
                flushBufferAndRewind();
                ofs += size;
                len -= size;
            }
            System.arraycopy(byteArray, ofs, this.buf, this.position, len);
            this.position += len;
        }
    
        /**
         * Flushes the buffer by uploading a part to S3.
         */
        @Override
        public synchronized void flush() {
            this.assertOpen();
            LOG.debug("Flush was called");
        }
    
        protected void flushBufferAndRewind() {
            if (uploadId == null) {
                LOG.debug("Starting a multipart upload for {}/{}",this.bucket,this.path);
                final InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(this.bucket, this.path)
                        .withCannedACL(CannedAccessControlList.BucketOwnerFullControl);
                InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(request);
                this.uploadId = initResponse.getUploadId();
            }
            uploadPart();
            this.position = 0;
        }
    
        protected void uploadPart() {
            LOG.debug("Uploading part {}",this.etags.size());
            UploadPartResult uploadResult = this.s3Client.uploadPart(new UploadPartRequest()
                    .withBucketName(this.bucket)
                    .withKey(this.path)
                    .withUploadId(this.uploadId)
                    .withInputStream(new ByteArrayInputStream(buf,0,this.position))
                    .withPartNumber(this.etags.size() + 1)
                    .withPartSize(this.position));
            this.etags.add(uploadResult.getPartETag());
        }
    
        @Override
        public void close() {
            if (this.open) {
                this.open = false;
                if (this.uploadId != null) {
                    if (this.position > 0) {
                        uploadPart();
                    }
                    LOG.debug("Completing multipart");
                    this.s3Client.completeMultipartUpload(new CompleteMultipartUploadRequest(bucket, path, uploadId, etags));
                }
                else {
                    LOG.debug("Uploading object at once to {}/{}",this.bucket,this.path);
                    final ObjectMetadata metadata = new ObjectMetadata();
                    metadata.setContentLength(this.position);
                    final PutObjectRequest request = new PutObjectRequest(this.bucket, this.path, new ByteArrayInputStream(this.buf, 0, this.position), metadata)
                            .withCannedAcl(CannedAccessControlList.BucketOwnerFullControl);
                    this.s3Client.putObject(request);
                }
            }
        }
    
        public void cancel() {
            this.open = false;
            if (this.uploadId != null) {
                LOG.debug("Aborting multipart upload");
                this.s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(this.bucket, this.path, this.uploadId));
            }
        }
    
        @Override
        public void write(int b) {
            this.assertOpen();
            if (position >= this.buf.length) {
                flushBufferAndRewind();
            }
            this.buf[position++] = (byte)b;
        }
    
        private void assertOpen() {
            if (!this.open) {
                throw new IllegalStateException("Closed");
            }
        }
    }
    
    

    【讨论】:

      【解决方案3】:

      我很晚才回答你的问题 我昨天为我的最新项目做的,看看下面的完整代码

      假设当我们在 s3 上上传文件时,它返回上传文件的 ObjectKey,这里我创建了一个相同的类并命名为 FileKey。

      package com.myprojectName.model.key;
      
      import java.time.Instant;
      
      import javax.persistence.Entity;
      
      import lombok.Data;
      import lombok.NoArgsConstructor;
      
      @Data
      @Entity
      @NoArgsConstructor
      public class FileKey {
      
          private String fileObjectKey;
          
          private String fileName;
          
          private int fileSize;
          
          private String fileType;
          
      }
      

      我存储在DownloadDetailsDTO中的presignedUrl的返回值

      import lombok.AllArgsConstructor;
      import lombok.Builder;
      import lombok.Getter;
      import lombok.NoArgsConstructor;
      
      import java.net.URL;
      
      @NoArgsConstructor
      @AllArgsConstructor
      @Getter
      @Builder
      public class FileDownloadDetailsDTO {
      
          private String name;
          private Long size;
          private String contentType;
          private URL preSignedDownloadUrl;
      
          public FileDownloadDetailsDTO(PreSignedUrlAndMetadata objectMetadata) {
              this.name = objectMetadata.getName();
              this.size = objectMetadata.getSize();
              this.contentType = objectMetadata.getContentType();
              this.preSignedDownloadUrl = objectMetadata.getUrl();
          }
      
      }
      

      PreSignedUrlAndMetaData 包含在 s3 存储桶上创建的 Url,如果不确定请查看以下代码

      public class PreSignedUrlAndMetadata {
      
      private final URL url;
      
      private final String name;
      
      private final String contentType;
      
      private final Long size;
      
      }
      

      以下方法将s3存储桶的每个文件作为zip条目存储到zip文件中,并返回一个zip文件的预签名URL(无需存储在本地temp中

       public FileDownloadDetailsDTO getDownloadFilesInZipDetails(String zipFileName, List<FileKey> files) {
      
              PreSignedUrlAndMetadata preSignedUrlAndMetadata;
              File zipFile = null;
              try {
                  zipFile = File.createTempFile(zipFileName, "file");
      
                  try (FileOutputStream fos = new FileOutputStream(zipFile); ZipOutputStream zos = new ZipOutputStream(fos)) {
      
                      for (FileKey file : files) {
                          String name = null;
                          if (ObjectUtils.isNotEmpty(file.getFileName())) {
                              name = file.getFileName();
                          }
                              ZipEntry entry = new ZipEntry(name);
      
                          try (InputStream inputStream = getInputStreamForFileKey(file.getFileObjectKey())) {
                              zos.putNextEntry(entry);
                              IOUtils.copy(inputStream, zos);
                              zos.closeEntry();
                          }
                      }
                  }
      
                  try (FileInputStream fis = new FileInputStream(zipFile)) {
                      TempFileObjectKey fileObjectKey =uploadTemp(fis, zipFile.length());
                      preSignedUrlAndMetadata = new PreSignedUrlAndMetadata(url, metadata.getUserMetaDataOf(USER_METADATA_NAME), contentType, metadata.getContentLength());
                  }
      
              } catch (Exception e) {
                  throw new ApplicationException("Error while creating zip file for " + archiveRequestDTO.getArchiveName(), e, ApplicationErrorCode.INTERNAL_SERVER_ERROR);
              } finally {
                  FileUtils.deleteQuietly(zipFile);
              }
      
              return FileDownloadDetailsDTO.builder().name(archiveRequestDTO.getArchiveName() + ".zip")
                      .size(preSignedUrlAndMetadata.getSize()).preSignedDownloadUrl(preSignedUrlAndMetadata.getUrl()).build();
      
          }
      
       public InputStream getInputStreamForFileKey(String key) {
          TempFileObjectKey tempFileObjectKey = new TempFileObjectKey(getActualPrefix(key));
          return storageService.getInputStream(tempFileObjectKey);
      }
      
      
      String getActualPrefix(String prefix){
          return prefix.replaceAll("_","/");
      }
      
      public TempFileObjectKey uploadTemp(InputStream inputStream, Long length) {
          TempFileObjectKey tempFileObjectKey = s3StorageManager.buildTempFileFullKey();
          ObjectMetadata objectMetadata = new ObjectMetadata();
          if (length != null) {
              objectMetadata.setContentLength(length);
          }
          Upload upload = com.amazonaws.services.s3.transfer.TransferManager.upload(getBucketName(abstractObjectKey), abstractObjectKey.getObjectKey(), inputStream, objectMetadata);
          try {
              upload.waitForCompletion();
          } catch (InterruptedException e) {
              throw new ApplicationException(e.getMessage(), e, ApplicationErrorCode.INTERNAL_SERVER_ERROR);
          }
          return tempFileObjectKey;
      }
      

      我希望这对你们有帮助。 您可以问我是否仍有任何疑问。 谢谢,

      【讨论】:

        猜你喜欢
        • 2023-03-24
        • 2021-02-27
        • 1970-01-01
        • 2020-04-29
        • 1970-01-01
        • 2019-04-15
        • 1970-01-01
        • 2013-04-03
        • 1970-01-01
        相关资源
        最近更新 更多