【发布时间】:2022-08-10 23:40:36
【问题描述】:
//etl.py
start = DummyOperator(task_id = \'start\', dag = dag)
job1 = SparkSubmitOperator(task_id = \'t1\', application = \'/home/airflow/dags/test.py\',
name = \'test\', conf = {\'spark.master\': \'yarn\'}, dag = dag)
start >> job1
//test.py
os.environ[\'JAVA_HOME\'] = \'/usr/lib/jvm/java-1.8.0-openjdk-amd64\'
os.environ[\'SPARK_HOME\'] = \'/opt/spark3\'
os.environ[\'YARN_CONF_DIR\'] = \'/opt/hadoop/etc/hadoop\'
os.environ[\'HADOOP_CONF_DIR\'] = \'/opt/hadoop/etc/hadoop\'
spark = SparkSession.builder.master(\"yarn\").appName(\'test1\').getOrCreate()
target_dir = \"hdfs:/localhost:9000/hospital/data/test.csv\"
file = spark.read.format(\'csv\').options(header=\'True\').options(inferSchema=\'True\').load(target_dir)
我将 \"test.csv\" 放在 hdfs://hospital/data/test.csv 上,然后运行气流网络服务器,但出现错误
java.lang.IllegalArgumentException:路径名 /localhost:9000/hospital/来自 hdfs 的数据:/localhost:9000/hospital/data 不是有效的 DFS 文件名。
我也尝试过 hdfs:///localhost:9000/hospital/data、hdfs::/hospital/data 等,但总是出现相同的错误。
我该如何解决?
标签: apache-spark hadoop airflow