【问题标题】:Kafka topic details not displaying in sparkKafka主题详细信息未在火花中显示
【发布时间】:2018-02-12 08:23:23
【问题描述】:

我在 Kafka 中写了一个主题为my-topic,我正在尝试获取 spark 中的主题信息。但是我在显示 Kafka 主题详细信息时遇到了一些困难,因为我收到了一长串错误。我正在使用 java 来获取数据。

下面是我的代码:

public static void main(String s[]) throws InterruptedException{
    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Sampleapp");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "localhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "Different id is allotted for different stream");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList("my-topic");

    final JavaInputDStream<ConsumerRecord<String, String>> stream =
      KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
      );

    JavaPairDStream<String, String> jPairDStream =  stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;

                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
                    return new Tuple2<>(record.key(), record.value());
                }
            });

    jPairDStream.foreachRDD(jPairRDD -> {
           jPairRDD.foreach(rdd -> {
                System.out.println("key="+rdd._1()+" value="+rdd._2());
            });
        });

    jssc.start();            
    jssc.awaitTermination(); 

    stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;

                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
                    return new Tuple2<>(record.key(), record.value());
                }
            });
}

以下是我得到的错误:

使用 Spark 的默认 log4j 配置文件: org/apache/spark/log4j-defaults.properties 17/09/04 11:41:15 信息 SparkContext:运行 Spark 版本 2.1.0 17/09/04 11:41:15 WARN NativeCodeLoader:无法为您的本地 Hadoop 库加载 平台...在适用的情况下使用内置 java 类 17/09/04 11:41:15 INFO SecurityManager:将视图 acls 更改为:11014525 17/09/04 11:41:15 INFO SecurityManager:将修改 acls 更改为: 11014525 17/09/04 11:41:15 INFO SecurityManager:更改视图 acl 组到:17/09/04 11:41:15 INFO SecurityManager:更改修改 acls 组到:17/09/04 11:41:15 INFO SecurityManager: SecurityManager:禁用身份验证; ui acls 禁用;用户 有查看权限:Set(11014525);具有查看权限的组: 放();具有修改权限的用户:Set(11014525);团体与 修改权限:Set() 17/09/04 11:41:15 INFO Utils:成功 在端口 56668 上启动服务“sparkDriver”。17/09/04 11:41:15 信息 SparkEnv:注册 MapOutputTracker 17/09/04 11:41:15 INFO SparkEnv:注册 BlockManagerMaster 17/09/04 11:41:15 INFO BlockManagerMasterEndpoint:使用 org.apache.spark.storage.DefaultTopologyMapper 用于获取拓扑 信息 17/09/04 11:41:15 信息 BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 17/09/04 11:41:15 INFO DiskBlockManager: 创建本地目录 C:\Users\11014525\AppData\Local\Temp\blockmgr-cba489b9-2458-455a-8c03-4c4395a01d44 17/09/04 11:41:15 信息 MemoryStore:MemoryStore 以容量启动 896.4 MB 17/09/04 11:41:16 信息 SparkEnv:注册 OutputCommitCoordinator 17/09/04 11:41:16 信息实用程序:成功 在端口 4040 上启动服务“SparkUI”。17/09/04 11:41:16 信息 SparkUI:将 SparkUI 绑定到 0.0.0.0,并开始于 http://172.16.202.21:4040 17/09/04 11:41:16 INFO 执行者:开始 主机 localhost 上的执行程序 ID 驱动程序 17/09/04 11:41:16 INFO Utils: 成功启动服务 端口上的“org.apache.spark.network.netty.NettyBlockTransferService” 56689. 17/09/04 11:41:16 信息 NettyBlockTransferService:服务器创建于 172.16.202.21:56689 17/09/04 11:41:16 信息块管理器: 使用 org.apache.spark.storage.RandomBlockReplicationPolicy 块 复制策略 17/09/04 11:41:16 信息 BlockManagerMaster: 注册 BlockManager BlockManagerId(driver, 172.16.202.21, 56689, 无)17/09/04 11:41:16 信息 BlockManagerMasterEndpoint:注册 具有 896.4 MB RAM 的块管理器 172.16.202.21:56689, BlockManagerId(驱动程序,172.16.202.21,56689,无)17/09/04 11:41:16 INFO BlockManagerMaster:已注册的 BlockManager BlockManagerId(驱动程序,172.16.202.21,56689,无)17/09/04 11:41:16 INFO BlockManager: 初始化 BlockManager: BlockManagerId(driver, 172.16.202.21、56689、无)17/09/04 11:41:16 警告 KafkaUtils:将执行程序的 enable.auto.commit 覆盖为 false 17/09/04 11:41:16 警告 KafkaUtils:将执行程序的 auto.offset.reset 覆盖为 none 17/09/04 11:41:16 警告 KafkaUtils:覆盖执行程序 group.id 到 spark-executor-不同的 id 分配给不同的流 17/09/04 11:41:16 警告 KafkaUtils:将 receive.buffer.bytes 覆盖到 65536 见 KAFKA-3135 17/09/04 11:41:16 信息 DirectKafkaInputDStream:滑动时间 = 10000 毫秒 17/09/04 11:41:16 信息 DirectKafkaInputDStream:存储级别 = 序列化 1x 复制 17/09/04 11:41:16 信息 DirectKafkaInputDStream:检查点间隔 = null 17/09/04 11:41:16 信息 DirectKafkaInputDStream:记住间隔 = 10000 毫秒 17/09/04 11:41:16 INFO DirectKafkaInputDStream:初始化和验证 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@23a3407b 17/09/04 11:41:16 信息 MappedDStream:滑动时间 = 10000 毫秒 17/09/04 11:41:16 信息 MappedDStream:存储级别 = 序列化 1x 复制 17/09/04 11:41:16 信息 MappedDStream:检查点间隔 = null 2004 年 9 月 17 日 11:41:16 信息 MappedDStream:记住间隔 = 10000 毫秒 17/09/04 11:41:16 信息 MappedDStream:已初始化和验证 org.apache.spark.streaming.dstream.MappedDStream@140030a9 17/09/04 11:41:16 信息 ForEachDStream:滑动时间 = 10000 毫秒 17/09/04 11:41:16 信息 ForEachDStream:存储级别 = 序列化 1x 复制 17/09/04 11:41:16 信息 ForEachDStream:检查点间隔 = null 17/09/04 11:41:16 信息 ForEachDStream:记住间隔 = 10000 毫秒 17/09/04 11:41:16 INFO ForEachDStream:初始化和验证 org.apache.spark.streaming.dstream.ForEachDStream@65041548 17/09/04 11:41:16 ERROR StreamingContext:启动上下文时出错,标记 它停止了 org.apache.kafka.common.config.ConfigException: Missing 所需的配置“partition.assignment.strategy”没有 默认值。在 org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) 在 org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:48) 在 org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:194) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:380) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:363) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:350) 在 org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83) 在 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75) 在 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) 在 org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) 在 org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) 在 scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) 在 scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) 在 scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) 在 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) 在 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) 在 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) 在 scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) 在 scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) 在 scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) 在 scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) 在 scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) 在 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 在 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 在 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 在 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) at ... 使用 org.apache.spark.util.ThreadUtils 在单独的线程中运行 ... () 在 org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) 在 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 在 org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) 在 Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56) 17/09/04 11:41:16 INFO ReceiverTracker:ReceiverTracker 已停止 17/09/04 11:41:16 INFO JobGenerator:立即停止 JobGenerator 17/09/04 11:41:16 INFO RecurringTimer:JobGenerator 的已停止计时器 时间-1 17/09/04 11:41:16 之后信息 JobGenerator:停止 JobGenerator 17/09/04 11:41:16 信息 JobScheduler:已停止 JobScheduler 线程“主”中的异常 org.apache.kafka.common.config.ConfigException:缺少必需的 没有默认值的配置“partition.assignment.strategy” 价值。在 org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) 在 org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:48) 在 org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:194) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:380) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:363) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:350) 在 org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83) 在 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75) 在 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) 在 org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) 在 org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) 在 scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) 在 scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) 在 scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) 在 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) 在 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) 在 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) 在 scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) 在 scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) 在 scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) 在 scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) 在 scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) 在 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 在 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 在 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 在 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) at ... 使用 org.apache.spark.util.ThreadUtils 在单独的线程中运行 ... () 在 org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) 在 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 在 org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) 在 Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56) 17/09/04 11:41:16 信息 SparkContext:从关机调用 stop() 钩子 17/09/04 11:41:16 信息 SparkUI:在 http://172.16.202.21:4040 17/09/04 11:41:16 信息 MapOutputTrackerMasterEndpoint:MapOutputTrackerMasterEndpoint 停了! 17/09/04 11:41:16 信息 MemoryStore:MemoryStore 已清除 17/09/04 11:41:16 信息块管理器:块管理器停止 17/09/04 11:41:16 信息 BlockManagerMaster:BlockManagerMaster 停止 17/09/04 11:41:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator 停止了! 17/09/04 11:41:16 信息 SparkContext: 成功停止 SparkContext 17/09/04 11:41:16 INFO ShutdownHookManager:称为 17/09/04 11:41:16 的关闭挂钩 ShutdownHookManager:删除目录 C:\Users\11014525\AppData\Local\Temp\spark-37334cdc-9680-4801-8e50-ef3024ed1d8a

pom.xml

org.apache.spark 火花流媒体_2.11 2.1.0 公共语言 公共语言 2.6 org.apache.kafka kafka_2.10 0.8.2.0 org.apache.spark spark-streaming-kafka-0-10_2.10 2.1.1

【问题讨论】:

  • org.apache.kafka.common.config.ConfigException:缺少所需的配置“partition.assignment.strategy”,它没有默认值。这可能与它有关
  • @bleedcode 我根据您提供的 pom.xml 详细信息在下面编辑了我的答案。请尝试一下,如果它可以解决您的问题,请告诉我。

标签: java apache-spark apache-kafka spark-streaming


【解决方案1】:

从日志中,您的 spark 版本是 2.1.0。您尚未共享具有其他依赖项的构建文件。看起来您在类路径中同时拥有 spark-streaming-kafka-0-8_2.11-2.1.0.jarspark-streaming-kafka-0-10_2.11-2.1.0.jar 并且它正在加载错误的类。如果您使用的是 maven,那么您将需要如下所示的依赖项。请检查并更新您的项目。

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.0</version>
</dependency>  
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.0</version>
</dependency> 

编辑

由于您已经编辑了问题并发布了依赖项,因此我正在编辑我的答案。您使用的是 Kafka 版本 0.8.*,而您的 spark-streaming-kafka 版本是 0.10.*。请为 Kafka 依赖项使用相同的版本。请为org.apache.kafka使用以下依赖项

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.2.0</version>
</dependency>

【讨论】:

  • 您遇到什么错误?如果旧 jar 仍然存在,请检查您的构建。请使用 Maven 更新项目并清理构建。
猜你喜欢
  • 2018-06-02
  • 2021-07-01
  • 2014-09-26
  • 1970-01-01
  • 2019-01-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-07-05
相关资源
最近更新 更多