【问题标题】:Airflow trigger DAG anytime after a google sheet is being updated更新谷歌表格后,气流随时触发 DAG
【发布时间】:2020-10-01 08:31:01
【问题描述】:

有什么方法可以安排在更新 Google 表格后立即触发 DAG?

不确定我是否从该文档中得到任何答案:https://airflow.readthedocs.io/en/latest/_api/airflow/providers/google/suite/hooks/sheets/index.html

【问题讨论】:

    标签: python google-cloud-platform airflow directed-acyclic-graphs


    【解决方案1】:

    @Alejandro 的方向是正确的,但只是扩展了他的答案。您可以使用 HttpSensor 运算符通过google drive api 对工作表文件进行获取请求

    HttpSensor(
        task_id='http_sensor_check',
        http_conn_id='http_default',
        endpoint='https://www.googleapis.com/drive/v3/files/fileId',
        request_params={},
        response_check=,
        poke_interval=5,
        dag=dag,
    )
    

    现在根据返回响应documentation,它应该返回modeifiedtime,你可以在response_check的响应中看到

    response_check=lambda response: response.json()['modifiedTime'] > last_time_stored
    

    您可以替换此 lambda 并从您的数据库或缓存等中获取值。

    立即触发:: 现在您可以使用 next 运算符与此传感器结合使用来有条件地触发。

    注意:这里 poke_Interval 取决于用例,您希望多久检查一次修改。

    【讨论】:

    • 我会试一试,然后回复你们!谢谢!很有帮助
    • 在这种情况下,我不需要在我的脚本中指定 schedule_interval 对吗?
    • 通过 schudule 间隔,你的意思是 poke_interval?如果您不想要默认值,那么您将需要。传感器在一定间隔后戳 api
    • 对不起一个愚蠢的问题并且超出了范围,为了阅读我需要访问 API 的响应。因此,我必须向该特定表格授予服务帐户才能阅读该表格,对吗?
    【解决方案2】:

    您可以将HTTPOperator 与 Google Drive API https://developers.google.com/drive/api/v3/reference/files/get 一起使用

    您也可以编写自己的实现,参见 WebHDFSHook 和 WebHDFSSensor 以供参考

    【讨论】:

    • 和我想的一样,我想的是传感器方法。
    • contrib 上还没有这样的传感器,也没有在书面钩子中的方法。在这种情况下,最好自己编写一个自定义传感器。更好的方法是使用您在下面说明的方法扩展 HTTPOperator。 WebHDFSHook 和 WebHDFSSensor 也都使用 WebHDFS REST API 来实现很好的参考
    • 但这仍然会戳成功回复,对吗?执行 HTTP GET 语句并在由于 404 Not Found 或 response_check 返回 False 导致失败时返回 False。 404(如 403)或连接被拒绝错误以外的 HTTP 错误代码将直接使传感器本身失败(不再戳)。
    • 是的,你是对的,但这些将是边缘情况,OP 想检查现有文件是否被修改。
    猜你喜欢
    • 1970-01-01
    • 2018-08-17
    • 2023-03-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-10-14
    • 1970-01-01
    • 2020-04-19
    相关资源
    最近更新 更多