如果你只有字符串和原语,你可以很容易地把这样一个粗略的实现放在一起:
def csvToAvro(file: Sting, schema: Schema) = {
val rec = new GenericData.Record(schema)
val types = schema
.getFields
.map { f => f.pos -> f.schema.getType }
Source.fromFile(file)
.getLines
.map(_.split("_").toSeq)
.foreach { data =>
(data zip types)
.foreach {
case (str, (idx, STRING)) => rec.put(idx, str)
case (str, (idx, INT)) => rec.put(idx, str.toInt)
case (str, (idx, LONG)) => rec.put(idx, str.toLong)
case (str, (idx, FLOAT)) => rec.put(idx, str.toFloat)
case (str, (idx, DOUBLE)) => rec.put(idx, str.toDouble)
case (str, (idx, BOOLEAN)) => rec.put(idx, str.toBoolean)
case (str, (idx, unknown)) => throw new IllegalArgumentException(s"Don't know how to convert $str to $unknown at $idx))
}
}
rec
}
注意这不处理可为空的字段:对于那些类型将是 UNION 的字段,您必须查看架构内部以找出实际的数据类型。
此外,“解析 csv”在这里非常粗略(仅以逗号分隔并不是一个好主意,因为如果字符串字段恰好在数据中包含 ,,或者如果字段是用双引号转义)。
此外,您可能还需要添加一些完整性检查,以确保例如 csv 行中的字段数与架构中的字段数等匹配。
尽管有上述考虑,但这应该足以说明方法并帮助您入门。