【发布时间】:2021-04-03 05:12:02
【问题描述】:
我正在构建一个基本的 ETL 管道,该管道到达一个主端点,该端点包含一个用于处理的 ID 列表(每次调用的可变数量)。我目前的想法是使用 RabbitMQ 作为队列系统,并从 RabbitMQ 消耗三个任务(提取、转换、加载)。我在网上看到的大多数教程都展示了任务在退出之前的简单顺序执行。我尝试构建一个 DAG,对我们收到的每个 ID 执行此顺序操作。但是当我不知道存在多少个 ID 时,我在试图弄清楚如何通过气流安排所有这些任务时遇到了问题。
这是相关 DAG 的一般树视图:
我已经开始使用 RabbitMQ 将这些 ID 推送到队列中,并让 celery 将可变数量的工作人员用于处理负载。我遇到的问题是我不知道如何打破“消费”循环。例如(我使用伪代码对 RabbitMQ 进行一些抽象):
def extract():
# callback function when messages are sent to this worker
def _extract(channel, url, rest):
resp = request.get(url)
channel.publish('transform_queue', resp)
# attach the callback to the queue
channel.basic_consume('extract_queue', callback=_extract)
channel.start_consuming() # runs a pseudo loop waiting for messages here
请注意,一些变量(例如_extract 下的通道是隐式的,但很可能会包装在自定义运算符中。
Load 和 Transform 函数的工作方式类似。我遇到的问题是当函数开始使用它时它不会停止,直到它关闭。我已经能够发送哨兵消息以允许该功能“退出”,但这将导致任务被标记为失败,并被发送重试。例如这里是哨兵关闭的代码。
def extract():
# callback function when messages are sent to this worker
def _extract(channel, message, rest):
if message == SHUTDOWN:
exit()
resp = requests.get(message.url)
channel.publish('transform_queue', resp)
# attach the callback to the queue
channel.basic_consume('extract_queue', callback=_extract)
channel.start_consuming() # runs a pseudo loop waiting for messages here
还有选择性地取消消费者的选项,但这只会增加更多复杂性,因为仍然存在轮询取消的问题,然后该任务最终会遇到上述相同的问题。
我的主要问题是:
有没有办法在这个设置中成功退出?
这是解决此问题的最佳方法吗?我想这是气流的常见用例,因此必须有一些最佳实践或常见设置。但是,我一直没能找到它。
【问题讨论】:
标签: python rabbitmq celery airflow etl