【发布时间】:2021-01-04 09:50:47
【问题描述】:
我给 spark2 提交的命令为:
value = Varibale.get('value')
cmd = """
spark2-submit --master yarn --deploy-mode cluster
--driver-memory=10G
--conf spark.dynamicAllocation.minExecutors=5
--conf spark.dynamicAllocation.maxExecutors=10
--queue test
--executor-memory=10G
--executor-cores=2
--conf spark.yarn.driver.memoryOverhead=5120
--conf spark.driver.maxResultSize=2G
--conf spark.yarn.executor.memoryOverhead=5120
--conf spark.kryoserializer.buffer.max=1000m
--conf spark.executor.extraJavaOptions=-XX:+UseG1GC
--conf spark.network.timeout=15000s
--conf spark.executor.heartbeatInterval=1500s
--conf spark.task.maxDirectResultSize=8G
--principal test-host@test
--keytab /home/test-host.keytab
--conf spark.ui.view.acls="*"
/home/test/test.py {0}
""".format(value)
test = SSHOperator(task_id='TEST',
ssh_conn_id='test-conn',
command=cmd
)
我希望将其转换为 SparkSubmitOperator。另外,我需要 spark2 提交。
如何将上述转换为 SparkSubmitOperator? 到目前为止,我已经尝试过:
SparkSubmitOperator(task_id='TEST',
conn_id='test-conn',
application=f'/home/test/test.py {0}'.format(value),
executor_cores=2,
executor_memory='10g',
)
【问题讨论】:
标签: pyspark airflow-scheduler airflow