【问题标题】:How can i transpose csv data using java spark我如何使用 java spark 转置 csv 数据
【发布时间】:2020-02-18 14:19:22
【问题描述】:

我正在使用 java spark,我想知道是否可以转换下面给出的示例数据

Incremental Cost Number | Approver Names                          
---------------------------------------------------------------------------------
S703401                  |Ryan P Cassidy|Christopher J Mattingly|Frank E 
                         LaSota|Ryan P Cassidy|Anthony L Locricchio|Jason Monte                                                                    

变成这样。

Incremental Cost Number| Approver Names                          
-------------------------------------------
S703401                | Ryan P Cassidy
S703401                | Christopher J Mattingly
S703401                | Frank E LaSota
S703401                | Ryan P Cassidy
S703401                | Anthony L Locricchio
S703401                | Jason Monte 

我导入的文件也是一个逗号分隔的 csv 文件,只是一个特定的列包含多个值,由管道符号分隔。同样,如果我有多个增量成本编号值。

【问题讨论】:

  • 嗨@yyy - 你能告诉我们你尝试了什么
  • 嗨@mrblewog 我被困在逻辑级别,我不知道如何使用 java spark 继续它。有没有办法使用 RDD 或 Dataset 来做到这一点

标签: java csv apache-spark


【解决方案1】:

如果您有多个列,您可以执行以下操作

  import org.apache.spark.sql.functions._

   val df = Seq(("S703401","Ryan P Cassidy|Christopher J Mattingly|Frank E 
   LaSota|Ryan P Cassidy|Anthony L Locricchio|Jason 
   Monte","xyz|mnp|abc")).toDF("Incremental Cost Number","Approver 
   Names","3rd Column")


  df.withColumn("Approver Names", explode(split(col("Approver Names"), "\\|")))
  .withColumn("3rd Column", explode(split(col("3rd Column"), "\\|")))
  .show()


   +-----------------------+--------------------+-----------+
   |Incremental Cost Number|      Approver Names| 3rd Column|
   +-----------------------+--------------------+-----------+
   |                S703401|Ryan P Cassidy|Ch...|xyz|mnp|abc|
   +-----------------------+--------------------+-----------+

   +-----------------------+--------------------+----------+
   |Incremental Cost Number|      Approver Names|3rd Column|
   +-----------------------+--------------------+----------+
   |                S703401|      Ryan P Cassidy|       xyz|
   |                S703401|      Ryan P Cassidy|       mnp|
   |                S703401|      Ryan P Cassidy|       abc|
   |                S703401|Christopher J Mat...|       xyz|
   |                S703401|Christopher J Mat...|       mnp|
   |                S703401|Christopher J Mat...|       abc|
   |                S703401|      Frank E LaSota|       xyz|
   |                S703401|      Frank E LaSota|       mnp|
   |                S703401|      Frank E LaSota|       abc|
   |                S703401|      Ryan P Cassidy|       xyz|
   |                S703401|      Ryan P Cassidy|       mnp|
   |                S703401|      Ryan P Cassidy|       abc|
   |                S703401|Anthony L Locricchio|       xyz|
   |                S703401|Anthony L Locricchio|       mnp|
   |                S703401|Anthony L Locricchio|       abc|
   |                S703401|         Jason Monte|       xyz|
   |                S703401|         Jason Monte|       mnp|
   |                S703401|         Jason Monte|       abc|
   +-----------------------+--------------------+----------+

【讨论】:

    【解决方案2】:

    我认为您需要用“|”分隔第二列然后使用explode()函数

    df.select(col("id"), explode(split(col("a"), "\\|")).as("a")).show()
    
    +-------+--------------------+
    |     id|                   a| 
    +-------+--------------------+
    |S703401|      Ryan P Cassidy|
    |S703401|Christopher J Mat...|
    |S703401|             Frank E|
    

    【讨论】:

    • 如果我有两列,这可以正常工作,但如果我必须以类似的方式添加 4 列,那么我仍然可以使用爆炸 fn 的功能吗?
    • 是的,您将获得更多的列副本,但它会起作用。试试吧
    【解决方案3】:

    注意:这是 RDD 的做事方式。在 Scala 和 Dataframe 中可能更容易。

    1. 使用SparkContext读取文件
    2. 更具体地说,您需要使用textFile() API,这将为您提供 RDD。
    3. 一旦你有了RDD,你就可以根据逗号对每条记录进行标记(这是通过在RDD上调用map() API并将map函数传递给它来完成的。在你的情况下,这个函数可以实现为拆分逗号将字符串分隔成多个标记。您可以使用元组数据结构来发出输出。
    4. 您可以根据您拥有的字段数选择 Tuple1 到 Tuple22。参考here
    5. 第 3 步应该再次返回元组的 RDD。您在此 RDD 上运行 flatMap 函数,它将使用元组中的第一个字段并将其连接到其他必需的元组字段。
    6. 完成后,您可以通过使用逗号分隔符连接所有元组字段来再次将所有内容重新组合在一起。 (这将是另一个地图功能)
    7. 最后你调用saveAsTextFile()保存更新的数据。

    【讨论】:

    • 能详细解释一下step3吗
    • 创建一个以逗号分隔的字符串作为输入并输出包含标记的元组的函数。将该函数传递给 rdd.map
    猜你喜欢
    • 1970-01-01
    • 2017-05-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-03
    • 2021-05-10
    • 2019-10-05
    • 2014-07-08
    相关资源
    最近更新 更多