【发布时间】:2021-07-11 14:25:41
【问题描述】:
我正在尝试使用 Airflow 运行一个简单的 ETL DAG。 Airflow 显示 DAG 已成功触发,但没有任何内容上传到我的计算机。当我自己运行该函数时,它工作正常。
这是从 api 中提取数据、转换数据并将其加载到 sqlite 数据库文件的 etl 函数。
from sqlite3.dbapi2 import Cursor
import requests
import pandas as pd
from datetime import datetime
import datetime
import sqlalchemy
import sqlite3
from sqlalchemy.orm import sessionmaker
pd.options.mode.chained_assignment = None
def run_activity_etl():
DATABASE_LOCATION = "sqlite:///run_activity.sqlite"
today = datetime.datetime.now()
yesterday = today - datetime.timedelta(days=1)
yesterday = yesterday.strftime("%Y-%m-%d")
#extract
access_token = "****" #not relevant for my issue
header = {'Authorization': 'Bearer {}'.format(access_token)}
response = requests.get("https://api.fitbit.com/1/user/-/activities/list.json?afterDate=2021-07-01&sort=asc&offset=0&limit=100", headers=header).json()
activity_data=pd.json_normalize(response['activities'], sep="_")
#transform
subset=['startTime', 'activityName', 'distance', 'duration', 'speed', 'averageHeartRate','calories', 'steps']
subset_activity_data=activity_data[subset]
subset_run = subset_activity_data[subset_activity_data['activityName']=='Run']
subset_run["startTime"]= pd.to_datetime(subset_run["startTime"])
subset_run["date"] = subset_run["startTime"].dt.strftime("%Y-%m-%d")
#load
engine= sqlalchemy.create_engine(DATABASE_LOCATION)
conn=sqlite3.connect('run_activity.sqlite')
cursor=conn.cursor()
sql_query = """
CREATE TABLE IF NOT EXISTS run_activity(
date VARCHAR(200),
activityName VARCHAR(200),
distance VARCHAR(200),
duration VARCHAR(200),
speed VARCHAR(200),
averageHeartRate VARCHAR(200),
calories VARCHAR(200),
steps VARCHAR(200),
startTime VARCHAR(200),
CONSTRAINT primary_key_constraint PRIMARY KEY (startTime)
)
"""
cursor.execute(sql_query)
print("Opened database successfully")
try:
subset_run.to_sql("run_activity", engine, index=False, if_exists='append')
except:
print("Data already exists in the database")
conn.close()
print("Close database successfully")
我的 DAG 文件:
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from fitbit_api import run_activity_etl
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(0,0,0,0,0),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'activity_dag',
default_args=default_args,
description='ETL process for Fitbit on running data!',
schedule_interval=timedelta(days=1),
)
run_etl = PythonOperator(
task_id='whole_activity_etl',
python_callable=run_activity_etl,
dag=dag,
)
run_etl
这是气流的日志:
感谢任何帮助!
【问题讨论】:
标签: python sqlalchemy airflow