【问题标题】:Airflow task triggers successfully but no file uploaded气流任务触发成功但没有上传文件
【发布时间】:2021-07-11 14:25:41
【问题描述】:

我正在尝试使用 Airflow 运行一个简单的 ETL DAG。 Airflow 显示 DAG 已成功触发,但没有任何内容上传到我的计算机。当我自己运行该函数时,它工作正常。

这是从 api 中提取数据、转换数据并将其加载到 sqlite 数据库文件的 etl 函数。

from sqlite3.dbapi2 import Cursor
import requests
import pandas as pd
from datetime import datetime
import datetime
import sqlalchemy
import sqlite3
from sqlalchemy.orm import sessionmaker
pd.options.mode.chained_assignment = None


def run_activity_etl():
    DATABASE_LOCATION = "sqlite:///run_activity.sqlite"
    today = datetime.datetime.now()
    yesterday = today - datetime.timedelta(days=1)
    yesterday = yesterday.strftime("%Y-%m-%d")

    #extract
    access_token = "****" #not relevant for my issue
    header = {'Authorization': 'Bearer {}'.format(access_token)}
    response = requests.get("https://api.fitbit.com/1/user/-/activities/list.json?afterDate=2021-07-01&sort=asc&offset=0&limit=100", headers=header).json()
    activity_data=pd.json_normalize(response['activities'], sep="_")

    #transform
    subset=['startTime', 'activityName', 'distance', 'duration', 'speed', 'averageHeartRate','calories', 'steps']
    subset_activity_data=activity_data[subset]
    subset_run = subset_activity_data[subset_activity_data['activityName']=='Run']
    subset_run["startTime"]= pd.to_datetime(subset_run["startTime"])
    subset_run["date"] = subset_run["startTime"].dt.strftime("%Y-%m-%d")

    #load
    engine= sqlalchemy.create_engine(DATABASE_LOCATION)
    conn=sqlite3.connect('run_activity.sqlite')
    cursor=conn.cursor()

    sql_query = """
        CREATE TABLE IF NOT EXISTS run_activity(
            date VARCHAR(200),
            activityName VARCHAR(200),
            distance VARCHAR(200),
            duration VARCHAR(200),
            speed VARCHAR(200),
            averageHeartRate VARCHAR(200),
            calories VARCHAR(200),
            steps VARCHAR(200),
            startTime VARCHAR(200),
            CONSTRAINT primary_key_constraint PRIMARY KEY (startTime)
        )
        """

    cursor.execute(sql_query)
    print("Opened database successfully")

    try:
        subset_run.to_sql("run_activity", engine, index=False, if_exists='append')
    except:
        print("Data already exists in the database")

    conn.close()
    print("Close database successfully")

我的 DAG 文件:

from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from fitbit_api import run_activity_etl


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(0,0,0,0,0),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    'activity_dag',
    default_args=default_args,
    description='ETL process for Fitbit on running data!',
    schedule_interval=timedelta(days=1),
)


run_etl = PythonOperator(
    task_id='whole_activity_etl',
    python_callable=run_activity_etl,
    dag=dag,
)

run_etl

这是气流的日志:

Log from airflow

感谢任何帮助!

【问题讨论】:

    标签: python sqlalchemy airflow


    【解决方案1】:

    目前你有以下:

    try:
        subset_run.to_sql("run_activity", engine, index=False, if_exists='append')
    except:
        print("Data already exists in the database")
    

    这意味着它会捕获任何存在的异常。如果您删除 try except 并运行

    subset_run.to_sql("run_activity", engine, index=False, if_exists='append')
    

    您可能会看到异常。如果您在此处发布,我们可以进一步帮助您。

    【讨论】:

    • 谢谢,但是我在更改代码时没有看到异常。据我所知,日志也没有显示任何错误。此外,当我在没有气流的情况下运行代码时,它完全可以正常工作(> 它会创建 .sqlite 文件)
    • 您的数据库位置是根文件夹中的/run_activity.sqlite,对吗?也许在那里提供另一条完整路径
    • 感谢您的帮助!也尝试过DATABASE_LOCATION = "sqlite:///C:\\Users\\jules\\OneDrive\\Documenten\\Python projects\\run_activity.sqlite" ,但结果相同:在没有气流的情况下运行该功能时,它可以工作(创建文件)。如果我用气流触发 DAG,它说它触发成功,但是没有创建文件。也试过只写一个csv文件,但同样的“结果”......
    • 您确定 Airflow 有权写入该文件夹吗?编写一个简单的 PythonOperator 来打开一个文件并写入同一个目录来测试它。这里也不确定,但我认为你的完整路径应该是sqlite://C:\Users\jules\OneDrive\Documenten\Python projects\run_activity.sqlite
    • 我必须使用双反斜杠,否则会引发错误。如果我尝试做一个简单的任务(打开并将文件写入同一目录),我在 Airflow 日志中收到此错误:FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\jules\\OneDrive\\Documenten\\Python projects\\airflow-docker\\dags\\test.csv'(这个文件肯定存在)这是否意味着 Airflow 没有权限?如果是这样,我该如何授予 Airflow 权限?再次感谢!
    【解决方案2】:

    我仍然需要学习很多东西,但我找到了解决问题的方法。显然,在我的情况下,数据库位置应该是:sqlite:////opt/airflow/dags/run_activity.sqlite,因为这也写在我的 docker-compose 文件中:

    volumes:
        - ./dags:/opt/airflow/dags
        - ./logs:/opt/airflow/logs
        - ./plugins:/opt/airflow/plugins
    

    现在,此文件已上传,没有任何问题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-11-12
      • 2021-12-31
      • 1970-01-01
      • 2019-06-05
      • 2015-03-31
      • 2023-03-18
      • 2021-02-22
      • 1970-01-01
      相关资源
      最近更新 更多