【问题标题】:How to map one column with other columns in an avro file?如何将一列与avro文件中的其他列映射?
【发布时间】:2017-07-05 10:28:20
【问题描述】:

我正在使用 Spark 2.1.1 和 Scala 2.11.8

这个问题是我之前的一个问题的延伸:

How to identify null fields in a csv file?

变化在于,我现在不是从 CSV 文件中读取数据,而是从 avro 文件中读取数据。这是我从中读取数据的 avro 文件的格式:

var ttime: Long = 0;
var eTime: Long = 0;
var tids: String = "";
var tlevel: Integer = 0;
var tboot: Long = 0;
var rNo: Integer = 0;
var varType: String = "";
var uids: List[TRUEntry] = Nil;

我在一个单独的类中解析 avro 文件。

我必须按照上面发布的链接的已接受答案中提到的相同方式将 tids 列与每个 uid 映射,除了这次来自 avro 文件而不是格式良好的 csv 文件。我怎样才能做到这一点?

这是我正在尝试使用的代码:

val avroRow = spark.read.avro(inputString).rdd
  val avroParsed = avroRow
    .map(x => new TRParser(x))
    .map((obj: TRParser) => ((obj.tids, obj.uId ),1))
    .reduceByKey(_+_)
    .saveAsTextFile(outputString)

在 obj.tids 之后,必须单独映射所有 uids 列,以提供与上述链接接受的答案中提到的最终输出相同的输出。

这就是我在 avro 文件解析类中解析所有 uid 的方式:

this.uids = Nil
    row.getAs[Seq[Row]]("uids")
    .foreach((objRow: Row) => 
      this.uids ::= (new TRUEntry(objRow))
    )

this.uids    
.foreach((obj:TRUEntry) => {
  uInfo += obj.uId + " , " + obj.initM.toString() + " , "
})   

P.S : 如果这个问题看起来很愚蠢,我很抱歉,但这是我第一次遇到 avro 文件

【问题讨论】:

    标签: scala apache-spark spark-avro


    【解决方案1】:

    可以通过同样的for循环处理来完成

    this.uids 
    

    在主代码中为:

     val avroParsed = avroRow
        .map(x => new TRParser(x))
        .map((obj: TRParser) => {
          val tId = obj.source.trim
          var retVal: String = ""
          obj.uids
            .foreach((obj: TRUEntry) => {
              retVal += tId + "," + obj.uId.trim + ":"
            })
            retVal.dropRight(1)
        })
    
     val flattened = avroParsed
     .flatMap(x => x.split(":"))
     .map(y => ((y),1))
    

    【讨论】:

      猜你喜欢
      • 2022-08-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-07-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多