【问题标题】:execute sql code which is embedded in a file in DAG执行嵌入在 DAG 文件中的 sql 代码
【发布时间】:2019-11-18 08:05:10
【问题描述】:

Fetch results from BigQueryOperator in airflow

我遵循了上面链接中的建议,解决方案有效,而且很好,如果我的 sql 是单行,它就有效。但如果 SQL 代码很大并将其放在一个文件中并在函数中引用该文件,则会失败。

def MyChequer(**kwargs):
big_query_count = bigquery_operator.BigQueryOperator(
    task_id='my_bq_query',
    sql='/dags/sqls/invalidTable.sql'
)

然后我收到错误:BigQuery 作业失败。最终错误是:{'reason': 'invalidQuery', 'location': 'query', 'message': '语法错误: Unexpected identifier "dags" at [1:1]'}

通常我以以下方式使用,以下工作

BigQueryOperator(
        task_id='invalidXXX',
        use_legacy_sql=False,
        sql='/dags/sqls/invalid_v1.sql',
        destination_dataset_table=targetTable,
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE',
        dag=dag
    )
   dag = DAG('invalidXXX', 
    default_args=default_args, 
    description='', 
    schedule_interval="0 5 * * *",
    catchup=False,
    template_searchpath=['/home/airflow/stgAirflow/']
   )

【问题讨论】:

    标签: python airflow directed-acyclic-graphs google-bigquery


    【解决方案1】:

    好的,我解决了这个问题。这意味着,当执行 dag 时,将使用并执行文件中的 sql 代码。不确定它是否是优化的解决方案。欢迎大家多提建议。

    //define 
    class SQLTemplatedPythonOperator(PythonOperator):
        template_ext = ('.sql',)
    
    //modify function
    def loadCSV(**kwargs):
        print("inside loadCSV")
        query = kwargs['templates_dict']['query']
        big_query_count = bigquery_operator.BigQueryOperator(
            task_id='my_bq_query',
            sql=query,
    
    //dag - task
    SQLTemplatedPythonOperator(
        task_id='invalidBBDToCSV',
        templates_dict={'query': 'invalidBBD.sql'},
        provide_context=True,
        python_callable=loadCSV,
        dag=dag,
    //dag
    dag = DAG('invalidBBDLoad', 
        default_args=default_args, 
        description='DAG data', 
        schedule_interval="0 11 * * *",
        catchup=False,
        template_searchpath=['/home/stgairflow/dags/sqls'], 
        user_defined_macros={'myProjectId': myProjectId,}
    )
    

    【讨论】:

      【解决方案2】:

      看起来错误来自尝试将这个字符串 '/dags/sqls/invalid_v1.sql' 作为 sql 执行...这是无效的。

      如果您想将 sql 保存在单独的文件中,您可以在那里读取文件内容吗?似乎 sql arg 需要一个实际的 sql 语句。

      【讨论】:

      • 是的,我同意,它将字符串解释为 SQL 并失败。这很明显,但通常,在 DAG 中,我给出了一个“template_searchpath”,但在这种情况下它不是 DAG,它是一个函数(def MyChequer)并且在函数运算符内部。我不知道如何让函数/运算符解释 sql 搜索路径......所以如果有任何人遇到过以及使用什么逻辑来修复,我正在寻找一些帮助。
      • 是的,有点遗憾,文档说他们支持解析模板文件:``` :param sql:要执行的 sql 代码(模板化) :type sql:可以接收一个 str 表示sql 语句、str 列表(sql 语句)或对模板文件的引用。模板引用被以 '.sql' 结尾的 str 识别。``` 但是阅读代码here,模板位置将在这里运行:if isinstance(self.sql, str):...没有模板处理。
      猜你喜欢
      • 2014-11-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-03-11
      • 1970-01-01
      • 2019-10-19
      • 1970-01-01
      • 2012-05-30
      相关资源
      最近更新 更多