【问题标题】:Error in returning spark rdd from a function called inside the map function从 map 函数内部调用的函数返回 spark rdd 时出错
【发布时间】:2019-05-04 05:19:32
【问题描述】:

我有一组来自 hbase 表的行键(如下所示的植物),我想创建一个 fetchData 函数,该函数从集合中返回行键的 rdd 数据。目标是从植物集合中的每个元素的 fetchData 方法中获取 RDD 的联合。我在下面给出了代码的相关部分。我的问题是,代码给出了 fetchData 返回类型的编译错误:

println("PartB: "+ hBaseRDD.getNumPartitions)

错误:值 getNumPartitions 不是 Option[org.apache.spark.rdd.RDD[it.nerdammer.spark.test.sys.Record]] 的成员

我正在使用 scala 2.11.8 spark 2.2.0 和 maven 编译

import it.nerdammer.spark.hbase._
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object sys {
  case class systems( rowkey: String, iacp: Option[String], temp: Option[String])

  val spark = SparkSession.builder().appName("myApp").config("spark.executor.cores",4).getOrCreate()
  import spark.implicits._

  type Record = (String, Option[String], Option[String])

  def fetchData(plant: String): RDD[Record] = {
    val start_index = plant
    val end_index = plant + "z"
    //The below command works fine if I run it in main function, but to get multiple rows from hbase, I am using it in a separate function
    spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)

  }

  def main(args: Array[String]) {
    //the below elements in the collection are prefix of relevant rowkeys in hbase table ("test_table") 
    val plants = Vector("a8","cu","aw","fx")
    val hBaseRDD = plants.map( pp => fetchData(pp))
    println("Part: "+ hBaseRDD.getNumPartitions)
    /*
      rest of the code
    */
  }

}

这是代码的工作版本。这里的问题是我正在使用for循环,我必须在每个循环中从HBase请求与rowkey(植物)向量相对应的数据,而不是先获取所有数据然后执行其余代码

    import it.nerdammer.spark.hbase._
    import org.apache.spark.sql._
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
    import org.apache.log4j.Level
    import org.apache.log4j.Logger
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    object sys {
      case class systems( rowkey: String, iacp: Option[String], temp: Option[String])
      def main(args: Array[String]) {
        
        val spark = SparkSession.builder().appName("myApp").config("spark.executor.cores",4).getOrCreate()
        import spark.implicits._

        type Record = (String, Option[String], Option[String])
        val plants = Vector("a8","cu","aw","fx")
        
        for (plant <- plants){
          val start_index = plant
          val end_index = plant + "z"
          val hBaseRDD = spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)
          println("Part: "+ hBaseRDD.getNumPartitions)
          /*
            rest of the code
          */
        }
      }
    }

尝试后,这就是我现在卡住的地方。那么如何将类型转换为必需的。

scala>   def fetchData(plant: String) = {
     |     val start_index = plant
     |     val end_index = plant + "~"
     |     val x1 = spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)
     |     x1
     |   }
    

在 REPL 中定义函数并运行它

scala> val hBaseRDD = plants.map( pp => fetchData(pp)).reduceOption(_ union _)
<console>:39: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(String, Option[String], Option[String])]
 required: it.nerdammer.spark.hbase.HBaseReaderBuilder[(String, Option[String], Option[String])]
       val hBaseRDD = plants.map( pp => fetchData(pp)).reduceOption(_ union _)

提前致谢!

【问题讨论】:

  • 这条消息肯定会告诉你答案。
  • 我问是因为我不清楚该消息。所以,如果你不想回答,我看不出投反对票的意义。我尝试更改 fetchData 函数的返回类型,但无法解决错误。无论如何,感谢您的反馈。
  • 好吧,我没有给出 min1。但我认为味精很清楚。
  • 稍后再看。
  • 我已经编辑了我的问题以包含使用 for 循环的工作版本。

标签: scala apache-spark hbase


【解决方案1】:

hBaseRDD 的类型是Vector[_] 而不是RDD[_],所以你不能在它上面执行方法getNumPartitions。如果我理解正确,您想合并获取的 RDD。您可以通过plants.map( pp =&gt; fetchData(pp)).reduceOption(_ union _) 来完成(我建议使用reduceOption,因为它不会在空列表上失败,但如果您确信列表不为空,您可以使用reduce

fetchData 的返回类型也是RDD[U],但我没有找到U 的任何定义。可能这就是编译器推断Vector[Nothing] 而不是Vector[RDD[Record]] 的原因。为了避免后续错误,您还应该将RDD[U] 更改为RDD[Record]

【讨论】:

  • 是的,我想要 fetchedData RDD 的联合。我已经编辑了我的问题以包含代码的工作版本。在工作版本中, hBaseRDD.getNumPartitions 方法为我提供了 RDD 的分区。我尝试将 fetchData 的返回类型更改为 Vector[RDD[Record]] 或使用plants.map( pp => fetchData(pp)).reduceOption(_ union _) 方法,但它们不起作用
  • 我已经在最后更新了这个问题。现在,我正在使用 reduceOption 函数。我想我非常接近解决方案,如果您能帮助解决问题,请告诉我。谢谢
猜你喜欢
  • 2018-01-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-08-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多