【问题标题】:Gobblin JSON to Avro convert failed with not a Json Array errorGobblin JSON 到 Avro 的转换失败,不是 Json Array 错误
【发布时间】:2020-09-05 11:00:27
【问题描述】:

我是 Gobblin 的新手,我正在尝试读取 JSON Kafka 消息并将其转换为 AVRO,然后将其存储在 HDFS 中。我目前的工作档案就像是一个打击:

job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false

kafka.brokers=localhost:9092

source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=org.apache.gobblin.extract.kafka
converter.classes=org.apache.gobblin.converter.json.JsonStringToJsonIntermediateConverter, org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter


source.schema=[{"columnName":"name", "dataType":{"type": "string"}}, {"columnName":"city", "dataType":{"type": "string"}}, {"columnName":"age", "dataType":{"type": "integer"}}, {"columnName":"ubdated_at", "dataType":{"type": "string"}}]


writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
extract.namespace=gobblin.source.extractor.filebased
gobblin.converter.schemaInjector.schema=SCHEMA
# writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=text

data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher

mr.job.max.mappers=1

metrics.reporting.file.enabled=true
metrics.log.dir=${gobblin.cluster.work.dir}/metrics
metrics.reporting.file.suffix=txt

bootstrap.with.offset=earliest

Kafka 按摩示例:{"age": 36, "city": "London", "name": "John", "ubdated_at": "2020-05-19"} 但是,当我在独立模式下运行它时,出现以下错误。

ERROR [TaskExecutor-1] org.apache.gobblin.runtime.Task 551 - Task 
task_GobblinKafkaQuickStart_1589884160573_0 failed java.lang.IllegalStateException: This is not a JSON
Array. at com.google.gson.JsonElement.getAsJsonArray(JsonElement.java:106) at
org.apache.gobblin.converter.json.JsonStringToJsonIntermediateConverter.convertSchema(JsonStringToJsonIn
termediateConverter.java:71) at
org.apache.gobblin.converter.json.JsonStringToJsonIntermediateConverter.convertSchema(JsonStringToJsonIn
termediateConverter.java:48) at
org.apache.gobblin.instrumented.converter.InstrumentedConverterDecorator.convertSchema(InstrumentedConve
rterDecorator.java:79) at
org.apache.gobblin.runtime.MultiConverter.convertSchema(MultiConverter.java:76) at
org.apache.gobblin.runtime.Task.runSynchronousModel(Task.java:417) at
org.apache.gobblin.runtime.Task.run(Task.java:368) at org.apache.gobblin.runtime.TaskExecutor$TrackingTask.run(TaskExecutor.java:443) at
org.apache.gobblin.util.executors.MDCPropagatingRunnable.run(MDCPropagatingRunnable.java:39) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at
java.lang.Thread.run(Thread.java:748)

有人可以帮我吗?

【问题讨论】:

    标签: gobblin


    【解决方案1】:

    您的gobblin.converter.schemaInjector.schema 属性只是说SCHEMA,并且库需要在那个地方实际的JSON 模式。您可以提供它,也可以像这样引用上面定义的架构:

    gobblin.converter.schemaInjector.schema='${source.schema}'
    

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-02-10
    • 1970-01-01
    • 2012-08-12
    • 2017-04-02
    • 2021-10-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多