【问题标题】:Join apache spark dataframes properly with scala avoiding null values使用 scala 正确加入 apache spark 数据帧,避免空值
【发布时间】:2019-06-07 23:44:28
【问题描述】:

大家好!

我在 apache spark (2.3) 中有两个 DataFrame,我想正确地加入它们。我将在下面解释“正确”的含义。首先,这两个数据框包含以下信息:

nodeDf:(id、年份、标题、作者、期刊、摘要)
edgeDf: (srcId, dstId, label)

如果 node1 与 node2 连接或不连接,则标签可以是 0 或 1。

我想将这两个数据帧结合起来得到一个数据帧,其中包含以下信息:

JoinedDF:(id_from、year_from、title_from、journal_from、abstract_from、id_to、year_to、title_to、journal_to、abstract_to、time_dist)

time_dist = abs(year_from - year_to)

当我说“正确”时,我的意思是查询必须尽可能快,并且我不想包含空行或 cels(一行上的值)。

我尝试了以下方法,但我花了 500 -540 秒来执行查询,最终数据帧包含空值。我什至不知道数据框是否正确连接。

我想提一下,我从中创建 nodeDF 的节点文件有 27770 行,而边缘文件 (edgeDf) 有 615512 行。

代码:

val spark = SparkSession.builder().master("local[*]").appName("Logistic Regression").getOrCreate()
val sc = spark.sparkContext

val data = sc.textFile("resources/data/training_set.txt").map(line =>{
  val fields = line.split(" ")
  (fields(0),fields(1), fields(2).toInt)
})

val data2 = sc.textFile("resources/data/test_set.txt").map(line =>{
  val fields = line.split(" ")
  (fields(0),fields(1))
})

import spark.implicits._
val trainingDF = data.toDF("srcId","dstId", "label")
val testDF = data2.toDF("srcId","dstId")

val infoRDD = spark.read.option("header","false").option("inferSchema","true").format("csv").load("resources/data/node_information.csv")

val infoDF = infoRDD.toDF("srcId","year","title","authors","jurnal","abstract")

println("Showing linksDF sample...")
trainingDF.show(5)
println("Rows of linksDF: ",trainingDF.count())

println("Showing infoDF sample...")
infoDF.show(2)
println("Rows of infoDF: ",infoDF.count())

println("Joining linksDF and infoDF...")
var joinedDF = trainingDF.as("a").join(infoDF.as("b"),$"a.srcId" === $"b.srcId")

println(joinedDF.count())

joinedDF = joinedDF.select($"a.srcId",$"a.dstId",$"a.label",$"b.year",$"b.title",$"b.authors",$"b.jurnal",$"b.abstract")

joinedDF.show(5)


val graphX = new GraphX()
val pageRankDf =graphX.computePageRank(spark,"resources/data/training_set.txt",0.0001)

println("Joining joinedDF and pageRankDf...")
joinedDF = joinedDF.as("a").join(pageRankDf.as("b"),$"a.srcId" === $"b.nodeId")

var dfWithRanks = joinedDF.select("srcId","dstId","label","year","title","authors","jurnal","abstract","rank").withColumnRenamed("rank","pgRank")
dfWithRanks.show(5)

println("Renameming joinedDF...")
dfWithRanks = dfWithRanks
  .withColumnRenamed("srcId","id_from")
  .withColumnRenamed("dstId","id_to")
  .withColumnRenamed("year","year_from")
  .withColumnRenamed("title","title_from")
  .withColumnRenamed("authors","authors_from")
  .withColumnRenamed("jurnal","jurnal_from")
  .withColumnRenamed("abstract","abstract_from")

var infoDfRenamed = dfWithRanks
  .withColumnRenamed("id_from","id_from")
  .withColumnRenamed("id_to","id_to")
  .withColumnRenamed("year_from","year_to")
  .withColumnRenamed("title_from","title_to")
  .withColumnRenamed("authors_from","authors_to")
  .withColumnRenamed("jurnal_from","jurnal_to")
  .withColumnRenamed("abstract_from","abstract_to").select("id_to","year_to","title_to","authors_to","jurnal_to","jurnal_to")

var finalDF = dfWithRanks.as("a").join(infoDF.as("b"),$"a.id_to" === $"b.srcId")

finalDF = finalDF
  .withColumnRenamed("year","year_to")
  .withColumnRenamed("title","title_to")
  .withColumnRenamed("authors","authors_to")
  .withColumnRenamed("jurnal","jurnal_to")
  .withColumnRenamed("abstract","abstract_to")

println("Dropping unused columns from joinedDF...")
finalDF = finalDF.drop("srcId")

finalDF.show(5)  

这是我的结果!

避免所有与 pgRank 相关的计算和代码!有什么合适的方法来做这个加入工作吗?

【问题讨论】:

    标签: apache-spark dataframe join apache-spark-sql


    【解决方案1】:

    你可以先过滤你的数据,然后加入,这样你就可以避免空值

    df.filter($"ColumnName".isNotNull)

    【讨论】:

      【解决方案2】:

      在加入列条件中使用 运算符

      var joinedDF = trainingDF.as("a").join(infoDF.as("b"),$"a.srcId" <=> $"b.srcId") 
      

      spark 2.1 或更高版本中有一个函数是 eqNullSafe

      var joinedDF = trainingDF.join(infoDF,trainingDF("srcId").eqNullSafe(infoDF("srcId")))
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-01-24
        • 2020-09-04
        • 1970-01-01
        • 1970-01-01
        • 2016-07-01
        • 1970-01-01
        • 2021-06-02
        • 1970-01-01
        相关资源
        最近更新 更多