【问题标题】:org.apache.avro.UnresolvedUnionException: Not in union [{"type":"bytes","logicalType":"decimal","precision":18,"scale":4},"null"]: 0.0000org.apache.avro.UnresolvedUnionException: 不在联合中 [{"type":"bytes","logicalType":"decimal","precision":18,"scale":4},"null"]: 0.0000
【发布时间】:2021-04-19 01:01:18
【问题描述】:

我正在尝试读取存储在 s3 中的配置单元表中的数据,将其转换为 Avro 格式,然后使用 Avro 记录来构建最终对象并将其推送到 kafka 主题。在我尝试发布的对象中,我有一个嵌套对象,其中包含字符串和十进制类型的字段 (CarCostDetails)。当这个对象为空时,我可以将记录推送到 kafka,但是如果这个对象填充了任何值(0,+/-),那么我会得到这个异常 org.apache.avro.UnresolvedUnionException: Not in union [{"type":"bytes","logicalType":"decimal","precision":18,"scale":4},"null"]: 40000.0000 当我做producer.send()

我没有在我的项目中定义架构。我在我的项目中使用预定义架构作为外部依赖项

示例: CarDataLoad.scala

class CarDataLoad extends ApplicationRunner with Serializable {
override def run(args: ApplicationArguments): Unit = {
            val spark = new SparkSession.Builder()
                .appName("s3-to-kafka")
                .enableHiveSupport
                .getOrCreate()
      getData(spark)
}
    

 def getData(sparkSession: SparkSession){
        val avroPath = copyToAvro(sparkSession)
        val car = sparkSession.read.avro(avroPath)
            import sparkSession.implicits._
            val avroData = car.select(
              $"car_specs",
              $"car_cost_details",
              $"car_key"
            )
           
       ingestDataframeToKafka(sparkSession, avroData)
    
    }
    
    
    def copyToAvro(sparkSession: SparkSession): String = {
                sourceDf = sparkSession.read.table("sample_table")
          val targetPath = s"s3://some/target/path"
         //write to a path (internal libraries to do that) in avro format
    
          targetPath
    }
    
        def ingestDataframeToKafka(sparkSession: SparkSession, dataframe: sql.DataFrame): Unit ={
            val batchProducer: CarProducerClass = new CarProducerClass(kafkaBootstapServers, kafkaSchemaRegistryUrl,
                kafkaClientIdConfig, topic)
            dataframe.collect.foreach(
                row => {
                    val result = batchProducer.publishRecord(row)
                }
            )
            batchProducer.closeProducer();
        }
}

生产者类 - CarProducerClass.java

import org.apache.kafka.clients.producer.*;
import org.apache.spark.sql.Row;

import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.*;

public class CarProducerClass {

private void initializeProducer() {
        log.info("Initializing producer");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstapServers);
        props.put("schema.registry.url", kafkaSchemaRegistryUrl);
        props.put("acks", "1");
        props.put("batch.size", 16384);
        props.put("buffer.memory", 33554432);
        props.put("retries",3);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaClientIdConfig);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("key.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicNameStrategy");
        props.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");
        log.info("Created producer");
        producer = new KafkaProducer(props);
    }
}

public Boolean publishRecord(Row row) {
    Boolean publishRecordFlag = false;
        if (producer == null) {
            initializeProducer();
        }
    Car.Builder car = new Car.newBuilder();
    car.setCarSpecs(buildCarSpecs(row.getAs("car_specs")))
    car.setCarCostDetails(buildCarCostDetails(row.getAs("car_cost_details")))
    CarKey.Builder carKey = new CarKey.Builder();
    Row car_key = row.getAs("car_key");
    carKey.setKey(car_key.getAs("car_id"))
    
        try{
             ProducerRecord<CarKey, Car> producerRecord
                    = new ProducerRecord(topic, null, System.currentTimeMillis(), carKey.build(), car.build());
                //Exception occurs here 
                RecordMetadata metadata = (RecordMetadata) producer.send(producerRecord).get();
       
          } catch (Exception e){
            log.info("Exception caught");
            e.printStackTrace();
          }
     
    public CarSpecs buildCarSpecs (Row car_specs){
        CarSpecs.Builder kafkaCarSpecs = CarSpecs.newBuilder();
         kafkaCarSpecs.setCarName("CX5");
         kafkaCarSpecs.setCarBrand("Mazda"); 
    }

    public CostDetails buildCarCostDetails (Row car_cost_details){
        CarSpecs.Builder kafkaCarSpecs = CarSpecs.newBuilder();
        kafkaCarSpecs.setPurchaseCity(car_cost_details.getAs("purchase_city"));
        kafkaCarSpecs.setPurchaseState(car_cost_details.getAs("purchase_state"));
        kafkaCarSpecs.setBasePrice((BigDecimal)car_cost_details.getAs("base_price"));
        kafkaCarSpecs.setTax((BigDecimal)car_cost_details.getAs("tax")); 
        kafkaCarSpecs.setTotalCost((BigDecimal)car_cost_details.getAs("total_cost")); 
        kafkaCarSpecs.setOtherCosts((BigDecimal)car_cost_details.getAs("other_costs")); 
    }
    public void closeProducer(){
        producer.close();
    }}

Avro Schema(在另一个已生产化的项目中预定义)

CarSpecs.avdl

protocol CarSpecsProtocol {

  record CarSpecs {
    string name;
    string brand;

  }
}

CarCostDetails.avdl

protocol CarCostDetailsProtocol {

  record CarCostDetails {
    string purchase_city;
    string purchase_state;
    decimal(18, 4) base_price;
    union { decimal(18,4), null} tax;
    union { decimal(18,4), null} total_cost;
    union { decimal(18,4), null} other_costs;
  }
}

汽车.avdl

protocol CarProtocol {
  import idl "CarCostDetails.avdl";
  import idl "CarSpecs.avdl";
  record Car {
    union { null, CarSpecs} car_specs = null;
    union { null, CarCostDetails} car_cost_details = null;
  }
}

汽车钥匙.avdl

protocol CarKeyProtocol {

  record CarKey {
     string id;
  }
}

Avro 生成的 Java 对象

@AvroGenerated
public class CarSpecs extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc 
private String name;
private String brand;
}


@AvroGenerated
import java.math.BigDecimal;
public class CarCostDetails extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc 
private String purchaseCity;
private String purchaseState;
private BigDecimal basePrice;
private BigDecimal tax;
private BigDecimal totalCost;
private BigDecimal otherCosts;

}


@AvroGenerated
public class Car extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc 
private CarSpecs carSpecs;
private CarCostDetails carCostDetails;

}

@AvroGenerated
public class CarKey extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc 
private String id;
}

我已经尝试过的:

  1. 像这样--packages org.apache.spark:spark-avro_2.11:2.4.3 在 spark 命令中传递 spark-avro 包
  2. 像在实际架构中一样对字段进行排序
  3. 为所有十进制/BigDecimal 字段设置默认值 0
  4. 检查这些字段的源数据类型是否为 java.Math.BigDecimal。是的。
  5. 将值显式转换为 BigDecimal(如上例所示)

以上所有仍然导致org.apache.avro.UnresolvedUnionException

【问题讨论】:

    标签: java apache-spark amazon-s3 avro spark-avro


    【解决方案1】:

    将十进制转换添加到全局配置(在运行时执行一次,然后将任何消息发送到 Kafka,例如,在 initializeProducer 中):

    import org.apache.avro.specific.SpecificData;
    import org.apache.avro.Conversions;
    
    SpecificData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
    

    您可能会在从应用于 MODEL$ 的 Avro 架构生成的静态构造函数中看到类似的行,因此请记住添加消息中使用的所有转换。

    以下观察基于 avro 1.10.1 库源和运行时行为。

    应该应用

    MODEL$ 配置(请参阅 SpecificData.getForClass),但如果 SpecificData 和您的消息类由不同的类加载器加载(这种情况在我的应用程序中——两个单独的 OSGI 包)。 在这种情况下 getForClass falls back to global instance.

    然后 GenericData.resolveUnion throws UnresolvedUnionException 因为 conversionsByClass 不包含带有 BigDecimal.class 键的值 和 getSchemaNameSpecificData returns Schema.Type.STRING 中覆盖 BigDecimal 类(以及其他少数,请参阅 SpecificData.stringableClasses)。 然后,此 STRING 与联合模式 (getIndexNamed) 中定义的值匹配,但未找到(因为它不是“字节”或“空”)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-05-17
      • 2016-04-04
      • 1970-01-01
      • 2011-01-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多