【发布时间】:2019-07-11 17:54:03
【问题描述】:
我有一个用 python 编写的 API,可以调用 AWS 服务,特别是 sqs、s3 和 dynamodb。我正在尝试为 API 编写单元测试,并且我想模拟对 AWS 的所有调用。我已经对 moto 进行了大量研究,作为模拟这些服务的一种方式,但是我尝试过的每个实现都不会模拟我的调用,而是向 AWS 发送真正的请求。调查这个问题,我发现people discussing 在使用 boto3>=1.8 时,boto 和 moto 之间存在一些不兼容性。有没有办法解决?我的终极问题是:在使用 boto3>=1.8 时,是否有一种简单的方法可以使用 moto 或其他库来模拟 boto3 对 sqs、s3 和 dynamodb 的调用?
这是我当前使用的 boto3 和 moto 版本:
boto3 == 1.9.314
moto == 1.3.11
以下是我最近尝试使用 moto 模拟对 sqs 的调用。我定义了一个 pytest 夹具,在其中创建了一个 mock_sqs 会话和一个(希望是假的)队列。我使用这个夹具对我的 get_queue_item 函数进行单元测试。
SQS 脚本
# ptr_api.aws.sqs
import boto3
REGION = 'us-east-1'
sqs_r = boto3.resource('sqs', REGION)
sqs_c = boto3.client('sqs', REGION)
def get_queue_item(queue_name):
queue = sqs_r.get_queue_by_name(QueueName=queue_name)
queue_url = queue.url
response = sqs_c.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
VisibilityTimeout=10,
WaitTimeSeconds=3
)
try:
message = response['Messages'][0]
receipt_handle = message['ReceiptHandle']
delete_response = sqs_c.delete_message(QueueUrl=queue_url,
ReceiptHandle=receipt_handle)
return message['Body']
except Exception as e:
print("error in get_queue_item: ")
print(e)
return False
测试 SQS 脚本
# test_sqs.py
import pytest
from moto import mock_sqs
import boto3
from ptr_api.aws.sqs import get_queue_item
@pytest.fixture
def sqs_mocker(scope='session', autouse=True):
mock = mock_sqs()
mock.start()
sqs_r = boto3.resource('sqs', 'us-east-1')
sqs_c = boto3.client('sqs', 'us-east-1')
queue_name = 'test_queue_please_dont_actually_exist'
queue_url = sqs_c.create_queue(
QueueName=queue_name
)['QueueUrl']
yield (sqs_c, queue_url, queue_name)
mock.stop()
def test_get_queue_item(sqs_mocker):
sqs_c, queue_url, queue_name = sqs_mocker
message_body = 'why hello there' # Create dummy message
sqs_c.send_message( # Send message to fake queue
QueueUrl=queue_url,
MessageBody=message_body,
)
res = get_queue_item(queue_name) # Test get_queue_item function
assert res == message_body
然而,当我去检查控制台时,我看到队列实际上已经创建。我也尝试过改变我的进口顺序,但似乎没有任何效果。我尝试使用模拟装饰器,甚至短暂地玩过 moto 的独立服务器模式。我是在做错什么,还是真的只是我听说过的较新版本的 boto3 与 boto3/moto 不兼容?不幸的是,降级我的 boto3 版本不是一种选择。有没有另一种方法可以通过另一个库获得我想要的结果?我对 localstack 进行了一些研究,但我想确保这是我完全放弃 moto 之前的唯一选择。
【问题讨论】:
-
我刚刚运行了你的测试代码。它没有创建 SQS 队列。我的默认配置文件中也没有定义任何凭据。测试成功完成。我不得不注释掉几行:第 4 行和倒数第二行。相反,我在最后一个断言语句之前添加了以下行:
res = sqs_c.receive_message(QueueUrl=queue_url)['Messages'][0]['Body']。 -
@krishna_mee2004 因此尝试了您运行的内容并注释掉了您提到的相同代码行并将我的函数调用替换为
res = sqs_c.receive_message(QueueUrl=queue_url)['Messages'][0]['Body'],并且仍然在sqs中创建了一个队列。所以我猜这是一个凭证? -
这似乎也对我有用。我没有使用过
moto,所以在逐步完成测试时,它确实看起来像是在接触aws。但是,我将boto3.client调用更改为传入aws_access_key_id和aws_secret_access_key为非工作值,我的测试仍然通过。也许尝试这样做以确保您没有从您的环境中获取凭据。另外,你是怎么运行这个的? -
@wholevinski 答案贴在下面,感谢您的帮助!
标签: python amazon-web-services mocking boto3 moto