【问题标题】:Creating Dataframe out of two different arrays spark用两个不同的数组创建 Dataframe
【发布时间】:2017-02-24 22:25:37
【问题描述】:

我正在使用火花流,并从 kafka 消息中创建此数据帧:

|customer|initialLoadComplete|initialLoadRunning|      messageContent|        tableName|
+--------+-------------------+------------------+--------------------+-----------------+
|   A|              false|              true|TEFault_IdReason...|Timed_Event_Fault|
|   A|              false|              true|TEFault_IdReason...|Timed_Event_Fault|
+--------+-------------------+------------------+--------------------+-----------------+

现在我想提取 messageContent,messageContent 基本上就像一个包含原始数据的 CSV,第一行是列。 我可以通过以下方式从 messageContent 字段中提取标题。

 val Array1 = ssc.sparkContext.parallelize(rowD.getString(2).split("\u0002")(0))

所以 Array1 看起来像这样:

 Array1: col1^Acol2^Acol3

Array2 基本上是原始数据,每列值由 ^A 分隔,记录由 ^B 分隔。

^A 是一个列分隔符。 ^B 是记录分隔符

所以这就是 array2 的样子:

Array2 = value1^Avalue2^Avalue3^Bvalue4^Avalue5^Avalue6^Bvalue7^Avalue8^Avalue9

基本上我想用这个创建一个数据框,所以它看起来像这样:

col1   | col2   | col3
-------------------------
value1 | value2 | value3
value4 | value5 | value6
value7 | value8 | value9

^B 是记录分隔符。

当我们从 hdfs 文件中读取数据时,我们通过以下命令创建了一个数据帧:

  val df = csc.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", "\u0001").load(hdfsFile)

但是这次我从内存中的两个数组创建一个数据框。 Array1 是 array2 中值的标头,array2 是由 ^B 分隔的记录。

在这种方法中创建数据框的效果与我从文件中创建数据框的效果相同。

【问题讨论】:

  • so Array1 和 Array2 只有一个 String 类型的元素,分隔符为 ^A 和 ^B?
  • 这是一个并行集合,更新了我的问题
  • 我只需要知道 Array1 和 Array2 中的每一项是什么样子的.. 这样我们就可以在没有任何假设的情况下提供解决方案..
  • 添加了更多细节,希望对您有所帮助

标签: scala apache-spark


【解决方案1】:

我从您的问题中推断出以下内容。

Array1 是一个只有一个条目的 rdd col1^Acol2^Acol3

Array2 是一个 rdd,每个条目看起来像这样。 value1^Avalue2^Avalue3^Bvalue4^Avalue5^Avalue6^Bvalue7^Avalue8^Avalue9

有了这些假设,以下应该可行。

val array1 = sc.parallelize(Seq("col1\u0002col2\u0002col3"))
val array2 = sc.parallelize(Seq("value1\u0001value2\u0001value3\u0002value4\u0001value5\u0001value6\u0002value7\u0001value8\u0001value9"))
val data = array2.flatMap(x => x.split("\u0002")).map(x => x.split('\u0001')).collect()

val result = array2
              .flatMap(x => x.split("\u0002"))
              .map(x => x.split('\u0001'))
              .map({ case Array(x,y,z) => (x,y,z)})
              .toDF(array1.flatMap(x => x.split('\u0002')).collect(): _*)

result.show()
+------+------+------+
|  col1|  col2|  col3|
+------+------+------+
|value1|value2|value3|
|value4|value5|value6|
|value7|value8|value9|
+------+------+------+

【讨论】:

  • 嗨,是的,我能够自己实施类似的解决方案并使其正常工作。谢谢
猜你喜欢
  • 1970-01-01
  • 2017-05-10
  • 2020-11-13
  • 2015-04-28
  • 1970-01-01
  • 2020-01-14
  • 1970-01-01
  • 1970-01-01
  • 2019-01-25
相关资源
最近更新 更多