【问题标题】:Iterate over different columns using withcolumn in Java Spark在 Java Spark 中使用 withcolumn 遍历不同的列
【发布时间】:2020-05-14 03:55:43
【问题描述】:

我必须根据List<Row> 中的一些规则修改Dataset<Row>。 我想使用Dataset.withColumn(...) 遍历Datset<Row> 列,如下一个示例所示:

(import necesary libraries...)

SparkSession spark = SparkSession
                .builder()
                .appName("appname")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();

Dataset<Row> dfToModify = spark.read().table("TableToModify");

List<Row> ListListWithInfo = new ArrayList<>(Arrays.asList());

ListWithInfo.add(0,RowFactory.create("field1", "input1", "output1", "conditionAux1"));
ListWithInfo.add(1,RowFactory.create("field1", "input1", "output1", "conditionAux2"));
ListWithInfo.add(2,RowFactory.create("field1", "input2", "output3", "conditionAux3"));
ListWithInfo.add(3,RowFactory.create("field2", "input3", "output4", "conditionAux4"));
.
.
.

for (Row row : ListWithInfo) {

            String field = row.getString(0);
            String input = row.getString(1);
            String output = row.getString(2);
            String conditionAux = row.getString(3);

            dfToModify = dfToModify.withColumn(field,
                                    when(dfToModify.col(field).equalTo(input)
                                    .and(dfToModify.col("conditionAuxField").equalTo(conditionAux))
                                    ,output)
                                    .otherwise(dfToModify.col(field)));

        }

代码确实可以正常工作,但是当列表中有超过 50 条“规则”时,程序不会完成,并且此输出会显示在屏幕上:

0/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1653
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1650
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1635
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1641
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1645
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1646
20/01/27 17:48:18 INFO storage.BlockManagerInfo: Removed broadcast_113_piece0 on **************** in memory (size: 14.5 KB, free: 3.0 GB)
20/01/27 17:48:18 INFO storage.BlockManagerInfo: Removed broadcast_113_piece0 on ***************** in memory (size: 14.5 KB, free: 3.0 GB)
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1639
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1649
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1651
20/01/27 17:49:18 INFO spark.ExecutorAllocationManager: Request to remove executorIds: 6
20/01/27 17:49:18 INFO cluster.YarnClientSchedulerBackend: Requesting to kill executor(s) 6
20/01/27 17:49:18 INFO cluster.YarnClientSchedulerBackend: Actual list of executor(s) to be killed is 6
20/01/27 17:49:18 INFO spark.ExecutorAllocationManager: Removing executor 6 because it has been idle for 60 seconds (new desired total will be 0)
20/01/27 17:49:19 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:19 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 6.
20/01/27 17:49:19 INFO scheduler.DAGScheduler: Executor lost: 6 (epoch 0)
20/01/27 17:49:19 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:19 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 6 from BlockManagerMaster.
20/01/27 17:49:19 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(6, *********************, 43387, None)
20/01/27 17:49:19 INFO storage.BlockManagerMaster: Removed 6 successfully in removeExecutor
20/01/27 17:49:19 INFO cluster.YarnScheduler: Executor 6 on **************** killed by driver.
20/01/27 17:49:19 INFO spark.ExecutorAllocationManager: Existing executor 6 has been removed (new total is 0)
20/01/27 17:49:20 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:21 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:22 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
.
.
.
.

有没有什么方法可以提高使用 Java Spark 的效率? (不使用 for 循环或类似的东西)

【问题讨论】:

    标签: java loops apache-spark optimization calculated-columns


    【解决方案1】:

    最后我使用了Dataset&lt;Row&gt;对象的withColumns方法。这个方法需要两个参数:

    .withColumns(Seq&lt;String&gt; ColumnsNames, Seq&lt;Column&gt; ColumnsValues);

    并且在Seq&lt;String&gt;中不能重复。

    代码如下:

    
    SparkSession spark = SparkSession
                    .builder()
                    .appName("appname")
                    .config("spark.some.config.option", "some-value")
                    .getOrCreate();
    
    Dataset<Row> dfToModify = spark.read().table("TableToModify");
    
    List<Row> ListListWithInfo = new ArrayList<>(Arrays.asList());
    
    ListWithInfo.add(0,RowFactory.create("field1", "input1", "output1", "conditionAux1"));
    ListWithInfo.add(1,RowFactory.create("field1", "input1", "output1", "conditionAux2"));
    ListWithInfo.add(2,RowFactory.create("field1", "input2", "output3", "conditionAux3"));
    ListWithInfo.add(3,RowFactory.create("field2", "input3", "output4", "conditionAux4"));
    .
    .
    .
    // initialize values for fields and conditions
    String field_ant = ListWithInfo.get(0).getString(0).toLowerCase();
    String first_input = ListWithInfo.get(0).getString(1);
    String first_output = ListWithInfo.get(0).getString(2);
    String first_conditionAux = ListWithInfo.get(0).getString(3);
    Column whenColumn = when(dfToModify.col(field_ant).equalTo(first_input)
                    .and(dfToModify.col("conditionAuxField").equalTo(lit(first_conditionAux)))
                    ,first_output);
    
    // lists with the names of the fields and the conditions        
    List<Column> whenColumnList = new ArrayList(Arrays.asList());
    List<String> fieldsNameList = new ArrayList(Arrays.asList());
    
    for (Row row : ListWithInfo.subList(1,ListWithInfo.size())) {
    
                String field = row.getString(0);
                String input = row.getString(1);
                String output = row.getString(2);
                String conditionAux = row.getString(3);
    
               if (field.equals(field_ant)) {
                     // if field is equals to fiel_ant the new condition is added to the previous one
                    whenColumn = whenColumn.when(dfToModify.col(field).equalTo(input)
                            .and(dfToModify.col("conditionAuxField").equalTo(lit(conditionAux)))
                            ,output);
                } else {
                    // if field is diferent to the previous:
                    // close the conditions for this field
                    whenColumn = whenColumn.otherwise(dfToModify.col(field_ant));
    
                    // add to the lists the field(String) and the conditions (columns)
                    whenColumnList.add(whenColumn);
                    fieldsNameList.add(field_ant);
    
                    // and initialize the conditions for the new field
                    whenColumn = when(dfToModify.col(field).equalTo(input)
                                    .and(dfToModify.col("branchField").equalTo(lit(branch)))
                            ,output);
                }
    
                field_ant = field;
    
            }
    
    // add last values
    whenColumnList.add(whenColumn);
    fieldsNameList.add(field_ant);
    
    // transform list to Seq
    Seq<Column> whenColumnSeq = JavaConversions.asScalaBuffer(whenColumnList).seq();
    Seq<String> fieldsNameSeq = JavaConversions.asScalaBuffer(fieldsNameList).seq();
    
    Dataset<Row>  dfModified = dfToModify.withColumns(fieldsNameSeq, whenColumnSeq);
    
    

    【讨论】:

      猜你喜欢
      • 2020-06-14
      • 2020-05-04
      • 2019-12-22
      • 2019-12-20
      • 2017-05-15
      • 2021-01-28
      • 2018-09-17
      • 2019-03-20
      • 1970-01-01
      相关资源
      最近更新 更多