【问题标题】:Spark Row keyBy valuesSpark Row keyBy 值
【发布时间】:2018-04-27 13:55:29
【问题描述】:

我有以下数据条目:

{"uid":"XA8O3jlfAxxc","events":{"profile":{"last_update":1502287200,"clusters":{"129":11,"17":13,"99":4 ,"1":9,"162":12,"161":11,"233":11,"120":6,"61":12,"115":8,"168":10," 220":10,"135":6,"231":10,"109":3,"89":9,"140":11,"113":9,"124":3,"35" :10,"155":8,"131":7,"11":2,"207":3,"91":2,"167":3,"212":12,"77":11 ,"174":13,"154":11,"23":12,"13":6,"157":12,"235":11,"159":12,"138":13," 199":11,"111":1,"41":6,"211":12,"15":10,"47":3,"209":10,"173":13,"56" :14,"101":13,"45":2,"169":14,"86":12},"segments":{"11":6,"21":9,"7":12 ,"17":13,"22":13,"1":10,"18":14,"16":13,"13":12,"23":11,"6":8," 3":11,"9":12,"12":13,"15":2,"14":8,"8":14,"4":12,"10":6,"5" :12},"geoloc":{"country":"ES","longitude":2.81908,"latitude":41.9781},"sociodemos":{"11":6,"21":11,"7" :12,"2":5,"22":5,"18":3,"16":10,"13":4,"23":10,"6":11,"3":12 ,"9":4,"12":4,"20":3,"15":6,"14":6,"8":4,"4":9,"24":10," 5":11}},"wam":{"techno":{"browser":"Other","device":"Mobile","os":"Android","isp":"Telefonica"}, "last_update":1502568000,"wcm":{"conversion": [{"last_update":1502564400,"id":"1"}]}}}} {"uid":"Mq0tCKsYwzMy","events":{"profile":{"last_update":1502456400,"clusters":{"170":10,"32":6,"63":10,"90 ":2,"7":2,"227":5,"119":4,"200":5,"180":4,"18":1,"179":2,"162": 2,"125":1,"16":8,"84":9,"190":7,"161":10,"61":7,"115":5,"220":12, "20":8,"92":2,"231":2,"109":7,"103":9,"151":4,"89":2,"113":8,"35 ":3,"189":9,"11":14,"207":11,"91":3,"167":7,"77":10,"174":3,"157": 4,"29":7,"203":11,"210":7,"138":12,"97":3,"199":8,"41":13,"15":7, "153":4,"56":6,"45":10,"101":8,"86":2,"54":5,"237":4,"67":9,"129 ":5,"2":10,"17":1,"1":6,"136":5,"186":10,"110":3,"82":9,"25": 2,"28":12,"120":4,"75":6,"168":8,"177":2,"140":5,"124":8,"155":12, "131":2,"53":10,"181":10,"122":11,"79":3,"212":6,"154":3,"13":10,"23 ":8,"235":7,"126":3,"159":2,"85":4,"3":10,"185":11,"183":13,"111": 3,"9":13,"51":8,"47":3,"209":3,"216":3,"1000":3,"37":11,"132":3, "169":2,"117":5,"5":10},"segments":{"11":10,"21":8,"7":10,"17":13,"2 ":9,"22":13,"1":11,"18":2,"16":14,"13":9,"23":5,"6":5,"25": 3,"3":10,"9":8,"12":10,"15":10,"14":12,"8":6,"4":13,"1 0":4,"19":10,"5":10},"geoloc":{"country":"ES","longitude":-3.70358,"latitude":40.4167},"sociodemos":{ "11":3,"21":6,"7":10,"2":10,"22":5,"18":6,"23":6,"16":6,"13 ":7,"6":6,"3":11,"9":7,"12":4,"14":4,"15":3,"20":7,"8": 9,"4":12,"24":14,"5":12}},"wam":{"techno":{"browser":"Chrome","device":"Mobile","os ":"Android","isp":"Telefonica"},"last_update":1502575200,"wcm":{"conversion":[{"last_update":1502560800,"id":"1"}]}}} } {"uid":"1NaQF91h10rU","events":{"wam":{"techno":{"browser":"Chrome","device":"Mobile","os":"Android","isp ":"Other"},"last_update":1502571600,"wcm":{"conversion":[{"last_update":1502568000,"id":"1"}]}}}}

我只对集群的信息感兴趣:"clusters":{"number": affinity,...}

我收集了以下信息:

val trafico = sqlContext.read.json("/weborama/WAM_files/*/*")
val traficoRDD  = trafico.selectExpr(List("events.profile.clusters"): _*).filter("clusters is not null").rdd

输出:

[[9,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,11,2,null,null,null,4,null ,5,null,4,null,5,null,3,null,null,null,8,null,null,null,null,null,6,null,null,null,null,null,10,null,null ,null,null,null,null,null,null,null,null,null,null,null,null,null,11,7,null,null,null,null,null,null,null,null,null,null ,null,null,3,13,7,5,6,null,null,8,11,null,null,null,null,null,null,null,null,null,null,null,null,null,null ,null,null,null,null,null,null,null,null,null,null,null,null,null,null,8,null,null,8,null,null,null,null,12,null,null ,null,null,null,null,null,null,null,null,12,3,null,null,null,null,null,null,null,null,null,10,null,null,11,5,null ,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null ,null,null,null,null,null,6,null,null,null,null,null,null,6,null,null,null,12,null,null,null,null,null,null,9,null ,5,null,null,8,null,6,5,10,null,6,null,null,null,null,null,13,12,null,null,null,null,null,nu ll,null,null,null,null,8,null,7,6,null,null,null,null,null,9,null,null,null,null,null,null,6,3,null,null,空,空,空,空,空]] [[2,null,null,8,null,null,11,null,8,null,null,null,null,null,null,null,12,null,null,null,null,null,null,null,空,空,空,空,空,1,空,空,空,1,3,空,空,空,空,10,空,1,空,空,空,空,空,空,8,空值,12,空值,空值,8,空值,空值,空值,空值,12,空值,空值,4,空值,空值,空值,空值,4,空值,空值,12,空值,8,空值,空值,空,空,3,13,空,空,空,空,空,空,空,空,空,空,空,空,空,4,13,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,7,空,空,空,空,4,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,9,空,空,空,空,空,空,空,空,空,9,空,3,空,6,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空,空值,空值,空值,空值,空值,空值,13,空值,空值,空值,空值,空值,空值,空值,空值,空值,空值,空值,空值,空值,空值,12,空值,1,空,空,4,空,2,4,空,空,空,空,空,空,空,空,2,空,空,空,空,空l,null,null,null,null,null,null,4,null,null,null,null,null,null,null,null,1,null,null,null,null,null,13,14,null,空,空,空,空,空,空,9]]

我想要为每个亲和性分析重复的集群数量以及其他。

为此,我想创建一对 rdd 作为(亲和力,簇号)。有人可以帮帮我吗?

(1,[129,99,17])
(2,[63,80,3])
.
.
.
(14,[222,69])

谢谢!

【问题讨论】:

  • 您尝试了哪些方法,为什么没有成功?

标签: sql scala apache-spark dataframe rdd


【解决方案1】:

您正在以正确的方式阅读json,这将创建一个dataframe。所以

val trafico = sqlContext.read.json("/weborama/WAM_files/*/*")

将使用schema 创建dataframe

root
 |-- events: struct (nullable = true)
 |    |-- profile: struct (nullable = true)
 |    |    |-- clusters: struct (nullable = true)
 |    |    |    |-- 1: long (nullable = true)
 |    |    |    |-- 101: long (nullable = true)
 |    |    |    |-- 109: long (nullable = true)
 |    |    |    |-- 11: long (nullable = true)
 |    |    |    |-- 111: long (nullable = true)
 |    |    |    |-- 113: long (nullable = true)
 |    |    |    |-- 115: long (nullable = true)
 |    |    |    |-- 120: long (nullable = true)
 |    |    |    |-- 124: long (nullable = true)
 |    |    |    |-- 129: long (nullable = true)
 |    |    |    |-- 13: long (nullable = true)
 |    |    |    |-- 131: long (nullable = true)
 |    |    |    |-- 135: long (nullable = true)
 |    |    |    |-- 138: long (nullable = true)
 |    |    |    |-- 140: long (nullable = true)
 |    |    |    |-- 15: long (nullable = true)
 |    |    |    |-- 154: long (nullable = true)
 |    |    |    |-- 155: long (nullable = true)
 |    |    |    |-- 157: long (nullable = true)
 |    |    |    |-- 159: long (nullable = true)
 |    |    |    |-- 161: long (nullable = true)
 |    |    |    |-- 162: long (nullable = true)
 |    |    |    |-- 167: long (nullable = true)
 |    |    |    |-- 168: long (nullable = true)
 |    |    |    |-- 169: long (nullable = true)
 |    |    |    |-- 17: long (nullable = true)
 |    |    |    |-- 173: long (nullable = true)
 |    |    |    |-- 174: long (nullable = true)
 |    |    |    |-- 199: long (nullable = true)
 |    |    |    |-- 207: long (nullable = true)
 |    |    |    |-- 209: long (nullable = true)
 |    |    |    |-- 211: long (nullable = true)
 |    |    |    |-- 212: long (nullable = true)
 |    |    |    |-- 220: long (nullable = true)
 |    |    |    |-- 23: long (nullable = true)
 |    |    |    |-- 231: long (nullable = true)
 |    |    |    |-- 233: long (nullable = true)
 |    |    |    |-- 235: long (nullable = true)
 |    |    |    |-- 35: long (nullable = true)
 |    |    |    |-- 41: long (nullable = true)
 |    |    |    |-- 45: long (nullable = true)
 |    |    |    |-- 47: long (nullable = true)
 |    |    |    |-- 56: long (nullable = true)
 |    |    |    |-- 61: long (nullable = true)
 |    |    |    |-- 77: long (nullable = true)
 |    |    |    |-- 86: long (nullable = true)
 |    |    |    |-- 89: long (nullable = true)
 |    |    |    |-- 91: long (nullable = true)
 |    |    |    |-- 99: long (nullable = true)
 |    |    |-- geoloc: struct (nullable = true)
 |    |    |    |-- country: string (nullable = true)
 |    |    |    |-- latitude: double (nullable = true)
 |    |    |    |-- longitude: double (nullable = true)
 |    |    |-- last_update: long (nullable = true)
 |    |    |-- segments: struct (nullable = true)
 |    |    |    |-- 1: long (nullable = true)
 |    |    |    |-- 10: long (nullable = true)
 |    |    |    |-- 11: long (nullable = true)
 |    |    |    |-- 12: long (nullable = true)
 |    |    |    |-- 13: long (nullable = true)
 |    |    |    |-- 14: long (nullable = true)
 |    |    |    |-- 15: long (nullable = true)
 |    |    |    |-- 16: long (nullable = true)
 |    |    |    |-- 17: long (nullable = true)
 |    |    |    |-- 18: long (nullable = true)
 |    |    |    |-- 21: long (nullable = true)
 |    |    |    |-- 22: long (nullable = true)
 |    |    |    |-- 23: long (nullable = true)
 |    |    |    |-- 3: long (nullable = true)
 |    |    |    |-- 4: long (nullable = true)
 |    |    |    |-- 5: long (nullable = true)
 |    |    |    |-- 6: long (nullable = true)
 |    |    |    |-- 7: long (nullable = true)
 |    |    |    |-- 8: long (nullable = true)
 |    |    |    |-- 9: long (nullable = true)
 |    |    |-- sociodemos: struct (nullable = true)
 |    |    |    |-- 11: long (nullable = true)
 |    |    |    |-- 12: long (nullable = true)
 |    |    |    |-- 13: long (nullable = true)
 |    |    |    |-- 14: long (nullable = true)
 |    |    |    |-- 15: long (nullable = true)
 |    |    |    |-- 16: long (nullable = true)
 |    |    |    |-- 18: long (nullable = true)
 |    |    |    |-- 2: long (nullable = true)
 |    |    |    |-- 20: long (nullable = true)
 |    |    |    |-- 21: long (nullable = true)
 |    |    |    |-- 22: long (nullable = true)
 |    |    |    |-- 23: long (nullable = true)
 |    |    |    |-- 24: long (nullable = true)
 |    |    |    |-- 3: long (nullable = true)
 |    |    |    |-- 4: long (nullable = true)
 |    |    |    |-- 5: long (nullable = true)
 |    |    |    |-- 6: long (nullable = true)
 |    |    |    |-- 7: long (nullable = true)
 |    |    |    |-- 8: long (nullable = true)
 |    |    |    |-- 9: long (nullable = true)
 |    |-- wam: struct (nullable = true)
 |    |    |-- last_update: long (nullable = true)
 |    |    |-- techno: struct (nullable = true)
 |    |    |    |-- browser: string (nullable = true)
 |    |    |    |-- device: string (nullable = true)
 |    |    |    |-- isp: string (nullable = true)
 |    |    |    |-- os: string (nullable = true)
 |    |    |-- wcm: struct (nullable = true)
 |    |    |    |-- conversion: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |-- last_update: long (nullable = true)
 |-- uid: string (nullable = true)

现在你只对clusters 字段感兴趣,你可以这样做

val clusters = trafico.select("events.profile.clusters.*")

这会给你dataframeschema

root
 |-- 1: long (nullable = true)
 |-- 101: long (nullable = true)
 |-- 109: long (nullable = true)
 |-- 11: long (nullable = true)
 |-- 111: long (nullable = true)
 |-- 113: long (nullable = true)
 |-- 115: long (nullable = true)
 |-- 120: long (nullable = true)
 |-- 124: long (nullable = true)
 |-- 129: long (nullable = true)
 |-- 13: long (nullable = true)
 |-- 131: long (nullable = true)
 |-- 135: long (nullable = true)
 |-- 138: long (nullable = true)
 |-- 140: long (nullable = true)
 |-- 15: long (nullable = true)
 |-- 154: long (nullable = true)
 |-- 155: long (nullable = true)
 |-- 157: long (nullable = true)
 |-- 159: long (nullable = true)
 |-- 161: long (nullable = true)
 |-- 162: long (nullable = true)
 |-- 167: long (nullable = true)
 |-- 168: long (nullable = true)
 |-- 169: long (nullable = true)
 |-- 17: long (nullable = true)
 |-- 173: long (nullable = true)
 |-- 174: long (nullable = true)
 |-- 199: long (nullable = true)
 |-- 207: long (nullable = true)
 |-- 209: long (nullable = true)
 |-- 211: long (nullable = true)
 |-- 212: long (nullable = true)
 |-- 220: long (nullable = true)
 |-- 23: long (nullable = true)
 |-- 231: long (nullable = true)
 |-- 233: long (nullable = true)
 |-- 235: long (nullable = true)
 |-- 35: long (nullable = true)
 |-- 41: long (nullable = true)
 |-- 45: long (nullable = true)
 |-- 47: long (nullable = true)
 |-- 56: long (nullable = true)
 |-- 61: long (nullable = true)
 |-- 77: long (nullable = true)
 |-- 86: long (nullable = true)
 |-- 89: long (nullable = true)
 |-- 91: long (nullable = true)
 |-- 99: long (nullable = true)

这些column names 是您在paired Rdd 中想要的。所以你可以将它们保存为

val clusterNames = clusters.schema.fieldNames

应该是

Array[1, 101, 109, 11, 111, 113, 115, 120, 124, 129, 13, 131, 135, 138, 140, 15, 154, 155, 157, 159, 161, 162, 167, 168, 169, 17, 173, 174, 199, 207, 209, 211, 212, 220, 23, 231, 233, 235, 35, 41, 45, 47, 56, 61, 77, 86, 89, 91, 99]

您在所需的配对 RDD 中的 是上面每个 columnsclusters dataframe收集列表,所以你可以执行以下操作来获得它

import org.apache.spark.sql.functions._
val collectedClusters = clusters.select(clusterNames.map(x => collect_list(col(x))) : _*).rdd.flatMap(_.toSeq.toList).collect

应该是

Array[WrappedArray(9), WrappedArray(13), WrappedArray(3), WrappedArray(2), WrappedArray(1), WrappedArray(9), WrappedArray(8), WrappedArray(6), WrappedArray(3), WrappedArray(11), WrappedArray(6), WrappedArray(7), WrappedArray(6), WrappedArray(13), WrappedArray(11), WrappedArray(10), WrappedArray(11), WrappedArray(8), WrappedArray(12), WrappedArray(12), WrappedArray(11), WrappedArray(12), WrappedArray(3), WrappedArray(10), WrappedArray(14), WrappedArray(13), WrappedArray(13), WrappedArray(13), WrappedArray(11), WrappedArray(3), WrappedArray(10), WrappedArray(12), WrappedArray(12), WrappedArray(10), WrappedArray(12), WrappedArray(10), WrappedArray(11), WrappedArray(11), WrappedArray(10), WrappedArray(6), WrappedArray(2), WrappedArray(3), WrappedArray(14), WrappedArray(12), WrappedArray(11), WrappedArray(12), WrappedArray(9), WrappedArray(2), WrappedArray(4)]

最后一步是创建配对的RDD,这可以通过使用zip来实现

clusterNames.zip(collectedClusters)

你应该有你需要的配对的RDD

(1,WrappedArray(9))
(101,WrappedArray(13))
(109,WrappedArray(3))
(11,WrappedArray(2))
(111,WrappedArray(1))
(113,WrappedArray(9))
(115,WrappedArray(8))
(120,WrappedArray(6))
(124,WrappedArray(3))
(129,WrappedArray(11))
(13,WrappedArray(6))
(131,WrappedArray(7))
(135,WrappedArray(6))
(138,WrappedArray(13))
(140,WrappedArray(11))
(15,WrappedArray(10))
(154,WrappedArray(11))
(155,WrappedArray(8))
(157,WrappedArray(12))
(159,WrappedArray(12))
(161,WrappedArray(11))
(162,WrappedArray(12))
(167,WrappedArray(3))
(168,WrappedArray(10))
(169,WrappedArray(14))
(17,WrappedArray(13))
(173,WrappedArray(13))
(174,WrappedArray(13))
(199,WrappedArray(11))
(207,WrappedArray(3))
(209,WrappedArray(10))
(211,WrappedArray(12))
(212,WrappedArray(12))
(220,WrappedArray(10))
(23,WrappedArray(12))
(231,WrappedArray(10))
(233,WrappedArray(11))
(235,WrappedArray(11))
(35,WrappedArray(10))
(41,WrappedArray(6))
(45,WrappedArray(2))
(47,WrappedArray(3))
(56,WrappedArray(14))
(61,WrappedArray(12))
(77,WrappedArray(11))
(86,WrappedArray(12))
(89,WrappedArray(9))
(91,WrappedArray(2))
(99,WrappedArray(4))

希望回答对你有帮助

【讨论】:

    猜你喜欢
    • 2021-11-30
    • 2015-05-12
    • 2018-03-04
    • 1970-01-01
    • 2017-08-13
    • 1970-01-01
    • 2020-09-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多