【问题标题】:Flink Stream Sink ParquetWriter Failing with ClassCastExceptionFlink Stream Sink ParquetWriter 因 ClassCastException 而失败
【发布时间】:2020-02-28 19:48:00
【问题描述】:

我已经编写了一个消费者,它从 kafka 主题中读取并使用 StreamSink 以镶木地板格式写入数据。但我收到以下错误

java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.CharSequence
    at org.apache.parquet.avro.AvroWriteSupport.fromAvroString(AvroWriteSupport.java:371)
    at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)
    at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
    at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
    at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
    at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)

代码很简单:

DataStream<GenericRecord> sourceStream = env.addSource(bikeDetailsKafkaConsumer010;

            final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(schemaSubject)))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

            sourceStream.addSink(sink).setParallelism(parallelism);

某些记录是否有问题以及如何调试哪个记录出现异常。

【问题讨论】:

    标签: java apache-kafka apache-flink parquet


    【解决方案1】:

    如果您在编写 Parquet/Avro 时收到 ClassCastException,则几乎总是架构和数据不匹配的情况。

    我建议打印出预期的架构和格式为 json 的实际记录(使用 toString 或使用类型信息更好)。

    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
    JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos);
    writer.write(genericRecord, encoder);
    encoder.flush();
    baos.flush();
    return new String(baos.toByteArray());
    

    【讨论】:

    • 但是如果模式和记录不匹配,那么记录如何通过验证并推送到主题。它应该只在生产者方面失败,并且不应该被推送到主题。
    • 我也只能在写入错误时解析具有相同模式的记录。
    • 没有kafka的记录没问题,但是写parquet时的schema不匹配。
    • 好的 Arvid,但它的发生非常随机,我使用相同的模式在 kafka 中写入,读取它并在 parquet 中写入,然后怎么可能。架构在 kafka 架构注册中,而不仅仅是我用来写实木复合地板。像这样 List 版本 = registryClient.getAllVersions(subjectName); SchemaMetadata schemaMeta = registryClient.getSchemaMetadata(subjectName, versions.get(versions.size() - 1)); Schema schema = new Schema.Parser().parse(schemaMeta.getSchema());
    • 甚至它的发生比我如何跳过该记录并继续,以便我的整个工作不会因为一条记录而失败并停止消费。
    猜你喜欢
    • 1970-01-01
    • 2015-02-03
    • 2018-01-15
    • 1970-01-01
    • 2021-01-27
    • 1970-01-01
    • 2018-02-16
    • 1970-01-01
    • 2021-07-09
    相关资源
    最近更新 更多