1.首先
我总结了通过 Python & Boto3 在 AWS 上访问 DynamoDB 时经常使用的方法(CRUD + COPY)。
2. Boto3 方法
蜜蜂
矿。 py
dynamodb = boto3.resource(
"dynamodb",
endpoint_url="http://localhost:8100",
region_name=os.environ.get("REGION_NAME"),
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
)
数据定义
该表的定义和项目的定义在env.yaml 和item.yaml 中有描述。
环境.yaml
# テーブルの定義
DynamoDB:
TableName: TestTable
AttributeDefinitions:
- AttributeName: user_id
AttributeType: S
- AttributeName: count
AttributeType: N
- AttributeName: status
AttributeType: N
KeySchema:
- AttributeName: user_id
KeyType: HASH
- AttributeName: count
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: user_count
KeySchema:
- AttributeName: user_id
KeyType: HASH
- AttributeName: status
KeyType: RANGE
Projection:
ProjectionType: ALL
BillingMode: PAY_PER_REQUEST
项目.yaml
アイテムの定義
TestItem:
- user_id: test_user_1
count: 100
status: 1
- user_id: test_user_2
count: 80
status: 0
- user_id: test_user_3
count: 90
status: 1
- user_id: test_user_3
count: 40
status: 0
加载
矿。 py
with open("env.yaml", "r", encoding="utf-8") as f:
params = yaml.safe_load(f)["DynamoDB"]
with open("item.yaml", "r", encoding="utf-8") as f:
items = yaml.safe_load(f)["TestItem"]
C(创建)
表创建
矿。 py
def create_table(dynamodb_resource: boto3.resource, params: dict):
try:
table = dynamodb_resource.create_table(**params)
table.wait_until_exists()
print(f"Table ({params['TableName']}) created.")
except Exception as e:
raise e
create_table(dynamodb, params)
项目创建
矿。 py
def put_item(dynamodb_resource: boto3.resource, table_name: str, items: dict):
try:
dynamodb_table = dynamodb_resource.Table(table_name)
except Exception as e:
raise e
with dynamodb_table.batch_writer() as batch:
for counter, item in enumerate(items):
batch.put_item(Item=item)
print(f"Items put: {counter+1}")
put_item(dynamodb, "TestTable", items)
R(读)
获取表格列表
矿。 py
def list_tables(dynamodb_resource: boto3.resource):
tables = dynamodb_resource.tables.all()
for table in tables:
print(table)
list_tables(dynamodb)
获取表定义
矿。 py
def get_table_schema(dynamodb_resource: boto3.resource, table_name: str):
try:
dynamodb_table = dynamodb_resource.Table(table_name)
except Exception as e:
raise e
attrs = dynamodb_table.attribute_definitions
schema = dynamodb_table.key_schema
get_table_schema(dynamodb, "TestTable")
扫描
矿。 py
def scan_table(dynamodb_resource: boto3.resource, table_name: str):
try:
table = dynamodb_resource.Table(table_name)
except Exception as e:
raise e
response = table.scan()
data = response["Items"]
while "LastEvaluatedKey" in response:
response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
data.extend(response["Items"])
return data
scan_table(dynamodb, "TestTable")
询问
矿。 py
def query_table(
dynamodb_resource: boto3.resource,
table_name: str,
options: dict,
):
table = dynamodb_resource.Table(table_name)
query_res = table.query(**options)
data = query_res["Items"]
print(data)
query_options = {
# "Select": "COUNT",
"KeyConditionExpression": Key("user_id").eq("test_user_3")
& Key("count").eq(40),
# "FilterExpression": Attr("status").eq(1),
}
query_table(
dynamodb_resource=dynamodb,
table_name="TestTable",
options=query_options
)
U(更新)
矿。 py
def update_item(dynamodb_resource: boto3.resource, table_name: str, option: dict):
try:
dynamodb_table = dynamodb_resource.Table(table_name)
except Exception as e:
raise e
dynamodb_table.update_item(**option)
update_option = {
"Key": {"user_id": "test_user_2", "count": 100},
"UpdateExpression": "set #attr1 = :_status",
"ExpressionAttributeNames": {"#attr1": "status"},
"ExpressionAttributeValues": {":_status": 404},
"ReturnValues": "UPDATED_NEW",
}
update_item(
dynamodb_resource=dynamodb,
table_name="TestTable",
option=update_option
)
D(删除)
删除表
矿。 py
def delete_table(dynamodb_resource: boto3.resource, table_name: str):
try:
table = dynamodb_resource.Table(table_name)
table.delete()
table.wait_until_not_exists()
print(f"Table ({table_name}) deleted.")
except Exception as e:
raise e
delete_table(dynamodb, "TestTable")
删除所有项目
矿。 py
def truncate_table(dynamodb_resource: boto3.resource, table_name: str):
try:
table = dynamodb_resource.Table(table_name)
except Exception as e:
raise e
table_key_name = [key.get("AttributeName") for key in table.key_schema]
projectionExpression = ", ".join("#" + key for key in table_key_name)
expressionAttrNames = {"#" + key: key for key in table_key_name}
counter = 0
page = table.scan(
ProjectionExpression=projectionExpression,
ExpressionAttributeNames=expressionAttrNames,
)
with table.batch_writer() as batch:
while page["Count"] > 0:
counter += page["Count"]
for itemKeys in page["Items"]:
batch.delete_item(Key=itemKeys)
if "LastEvaluatedKey" in page:
page = table.scan(
ProjectionExpression=projectionExpression,
ExpressionAttributeNames=expressionAttrNames,
ExclusiveStartKey=page["LastEvaluatedKey"],
)
else:
break
print(f"Items deleted: {counter}")
truncate_table(dynamodb, "TestTable")
复制
将项目从一个表复制到另一个。注意
这使用boto3.client 而不是boto3.resource。
矿。 py
def copy_table(
src_dynamodb: boto3.client,
src_table_name: str,
dst_dynamodb: boto3.client,
dst_table_name: str,
):
dynamo_paginator = src_dynamodb.get_paginator("scan")
dynamodb_response = dynamo_paginator.paginate(
TableName=src_table_name,
Select="ALL_ATTRIBUTES",
ReturnConsumedCapacity="NONE",
ConsistentRead=True,
)
for page in dynamodb_response:
if page["Items"]:
for count, item in enumerate(page["Items"]):
dst_dynamodb.put_item(TableName=dst_table_name, Item=item)
print(f"Items transfered: {count+1}")
else:
print("Original Table is empty.")
矿。 py
src_dynamodb_client = boto3.client(
"dynamodb",
endpoint_url="http://localhost:8100",
region_name=os.environ.get("REGION_NAME"),
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
)
dst_dynamodb_client = boto3.client(
"dynamodb",
endpoint_url="http://localhost:8100",
region_name=os.environ.get("REGION_NAME"),
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
)
copy_table(
src_dynamodb_client, "TestTable",
dst_dynamodb_client, "TestTable2"
)
三、结论
我总结了DynamoDB常用的方法。
我也想将它添加到其他服务中。
原创声明:本文系作者授权爱码网发表,未经许可,不得转载;
原文地址:https://www.likecs.com/show-308624052.html