【问题标题】:how to do nested collect_list in spark sql?如何在 spark sql 中做嵌套的 collect_list?
【发布时间】:2021-06-11 06:03:10
【问题描述】:

我是数据块 spark SQL 的新手。我正在寻找嵌套的 collect_list 并试图找出答案。 下面是我的 spark 实际 sql 查询

select
            policy.CustomerId,
            collect_list(struct(Number, Type, Id, Product.product))as policydetail 
         from
            policy 
            Left Join
               (
                  SELECT
                     policy.CustomerId,
                     Collect_list(struct(ProductId, productname)) as Product 
                  FROM
                     policy 
                  group by 
                     CustomerId
               )
               product 
               on product.CustomerId = policy.CustomerId
               group by 
               policy.CustomerId

我修改如下

select
                policy.id,
                collect_list(struct(Number, Type, Id, Collect_list(struct(ProductId, productname))))as policydetail 
            from
                policy             
            group by 
               policy.CustomerPartyId

修改查询后,出现如下错误

不允许在参数中使用聚合函数 另一个聚合函数。请使用内部聚合函数 一个子查询。;

是否有任何替代方法可以从表列中接近 Json。

"Policy":[
            {
               "Number":"123456",
               "Type":"new",
               "Id":"34355656",
               "Product":[
                  {
                     "ProductId":"2526",
                     "ProductName":"abc"
                  }
               ]
            }
         ]

输入到 sql 查询
包含列(id、number、type、transactionid、productid、productname)的策略表
sql 查询输出
[Row(Number='POL-CP-155', Type='Applicn', Id=14924102, product=[Row(ProductId=2526, productname='Commercial Property')])]

这是我运行 spark sql 查询的代码

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext

sc = sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
sc_sql = SQLContext(sc)

#ExtractConfig
policy = spark.read.format('csv').options(header='true', inferSchema='true').load("D:/policy.csv")
#RawPath


policy.createOrReplaceTempView("policy")

policydetails = sc_sql.sql("select policy.CustomerId, collect_list(struct(Number, Type, Id, Collect_list(struct(ProductId, productname))))as policydetail from policy group by policy.CustomerId")

pandasDF = policydetails.toPandas()
pandasDF = policydetails.toPandas().to_csv('data1.csv')

请找到policy.csv的图片

提前致谢!

【问题讨论】:

  • 你能添加一些示例输入数据和预期输出吗?
  • @Srinivas,感谢您的即时回复。我已经编辑了这个问题。你能看看 spark sql 的输入和预期输出吗
  • 能否请您从 csv 文件中发布几条消息?
  • @Srinivas,我已经更新了问题中的 policy.csv。请看一下
  • 在您没有CustomerId 列的示例数据中?

标签: apache-spark pyspark apache-spark-sql


【解决方案1】:

我没有测试,因为我没有样本数据。

试试下面的查询。

select 
    policy.CustomerId, 
    collect_list(
       struct(
          Number, 
          Type, 
          Id, 
          array(
              struct(
                  ProductId, 
                  productname)
              )
       )
) as policydetail 
from policy 
group by policy.CustomerId

或者

select 
    collect_list(
           struct(
               Number, 
               Type, 
               Id, 
               array(
                  struct(
                      ProductId, 
                      productname
                  )
               )
           )
) as policydetail 
from policy 

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-12-03
    • 2016-08-17
    • 2015-07-09
    • 2017-03-17
    • 1970-01-01
    • 2021-06-22
    相关资源
    最近更新 更多