【发布时间】:2020-02-19 09:43:22
【问题描述】:
我们使用 Kafka Connect 从 Oracle 数据源摄取数据并以 AVRO 格式写入 HDFS。 在 Kafka Schema Registry 中,其中一个数据源的模式如下所示:
{
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [
{
"name": "ID",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}....
}
这意味着 ID 列的精度为 64。 当我尝试这些 AVRO 文件时,它会抛出:
引起:org.apache.spark.sql.AnalysisException:十进制只能 支持精度高达38;在 org.apache.spark.sql.types.DecimalType.(DecimalType.scala:51) 在 org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:60) 在 org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) 在 org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) 在 org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) 在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 在 scala.collection.Iterator$class.foreach(Iterator.scala:891) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 在 scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 在 scala.collection.AbstractIterable.foreach(Iterable.scala:54) 在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 在 scala.collection.AbstractTraversable.map(Traversable.scala:104) 在 org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) 在 org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46) 在 org.apache.spark.sql.avro.AvroFileFormat.inferSchema(AvroFileFormat.scala:93) 在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180) 在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
我读取AVO文件的代码sn-p是:
def readSchemaOfAvroPartition(avroLocation: String, partitionColumn: String, partitionValue: String): StructType = {
sparkSession.read.format(AVRO)
.load(s"${avroLocation}/${partitionColumn}=${partitionValue}")
.schema
}
根据 Oracle 文档,最大精度应该是 38。https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT313
如何强制 Kafka Connect 将此模式值注册为 38 而不是 64?
【问题讨论】:
标签: oracle apache-spark apache-kafka-connect confluent-platform confluent-schema-registry