【问题标题】:How to quickly send messages to azure queue storage using python?如何使用 python 快速将消息发送到 Azure 队列存储?
【发布时间】:2021-02-22 07:27:57
【问题描述】:

我正在尝试使用 python azure.storage.queue 库向 azure 发送大量消息(数千万),但是这样做需要很长时间。我使用的代码如下:

from azure.storage.queue import (
    QueueClient,
    BinaryBase64EncodePolicy,
    BinaryBase64DecodePolicy
)

messages = [example list of messages]
connectionString = "example connection string"
queueName = "example-queue-name"

queueClient = QueueClient.from_connection_string(connectionString, queueName)
for message in messages:
    queueClient.send_message(message)

目前,发送大约 70,000 条消息需要大约 3 个小时,考虑到需要发送的潜在消息数量,这显然太慢了。

我查看了文档以尝试找到批处理选项,但似乎不存在:https://docs.microsoft.com/en-us/python/api/azure-storage-queue/azure.storage.queue.queueclient?view=azure-python

我还想知道是否有人有任何使用 asynchio 库来加快此过程的经验并可以建议如何使用它?

【问题讨论】:

  • 怎么样?我的帖子有用吗?

标签: python azure python-asyncio azure-queues


【解决方案1】:

试试这个:

from azure.storage.queue import (
    QueueClient,
    BinaryBase64EncodePolicy,
    BinaryBase64DecodePolicy
)
from concurrent.futures import ProcessPoolExecutor
import time

messages = []

messagesP1 = messages[:len(messages)//2] 
messagesP2 = messages[len(messages)//2:] 

print(len(messagesP1))
print(len(messagesP2))

connectionString = "<conn str>"
queueName = "<queue name>"

queueClient = QueueClient.from_connection_string(connectionString, queueName)

def pushThread(messages):
   for message in messages:
       queueClient.send_message(message)



def callback_function(future):
    print('Callback with the following result', future.result())

tic = time.perf_counter()

def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        future = executor.submit(pushThread, messagesP1)
        future.add_done_callback(callback_function)
        future2 = executor.submit(pushThread, messagesP2)
        while True:
            if(future.running()):
                print("Task 1 running")
            if(future2.running()):
                print("Task 2 running")

            if(future.done() and future2.done()):
                print(future.result(), future2.result())
                break

if __name__ == '__main__':
    main()


toc = time.perf_counter()
    
print(f"spent {toc - tic:0.4f} seconds")

如您所见,我将消息数组拆分为 2 个部分,并使用 2 个任务同时将数据推送到队列中。根据我的测试,我有大约 800 条消息,我花了 94 秒来推送所有消息:

但是使用上面的方式,我花了48s:

【讨论】:

  • 嘿,Stanley,非常感谢,它看起来和我想要的完全一样。我一直很忙,但计划在今天或明天尝试实施,届时会向您提供反馈!
  • @petgeo,你好,现在怎么样了?我的帖子有用吗?
  • 嘿,斯坦利,我尝试实施此解决方案,但运行时失败,说未来没有结果。然后我尝试在没有回调功能的情况下运行它,它运行没有错误,但不幸的是实际上没有生成任何消息。我必须重置为旧版本,但如果有帮助,我可以重新运行并为您获取更多信息
  • @wwnde,肯定愿意尝试和帮助:)
  • @wwnde,我的朋友 Hury 正在研究这个。他很擅长,不用担心。
猜你喜欢
  • 2019-11-08
  • 2014-01-28
  • 1970-01-01
  • 2022-11-04
  • 1970-01-01
  • 2014-08-30
  • 1970-01-01
  • 2021-09-26
  • 2020-09-28
相关资源
最近更新 更多