【问题标题】:Difference between Kafka Consumer and Spark-Kafka-ConsumerKafka Consumer 和 Spark-Kafka-Consumer 的区别
【发布时间】:2023-03-02 23:26:01
【问题描述】:

我有一个 kafka 主题,我通过 Kafka Producer 发送数据。现在在消费者方面,我有两种选择。

1.使用 KafkaConsumer - kafkaConsumer 的代码如下,它从主题中读取数据并且工作正常。

  @EnableKafka
@Configuration
@PropertySource("kaafka.properties")
public class RawEventKafkaConsumer {

    private static final Logger logger = LoggerFactory.getLogger(RawEventKafkaConsumer.class);
    @Autowired
    private DataModelServiceImpl dataModelServiceImpl;

    private PolicyExecutor policyExecutor;

    public RawEventKafkaConsumer() {
         policyExecutor = new PolicyExecutor();
    }


    @Value("${spring.kafka.topic}")
    private String rawEventTopicName;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootStrapServer;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;



    @Bean
    public DefaultKafkaConsumerFactory<String, BaseDataModel> rawEventConsumer() {
        Map<String, Object> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        return new DefaultKafkaConsumerFactory<>(consumerProperties);
    }

    @Bean(name="kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> kafkaListenerContainerFactory() {
         logger.info("kafkaListenerContainerFactory called..");
        ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(rawEventConsumer());
        return factory;
    }

    @KafkaListener(topics = "rawEventTopic",  containerFactory = "kafkaListenerContainerFactory")
    public void listen(String baseDataModel) {

        ObjectMapper mapper = new ObjectMapper();
        BaseDataModel csvDataModel;
        try {
            csvDataModel = mapper.readValue(baseDataModel, BaseDataModel.class);

            //saving the datamodel in elastic search.
            //dataModelServiceImpl.save(csvDataModel);
            System.out.println("Message received " + csvDataModel.toString());
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

}

2。使用 Spark Stream 使用 kafkaTopic 数据 - 代码如下 -

 @Service
    public class RawEventSparkStreamConsumer {

        private final Logger logger = LoggerFactory.getLogger(RawEventSparkStreamConsumer.class);

        @Autowired
        private DataModelServiceImpl dataModelServiceImpl;


        @Autowired
        private JavaStreamingContext streamingContext;

        @Autowired
        private JavaInputDStream<ConsumerRecord<String, String>> messages;


        @PostConstruct
        private void sparkRawEventConsumer() {

            ExecutorService executor = Executors.newSingleThreadExecutor();
            executor.execute(()->{
                messages.foreachRDD((rdd) -> {
                    System.out.println("RDD coming *************************______________________________---------------------.." + rdd.count());
                    rdd.foreach(record -> {
                        System.out.println("Data is comming...." + record);
                    });
                });

                streamingContext.start();

                try {
                    streamingContext.awaitTermination();
                } catch (InterruptedException e) { // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            });

        }
    }

consumer kafka consumer 和 Spark streaming 都成功地从 topic 中读取数据。现在我有一个问题,如果两者都在做同样的事情(从主题中读取数据)那么

  1. 两者有什么区别?
  2. 另外我还面临一个问题,kafka 消费类和 Spark 消费类都在同一个代码库中,所以如果我同时使用这两个类,那么 kafkaConsumer 代码将无法正常工作。

谢谢。

【问题讨论】:

    标签: spring apache-spark apache-kafka spark-streaming kafka-consumer-api


    【解决方案1】:

    简短的回答是,您需要一个 Spark 集群以分布式方式运行 Spark 代码,而 Kafka Consumer 仅在单个 JVM 中运行,并且您手动运行同一应用程序的多个实例以将其扩展。

    换句话说,你会以不同的方式运行它们。 spark-submitjava -jar。我不相信使用 Spring 的更改会发生

    另一个区别是“普通消费者”对 Kafka 配置有更多的控制权,并且您一次只能获得一条记录。 Spark RDD 可以是许多事件,并且它们都必须具有相同的“模式”,除非您想要复杂的解析逻辑,使用 RDD 对象比使用为您提取的 ConsumerRecord 值更难编写。


    总的来说,我认为将它们结合起来并不是一个好主意。

    如果他们从同一个主题读取,那么 Kafka 消费者协议只能为每个分区分配一个消费者......目前尚不清楚您的主题有多少个分区,但这可以解释为什么一个可以工作,但不能另一个

    【讨论】:

    • 主题只有 1 个分区。我认为这就是如果我同时使用两者的原因,那么 Kafka 消费者无法正常工作。感谢您的澄清。
    猜你喜欢
    • 2016-11-03
    • 2017-06-20
    • 2016-11-03
    • 2017-08-15
    • 2018-07-15
    • 1970-01-01
    • 2018-12-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多