【问题标题】:NoSuchMethodError occurs when run spark-streaming job on kafka在 kafka 上运行 spark-streaming 作业时发生 NoSuchMethodError
【发布时间】:2019-11-07 13:59:58
【问题描述】:

我正在使用 spark-streaming 来使用来自 kafka 的 protobuf-formated-messages。

master 设置为“local[2]”时它工作正常,但是当我将 master url 更改为真正 spark 集群的 master url 时,我遇到了以下异常

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 4 times, most recent failure: Lost task 0.3 in stage 20.0 (TID 58, 10.0.5.155): java.lang.NoSuchMethodError: com.google.protobuf.CodedInputStream.readStringRequireUtf8()Ljava/lang/String;
    at cn.xiaoman.eagleeye.Agent$Tag.<init>(Agent.java:83)
    at cn.xiaoman.eagleeye.Agent$Tag.<init>(Agent.java:44)
    at cn.xiaoman.eagleeye.Agent$Tag$1.parsePartialFrom(Agent.java:638)
    at cn.xiaoman.eagleeye.Agent$Tag$1.parsePartialFrom(Agent.java:633)
    at com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309)
    at cn.xiaoman.eagleeye.Agent$Metric.<init>(Agent.java:797)
    at cn.xiaoman.eagleeye.Agent$Metric.<init>(Agent.java:718)
    at cn.xiaoman.eagleeye.Agent$Metric$1.parsePartialFrom(Agent.java:1754)
    at cn.xiaoman.eagleeye.Agent$Metric$1.parsePartialFrom(Agent.java:1749)
    at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
    at cn.xiaoman.eagleeye.Agent$Metric.parseFrom(Agent.java:1058)
    at cn.xiaoman.eagleeye.rtmetricprocessor.MetricDeserializer.deserialize(MetricDeserializer.java:25)
    at cn.xiaoman.eagleeye.rtmetricprocessor.MetricDeserializer.deserialize(MetricDeserializer.java:14)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:627)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:548)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

版本: 火花:2.11-2.0.2 卡夫卡:2.11-0.10.1.0 protobuf:3.0.2

【问题讨论】:

    标签: apache-spark protocol-buffers spark-streaming


    【解决方案1】:

    因为任务依赖于 protobuf 3,而 spark 运行时依赖于另一个 protobuf 版本。

    解决方案:编辑 build.gradle 以使用“com.github.johnrengelman.shadow”插件将 com.google.protobuf 重新定位到另一个名称。

    shadowJar {
        relocate 'com.google.protobuf', 'shadow.google.protobuf'
    }
    

    编辑:添加完整版build.gradle

    group 'xxx'
    version '1.0-SNAPSHOT'
    
    apply plugin: 'java'
    apply plugin: 'idea'
    apply plugin: 'application'
    apply plugin: 'com.google.protobuf'
    apply plugin: 'com.github.johnrengelman.shadow'
    sourceCompatibility = 1.8
    
    buildscript {
        repositories {
            mavenLocal()
            mavenCentral()
            jcenter()
        }
        dependencies {
            // ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier
            // gradle versions
            classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
            classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.3'
        }
    }
    
    
    def grpcVersion = '1.0.2'
    
    repositories {
        mavenLocal()
        mavenCentral()
        jcenter()
    }
    
    def sparkVersion = '2.0.2'
    dependencies {
        compile "org.apache.spark:spark-streaming_2.11:${sparkVersion}"
        compile "org.apache.spark:spark-streaming-kafka-0-10_2.11:${sparkVersion}"
        compile "org.apache.spark:spark-core_2.11:${sparkVersion}"
        compile 'com.google.protobuf:protobuf-java:3.1.0'
    
        compile group: 'org.mongodb', name: 'mongo-java-driver', version: '3.4.0'
    
        testCompile 'junit:junit:4.11'
    }
    
    protobuf {
        protoc {
            // The version of protoc must match protobuf-java. If you don't depend on
            // protobuf-java directly, you will be transitively depending on the
            // protobuf-java version that grpc depends on.
            artifact = 'com.google.protobuf:protoc:3.0.2'
        }
    //    plugins {
    //        grpc {
    //            artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
    //        }
    //    }
    //    generateProtoTasks {
    //        all()*.plugins {
    //            grpc {
    //                // To generate deprecated interfaces and static bindService method,
    //                // turn the enable_deprecated option to true below:
    //                option 'enable_deprecated=false'
    //            }
    //        }
    //    }
    }
    
    idea {
        module {
            // Not using generatedSourceDirs because of
            // https://discuss.gradle.org/t/support-for-intellij-2016/15294/8
            sourceDirs += file("${projectDir}/build/generated/source/proto/main/java");
        }
    }
    
     shadowJar {
        zip64 true
        relocate 'com.google.protobuf', 'shadow.google.protobuf'
    }
    
    mainClassName = "xxx.Main"
    

    【讨论】:

    • 我尝试了这种方法但仍然失败.. gradle 中的着色插件有什么特别说明吗?
    • @NowaConcordia 我已经发布了我的完整版 gradle,它可能会提供帮助...
    猜你喜欢
    • 1970-01-01
    • 2015-10-15
    • 2016-08-31
    • 2018-10-26
    • 1970-01-01
    • 2017-04-02
    • 2021-11-17
    • 2019-03-10
    • 1970-01-01
    相关资源
    最近更新 更多