【问题标题】:Airflow - Great Expectations - Send evaluation parameters through to GreatExpectationsOperatorAirflow - Great Expectations - 将评估参数发送到 GreatExpectationsOperator
【发布时间】:2021-10-20 19:58:08
【问题描述】:

对于在气流中使用 GreatExpectations 的任何人,有谁知道是否可以使用气流 GreatExpectationsOperator 发送评估参数?我目前正在尝试这个并收到错误:

airflow.exceptions.AirflowException:无效的参数被传递给 GreatExpectationsOperator (task_id: my_task)。无效的参数是: **kwargs: {'evaluation_parameters': {}}

谢谢,

【问题讨论】:

  • GreatExpectationsOperator 不是来自 Airflow。这是您开发的第 3 方或本地运营商。如果你需要帮助,你需要提供它的代码。

标签: airflow great-expectations


【解决方案1】:

假设您使用来自https://github.com/great-expectations/airflow-provider-great-expectations repo 的GreatExpectationsOperator,它仅支持https://registry.astronomer.io/providers/great-expectations/modules/greatexpectationsoperator 中列出并在此处列出的以下参数:

    :param run_name: Identifies the validation run (defaults to timestamp if not specified)
    :type run_name: Optional[str]
    :param data_context_root_dir: Path of the great_expectations directory
    :type data_context_root_dir: str
    :param data_contex: A great_expectations DataContext object
    :type data_contex: dict
    :param expectation_suite_name: The name of the Expectation Suite to use for validation
    :type expectation_suite_name: str
    :param batch_kwargs: The batch_kwargs to use for validation
    :type batch_kwargs: dict
    :param assets_to_validate: A list of dictionaries of batch_kwargs + Expectation Suites to use for validation
    :type assets_to_validate: iterable
    :param checkpoint_name: A Checkpoint name to use for validation
    :type checkpoint_name: str
    :param fail_task_on_validation_failure: Fail the Airflow task if the Great Expectation validation fails
    :type  fail_task_on_validation_failure: bool
    :param validation_operator_name: name of a Great Expectations validation operator, defaults to action_list_operator
    :type validation_operator_name: Optional[str]
    :param **kwargs: kwargs
    :type **kwargs: Optional[dict]

检查来自link的以下example_dag:

"""
A DAG that demonstrates implementation of the GreatExpectationsOperator. 
Note: you wil need to reference the necessary data assets and expectations suites in your project. You can find samples available in the provider source directory.
To view steps on running this DAG, check out the Provider Readme: https://github.com/great-expectations/airflow-provider-great-expectations#examples
"""

import logging
import os
import airflow
from airflow import DAG
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
from great_expectations_provider.operators.great_expectations_bigquery import GreatExpectationsBigQueryOperator

default_args = {
    "owner": "Airflow",
    "start_date": airflow.utils.dates.days_ago(1)
}

dag = DAG(
    dag_id='example_great_expectations_dag',
    default_args=default_args
)

# This runs an expectation suite against a sample data asset. You may need to change these paths if you do not have your `data`
# directory living in a top-level `include` directory. Ensure the checkpoint yml files have the correct path to the data file.
base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_file = os.path.join(base_path, 'include',
                         'data/yellow_tripdata_sample_2019-01.csv')
ge_root_dir = os.path.join(base_path, 'include', 'great_expectations')


ge_batch_kwargs_pass = GreatExpectationsOperator(
    task_id='ge_batch_kwargs_pass',
    expectation_suite_name='taxi.demo',
    batch_kwargs={
        'path': data_file,
        'datasource': 'data__dir'
    },
    data_context_root_dir=ge_root_dir,
    dag=dag,
)

# This runs an expectation suite against a data asset that passes the tests
ge_batch_kwargs_list_pass = GreatExpectationsOperator(
    task_id='ge_batch_kwargs_list_pass',
    assets_to_validate=[
        {
            'batch_kwargs': {
                'path': data_file,
                'datasource': 'data__dir'
            },
            'expectation_suite_name': 'taxi.demo'
        }
    ],
    data_context_root_dir=ge_root_dir,
    dag=dag,
)

# This runs a checkpoint that will pass. Make sure the checkpoint yml file has the correct path to the data file.
ge_checkpoint_pass = GreatExpectationsOperator(
    task_id='ge_checkpoint_pass',
    run_name='ge_airflow_run',
    checkpoint_name='taxi.pass.chk',
    data_context_root_dir=ge_root_dir,
    dag=dag
)

# This runs a checkpoint that will fail. Make sure the checkpoint yml file has the correct path to the data file.
ge_checkpoint_fail = GreatExpectationsOperator(
    task_id='ge_checkpoint_fail',
    run_name='ge_airflow_run',
    checkpoint_name='taxi.fail.chk',
    data_context_root_dir=ge_root_dir,
    dag=dag
)

# This runs a checkpoint that will fail, but we set a flag to exit the task successfully.
ge_checkpoint_fail_but_continue = GreatExpectationsOperator(
    task_id='ge_checkpoint_fail_but_continue',
    run_name='ge_airflow_run',
    checkpoint_name='taxi.fail.chk',
    fail_task_on_validation_failure=False,
    data_context_root_dir=ge_root_dir,
    dag=dag
)

# This runs a checkpoint and passes in a root dir.
ge_checkpoint_pass_root_dir = GreatExpectationsOperator(
    task_id='ge_checkpoint_pass_root_dir',
    run_name='ge_airflow_run',
    checkpoint_name='taxi.pass.chk',
    data_context_root_dir=ge_root_dir,
    dag=dag
)
 
ge_batch_kwargs_list_pass >> ge_checkpoint_pass_root_dir >> ge_batch_kwargs_pass >>ge_checkpoint_fail_but_continue >> ge_checkpoint_pass >> ge_checkpoint_fail

【讨论】:

    猜你喜欢
    • 2022-07-17
    • 2021-11-14
    • 2021-12-23
    • 2022-08-19
    • 2021-02-18
    • 2022-06-16
    • 2020-12-08
    • 2021-10-18
    • 1970-01-01
    相关资源
    最近更新 更多