【发布时间】:2021-05-03 22:29:22
【问题描述】:
我正在尝试使用 AWS CDK 部署将由 S3 上传事件触发的 Lambda 函数。当我尝试做cdk ls 或cdk synth 时,我得到了错误:
Traceback (most recent call last):
File "app.py", line 14, in <module>
S3TosqsStack(app, "S3TosqsStack", env=core.Environment(account=os.getenv('CDK_DEFAULT_ACCOUNT'), region=os.getenv('CDK_DEFAULT_REGION')))
File "/home/ec2-user/s3tosqs/.venv/lib64/python3.7/site-packages/jsii/_runtime.py", line 83, in __call__
inst = super().__call__(*args, **kwargs)
File "/home/ec2-user/s3tosqs/s3tosqs/s3tosqs_stack.py", line 37, in __init__
bucket=s3.IBucket(bucket_name=lambda_deployment_bucket),
File "/home/ec2-user/s3tosqs/.venv/lib64/python3.7/site-packages/typing_extensions.py", line 1548, in _no_init
raise TypeError('Protocols cannot be instantiated')
TypeError: Protocols cannot be instantiated
Subprocess exited with error 1
看起来问题源于 lambda 函数的 bucket 定义,但我不明白错误是什么,因为我遵循了文档。我尝试使用bucket_arn 而不是bucket_name,但这也没有用。
这是主堆栈代码:
s3tosqs_stack.py
from aws_cdk import (
aws_s3 as s3,
aws_s3_notifications as s3_notifications,
aws_sqs as sqs,
aws_lambda as _lambda
)
from aws_cdk import core
# User-specified Parameters
lambda_deployment_bucket = 'some-deployment-bucket'
trigger_bucket = 'some-trigger-bucket'
trigger_key = 'uploads'
queue_name = 'some-queue.fifo'
region = 'us-west-2'
class S3TosqsStack(core.Stack):
def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Defines an SQS queue resource
queue = sqs.Queue(
self, 'NotificationQueue',
queue_name=queue_name,
content_based_deduplication=True,
visibility_timeout=core.Duration.seconds(300)
)
# Defines an AWS Lambda resource
lambda_function = _lambda.Function(
self, 'S3toSQS',
runtime=_lambda.Runtime.PYTHON_3_8,
code=_lambda.Code.from_bucket(
bucket=s3.IBucket(bucket_name=lambda_deployment_bucket),
key='S3toSQS.zip'),
handler='handler.publish_SQS_message',
environment={'SOURCE_BUCKET': trigger_bucket,
'REGION': region,
'QUEUE_NAME': queue_name}
)
# Define S3 upload bucket for Lambda trigger
upload_bucket = s3.Bucket(
self, 'S3TriggerBucket',
bucket_name=trigger_bucket
)
upload_bucket.add_event_notification(
s3.EventType.OBJECT_CREATED,
s3_notifications.LambdaDestination(lambda_function),
s3.NotificationKeyFilter(
prefix=trigger_key)
)
【问题讨论】:
标签: amazon-web-services aws-lambda python-3.7 aws-cdk