1.首先

我总结了通过 Python & Boto3 在 AWS 上访问 DynamoDB 时经常使用的方法(CRUD + COPY)。

Github

2. Boto3 方法

蜜蜂

矿。 py
dynamodb = boto3.resource(
    "dynamodb",
    endpoint_url="http://localhost:8100",
    region_name=os.environ.get("REGION_NAME"),
    aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),
    aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
)

数据定义

该表的定义和项目的定义在env.yamlitem.yaml 中有描述。

环境.yaml
# テーブルの定義
DynamoDB:
  TableName: TestTable
  AttributeDefinitions:
    - AttributeName: user_id
      AttributeType: S
    - AttributeName: count
      AttributeType: N
    - AttributeName: status
      AttributeType: N
  KeySchema:
    - AttributeName: user_id
      KeyType: HASH
    - AttributeName: count
      KeyType: RANGE
  GlobalSecondaryIndexes:
    - IndexName: user_count
      KeySchema:
        - AttributeName: user_id
          KeyType: HASH
        - AttributeName: status
          KeyType: RANGE
      Projection:
        ProjectionType: ALL
  BillingMode: PAY_PER_REQUEST
项目.yaml
アイテムの定義
TestItem:
  - user_id: test_user_1
    count: 100
    status: 1

  - user_id: test_user_2
    count: 80
    status: 0

  - user_id: test_user_3
    count: 90
    status: 1

  - user_id: test_user_3
    count: 40
    status: 0

加载

矿。 py
with open("env.yaml", "r", encoding="utf-8") as f:
    params = yaml.safe_load(f)["DynamoDB"]
with open("item.yaml", "r", encoding="utf-8") as f:
    items = yaml.safe_load(f)["TestItem"]

C(创建)

表创建

矿。 py
def create_table(dynamodb_resource: boto3.resource, params: dict):
    try:
        table = dynamodb_resource.create_table(**params)
        table.wait_until_exists()
        print(f"Table ({params['TableName']}) created.")
    except Exception as e:
        raise e

create_table(dynamodb, params)

项目创建

矿。 py
def put_item(dynamodb_resource: boto3.resource, table_name: str, items: dict):
    try:
        dynamodb_table = dynamodb_resource.Table(table_name)
    except Exception as e:
        raise e
    with dynamodb_table.batch_writer() as batch:
        for counter, item in enumerate(items):
            batch.put_item(Item=item)
    print(f"Items put: {counter+1}")

put_item(dynamodb, "TestTable", items)

R(读)

获取表格列表

矿。 py
def list_tables(dynamodb_resource: boto3.resource):
    tables = dynamodb_resource.tables.all()
    for table in tables:
        print(table)

list_tables(dynamodb)

获取表定义

矿。 py
def get_table_schema(dynamodb_resource: boto3.resource, table_name: str):
    try:
        dynamodb_table = dynamodb_resource.Table(table_name)
    except Exception as e:
        raise e
    attrs = dynamodb_table.attribute_definitions
    schema = dynamodb_table.key_schema

get_table_schema(dynamodb, "TestTable")

扫描

矿。 py
def scan_table(dynamodb_resource: boto3.resource, table_name: str):
    try:
        table = dynamodb_resource.Table(table_name)
    except Exception as e:
        raise e

    response = table.scan()

    data = response["Items"]
    while "LastEvaluatedKey" in response:
        response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
        data.extend(response["Items"])
    return data

scan_table(dynamodb, "TestTable")

询问

矿。 py
def query_table(
    dynamodb_resource: boto3.resource,
    table_name: str,
    options: dict,
):
    table = dynamodb_resource.Table(table_name)
    query_res = table.query(**options)
    data = query_res["Items"]
    print(data)

query_options = {
    # "Select": "COUNT",
    "KeyConditionExpression": Key("user_id").eq("test_user_3")
    & Key("count").eq(40),
    # "FilterExpression": Attr("status").eq(1),
}

query_table(
    dynamodb_resource=dynamodb, 
    table_name="TestTable", 
    options=query_options
)

U(更新)

矿。 py
def update_item(dynamodb_resource: boto3.resource, table_name: str, option: dict):
    try:
        dynamodb_table = dynamodb_resource.Table(table_name)
    except Exception as e:
        raise e
    dynamodb_table.update_item(**option)

update_option = {
    "Key": {"user_id": "test_user_2", "count": 100},
    "UpdateExpression": "set #attr1 = :_status",
    "ExpressionAttributeNames": {"#attr1": "status"},
    "ExpressionAttributeValues": {":_status": 404},
    "ReturnValues": "UPDATED_NEW",
}

update_item(
    dynamodb_resource=dynamodb, 
    table_name="TestTable", 
    option=update_option
)

D(删除)

删除表

矿。 py
def delete_table(dynamodb_resource: boto3.resource, table_name: str):
    try:
        table = dynamodb_resource.Table(table_name)
        table.delete()
        table.wait_until_not_exists()
        print(f"Table ({table_name}) deleted.")

    except Exception as e:
        raise e

delete_table(dynamodb, "TestTable")

删除所有项目

矿。 py
def truncate_table(dynamodb_resource: boto3.resource, table_name: str):
    try:
        table = dynamodb_resource.Table(table_name)
    except Exception as e:
        raise e

    table_key_name = [key.get("AttributeName") for key in table.key_schema]

    projectionExpression = ", ".join("#" + key for key in table_key_name)
    expressionAttrNames = {"#" + key: key for key in table_key_name}

    counter = 0
    page = table.scan(
        ProjectionExpression=projectionExpression,
        ExpressionAttributeNames=expressionAttrNames,
    )
    with table.batch_writer() as batch:
        while page["Count"] > 0:
            counter += page["Count"]
            for itemKeys in page["Items"]:
                batch.delete_item(Key=itemKeys)
            if "LastEvaluatedKey" in page:
                page = table.scan(
                    ProjectionExpression=projectionExpression,
                    ExpressionAttributeNames=expressionAttrNames,
                    ExclusiveStartKey=page["LastEvaluatedKey"],
                )
            else:
                break
    print(f"Items deleted: {counter}")

truncate_table(dynamodb, "TestTable")

复制

将项目从一个表复制到另一个。
【メモ】AWS boto3 & dynamoDBチートシート注意【メモ】AWS boto3 & dynamoDBチートシート
这使用boto3.client 而不是boto3.resource

矿。 py


def copy_table(
    src_dynamodb: boto3.client,
    src_table_name: str,
    dst_dynamodb: boto3.client,
    dst_table_name: str,
):
    dynamo_paginator = src_dynamodb.get_paginator("scan")

    dynamodb_response = dynamo_paginator.paginate(
        TableName=src_table_name,
        Select="ALL_ATTRIBUTES",
        ReturnConsumedCapacity="NONE",
        ConsistentRead=True,
    )

    for page in dynamodb_response:
        if page["Items"]:
            for count, item in enumerate(page["Items"]):
                dst_dynamodb.put_item(TableName=dst_table_name, Item=item)
            print(f"Items transfered: {count+1}")
        else:
            print("Original Table is empty.")
矿。 py
src_dynamodb_client = boto3.client(
        "dynamodb",
        endpoint_url="http://localhost:8100",
        region_name=os.environ.get("REGION_NAME"),
        aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),
        aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
    )

dst_dynamodb_client = boto3.client(
        "dynamodb",
        endpoint_url="http://localhost:8100",
        region_name=os.environ.get("REGION_NAME"),
        aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),
        aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
    )

copy_table(
    src_dynamodb_client, "TestTable", 
    dst_dynamodb_client, "TestTable2"
)

三、结论

我总结了DynamoDB常用的方法。
我也想将它添加到其他服务中。


原创声明:本文系作者授权爱码网发表,未经许可,不得转载;

原文地址:https://www.likecs.com/show-308624052.html

相关文章:

  • 2021-09-19
  • 2021-08-13
  • 2021-07-30
  • 2021-10-13
  • 2021-12-14
  • 2022-12-23
  • 2021-12-12
  • 2021-05-27
猜你喜欢
  • 2022-12-23
  • 2022-01-09
  • 2022-12-23
  • 2021-11-09
  • 2022-12-23
  • 2022-01-01
相关资源
相似解决方案