【问题标题】:Avro deserialization error - ArrayIndexOutOfBoundsExceptionAvro 反序列化错误 - ArrayIndexOutOfBoundsException
【发布时间】:2017-10-06 21:44:30
【问题描述】:

我的工作流程是 - 我创建 avsc 文件,使用 avrogencpp 工具生成 C++ 类,并在我的 C++ 应用程序中创建 avro 二进制编码数据。

我正在尝试找出场景 2 不起作用的原因。

场景 1

test.avsc

{
"namespace": "com.company.project",
"name": "Component_DeviceInfo",
"type": "record",
"doc": "Identifies a client device",
"fields": [
    {
        "name": "deviceId",
        "type": [
            "null",
            "string"
        ],
        "default": null,
        "doc": "Multicast Data Client Device Id. Usually unique MAC address"
    },
    {
        "name": "zoneId",
        "type": [
            "null",
            "string"
        ],
        "default": null,
        "doc": "Zone id where device belongs to"
    }
]
}

编码器 - C++

Component_DeviceInfo deviceInfo;
    deviceInfo.deviceId.set_string("device1");
    deviceInfo.zoneId.set_string("zone1");
    std::vector <char>tele_bytes_;
    std::auto_ptr<avro::OutputStream> out = avro::memoryOutputStream(1);
    avro::EncoderPtr enc = avro::binaryEncoder();
    enc->init(*out);
    avro::encode(*enc, deviceInfo);
    out->flush();

    size_t byte_count = out->byteCount();
    DBG("BYTE COUNT " << byte_count);

    std::auto_ptr<avro::InputStream> in = avro::memoryInputStream(*out);
    avro::StreamReader reader(*in);
    std::vector<uint8_t> row_data(byte_count);
    reader.readBytes(&row_data[0], byte_count);

Java 解码器

@Override
    public Object deserializeByteArr(Schema schema, final byte[] data){
        DatumReader<GenericRecord> genericDatumReader = new SpecificDatumReader<>(schema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
        try {
            GenericRecord userData = genericDatumReader.read(null, decoder);
            System.out.println(userData);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

场景 2 - 不起作用

注意我已经更新了schema并重新生成了schema对应的c++文件

test.avsc

[
    {
        "namespace": "com.company.project",
        "name": "Component_DeviceInfo",
        "type": "record",
        "doc": "Identifies a client device",
        "fields": [
            {
                "name": "deviceId",
                "type": [
                    "null",
                    "string"
                ],
                "default": null,
                "doc": "Unique MAC address"
            },
            {
                "name": "zoneId",
                "type": [
                    "null",
                    "string"
                ],
                "default": null,
                "doc": "Zone id where Client device belongs to"
            }
        ]
    },
    {
        "namespace": "com.company.project",
        "name": "Component_EventList",
        "type": "record",
        "doc": "Component Event list",
        "fields": [
            {
                "name": "deviceInfo",
                "type": [
                    "null",
                    "com.company.project.Component_DeviceInfo"
                ],
                "default": null,
                "doc": "Device information such as device id and zone id"
            }
        ]
    }
]

编码器 C++

Component_DeviceInfo deviceInfo;
    deviceInfo.deviceId.set_string("device1");
    deviceInfo.zoneId.set_string("zone1");

    std::vector <char>tele_bytes_;

    Component_EventList ComponentEventList;
    ComponentEventList.deviceInfo.set_Component_DeviceInfo(deviceInfo);

    std::auto_ptr<avro::OutputStream> out = avro::memoryOutputStream(1);
    avro::EncoderPtr enc = avro::binaryEncoder();
    enc->init(*out);
    avro::encode(*enc, ComponentEventList);
    out->flush();

    size_t byte_count = out->byteCount();
    DBG("BYTE COUNT " << byte_count);

    std::auto_ptr<avro::InputStream> in = avro::memoryInputStream(*out);
    avro::StreamReader reader(*in);
    std::vector<uint8_t> row_data(byte_count);
    reader.readBytes(&row_data[0], byte_count);

输出

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.company.telemetry.services.consumer.TelemetryConsumerService.consume(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, byte[]>)' threw exception; nested exception is java.lang.ArrayIndexOutOfBoundsException: 7
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) ~[spring-kafka-1.1.6.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.6.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.6.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.1.6.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.1.6.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:570) [spring-kafka-1.1.6.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_91]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_91]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
Caused by: java.lang.ArrayIndexOutOfBoundsException: 7
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) ~[avro-1.7.7.jar:1.7.7]
    at com.company.telemetry.services.serde.AvroByteArrDeserializer.deserializeByteArr(AvroByteArrDeserializer.java:32) ~[classes/:na]
    at com.company.telemetry.services.TelemetryService.handleByteArr(TelemetryService.java:59) ~[classes/:na]
    at com.company.telemetry.services.consumer.TelemetryConsumerService.consume(TelemetryConsumerService.java:39) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_91]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_91]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_91]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:180) ~[spring-messaging-4.3.11.RELEASE.jar:4.3.11.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:112) ~[spring-messaging-4.3.11.RELEASE.jar:4.3.11.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.6.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:174) ~[spring-kafka-1.1.6.RELEASE.jar:na]
    ... 8 common frames omitted

如果有人能告诉我发生了什么事,我将不胜感激!谢谢 !

【问题讨论】:

  • 嗨,Karthik,您解决了吗?任何输入都可能会有所帮助。我现在面临同样的问题。
  • 是的。请查看我接受的答案 - 特别是 issues.apache.org/jira/browse/AVRO-2095

标签: java c++ deserialization avro


【解决方案1】:

问题是我使用错误的类创建 Avro 编码数据。但更大的问题原来是顶级工会 - https://issues.apache.org/jira/browse/AVRO-2095

【讨论】:

    【解决方案2】:

    如果你在 avsc 中将 DeviceInfo 定义为内联而不是数组的元素,它会起作用吗?

    【讨论】:

    • 嗨@JohnM,是的,当我内联定义 DeviceInfo 时它可以工作!
    • 但是,问题是我不能将所有模式定义都内联。关于导致此问题的任何线索?
    【解决方案3】:

    我相信您所做的是在您的第二个架构中定义一个联合。

    你可能想看看这个,说实话,我觉得这没什么帮助。

    http://apache-avro.679487.n3.nabble.com/avrogencpp-and-multiple-avsc-td4028378.html

    【讨论】:

      【解决方案4】:

      我也明白了,可能是因为https://issues.apache.org/jira/browse/AVRO-1953

      作为一种解决方法,我做了:

          public static SpecificRecord deserializeAvroUsingFile(byte [] bytes, String napId, Schema schema) throws IOException {
              if (shouldCompress) {
                  bytes = Snappy.uncompress(bytes);
              }
              File file = File.createTempFile("avro",null);
              FileOutputStream fos = new FileOutputStream(file);
              fos.write(bytes);
              fos.close();
              SpecificDatumReader<SpecificRecord> datumReader = new SpecificDatumReader<>(schema); // new GenericDatumReader();
              DataFileReader reader = new DataFileReader(file, datumReader);
              while (reader.hasNext()) {
                  Object obj = reader.next(null);
                  return (SpecificRecord) obj;
              }
              throw new IllegalStateException();
          }
      

      【讨论】:

        猜你喜欢
        • 2020-03-04
        • 1970-01-01
        • 2021-03-07
        • 2019-11-19
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-11-20
        • 2018-05-27
        相关资源
        最近更新 更多