【问题标题】:Avro decoding gives java.io.EOFExceptionAvro 解码给出 java.io.EOFException
【发布时间】:2016-08-09 16:11:31
【问题描述】:

我使用 Apache avro 架构和 Kafka 0.0.8V。我在生产者/消费者端使用相同的模式。架构中没有NO ANY Changes。但是当我尝试使用消息时,我在消费者那里遇到了一些异常。 为什么会出现这个错误?

制片人

public void sendFile(String topic, GenericRecord payload, Schema schema) throws CoreException, IOException {
    BinaryEncoder encoder = null;
    ByteArrayOutputStream out = null;
    try {
        DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema);
        out = new ByteArrayOutputStream();
        encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(payload, encoder);
        encoder.flush();

        byte[] serializedBytes = out.toByteArray();

        KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes);

            producer.send(message);
        }

消费者

public void run() {
        try {
            ConsumerIterator<byte[], byte[]> itr = stream.iterator();
            while (itr.hasNext()) {

                byte[] data = itr.next().message();

                Schema schema = new Schema.Parser()
                        .parse(new File("/Users/xx/avro_schemas/file.avsc"));

                DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
                Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

                GenericRecord payload = reader.read(null, decoder);
                System.out.println("Message received --: " + payload);

但是当阅读器尝试从解码器读取消息时出现以下异常。;

java.io.EOFException
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
    at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
    at com.xx.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

消费者属性

enable.auto.commit=true
auto.commit.interval.ms=101
session.timeout.ms=7000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
zookeeper.connect=zookeeper.xx.com\:2181
heartbeat.interval.ms=1000
auto.offset.reset=smallest
serializer.class=kafka.serializer.DefaultEncoder
bootstrap.servers=kafka.xx.com\:9092
group.id=test
consumer.timeout.ms=-1
fetch.min.bytes=1
receive.buffer.bytes=262144

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api avro


    【解决方案1】:

    问题是由您的 AVRO 制作人产生的。

    在 sendFile() 方法中,您没有刷新编码器,也没有关闭 ByteArrayOutputStream(),导致 EOFException。

    这里有一个通用序列化类的示例:

    public class TestSerializer<T> {
    
    
    
        final private Class<T> avroType;
    
        public TestSerializer(Class<T> avroType) {
            this.avroType = avroType;
        }
    
        public byte[] serialize(T object)
        {
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
            DatumWriter<T> writer = new SpecificDatumWriter<T>(avroType);
            try
            {
                writer.write(object, encoder);
                out.close();
            } catch (IOException e)
            {
                throw new RuntimeException(e);
            } finally
            {
                //Here is the flushing and closing
                try
                {
                    if (encoder != null)
                    {
                        encoder.flush();
                    }
                    if (out != null)
                    {
                        out.close();
                    }
                } catch (IOException e)
                {
                    throw new RuntimeException(e);
                }
            }
    
            return out.toByteArray();
    
        }
    
    }
    

    【讨论】:

    • 感谢您的回答,这也帮助了我:)
    猜你喜欢
    • 1970-01-01
    • 2018-01-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-26
    相关资源
    最近更新 更多