假设您使用来自https://github.com/great-expectations/airflow-provider-great-expectations repo 的GreatExpectationsOperator,它仅支持https://registry.astronomer.io/providers/great-expectations/modules/greatexpectationsoperator 中列出并在此处列出的以下参数:
:param run_name: Identifies the validation run (defaults to timestamp if not specified)
:type run_name: Optional[str]
:param data_context_root_dir: Path of the great_expectations directory
:type data_context_root_dir: str
:param data_contex: A great_expectations DataContext object
:type data_contex: dict
:param expectation_suite_name: The name of the Expectation Suite to use for validation
:type expectation_suite_name: str
:param batch_kwargs: The batch_kwargs to use for validation
:type batch_kwargs: dict
:param assets_to_validate: A list of dictionaries of batch_kwargs + Expectation Suites to use for validation
:type assets_to_validate: iterable
:param checkpoint_name: A Checkpoint name to use for validation
:type checkpoint_name: str
:param fail_task_on_validation_failure: Fail the Airflow task if the Great Expectation validation fails
:type fail_task_on_validation_failure: bool
:param validation_operator_name: name of a Great Expectations validation operator, defaults to action_list_operator
:type validation_operator_name: Optional[str]
:param **kwargs: kwargs
:type **kwargs: Optional[dict]
检查来自link的以下example_dag:
"""
A DAG that demonstrates implementation of the GreatExpectationsOperator.
Note: you wil need to reference the necessary data assets and expectations suites in your project. You can find samples available in the provider source directory.
To view steps on running this DAG, check out the Provider Readme: https://github.com/great-expectations/airflow-provider-great-expectations#examples
"""
import logging
import os
import airflow
from airflow import DAG
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
from great_expectations_provider.operators.great_expectations_bigquery import GreatExpectationsBigQueryOperator
default_args = {
"owner": "Airflow",
"start_date": airflow.utils.dates.days_ago(1)
}
dag = DAG(
dag_id='example_great_expectations_dag',
default_args=default_args
)
# This runs an expectation suite against a sample data asset. You may need to change these paths if you do not have your `data`
# directory living in a top-level `include` directory. Ensure the checkpoint yml files have the correct path to the data file.
base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_file = os.path.join(base_path, 'include',
'data/yellow_tripdata_sample_2019-01.csv')
ge_root_dir = os.path.join(base_path, 'include', 'great_expectations')
ge_batch_kwargs_pass = GreatExpectationsOperator(
task_id='ge_batch_kwargs_pass',
expectation_suite_name='taxi.demo',
batch_kwargs={
'path': data_file,
'datasource': 'data__dir'
},
data_context_root_dir=ge_root_dir,
dag=dag,
)
# This runs an expectation suite against a data asset that passes the tests
ge_batch_kwargs_list_pass = GreatExpectationsOperator(
task_id='ge_batch_kwargs_list_pass',
assets_to_validate=[
{
'batch_kwargs': {
'path': data_file,
'datasource': 'data__dir'
},
'expectation_suite_name': 'taxi.demo'
}
],
data_context_root_dir=ge_root_dir,
dag=dag,
)
# This runs a checkpoint that will pass. Make sure the checkpoint yml file has the correct path to the data file.
ge_checkpoint_pass = GreatExpectationsOperator(
task_id='ge_checkpoint_pass',
run_name='ge_airflow_run',
checkpoint_name='taxi.pass.chk',
data_context_root_dir=ge_root_dir,
dag=dag
)
# This runs a checkpoint that will fail. Make sure the checkpoint yml file has the correct path to the data file.
ge_checkpoint_fail = GreatExpectationsOperator(
task_id='ge_checkpoint_fail',
run_name='ge_airflow_run',
checkpoint_name='taxi.fail.chk',
data_context_root_dir=ge_root_dir,
dag=dag
)
# This runs a checkpoint that will fail, but we set a flag to exit the task successfully.
ge_checkpoint_fail_but_continue = GreatExpectationsOperator(
task_id='ge_checkpoint_fail_but_continue',
run_name='ge_airflow_run',
checkpoint_name='taxi.fail.chk',
fail_task_on_validation_failure=False,
data_context_root_dir=ge_root_dir,
dag=dag
)
# This runs a checkpoint and passes in a root dir.
ge_checkpoint_pass_root_dir = GreatExpectationsOperator(
task_id='ge_checkpoint_pass_root_dir',
run_name='ge_airflow_run',
checkpoint_name='taxi.pass.chk',
data_context_root_dir=ge_root_dir,
dag=dag
)
ge_batch_kwargs_list_pass >> ge_checkpoint_pass_root_dir >> ge_batch_kwargs_pass >>ge_checkpoint_fail_but_continue >> ge_checkpoint_pass >> ge_checkpoint_fail