【发布时间】:2021-12-09 17:39:43
【问题描述】:
我正在气流 (Cloud Composer) 中设置 DAG,以触发一些需要 30 分钟以上才能完成的 Cloud Run 作业。
我没有使用 SimpleHTTPOperator,而是使用 pythonOperator 获取 OIDC 令牌,以允许我触发需要自动化的服务。
def make_authorized_get_request(service_url, **kwargs):
auth_req = google.auth.transport.requests.Request()
id_token = google.oauth2.id_token.fetch_id_token(auth_req, service_url)
headers = {"Authorization": f"Bearer {id_token}"}
req = requests.get(service_url, headers=headers, timeout=(5, 3600))
status = req.status_code
response_json = req.json()
return response_json,status
#t1 as request first report
big3_request = PythonOperator(
task_id='big3_request',
python_callable=make_authorized_get_request,
op_kwargs={"service_url":"cloud run url"}
)
我可以看到云运行作业已成功完成,但气流中的任务似乎没有收到响应(包含有关作业的一些数据和状态代码的 json)并且只是继续运行,除非设置了超时,在这种情况下它会出错(虽然响应应该是可用的)。
我已将此任务指向更短的服务进行测试,它可以很好地获得响应。
我怎样才能让任务接受响应?还是我需要使用其他运算符?
【问题讨论】: