【问题标题】:How to broadcast a DataFrame?如何广播 DataFrame?
【发布时间】:2019-12-17 11:15:32
【问题描述】:

我使用的是 spark-sql-2.4.1 版本。 创建如下广播变量

Broadcast<Map<String,Dataset>> bcVariable = javaSparkContext.broadcast(//read dataset);

我将 bcVariable 传递给函数

Service.calculateFunction(sparkSession, bcVariable.getValue());


public   static class Service {
        public static calculateFunction(
          SparkSession sparkSession,
          Map<String, Dataset> dataSet ) {

        System.out.println("---> size : " + dataSet.size());  //printing size 1


        for( Entry<String, Dataset> aEntry : dataSet.entrySet() ) {
           System.out.println( aEntry.getKey());   //  printing key 
            aEntry.getValue().show()   // throw null pointer exception
           }
    }

这里有什么问题?如何在函数中传递数据集/数据框?

尝试 2:

Broadcast<Dataset> bcVariable = javaSparkContext.broadcast(//read dataset);

我将 bcVariable 传递给函数

 Service.calculateFunction(sparkSession, bcVariable.getValue());

公共静态类服务{ 公共静态计算函数( 火花会话火花会话, 数据集数据集){

    System.out.println("---> size : " + dataSet.size());  // throwing null pointer exception.



}

这里有什么问题?如何在函数中传递数据集/数据框?

尝试 3:

Dataset metaData = //read dataset from oracle table i.e. meta-data.

我将元数据传递给函数

Service.calculateFunction(sparkSession, metaData);

public   static class Service {
        public static calculateFunction(
          SparkSession sparkSession,
          Dataset metaData ) {

        System.out.println("---> size : " + metaData.size());  // throwing null pointer exception.



    }

这里有什么问题?如何在函数中传递数据集/数据框?

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    要广播的值必须是任何 Scala 对象,但不能是 DataFrame

    Service.calculateFunction(sparkSession, metaData) 在执行器上执行,因此元数据是null(因为它没有被序列化并通过线路从驱动程序发送到执行器)。

    广播[T](值:T):广播[T]

    向集群广播一个只读变量,返回一个org.apache.spark.broadcast.Broadcast对象,以便在分布式函数中读取它。变量只会发送到每个集群一次。

    考虑DataFrame 数据抽象来表示以类似 SQL 的语言(数据集 API 或 SQL)描述的分布式计算。除了在驱动程序上可以提交计算以供执行(作为执行程序上的任务)之外,将它放在任何地方都没有任何意义。

    您只需使用DataFrame.collect“转换”此计算所代表的数据(以DataFrame 术语表示)。

    收集数据后,您可以使用.value 方法进行广播和引用。


    代码如下:

    val dataset = // reading data
    Broadcast<Map<String,Dataset>> bcVariable = 
      javaSparkContext.broadcast(dataset.collect);
    Service.calculateFunction(sparkSession, bcVariable.getValue());
    

    与您的代码相比,唯一的变化是collect

    【讨论】:

    • 很好的解释@Jacek Laskowski!但是我有一个问题,在从 HBase/Hive 读取表格后,如果您希望该信息存在于每个执行程序中(流式传输,不能四处询问)..唯一的方法是进行收集(和广播)?这对性能不是太不利了吗?波兹拉维亚姆 ;)
    • 我不这么认为。这正是广播哈希联接在 Spark SQL 中的工作原理,所以如果他们,Spark 开发人员这样做,我就懒得想一个更好的选择了 :)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-02
    • 2019-11-25
    • 1970-01-01
    • 1970-01-01
    • 2013-07-14
    • 1970-01-01
    相关资源
    最近更新 更多