【问题标题】:Spark/Scala iterating two dataframesSpark/Scala 迭代两个数据帧
【发布时间】:2017-08-10 07:38:51
【问题描述】:

我是spark/scala 世界的新手。我有两个数据来源

  1. 具有 URL 和主机名的流量数据

  2. 定义流量 url 规则的属性数据。规则是匹配域名的正则表达式模式。一个属性 ID 可能有一个或多个规则。

如果一个 URL 符合条件,我必须分配一个属性 ID。流量中的每一行都可以匹配零个或多个属性条件

样本输入 traffic-data

 visitor_id | url                      
 1000-abc10 | www.motor.com/index.html
 2000-fe30a | www.lifestyle.com/cooking/pasta.html 

`属性数据

attribute_id | rule                               | describtion
101          | motor.com, auto*.com, vehicles.com | "vehicles"
102          | motor.com                          | "auto site"

预期输出:

visitor_id  | attribute_id
1000-abc10  | 101
1000-abc10  | 202

我尝试了以下方法:

val traffic_df = spark.read.parquet(<traffic-path>).as[Traffic]
val attribute_df = spark.read.parquet(<attribute-path>).as[Attribute]

traffic_df.map(row => attribute_df.map(r => TrafficAttribute(row.visitor_id, r.attribute_id)))

【问题讨论】:

  • 您应该添加输入示例、您想要的输出以及您的代码无法使用的输出
  • 我已经按照建议添加了输入和输出。
  • 我可以为我发布的问题找到解决方案。为了完成循环,我在这里发布了下面对我有用的代码 sn-p。

标签: scala apache-spark apache-spark-sql


【解决方案1】:
case class Traffic(visitor_id: String, page_url : String)
case class ConfigRow(attribute_id: String, rule: String, description: String)
case class OutputRow(visitor_id: String, attribute_id)

val configList = spark.sqlContext.read.json(<config-path>).as[ConfigRow].collect().toList
val trafficDF = spark.read.json(<traffic-path>).as[Traffic]

def determineAttributes(row: Traffic, configList: List[ConfigRow]): ListBuffer[String] = {
    val attributeList = new ListBuffer[String]
    for (c <- configList) {
      rule = c.rule;
      if (<rule matches>) attributeList += c.attribute_id
   }
   attributeList
}

for r = trafficDF.flatMap((row:Traffic) => {
   for (attributeId <- determineAttributes(row, configList)) yield {
      OutputRow(row.visitor_id, attributeId)
   }
}) 

【讨论】:

    【解决方案2】:

    您可以在 2 个数据集上使用具有特殊连接条件的连接:

    val joinCondition = $"a.url".contains($"b.rule")
    var joinedDf = trafficDf.as('a).join(attributeDf.as('b),joinCondition)
    joinedDf.show()
    
    +----------+--------------------+------------+---------+-----------+
    |visitor_id|                 url|attribute_id|     rule|describtion|
    +----------+--------------------+------------+---------+-----------+
    |1000-abc10|www.motor.com/ind...|         101|motor.com|   vehicles|
    |1000-abc10|www.motor.com/ind...|         102|motor.com|  auto site|
    +----------+--------------------+------------+---------+-----------+
    

    然后您可以使用joinedDf.select("visitor_id","attribute_id") 选择所需的列

    【讨论】:

    • Fabich,感谢您的回复,非常感谢。实际上,为简洁起见,我添加了简单的规则。它可以有正则表达式模式我不确定这是否可以在正常的连接条件下完成。请参阅我已经更新了我的输入表。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-06-06
    • 2016-05-30
    • 2018-11-09
    • 2019-05-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多