除了使用 boto3 DataSync 客户端之外,我还实现了一个与 boto3 S3 客户端类似的类。 DataSync 确实有单独的费用。
我们遇到了同样的问题,但我们的另一个要求是我们需要每天处理 10GB-1TB 并精确匹配两个存储桶 s3 文件,如果更新则我们需要更新目标存储桶,如果删除我们需要 s3 文件要删除,如果创建则创建。
默认情况下,数据同步选项'TransferMode': 'CHANGED' 只更改文件,包括文件名和大小。 "PreserveDeletedFiles": "REMOVE" 也是代码,但根据你的问题,我认为你会想要'PreserveDeletedFiles': 'PRESERVE'。
成本:您只需为数据同步任务移动的文件付费。因此,如果两个存储桶中都存在一个文件并且没有更改,则没有成本。
性能:关于性能,未显示测试,但我在几个月前测试了一些存储桶,并在 20 分钟内复制了 720 GB,但我不记得文件数。
用例:我们使用 DataSync 执行 s3 存储桶蓝/绿更新,我们不希望 s3 复制,因为它会在数据加载时干扰热 s3 存储桶。我们使用它的另一个地方是在主要应用程序更改期间迁移数据,当存储桶更改时,或者如果我们想要在存储桶内迁移数据。我发现数据同步客户端比 s3 客户端移动数据要快得多,对于大文件和大量文件,它的运行速度比 boto s3 快得多。
缺点:
您将需要创建一个 IAM 角色,该角色可以访问存储桶、加密和 sts 假设数据同步。如果您移动
"""AWS DataSync an aws service to move/copy large amounts of data."""
import logging
import os
import boto3
import tenacity
from botocore import waiter
from botocore.exceptions import WaiterError
logger = logging.getLogger(__name__)
class DataSyncWaiter(object):
"""A AWS Data sync waiter class."""
def __init__(self, client):
"""Init."""
self._client = client
self._waiter = waiter
def wait_for_finished(self, task_execution_arn):
"""Wait for data sync to finish."""
model = self._waiter.WaiterModel({
"version": 2,
"waiters": {
"JobFinished": {
"delay":
1,
"operation":
"DescribeTaskExecution",
"description":
"Wait until AWS Data Sync starts finished",
"maxAttempts":
1000000,
"acceptors": [
{
"argument": "Status",
"expected": "SUCCESS",
"matcher": "path",
"state": "success",
},
{
"argument": "Status",
"expected": "ERROR",
"matcher": "path",
"state": "failure",
},
],
}
},
})
self._waiter.create_waiter_with_client("JobFinished", model,
self._client).wait(TaskExecutionArn=task_execution_arn)
class DataSyncClient:
"""A AWS DataSync client."""
def __init__(self, client, role_arn, waiter: DataSyncWaiter = None) -> None:
"""Init."""
self._client: boto3.client = client
if waiter is None:
waiter = DataSyncWaiter(client=client)
self._waiter: DataSyncWaiter = waiter
self._role_arn = role_arn
def _delete_task(self, task_arn):
"""Delete a AWS DataSync task."""
response = self._client.delete_task(TaskArn=task_arn)
return response
def _list_s3_locations(self):
"""List AWS DataSync locations."""
locations = self._client.list_locations(MaxResults=100)
if "Locations" in locations:
return [x for x in locations["Locations"] if x["LocationUri"].startswith("s3://")]
return []
def _create_datasync_s3_location(self, bucket_name: str, subdirectory: str = ""):
"""Create AWS DataSync location."""
return self._client.create_location_s3(
Subdirectory=subdirectory,
S3BucketArn=f"arn:aws:s3:::{bucket_name}",
S3StorageClass="STANDARD",
S3Config={"BucketAccessRoleArn": self._role_arn},
)
def _find_location_arn(self, bucket_name, subdirectory: str, locations_s3):
"""Find AWS DataSync LocationArn based on bucketname."""
for x in locations_s3:
# match the s3 location
if bucket_name in x["LocationUri"] and subdirectory in x["LocationUri"]:
# match the roles, these do not update frequently
location_metadata = self._client.describe_location_s3(LocationArn=x["LocationArn"])
if location_metadata['S3Config']['BucketAccessRoleArn'] == self._role_arn:
return x["LocationArn"]
return self._create_datasync_s3_location(bucket_name=bucket_name, subdirectory=subdirectory)["LocationArn"]
def move_data(self, task_name: str, source_bucket_name: str, dest_bucket_name: str, subdirectory: str) -> bool:
"""Move data using AWS DataSync tasks."""
current_locations = self._list_s3_locations()
source_s3_location_response = self._find_location_arn(bucket_name=source_bucket_name,
locations_s3=current_locations,
subdirectory=subdirectory)
dest_s3_location_response = self._find_location_arn(bucket_name=dest_bucket_name,
locations_s3=current_locations,
subdirectory=subdirectory)
logger.info("Moving data from SRC:{source} DEST:{dest}".format(
source=os.path.join(source_bucket_name, subdirectory), dest=os.path.join(dest_bucket_name, subdirectory)))
task = self._client.create_task(
SourceLocationArn=source_s3_location_response,
DestinationLocationArn=dest_s3_location_response,
Name=f"{task_name}-sync",
Options={
"VerifyMode": "POINT_IN_TIME_CONSISTENT",
"OverwriteMode": "ALWAYS",
"PreserveDeletedFiles": "REMOVE",
# 'TransferMode': # 'CHANGED'|'ALL'
},
)
self.start_task_waiting_for_complete(task_arn=task["TaskArn"])
self._delete_task(task_arn=task["TaskArn"])
return True
@tenacity.retry(
retry=tenacity.retry_if_exception_type(exception_types=(WaiterError)),
wait=tenacity.wait_random_exponential(multiplier=0.5),
stop=tenacity.stop_after_attempt(max_attempt_number=60),
reraise=True,
after=tenacity.after_log(logger, logging.INFO),
)
def start_task_waiting_for_complete(self, task_arn: str):
"""Start data move task, with retry because sometimes not all files get
moved.
It is not clear if this is because of eventual consistency in S3
or the AWS service just does not handle constistency well.
"""
task_started = self._client.start_task_execution(TaskArn=task_arn)
self._waiter.wait_for_finished(task_execution_arn=task_started["TaskExecutionArn"])
def data_sync_move_data(task_name: str, data_sync_role_arn: str, source_bucket: str, destination_bucket: str, subdirectory: str,
datasync_client: boto3.client = None):
"""Move data from source bucket to destition bucket."""
logger.info(f"DataSync: Moving all the data from {source_bucket} -> {destination_bucket}")
if datasync_client is None:
datasync_client = boto3.client("datasync")
datasync_client = DataSyncClient(client=datasync_client, role_arn=data_sync_role_arn)
datasync_client.move_data(
task_name=task_name,
source_bucket_name=source_bucket,
dest_bucket_name=destination_bucket,
subdirectory=subdirectory,
)
然后是实现
DATA_SYNC_ROLE_ARN = {
"sand": "arn:aws:iam::123456789:role/Bucket-and-DataSync-Access-sand",
"dev": "arn:aws:iam::123456789:role/Bucket-and-DataSync-Access-dev",
"stg": "arn:aws:iam::123456789:role/Bucket-and-DataSync-Access-stg",
"prod": "arn:aws:iam::123456789:role/Bucket-and-DataSync-Access-prod",
}
data_sync_move_data(task_name="migrate_data",
data_sync_role_arn=DATA_SYNC_ROLE_ARN[env],
source_bucket="old-bucket-name",
destination_bucket="new-bucket-name,
subdirectory="", # this is whole bucket
datasync_client=boto3.client('datasync'))
IAM 角色示例:
Role:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "Bucket-and-DataSync-Access-${Environment}"
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "datasync.amazonaws.com"
Action:
- "sts:AssumeRole"
...<s3 bucket access and encryption>