【问题标题】:How to pivot streaming dataset?如何旋转流数据集?
【发布时间】:2018-05-15 14:34:13
【问题描述】:

我正在尝试旋转 Spark 流数据集(结构化流),但我得到了 AnalysisException(摘录如下)。

有人可以确认结构化流(Spark 2.0)确实不支持旋转,或许可以提出替代方法?

线程 "main" org.apache.spark.sql.AnalysisException 中的异常:必须使用 writeStream.start(); 执行带有流源的查询; 卡夫卡 在 org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) 在 org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) 在 org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) 在 org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)

【问题讨论】:

    标签: apache-spark spark-structured-streaming apache-spark-2.0


    【解决方案1】:

    在大多数情况下,您可以使用条件聚合作为解决方法。 相当于

    df.groupBy("timestamp").
       pivot("name", Seq("banana", "peach")).
       sum("value")
    

    df.filter($"name".isin(Seq("banana", "peach"):_*)).
       groupBy("timestamp").
       agg(
         sum(when($"name".equalTo("banana"), $"value").
             otherwise("null")).
             cast(IntegerType).alias("banana"),
         sum(when($"name".equalTo("peach"), $"value").
             otherwise("null")).
             cast(IntegerType).alias("peach")
       )
    

    【讨论】:

      【解决方案2】:

      这是一个基于上面 Jacek 回答的简单 Java 示例:

      JSON 数组:

      [{
              "customer_id": "d6315a00",
              "product": "Super widget",
              "price": 10,
              "bought_date": "2019-01-01"
          },
          {
              "customer_id": "d6315a00",
              "product": "Super widget",
              "price": 10,
              "bought_date": "2019-01-01"
          },
          {
              "customer_id": "d6315a00",
              "product": "Super widget",
              "price": 10,
              "bought_date": "2019-01-02"
          },
          {
              "customer_id": "d6315a00",
              "product": "Food widget",
              "price": 4,
              "bought_date": "2019-08-20"
          },
          {
              "customer_id": "d6315cd0",
              "product": "Food widget",
              "price": 4,
              "bought_date": "2019-09-19"
          }, {
              "customer_id": "d6315e2e",
              "product": "Bike widget",
              "price": 10,
              "bought_date": "2019-01-01"
          }, {
              "customer_id": "d6315a00",
              "product": "Bike widget",
              "price": 10,
              "bought_date": "2019-03-10"
          },
          {
              "customer_id": "d631614e",
              "product": "Garage widget",
              "price": 4,
              "bought_date": "2019-02-15"
          }
      ]
      

      Java 代码:

      package io.centilliard;
      
      import static org.apache.spark.sql.functions.explode;
      import static org.apache.spark.sql.functions.from_json;
      
      import org.apache.spark.sql.AnalysisException;
      import org.apache.spark.sql.Column;
      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.SparkSession;
      import org.apache.spark.sql.streaming.DataStreamWriter;
      import org.apache.spark.sql.streaming.StreamingQuery;
      import org.apache.spark.sql.streaming.StreamingQueryException;
      import org.apache.spark.sql.types.ArrayType;
      import org.apache.spark.sql.types.DataTypes;
      import org.apache.spark.sql.types.Metadata;
      import org.apache.spark.sql.types.StructField;
      import org.apache.spark.sql.types.StructType;
      
      import scala.Function2;
      import scala.runtime.BoxedUnit;
      
      public class Pivot {
      
          public static void main(String[] args) throws StreamingQueryException, AnalysisException {
      
              StructType schema = new StructType(new StructField[]{
                      new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
                      new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
                      new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
                      new StructField("bought_date", DataTypes.StringType, false, Metadata.empty())
                  });
      
              ArrayType  arrayType = new ArrayType(schema, false);
      
              SparkSession spark = SparkSession
                      .builder()
                      .appName("SimpleExample")
                      .getOrCreate();
      
              // Create a DataSet representing the stream of input lines from Kafka
              Dataset<Row> dataset = spark
                              .readStream()
                              .format("kafka")                
                              .option("kafka.bootstrap.servers", "localhost:9092")
                              .option("subscribe", "utilization")
                              .load()
                              .selectExpr("CAST(value AS STRING) as json");
      
              Column col = new Column("json");        
              Column data = from_json(col,arrayType).as("data");  
              Column explode = explode(data);
              Dataset<Row> customers = dataset.select(explode).select("col.*");
      
              DataStreamWriter<Row> dataStreamWriter = new DataStreamWriter<Row>(customers);
      
              StreamingQuery dataStream = dataStreamWriter.foreachBatch(new Function2<Dataset<Row>, Object, BoxedUnit>() {
      
                  @Override
                  public BoxedUnit apply(Dataset<Row> dataset, Object object) {               
      
                      dataset
                      .groupBy("customer_id","product","bought_date")
                      .pivot("product")               
                      .sum("price")               
                      .orderBy("customer_id")
                      .show();
      
                      return null;
                  }
              })
              .start();
      
              dataStream.awaitTermination();
          }
      
      }
      

      输出:

      +-----------+-------------+-----------+-----------+-----------+-------------+------------+
      |customer_id|      product|bought_date|Bike widget|Food widget|Garage widget|Super widget|
      +-----------+-------------+-----------+-----------+-----------+-------------+------------+
      |   d6315a00|  Bike widget| 2019-03-10|         20|       null|         null|        null|
      |   d6315a00| Super widget| 2019-01-02|       null|       null|         null|          20|
      |   d6315a00| Super widget| 2019-01-01|       null|       null|         null|          40|
      |   d6315a00|  Food widget| 2019-08-20|       null|          8|         null|        null|
      |   d6315cd0|  Food widget| 2019-09-19|       null|          8|         null|        null|
      |   d6315e2e|  Bike widget| 2019-01-01|         20|       null|         null|        null|
      |   d631614e|Garage widget| 2019-02-15|       null|       null|            8|        null|
      +-----------+-------------+-----------+-----------+-----------+-------------+------------+
      

      【讨论】:

        【解决方案3】:

        tl;dr 直到 2.4.4 及以下版本的 Spark 结构化流不直接支持 pivot 聚合。

        作为一种解决方法,使用DataStreamWriter.foreachBatch 或更通用的DataStreamWriter.foreach


        我目前使用的是最新版本的 Spark 2.4.4。

        scala> spark.version
        res0: String = 2.4.4
        

        UnsupportedOperationChecker(您可以在堆栈跟踪中找到)检查流式查询(的逻辑计划)是否仅使用受支持的操作。

        当您执行pivot 时,您必须首先使用groupBy,因为这是为您提供pivot 可用的唯一界面。

        pivot 有两个问题:

        1. pivot 想知道要为多少列生成值,因此 collect 想知道这对于流数据集是不可能的。

        2. pivot 实际上是 Spark Structured Streaming 不支持的另一个聚合(除了groupBy

        让我们看一下问题 1,其中没有定义的列。

        val sq = spark
          .readStream
          .format("rate")
          .load
          .groupBy("value")
          .pivot("timestamp") // <-- pivot with no values
          .count
          .writeStream
          .format("console")
        scala> sq.start
        org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
        rate
          at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
          at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38)
          at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36)
          at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
          at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
          at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
          at scala.collection.immutable.List.foreach(List.scala:392)
          at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
          at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
          at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
          at scala.collection.immutable.List.foreach(List.scala:392)
          at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
          at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
          at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
          at scala.collection.immutable.List.foreach(List.scala:392)
          at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
          at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
          at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
          at scala.collection.immutable.List.foreach(List.scala:392)
          at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
          at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
          at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
          at scala.collection.immutable.List.foreach(List.scala:392)
          at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
          at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
          at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
          at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
          at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
          at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
          at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
          at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
          at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
          at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
          at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
          at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
          at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
          at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:384)
          ... 49 elided
        

        最后两行显示了问题,即 pivot does collect 在封面下,因此是问题。

        另一个问题是,即使您指定要旋转的列的值,您也会因为multiple aggregations 而遇到另一个问题(您可以看到它实际上是检查streaming 而不是@ 987654327@ 与第一个案例一样)。

        val sq = spark
          .readStream
          .format("rate")
          .load
          .groupBy("value")
          .pivot("timestamp", Seq(1)) // <-- pivot with explicit values
          .count
          .writeStream
          .format("console")
        scala> sq.start
        org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
        Project [value#128L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#141[0] AS 1#142L]
        +- Aggregate [value#128L], [value#128L, pivotfirst(timestamp#127, count(1) AS `count`#137L, 1000000, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#141]
           +- Aggregate [value#128L, timestamp#127], [value#128L, timestamp#127, count(1) AS count(1) AS `count`#137L]
              +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dd63368,rate,List(),None,List(),None,Map(),None), rate, [timestamp#127, value#128L]
        
          at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
          at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:93)
          at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)
          at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
          at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
          ... 49 elided
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2022-12-09
          • 1970-01-01
          • 2022-01-20
          • 2018-09-08
          • 2019-11-14
          相关资源
          最近更新 更多