# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install apache-airflow

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080
# nohup airflow webserver -p 8080 > ~/airflow/active.log 2>&1 &

# start the scheduler
airflow scheduler

# 删除dag(需要先去~/airflow/dags中删除py文件)
airflow delete_dag -y {dag_id}

安装路径: /root/anaconda3/envs/he_test/lib/python3.7/site-packages/

# 守护进程运行webserver
airflow webserver -D -p 8080
# 守护进程运行调度器
airflow scheduler -D

当在~/airflow/dags文件下添加py文件后,需要等待一会,才会在web中显示
重新运行airflow scheduler可以立刻在web中显示py文件

2. XCOM

可以使用xcom在不同的operator间传递变量(好像也可以在不同的dag之间传递参数,需要试一下)
xcom_pull(self, task_ids, dag_id, key, include_prior_dates)

def processing_data(**kwargs):
    kwargs['ti'].xcom_push(key='X', value=X)
    kwargs['ti'].xcom_push(key='str_with_trx_with_retail_with_corporate_with_account', value=str_with_trx_with_retail_with_corporate_with_account)


processing_data_operator = PythonOperator(
    task_id='processing_data_operator',
    provide_context=True,
    python_callable=processing_data,
    dag=dag,
)


def predict(**kwargs):
    ti = kwargs['ti']
    X = ti.xcom_pull(key='X', task_ids='processing_data_operator')
    
predict_operator = PythonOperator(
    task_id='predict_operator',
    provide_context=True,
    python_callable=predict,
    dag=dag,
)

3. 几个概念

https://www.cnblogs.com/piperck/p/10101423.html

  • DAG
  • DAG 意为有向无循环图,在 Airflow 中则定义了整个完整的作业。同一个 DAG 中的所有 Task 拥有相同的调度时间。
  • Task
  • Task 为 DAG 中具体的作业任务,它必须存在于某一个 DAG 之中。Task 在 DAG 中配置依赖关系,跨 DAG 的依赖是可行的,但是并不推荐。跨 DAG 依赖会导致 DAG 图的直观性降低,并给依赖管理带来麻烦。
  • DAG Run
  • 当一个 DAG 满足它的调度时间,或者被外部触发时,就会产生一个 DAG Run。可以理解为由 DAG 实例化的实例。
  • Task Instance
  • 当一个 Task 被调度启动时,就会产生一个 Task Instance。可以理解为由 Task 实例化的实例。

4. 常用命令

airflow test dag_id task_id execution_date   测试task
示例: airflow test example_hello_world_dag hello_task 20180516

airflow run dag_id task_id execution_date 运行task

airflow run -A dag_id task_id execution_date 忽略依赖task运行task

airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE  运行整个dag文件

airflow webserver -D  守护进程运行webserver

airflow scheduler -D  守护进程运行调度

airflow worker -D 守护进程运行celery worker

airflow worker -c 1 -D 守护进程运行celery worker并指定任务并发数为1

airflow pause dag_id  暂停任务

airflow unpause dag_id 取消暂停,等同于在管理界面打开off按钮

airflow list_tasks dag_id 查看task列表

airflow clear dag_id 清空任务实例

5. 运行一个dag的流程

  • 在~/airflow/dags文件下添加py文件,(需要等待一会,才会在web中显示,如果未开启webserver,也是可以运行的)
  • airflow unpause dag_id(取消暂停任务,任务会按照设定时间周期执行)
  • airflow trigger_dag dag_id(立刻运行整个dag)

6. 重启一个dag的流程

rm -rf ~/airflow/dags/aml_sl_with_config.py
airflow delete_dag -y aml_sl_with_config
ps -ef |grep "airflow scheduler" |awk '{print $2}'|xargs kill -9
vi ~/airflow/dags/aml_sl_with_config.py
nohup airflow scheduler &

7. json配置文件格式

{
  "hdfs_url": "localhost:50070",
  "hdfs_user": "hdfs",
  "daily_dir_list": [
    "trx",
    "str"
  ],
  "static_dir_list": [
    "retail",
    "corporate",
    "account"
  ],
  "base_local_path": "/root/airflow/aml_data/sl_data/{}/",
  "base_local_metrics_path": "/root/airflow/aml_data/sl_data/{}/for_metrics/",
  "base_local_model_path": "/root/airflow/aml_data/model/{}",
  "base_local_predict_res_path": "/root/airflow/aml_data/bp_data/res/{}",
  "model_prefix": "he_test_xgboost",
  "predict_res_prefix": "pred_full_table",
  "base_remote_daily_path": "/anti-money/daily_data_group/{}/daily/{}",
  "base_remote_static_path": "/anti-money/daily_data_group/{}/all",
  "base_remote_model_path": "/anti-money/he_test/model/{}",
  "base_remote_predict_res_path": "/anti-money/he_test/predict_res/{}",
  "specified_model_path":"",
  "start_time": "2018-05-01",
  "end_time": "2018-05-27",
  "metrics_start_time": "2018-05-28",
  "metrics_end_time": "2018-05-30"
}

8. 配置参数方式

  • Menu -> Admin -> Variables


     
    Airflow实战
     
 
Airflow实战
 

9. 一个可配置参数的自学习demo

# -*- coding: utf-8 -*-

from __future__ import print_function

import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable

from hdfs import *
import datetime
import pandas as pd
from sklearn.preprocessing import LabelEncoder
import xgboost as xgb
import os
import json
import shutil
from sklearn import metrics

args = {
    'owner': 'aml',
    'start_date': airflow.utils.dates.days_ago(0, hour=0),
    'retries': 3,
    'retry_delay': datetime.timedelta(minutes=2),
    'email': ['maotao@4paradigm.com', 'maopengyu@4paradigm.com'],
    'email_on_failure': True,
    'email_on_retry': True,
}

dag = DAG(
    dag_id='aml_sl_with_config',
    catchup=False,
    default_args=args,
    schedule_interval='0 * * * *',
    dagrun_timeout=datetime.timedelta(minutes=60),
)


def clear_local_path(local_path):
    if os.path.exists(local_path):
        if os.path.isfile(local_path):
            os.remove(local_path)
        else:
            # 删除目录及其下子文件
            shutil.rmtree(local_path)
            os.mkdir(local_path)
    else:
        os.mkdir(local_path)


def get_config_from_variables(**kwargs):
    # 获取配置文件中的参数
    sl_config = Variable.get('sl_config', deserialize_json=True, default_var={"hdfs_url":"localhost:50070","hdfs_user":"hdfs","daily_dir_list":["trx","str"],"static_dir_list":["retail","corporate","account"],"base_local_path":"/root/airflow/aml_data/sl_data/{}/","base_local_metrics_path":"/root/airflow/aml_data/sl_data/{}/for_metrics/","base_local_model_path":"/root/airflow/aml_data/model/{}","model_prefix":"he_test_xgboost","base_remote_daily_path":"/anti-money/daily_data_group/{}/daily/{}","base_remote_static_path":"/anti-money/daily_data_group/{}/all","base_remote_model_path":"/anti-money/he_test/model/{}","start_time":"2018-05-01","end_time":"2018-05-27","metrics_start_time":"2018-05-28","metrics_end_time":"2018-05-30"})
    print('config: {}'.format(sl_config))
    hdfs_url = sl_config['hdfs_url']
    hdfs_user = sl_config['hdfs_user']
    daily_dir_list = sl_config['daily_dir_list']
    static_dir_list = sl_config['static_dir_list']
    base_local_path 

相关文章: