【问题标题】:BroadCast Variable publish in Spark Program在 Spark 程序中发布广播变量
【发布时间】:2015-11-13 17:49:33
【问题描述】:

在 spark-java 程序中,我需要读取一个配置文件并填充一个 HashMap,我需要将其发布为广播变量,以便它可以在所有数据节点中使用。

我需要在将在 datanodes 中运行的 CustomInputFormat 类中获取此广播变量的值。由于广播变量是在我的驱动程序中声明的,如何在我的 CustomInputFormat 类中指定从特定广播变量中获取值?

我正在添加一些代码来解释它:

在这种情况下,我在驱动程序本身中使用它,即变量在同一个类中使用:这里我可以使用 Broadcat.value() 方法

> final Broadcast<String[]> signPrefixes =
> sc.broadcast(loadCallSignTable());
>     JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(
>       new PairFunction<Tuple2<String, Integer>, String, Integer> (){
>         public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
>           String sign = callSignCount._1();
>           String country = lookupCountry(sign, signPrefixes.value());
>           return new Tuple2(country, callSignCount._2());
>         }}).reduceByKey(new SumInts());

在场景 2 中,我将在自定义输入格式类中使用广播变量:

驱动程序:

> final JavaSparkContext sc=    new
> JavaSparkContext(sConf.setAppName("ParserSpark").setMaster("yarn-cluster"));
> Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
> 
> JavaPairRDD<NullWritable, ArrayList<Record>> baseRDD =
> sc.newAPIHadoopFile(args[2], InputFormat.class, NullWritable.class,
> ArrayList.class, conf);

InputFormat.class

> public class InputFormat extends  FileInputFormat {
> 
>   @Override   public RecordReader<NullWritable, ArrayList<Record>> 
>   createRecordReader(InputSplit split,            TaskAttemptContext context)
> throws IOException,           InterruptedException{
>       //I want to get the Broadcast Variable Here -- How will I do it 
>       
>         RecordReader reader = new RecordReader();         reader.initialize(split, context);      return reader;  }   @Override
>   protected boolean isSplitable(JobContext context, Path file) {
>       return false;    } }

【问题讨论】:

  • 我在驱动程序以外的另一个java类中需要这个广播值。
  • 在此期间您是否设法解决了这个问题?

标签: java apache-spark rdd


【解决方案1】:

我最近自己遇到了这个问题。最终变得相当简单(几个小时后,然后……哈哈!)

创建一个新配置,设置您的变量,并将其传递给 newAPIHadoopFile 函数的稍微不同的实现。

来自驱动程序(此处使用 Scala):

val myConf = new Configuration();
    myConf.set("var1", v1)
    myConf.set("var2", v2)
    myConf.set("var3", v3)

val yourFile = sc.newAPIHadoopFile("yourFilePath", classOf[MyFileInputFormat],classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.DoubleWritable],myConf)

从您的 InputFormat 或 InputReader..或任何您有上下文的地方(这次是 Java)

context.getConfiguration().get("var1");

或许

job.getConfiguration().get("var2");

【讨论】:

    【解决方案2】:

    您可以使用val bcVariable = sc.broadcast(myVariableToBroadcast) 在驱动程序上创建广播变量,稍后使用bcVariable.value 访问它

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-08-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-11-13
      • 1970-01-01
      相关资源
      最近更新 更多