【发布时间】:2021-06-11 06:03:10
【问题描述】:
我是数据块 spark SQL 的新手。我正在寻找嵌套的 collect_list 并试图找出答案。 下面是我的 spark 实际 sql 查询
select
policy.CustomerId,
collect_list(struct(Number, Type, Id, Product.product))as policydetail
from
policy
Left Join
(
SELECT
policy.CustomerId,
Collect_list(struct(ProductId, productname)) as Product
FROM
policy
group by
CustomerId
)
product
on product.CustomerId = policy.CustomerId
group by
policy.CustomerId
我修改如下
select
policy.id,
collect_list(struct(Number, Type, Id, Collect_list(struct(ProductId, productname))))as policydetail
from
policy
group by
policy.CustomerPartyId
修改查询后,出现如下错误
不允许在参数中使用聚合函数 另一个聚合函数。请使用内部聚合函数 一个子查询。;
是否有任何替代方法可以从表列中接近 Json。
"Policy":[
{
"Number":"123456",
"Type":"new",
"Id":"34355656",
"Product":[
{
"ProductId":"2526",
"ProductName":"abc"
}
]
}
]
输入到 sql 查询:
包含列(id、number、type、transactionid、productid、productname)的策略表
sql 查询输出:
[Row(Number='POL-CP-155', Type='Applicn', Id=14924102, product=[Row(ProductId=2526, productname='Commercial Property')])]
这是我运行 spark sql 查询的代码
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext
sc = sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
sc_sql = SQLContext(sc)
#ExtractConfig
policy = spark.read.format('csv').options(header='true', inferSchema='true').load("D:/policy.csv")
#RawPath
policy.createOrReplaceTempView("policy")
policydetails = sc_sql.sql("select policy.CustomerId, collect_list(struct(Number, Type, Id, Collect_list(struct(ProductId, productname))))as policydetail from policy group by policy.CustomerId")
pandasDF = policydetails.toPandas()
pandasDF = policydetails.toPandas().to_csv('data1.csv')
提前致谢!
【问题讨论】:
-
你能添加一些示例输入数据和预期输出吗?
-
@Srinivas,感谢您的即时回复。我已经编辑了这个问题。你能看看 spark sql 的输入和预期输出吗
-
能否请您从 csv 文件中发布几条消息?
-
@Srinivas,我已经更新了问题中的 policy.csv。请看一下
-
在您没有
CustomerId列的示例数据中?
标签: apache-spark pyspark apache-spark-sql