【问题标题】:Sync two buckets through boto3通过boto3同步两个bucket
【发布时间】:2022-03-03 07:06:25
【问题描述】:

有什么方法可以使用 boto3 循环两个不同存储桶(源和目标)中的存储桶内容,如果它在源中找到与目标不匹配的任何键,则将其上传到目标存储桶。请注意我不想使用 aws s3 同步。我目前正在使用以下代码来完成这项工作:

import boto3

s3 = boto3.resource('s3')
src = s3.Bucket('sourcenabcap')
dst = s3.Bucket('destinationnabcap')
objs = list(dst.objects.all())
for k in src.objects.all():
 if (k.key !=objs[0].key):
  # copy the k.key to target

【问题讨论】:

    标签: python amazon-web-services amazon-s3 boto3


    【解决方案1】:

    如果您决定不使用 boto3。 boto3 还没有同步命令,可以直接使用

    # python 3
    
    import os
    
    sync_command = f"aws s3 sync s3://source-bucket/ s3://destination-bucket/"
    os.system(sync_command)
    

    【讨论】:

    【解决方案2】:

    我刚刚为此实现了一个简单的类(将本地文件夹同步到存储桶)。我在这里发布它希望它可以帮助任何有同样问题的人。

    您可以修改 S3Sync.sync 以考虑文件大小。

    from pathlib import Path
    from bisect import bisect_left
    
    import boto3
    
    
    class S3Sync:
        """
        Class that holds the operations needed for synchronize local dirs to a given bucket.
        """
    
        def __init__(self):
            self._s3 = boto3.client('s3')
    
        def sync(self, source: str, dest: str) -> [str]:
            """
            Sync source to dest, this means that all elements existing in
            source that not exists in dest will be copied to dest.
    
            No element will be deleted.
    
            :param source: Source folder.
            :param dest: Destination folder.
    
            :return: None
            """
    
            paths = self.list_source_objects(source_folder=source)
            objects = self.list_bucket_objects(dest)
    
            # Getting the keys and ordering to perform binary search
            # each time we want to check if any paths is already there.
            object_keys = [obj['Key'] for obj in objects]
            object_keys.sort()
            object_keys_length = len(object_keys)
            
            for path in paths:
                # Binary search.
                index = bisect_left(object_keys, path)
                if index == object_keys_length:
                    # If path not found in object_keys, it has to be sync-ed.
                    self._s3.upload_file(str(Path(source).joinpath(path)),  Bucket=dest, Key=path)
    
        def list_bucket_objects(self, bucket: str) -> [dict]:
            """
            List all objects for the given bucket.
    
            :param bucket: Bucket name.
            :return: A [dict] containing the elements in the bucket.
    
            Example of a single object.
    
            {
                'Key': 'example/example.txt',
                'LastModified': datetime.datetime(2019, 7, 4, 13, 50, 34, 893000, tzinfo=tzutc()),
                'ETag': '"b11564415be7f58435013b414a59ae5c"',
                'Size': 115280,
                'StorageClass': 'STANDARD',
                'Owner': {
                    'DisplayName': 'webfile',
                    'ID': '75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a'
                }
            }
    
            """
            try:
                contents = self._s3.list_objects(Bucket=bucket)['Contents']
            except KeyError:
                # No Contents Key, empty bucket.
                return []
            else:
                return contents
    
        @staticmethod
        def list_source_objects(source_folder: str) -> [str]:
            """
            :param source_folder:  Root folder for resources you want to list.
            :return: A [str] containing relative names of the files.
    
            Example:
    
                /tmp
                    - example
                        - file_1.txt
                        - some_folder
                            - file_2.txt
    
                >>> sync.list_source_objects("/tmp/example")
                ['file_1.txt', 'some_folder/file_2.txt']
    
            """
    
            path = Path(source_folder)
    
            paths = []
    
            for file_path in path.rglob("*"):
                if file_path.is_dir():
                    continue
                str_file_path = str(file_path)
                str_file_path = str_file_path.replace(f'{str(path)}/', "")
                paths.append(str_file_path)
    
            return paths
    
    
    if __name__ == '__main__':
        sync = S3Sync()
        sync.sync("/temp/some_folder", "some_bucket_name")
    

    还将if file_path.is_dir(): 替换为if not file_path.is_file(): 可以绕过无法解析的链接和其他类似的废话,感谢@keithpjolley 指出这一点。

    【讨论】:

    • if not file_path.is_file(): 替换if file_path.is_dir(): 让它绕过无法解析的链接和其他类似的废话。
    【解决方案3】:

    如果您只想按 Key 进行比较(忽略对象内的差异),您可以使用以下内容:

    s3 = boto3.resource('s3')
    source_bucket = s3.Bucket('source')
    destination_bucket = s3.Bucket('destination')
    destination_keys = [object.key for object in destination_bucket.objects.all()]
    for object in source_bucket.objects.all():
      if (object.key not in destination_keys):
        # copy object.key to destination
    

    【讨论】:

    • 是的,这看起来不错,但是由于目标中的对象位于一个文件夹中,比如 ABC,对象名称将与源不同,我必须使用 filter(Prefix ='ABC/' )。例如,源中的对象名称是 name1 但目标中的对象名称是 ABC/name ,您有什么想法可以使它们具有可比性吗?
    • 你可以在最后一个斜杠之前去掉字符串。
    【解决方案4】:
    1. 获取目标帐户 ID DEST_ACCOUNT_ID

    2. 创建源存储桶并添加此策略

        {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "DelegateS3Access",
                    "Effect": "Allow",
                    "Principal": {
                        "AWS": "arn:aws:iam::DEST_ACCOUNT_ID:root"
                    },
                    "Action": [
                        "s3:ListBucket",
                        "s3:GetObject"
                    ],
                    "Resource": [
                        "arn:aws:s3:::s3-copy-test/*",
                        "arn:aws:s3:::s3-copy-test"
                    ]
                }
            ]
        }
    1. 创建要复制的文件

    2. 在目标账户上创建用户并使用该用户配置 AWS CLI

    3. 在目标帐户上创建目标存储桶

    4. 将此策略附加到目标账户上的 IAM 用户

       {
       "Version": "2012-10-17",
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "s3:ListBucket",
                   "s3:GetObject"
               ],
               "Resource": [
                   "arn:aws:s3:::s3-copy-test",
                   "arn:aws:s3:::s3-copy-test/*"
               ]
           },
           {
               "Effect": "Allow",
               "Action": [
                   "s3:ListBucket",
                   "s3:PutObject",
                   "s3:PutObjectAcl"
               ],
               "Resource": [
                   "arn:aws:s3:::s3-copy-test-dest",
                   "arn:aws:s3:::s3-copy-test-dest/*"
               ]
           }
       ]
      

      }

    5. 执行文件同步

    aws s3 sync s3://s3-copy-test s3://s3-copy-test-dest --source-region eu-west-1 --region eu-west-1
    

    【讨论】:

      【解决方案5】:

      除了使用 boto3 DataSync 客户端之外,我还实现了一个与 boto3 S3 客户端类似的类。 DataSync 确实有单独的费用。

      我们遇到了同样的问题,但我们的另一个要求是我们需要每天处理 10GB-1TB 并精确匹配两个存储桶 s3 文件,如果更新则我们需要更新目标存储桶,如果删除我们需要 s3 文件要删除,如果创建则创建。

      默认情况下,数据同步选项'TransferMode': 'CHANGED' 只更改文件,包括文件名和大小。 "PreserveDeletedFiles": "REMOVE" 也是代码,但根据你的问题,我认为你会想要'PreserveDeletedFiles': 'PRESERVE'

      成本:您只需为数据同步任务移动的文件付费。因此,如果两个存储桶中都存在一个文件并且没有更改,则没有成本。

      性能:关于性能,未显示测试,但我在几个月前测试了一些存储桶,并在 20 分钟内复制了 720 GB,但我不记得文件数。

      用例:我们使用 DataSync 执行 s3 存储桶蓝/绿更新,我们不希望 s3 复制,因为它会在数据加载时干扰热 s3 存储桶。我们使用它的另一个地方是在主要应用程序更改期间迁移数据,当存储桶更改时,或者如果我们想要在存储桶内迁移数据。我发现数据同步客户端比 s3 客户端移动数据要快得多,对于大文件和大量文件,它的运行速度比 boto s3 快得多。

      缺点: 您将需要创建一个 IAM 角色,该角色可以访问存储桶、加密和 sts 假设数据同步。如果您移动

      """AWS DataSync an aws service to move/copy large amounts of data."""
      import logging
      import os
      
      import boto3
      import tenacity
      from botocore import waiter
      from botocore.exceptions import WaiterError
      
      
      logger = logging.getLogger(__name__)
      
      
      class DataSyncWaiter(object):
          """A AWS Data sync waiter class."""
          def __init__(self, client):
              """Init."""
              self._client = client
              self._waiter = waiter
      
          def wait_for_finished(self, task_execution_arn):
              """Wait for data sync to finish."""
              model = self._waiter.WaiterModel({
                  "version": 2,
                  "waiters": {
                      "JobFinished": {
                          "delay":
                          1,
                          "operation":
                          "DescribeTaskExecution",
                          "description":
                          "Wait until AWS Data Sync starts finished",
                          "maxAttempts":
                          1000000,
                          "acceptors": [
                              {
                                  "argument": "Status",
                                  "expected": "SUCCESS",
                                  "matcher": "path",
                                  "state": "success",
                              },
                              {
                                  "argument": "Status",
                                  "expected": "ERROR",
                                  "matcher": "path",
                                  "state": "failure",
                              },
                          ],
                      }
                  },
              })
              self._waiter.create_waiter_with_client("JobFinished", model,
                                                     self._client).wait(TaskExecutionArn=task_execution_arn)
      
      
      class DataSyncClient:
          """A AWS DataSync client."""
          def __init__(self, client, role_arn, waiter: DataSyncWaiter = None) -> None:
              """Init."""
              self._client: boto3.client = client
              if waiter is None:
                  waiter = DataSyncWaiter(client=client)
              self._waiter: DataSyncWaiter = waiter
              self._role_arn = role_arn
      
          def _delete_task(self, task_arn):
              """Delete a AWS DataSync task."""
              response = self._client.delete_task(TaskArn=task_arn)
              return response
      
          def _list_s3_locations(self):
              """List AWS DataSync locations."""
              locations = self._client.list_locations(MaxResults=100)
              if "Locations" in locations:
                  return [x for x in locations["Locations"] if x["LocationUri"].startswith("s3://")]
              return []
      
          def _create_datasync_s3_location(self, bucket_name: str, subdirectory: str = ""):
              """Create AWS DataSync location."""
              return self._client.create_location_s3(
                  Subdirectory=subdirectory,
                  S3BucketArn=f"arn:aws:s3:::{bucket_name}",
                  S3StorageClass="STANDARD",
                  S3Config={"BucketAccessRoleArn": self._role_arn},
              )
      
          def _find_location_arn(self, bucket_name, subdirectory: str, locations_s3):
              """Find AWS DataSync LocationArn based on bucketname."""
              for x in locations_s3:
                  # match the s3 location
                  if bucket_name in x["LocationUri"] and subdirectory in x["LocationUri"]:
                      # match the roles, these do not update frequently
                      location_metadata = self._client.describe_location_s3(LocationArn=x["LocationArn"])
                      if location_metadata['S3Config']['BucketAccessRoleArn'] == self._role_arn:
                          return x["LocationArn"]
              return self._create_datasync_s3_location(bucket_name=bucket_name, subdirectory=subdirectory)["LocationArn"]
      
          def move_data(self, task_name: str, source_bucket_name: str, dest_bucket_name: str, subdirectory: str) -> bool:
              """Move data using AWS DataSync tasks."""
              current_locations = self._list_s3_locations()
              source_s3_location_response = self._find_location_arn(bucket_name=source_bucket_name,
                                                                    locations_s3=current_locations,
                                                                    subdirectory=subdirectory)
              dest_s3_location_response = self._find_location_arn(bucket_name=dest_bucket_name,
                                                                  locations_s3=current_locations,
                                                                  subdirectory=subdirectory)
              logger.info("Moving data from SRC:{source} DEST:{dest}".format(
                  source=os.path.join(source_bucket_name, subdirectory), dest=os.path.join(dest_bucket_name, subdirectory)))
              task = self._client.create_task(
                  SourceLocationArn=source_s3_location_response,
                  DestinationLocationArn=dest_s3_location_response,
                  Name=f"{task_name}-sync",
                  Options={
                      "VerifyMode": "POINT_IN_TIME_CONSISTENT",
                      "OverwriteMode": "ALWAYS",
                      "PreserveDeletedFiles": "REMOVE",
                      # 'TransferMode': # 'CHANGED'|'ALL'
                  },
              )
              self.start_task_waiting_for_complete(task_arn=task["TaskArn"])
              self._delete_task(task_arn=task["TaskArn"])
              return True
      
          @tenacity.retry(
              retry=tenacity.retry_if_exception_type(exception_types=(WaiterError)),
              wait=tenacity.wait_random_exponential(multiplier=0.5),
              stop=tenacity.stop_after_attempt(max_attempt_number=60),
              reraise=True,
              after=tenacity.after_log(logger, logging.INFO),
          )
          def start_task_waiting_for_complete(self, task_arn: str):
              """Start data move task, with retry because sometimes not all files get
              moved.
      
              It is not clear if this is because of eventual consistency in S3
              or the AWS service just does not handle constistency well.
              """
              task_started = self._client.start_task_execution(TaskArn=task_arn)
              self._waiter.wait_for_finished(task_execution_arn=task_started["TaskExecutionArn"])
      
      
      def data_sync_move_data(task_name: str, data_sync_role_arn: str, source_bucket: str, destination_bucket: str, subdirectory: str,
                               datasync_client: boto3.client = None):
          """Move data from source bucket to destition bucket."""
          logger.info(f"DataSync: Moving all the data from {source_bucket} -> {destination_bucket}")
          if datasync_client is None:
              datasync_client = boto3.client("datasync")
          datasync_client = DataSyncClient(client=datasync_client, role_arn=data_sync_role_arn)
          datasync_client.move_data(
              task_name=task_name,
              source_bucket_name=source_bucket,
              dest_bucket_name=destination_bucket,
              subdirectory=subdirectory,
          )
      

      然后是实现

      DATA_SYNC_ROLE_ARN = {
          "sand": "arn:aws:iam::123456789:role/Bucket-and-DataSync-Access-sand",
          "dev": "arn:aws:iam::123456789:role/Bucket-and-DataSync-Access-dev",
          "stg": "arn:aws:iam::123456789:role/Bucket-and-DataSync-Access-stg",
          "prod": "arn:aws:iam::123456789:role/Bucket-and-DataSync-Access-prod",
      }
      data_sync_move_data(task_name="migrate_data",
                              data_sync_role_arn=DATA_SYNC_ROLE_ARN[env],
                              source_bucket="old-bucket-name",
                              destination_bucket="new-bucket-name,
                              subdirectory="", # this is whole bucket
                              datasync_client=boto3.client('datasync'))
      

      IAM 角色示例:

        Role:
          Type: AWS::IAM::Role
          Properties:
            RoleName: !Sub "Bucket-and-DataSync-Access-${Environment}"
            AssumeRolePolicyDocument:
              Version: "2012-10-17"
              Statement:
                - Effect: "Allow"
                  Principal:
                    Service:
                      - "datasync.amazonaws.com"
                  Action:
                    - "sts:AssumeRole"
          ...<s3 bucket access and encryption>
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-10-15
        • 2022-01-16
        相关资源
        最近更新 更多