【问题标题】:SparkSubmitOperator raises “dag_id could not be found" errors when runSparkSubmitOperator 在运行时引发“dag_id could not be found”错误
【发布时间】:2018-03-27 08:06:10
【问题描述】:

所以我有一个 Spark 作业来从 AWS 中提取一些域,然后是三个不同的作业,每个作业都获取所述域并从站点中提取各种数据。由于某种原因,此工作流在 ImportS3CrawlData 处停止并出现以下错误:

[2018-03-22 13:37:02,762] {models.py:1428} INFO - Executing <Task(SparkSubmitOperator): ImportCrawlJob> on 2018-03-22 13:37:00
[2018-03-22 13:37:02,763] {base_task_runner.py:115} INFO - Running: ['bash', '-c', 'sudo -H -u hdfs airflow run dag_extract_jobs ImportCrawlJob 2018-03-22T13:37:00 --job_id 21 --raw -sd DAGS_FOLDER/run_extract_jobs.py --cfg_path /tmp/tmpir3e3r32']
[2018-03-22 13:37:04,194] {base_task_runner.py:98} INFO - Subtask: [2018-03-22 13:37:04,193] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-03-22 13:37:04,356] {base_task_runner.py:98} INFO - Subtask: [2018-03-22 13:37:04,356] {models.py:189} INFO - Filling up the DagBag from /home/airflow/airflow/dags/run_extract_jobs.py
[2018-03-22 13:37:04,451] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-03-22 13:37:04,451] {base_task_runner.py:98} INFO - Subtask:   File "/usr/bin/airflow", line 27, in <module>
[2018-03-22 13:37:04,451] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask:   File "/usr/lib/python3.5/site-packages/airflow/bin/cli.py", line 353, in run
[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask:     dag = get_dag(args)
[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask:   File "/usr/lib/python3.5/site-packages/airflow/bin/cli.py", line 130, in get_dag
[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask:     'parse.'.format(args.dag_id))
[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask: airflow.exceptions.AirflowException: dag_id could not be found: dag_extract_jobs. Either the dag did not exist or it failed to parse.

run_extract_jobs.py 的代码可以在下面找到,删除了敏感/不必要的位。

# Parameters to initialize Spark:
access_id = Variable.get("AWS_ACCESS_KEY")
bucket_name = 'cb-scrapinghub'
secret_key = Variable.get("AWS_SECRET_KEY")
timestamp = datetime.now().strftime("%Y-%m-%d-%H:%M:%S")


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

DAG = DAG(
    dag_id='dag_extract_jobs',
    description='Run Extract Jobs',
    schedule_interval='@once',
    start_date=datetime(2018, 1, 1),
    catchup=False,
    default_args=default_args,
)

# Spark Job that runs ImportS3CrawlData:
importCrawlJob = SparkSubmitOperator(
    task_id='ImportCrawlJob',
    ...
    run_as_user='hdfs',
    dag=DAG,
)

# Spark Job that runs ExtractAboutText:
extractAboutText = SparkSubmitOperator(
    task_id='ExtractAboutText',
    ...
    run_as_user='hdfs',
    dag=DAG
)
extractAboutText.set_upstream(importCrawlJob)

# Spark Job that runs ExtractCompanyInfo:
extractCompanyInfo = SparkSubmitOperator(
    task_id='ExtractCompanyInfo',
    ...
    run_as_user='hdfs',
    dag=DAG
)
extractCompanyInfo.set_upstream(importCrawlJob)

# Spark Job that runs ExtractWebPeople:
extractWebPeople = SparkSubmitOperator(
    task_id='ExtractWebPeople',
    ...
    run_as_user='hdfs',
    dag=DAG
)
extractWebPeople.set_upstream(importCrawlJob)

我已确保 Airflow 和 Spark 都是最新的。我的 dag 文件夹设置正确。 Airflow 可以很好地运行教程文件。

这几天我一直在搞砸这个问题,我完全感到困惑。提前感谢您的帮助。

【问题讨论】:

  • 在我看来,DAG 本身有问题。你能毫无问题地用airflow list_dags 列出 DAG 吗?如果可行,airflow list_tasks &lt;dag-name&gt; 呢?
  • 是的,这两个命令都可以正常工作。这就是我感到困惑的原因。
  • DAGS_FOLDER有关系吗?你写的还是这是一个未设置的变量?如果您自己在 shell 中准确执行 sudo -H -u hdfs airflow run dag_extract_jobs ImportCrawlJob 2018-03-22T13:37:00 --job_id 21 --raw -sd DAGS_FOLDER/run_extract_jobs.py --cfg_path /tmp/tmpir3e3r32 会发生什么?
  • 不适用于“--cfg_path /tmp/tmpir3e3r32”,没有这样的文件或目录。我完全删除了该变量并得到“dag_id not found”错误。然后我将“/tmp/tmpir3e3r32”替换为我的配置文件的名称“airflow.cfg”,并收到以下错误消息:File "/usr/lib64/python3.5/json/decoder.py", line 357, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None json.decoder.JSONDecodeError: Expecting value: line 1 column 2 (char 1)
  • 我不明白你为什么用文件名交换配置路径。这一切似乎都指向缺少配置。

标签: apache-spark airflow


【解决方案1】:

您的配置参数好像没有正确设置。

确保完成https://airflow.apache.org/configuration.html的第一部分

同样在airflow.cfg 中确保您已将dags_folder 设置为文件路径。同时,您还可以检查是否需要设置其他设置和其他路径。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-10-29
    • 2012-03-20
    • 1970-01-01
    • 2020-12-18
    • 2020-10-15
    • 2020-08-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多