【问题标题】:Spark print dataframe Without Running Out Of Memory火花打印数据帧而不会耗尽内存
【发布时间】:2019-08-03 04:29:04
【问题描述】:

如何在 Java 中打印整个数据帧而不会耗尽内存?

Dataset<Row> df = ...

我知道:

df.show() 

将显示数据框,但如果数据框足够大,则可能会耗尽内存。

我知道我可以使用以下方法限制内容:

df.show(rowCount, false)

但是想打印整个数据框,我不想限制内容...

我试过了:

df.foreachPartition(iter -> {
    while(iter.hasNext()){
       System.out.println(rowIter.next().mkString(",");)
     }
});

但这将打印在每个相应的节点上,而不是在驱动程序上...

如果有什么方法可以打印驱动程序中的所有内容而不会耗尽内存?

【问题讨论】:

    标签: java apache-spark memory dataset partitioning


    【解决方案1】:

    AFAIK,打印数据框的想法是查看数据。

    不建议打印大数据帧,因为数据帧大小可能会超出内存。

    我提供以下方式,如果您想查看内容,则可以保存在 hive 表中并查询内容。或写入可读的csv或json

    例子:

    1) 保存在 hive 表中

    df.write.mode("overwrite").saveAsTable("database.tableName")
    

    稍后从 hive 表中查询。

    2) csv 或 json

    df.write.csv("/your/location/data.csv")
     df.write.json("/your/location/data.json")
    

    如果你想要单个文件使用coalesce(1),上面将生成多个部分文件(但这会再次将数据移动到一个不鼓励的节点,除非你绝对需要它)

    其他选项是使用toLocalIterator see here 逐行打印,这也会将数据传输到节点...因此不是好主意

    【讨论】:

      【解决方案2】:

      您必须将所有数据带到驱动程序,这会有点吸你的记忆:(...

      解决方案可能是拆分您的数据帧并在驱动程序中逐个打印。当然,这取决于数据本身的结构,它看起来像:

      long count = df.count();
      long inc = count / 10;
      for (long i = 0; i < count; i += inc) {
        Dataset<Row> filteredDf =
            df.where("id>=" + i + " AND id<" + (i + inc));
      
        List<Row> rows = filteredDf.collectAsList();
        for (Row r : rows) {
          System.out.printf("%d: %s\n", r.getAs(0), r.getString(1));
        }
      }
      

      我将数据集分成 10 个,但我知道我的 id 是从 1 到 100...

      完整的例子可以是:

      package net.jgp.books.sparkWithJava.ch20.lab900_splitting_dataframe;
      
      import java.util.ArrayList;
      import java.util.List;
      
      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.RowFactory;
      import org.apache.spark.sql.SparkSession;
      import org.apache.spark.sql.types.DataTypes;
      import org.apache.spark.sql.types.StructField;
      import org.apache.spark.sql.types.StructType;
      
      /**
       * Splitting a dataframe to bring it back to the driver for local
       * processing.
       * 
       * @author jgp
       */
      public class SplittingDataframeApp {
      
        /**
         * main() is your entry point to the application.
         * 
         * @param args
         */
        public static void main(String[] args) {
          SplittingDataframeApp app = new SplittingDataframeApp();
          app.start();
        }
      
        /**
         * The processing code.
         */
        private void start() {
          // Creates a session on a local master
          SparkSession spark = SparkSession.builder()
              .appName("Splitting a dataframe to collect it")
              .master("local")
              .getOrCreate();
      
          Dataset<Row> df = createRandomDataframe(spark);
          df = df.cache();
      
          df.show();
          long count = df.count();
          long inc = count / 10;
          for (long i = 0; i < count; i += inc) {
            Dataset<Row> filteredDf =
                df.where("id>=" + i + " AND id<" + (i + inc));
      
            List<Row> rows = filteredDf.collectAsList();
            for (Row r : rows) {
              System.out.printf("%d: %s\n", r.getAs(0), r.getString(1));
            }
          }
        }
      
        private static Dataset<Row> createRandomDataframe(SparkSession spark) {
          StructType schema = DataTypes.createStructType(new StructField[] {
              DataTypes.createStructField(
                  "id",
                  DataTypes.IntegerType,
                  false),
              DataTypes.createStructField(
                  "value",
                  DataTypes.StringType,
                  false) });
      
          List<Row> rows = new ArrayList<Row>();
          for (int i = 0; i < 100; i++) {
            rows.add(RowFactory.create(i, "Row #" + i));
          }
          Dataset<Row> df = spark.createDataFrame(rows, schema);
          return df;
        }
      }
      

      你认为这有帮助吗?

      它不像将它保存在数据库中那样优雅,但它可以避免在您的架构中添加额外的组件。这段代码不是很通用,我不确定你是否可以在当前版本的 Spark 中使其通用。

      【讨论】:

        猜你喜欢
        • 2017-05-01
        • 2021-02-10
        • 1970-01-01
        • 2011-04-05
        • 1970-01-01
        • 2021-04-08
        • 1970-01-01
        • 2010-09-15
        • 1970-01-01
        相关资源
        最近更新 更多