【发布时间】:2018-03-27 08:06:10
【问题描述】:
所以我有一个 Spark 作业来从 AWS 中提取一些域,然后是三个不同的作业,每个作业都获取所述域并从站点中提取各种数据。由于某种原因,此工作流在 ImportS3CrawlData 处停止并出现以下错误:
[2018-03-22 13:37:02,762] {models.py:1428} INFO - Executing <Task(SparkSubmitOperator): ImportCrawlJob> on 2018-03-22 13:37:00
[2018-03-22 13:37:02,763] {base_task_runner.py:115} INFO - Running: ['bash', '-c', 'sudo -H -u hdfs airflow run dag_extract_jobs ImportCrawlJob 2018-03-22T13:37:00 --job_id 21 --raw -sd DAGS_FOLDER/run_extract_jobs.py --cfg_path /tmp/tmpir3e3r32']
[2018-03-22 13:37:04,194] {base_task_runner.py:98} INFO - Subtask: [2018-03-22 13:37:04,193] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-03-22 13:37:04,356] {base_task_runner.py:98} INFO - Subtask: [2018-03-22 13:37:04,356] {models.py:189} INFO - Filling up the DagBag from /home/airflow/airflow/dags/run_extract_jobs.py
[2018-03-22 13:37:04,451] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-03-22 13:37:04,451] {base_task_runner.py:98} INFO - Subtask: File "/usr/bin/airflow", line 27, in <module>
[2018-03-22 13:37:04,451] {base_task_runner.py:98} INFO - Subtask: args.func(args)
[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask: File "/usr/lib/python3.5/site-packages/airflow/bin/cli.py", line 353, in run
[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask: dag = get_dag(args)
[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask: File "/usr/lib/python3.5/site-packages/airflow/bin/cli.py", line 130, in get_dag
[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask: 'parse.'.format(args.dag_id))
[2018-03-22 13:37:04,452] {base_task_runner.py:98} INFO - Subtask: airflow.exceptions.AirflowException: dag_id could not be found: dag_extract_jobs. Either the dag did not exist or it failed to parse.
run_extract_jobs.py 的代码可以在下面找到,删除了敏感/不必要的位。
# Parameters to initialize Spark:
access_id = Variable.get("AWS_ACCESS_KEY")
bucket_name = 'cb-scrapinghub'
secret_key = Variable.get("AWS_SECRET_KEY")
timestamp = datetime.now().strftime("%Y-%m-%d-%H:%M:%S")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
DAG = DAG(
dag_id='dag_extract_jobs',
description='Run Extract Jobs',
schedule_interval='@once',
start_date=datetime(2018, 1, 1),
catchup=False,
default_args=default_args,
)
# Spark Job that runs ImportS3CrawlData:
importCrawlJob = SparkSubmitOperator(
task_id='ImportCrawlJob',
...
run_as_user='hdfs',
dag=DAG,
)
# Spark Job that runs ExtractAboutText:
extractAboutText = SparkSubmitOperator(
task_id='ExtractAboutText',
...
run_as_user='hdfs',
dag=DAG
)
extractAboutText.set_upstream(importCrawlJob)
# Spark Job that runs ExtractCompanyInfo:
extractCompanyInfo = SparkSubmitOperator(
task_id='ExtractCompanyInfo',
...
run_as_user='hdfs',
dag=DAG
)
extractCompanyInfo.set_upstream(importCrawlJob)
# Spark Job that runs ExtractWebPeople:
extractWebPeople = SparkSubmitOperator(
task_id='ExtractWebPeople',
...
run_as_user='hdfs',
dag=DAG
)
extractWebPeople.set_upstream(importCrawlJob)
我已确保 Airflow 和 Spark 都是最新的。我的 dag 文件夹设置正确。 Airflow 可以很好地运行教程文件。
这几天我一直在搞砸这个问题,我完全感到困惑。提前感谢您的帮助。
【问题讨论】:
-
在我看来,DAG 本身有问题。你能毫无问题地用
airflow list_dags列出 DAG 吗?如果可行,airflow list_tasks <dag-name>呢? -
是的,这两个命令都可以正常工作。这就是我感到困惑的原因。
-
和
DAGS_FOLDER有关系吗?你写的还是这是一个未设置的变量?如果您自己在 shell 中准确执行sudo -H -u hdfs airflow run dag_extract_jobs ImportCrawlJob 2018-03-22T13:37:00 --job_id 21 --raw -sd DAGS_FOLDER/run_extract_jobs.py --cfg_path /tmp/tmpir3e3r32会发生什么? -
不适用于“--cfg_path /tmp/tmpir3e3r32”,没有这样的文件或目录。我完全删除了该变量并得到“dag_id not found”错误。然后我将“/tmp/tmpir3e3r32”替换为我的配置文件的名称“airflow.cfg”,并收到以下错误消息:
File "/usr/lib64/python3.5/json/decoder.py", line 357, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None json.decoder.JSONDecodeError: Expecting value: line 1 column 2 (char 1) -
我不明白你为什么用文件名交换配置路径。这一切似乎都指向缺少配置。
标签: apache-spark airflow