【问题标题】:How to read a csv file and assign values to the variable in spark scala如何读取 csv 文件并将值分配给 spark scala 中的变量
【发布时间】:2021-02-17 17:35:09
【问题描述】:

我正在努力实现以下目标。 我有包含以下内容的 csv 文件

Sno,ColumnName,ColumnValue
——————————-———————————
1,svEmp,(Case when col1 = 1 then 2 else 1 end)
2,svCol,(Case when col2 = 2 then 3 else 6 end)

我有一个要求,我需要在 spark scala 中读取 csv 文件并将columnValue 分配给ColumnName(创建稍后可以在 spark.sql 语句中使用的暂存变量),如下所示

val svEmp = "(Case when col1 = 1 then 2 else 1 end)"
//and so on

spark.sql("select $svEmp as CalCol from tableName")

这是如何实现的? 任何帮助将不胜感激。

【问题讨论】:

  • 你愿意分享你的代码吗?

标签: scala apache-spark apache-spark-sql


【解决方案1】:

您可以将包含 select 表达式的 CSV 读取到数据框中,然后聚合以连接列 ColumnValueColumnName,最后将生成的 select 语句收集到字符串变量中:

val df = spark.read.option("header", "true").csv(path)

val selectStm = df.agg(
    concat_ws(
      ", ",
      collect_list(concat(col("ColumnValue"), lit(" as "), col("ColumnName")))
    )
  ).first.getString(0)

//selectStm: String = (Case when col1 = 1 then 2 else 1 end) as svEmp, (Case when col2 = 2 then 3 else 6 end) as svCol

val sqlQuery = s"select $selectStm from tableName"
//sqlQuery: String = select (Case when col1 = 1 then 2 else 1 end) as svEmp, (Case when col2 = 2 then 3 else 6 end) as svCol from tableName

val df2 = spark.sql(sqlQuery)

【讨论】:

  • 谢谢黑主教!!当我们选择变量而不对它们执行任何转换时,下面的解决方案会有所帮助。如果我们有一个语句作为 spark.sql("select $svEmp+4 as col1, col3, $svCol+$svEmp as CalCol from tableName")
猜你喜欢
  • 1970-01-01
  • 2021-02-22
  • 1970-01-01
  • 1970-01-01
  • 2019-05-11
  • 1970-01-01
  • 2023-01-20
  • 2012-05-18
  • 1970-01-01
相关资源
最近更新 更多