【发布时间】:2020-03-23 12:43:30
【问题描述】:
我在 Spark Submit Operator 的 conf 中传递了以下 spark 参数,但在运行作业时这些参数似乎不起作用。
my_conf = {
'spark.io.compression.codec' : 'snappy',
'spark.scheduler.listenerbus.eventqueue.size' : '30000',
'spark.yarn.queue' : 'pixel',
'spark.driver.cores' : '5',
'spark.dynamicAllocation.minExecutors' : '100',
'spark.dynamicAllocation.maxExecutors' : '300',
'spark.shuffle.compress' : 'false',
'spark.sql.tungsten.enabled' : 'true',
'spark.shuffle.spill' : 'true',
'spark.sql.parquet.compression.codec' : 'snappy',
'spark.speculation' : 'true',
'spark.kryo.referenceTracking' : 'false',
'spark.hadoop.parquet.block.size' : '134217728',
'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version' : '2',
'spark.executor.memory' : '22g',
'spark.hadoop.dfs.blocksize' : '134217728',
'spark.shuffle.manager' : 'sort',
'spark.driver.memory' : '25g',
'spark.hadoop.mapreduce.input.fileinputformat.split.minsize' : '134217728',
'spark.akka.frameSize' : '1024',
'spark.yarn.executor.memoryOverhead' : '3120',
'spark.sql.parquet.filterPushdown' : 'true',
'spark.sql.inMemoryColumnarStorage.compressed' : 'true',
'spark.hadoop.parquet.enable.summary-metadata' : 'false',
'spark.serializer' : 'org.apache.spark.serializer.KryoSerializer',
'spark.rdd.compress' : 'true',
'spark.task.maxFailures' : '50',
'spark.yarn.max.executor.failures' : '30',
'spark.yarn.maxAppAttempts' : '1',
'spark.default.parallelism' : '2001',
'spark.network.timeout' : '1200s',
'spark.hadoop.dfs.client.read.shortcircuit' : 'true',
'spark.dynamicAllocation.enabled' : 'true',
'spark.executor.cores' : '5',
'spark.yarn.driver.memoryOverhead' : '5025',
'spark.shuffle.consolidateFiles' : 'true',
'spark.sql.parquet.mergeSchema' : 'false',
'spark.sql.avro.compression.codec' : 'snappy',
'spark.hadoop.dfs.domain.socket.path' : '/var/lib/hadoop-hdfs/dn_socket',
'spark.shuffle.spill.compress' : 'false',
'spark.sql.caseSensitive' : 'true',
'spark.hadoop.mapreduce.use.directfileoutputcommitter' : 'true',
'spark.shuffle.service.enabled' : 'true',
'spark.driver.maxResultSize' : '0',
'spark.sql.shuffle.partitions' : '2001'}
以下是 AirFlow 中用于运行 spark 作业的类
SparkSubmitOperator(
task_id='ml_agg',
application='/home/hdfs/airflow/dags/ML_Agg/ML_Aggregation-assembly-1.0.jar',
conf=my_conf,
conn_id='spark_default',
files=None,
py_files=None,
archives=None,
driver_class_path=None,
jars=None,
java_class='com.pubmatic.ml.MLAggregation_v2',
packages='com.databricks:spark-csv_2.11:1.3.0,com.databricks:spark-avro_2.11:2.0.1',
exclude_packages=None,
repositories=None,
keytab=None,
principal=None,
name='test_airflow_ml_aggregation',
application_args=application_args,
env_vars=None,
verbose=False,
spark_binary="spark-submit",
dag=my_dag
)
另外,提到 spark_default 配置。
{"queue":"default","deploy_mode": "cluster", "spark_home": "", "spark_binary": "spark-submit", "namespace": "default"}
不过,该作业仍在纱线上的 默认 队列上运行。
我还需要做些什么吗?
【问题讨论】:
标签: apache-spark hadoop-yarn airflow spark-submit