【问题标题】:How to get kafka consumer lag for spark structured streaming application如何为 Spark 结构化流应用程序获取 kafka 消费者延迟
【发布时间】:2020-07-08 03:34:35
【问题描述】:

我正在为我的 spark 结构化流应用程序构建监控,并且需要获取 spark 应用程序使用的某个主题的消费者延迟。我相信火花驱动程序必须意识到这种滞后,因为它具有执行器的所有元数据。我看不到任何方法可以从任何现有的 spark 文档或资源中获取这些指标。我检查了streaminQueryListener 接口,但它的功能也有限,因为我们只能从中获取每个查询指标。

【问题讨论】:

    标签: apache-spark apache-kafka spark-structured-streaming


    【解决方案1】:

    跟踪结构化流作业的消费者滞后的挑战在于,结构化流不会向 Kafka 提交任何偏移量(有关详细信息,请参阅 here)。因此,Kafka 并不知道 Structured Streaming 作业的实际进度。

    另一方面,Spark 无法了解当前位于 Kafka 主题中的消息/偏移量。

    为了监控消费者滞后,您需要将这些信息整合在一起:

    • 不断请求 TopicPartition 中的最新偏移量
    • 不断检查结构化流应用程序处理的当前偏移量

    例如,您可以创建一个 Kafka AdminClient,并在 onQueryProgress 调用 StreamingQueryListener 期间从 Kafka 获取所需信息。在该方法中,您需要将提到的最新事件的偏移量与 Kafka 中可用的实际最高偏移量进行比较。

    【讨论】:

      【解决方案2】:

      这是一种获取有关执行程序节点的请求信息的方法。为每条消息获取信息,您可以以最适合您需求(计数、时间等)的方式减少请求量。

      下面我将监控信息发送到另一个 Kafka 主题。

      我非常频繁地在每批流式消息上打开 Kafka 消费者连接(以获取有关最大偏移量的信息)。也许你无法接受。

      final JavaInputDStream<ConsumerRecord<String, byte[]>> stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
              ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParams));
      
      
      JavaPairDStream<String, Income> streamPair = stream
      .mapPartitionsToPair(new PairFlatMapFunction<Iterator<ConsumerRecord<String, byte[]>>, String, Income>() {
      
          private Map<String, Object> getProps() {
              Map<String, Object> kafkaParams2 = new HashMap<>();
              kafkaParams2.put("bootstrap.servers", ApiConsts.BOOTSTRAP_SERVERS);
              kafkaParams2.put("key.deserializer", StringDeserializer.class);
              kafkaParams2.put("value.deserializer", ByteArrayDeserializer.class);
              kafkaParams2.put("group.id", "ta_calc_spark" + UUID.randomUUID().toString());
              kafkaParams2.put("auto.offset.reset", "latest");
              kafkaParams2.put("enable.auto.commit", false);
              kafkaParams2.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
              kafkaParams2.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
              return kafkaParams2;
          }
      
          @Override
          public Iterator<Tuple2<String, Income>> call(Iterator<ConsumerRecord<String, byte[]>> t) throws Exception {
              KafkaConsumer consumer = new KafkaConsumer<>(getProps());
      
              ArrayList<TopicPartition> partitions0 = new ArrayList<TopicPartition>();
              IntStream.range(0, consumer.partitionsFor(ApiConsts.TOPIC_TA_CALC_SPARK_TASK).size())
                      .forEach(i -> partitions0.add(new TopicPartition(ApiConsts.TOPIC_TA_CALC_SPARK_TASK, i)));
              consumer.assign(partitions0);
              KafkaProducer producerMonitoring = getKafkaProducer();
      
              List<Tuple2<String, Income>> result = new ArrayList<Tuple2<String, Income>>();
              try {
                  t.forEachRemaining(t2 -> {
                      // business logic - message handling
      
                      try {
                          Set<TopicPartition> partitions = new HashSet<TopicPartition>();
                          TopicPartition actualTopicPartition = new TopicPartition(ApiConsts.TOPIC_TA_CALC_SPARK_TASK, t2.partition());
                          partitions.add(actualTopicPartition);
                          Long actualEndOffset = (Long) consumer.endOffsets(partitions).get(actualTopicPartition);
                          long actualPosition = consumer.position(actualTopicPartition);
                          String monitorValue = String.format(
                                  "diff: %s   (partition:%s; actualEndOffsetStreaming:%s; actualEndOffset:%s; actualPosition=%s)",
                                  actualEndOffset - actualPosition, t2.partition(), t2.offset(), actualEndOffset, actualPosition);
                          ProducerRecord<String, String> pRecord = new ProducerRecord<String, String>(ApiConsts.TOPIC_TA_CALC_SPARK_TEMP_RESULT,
                                  UUID.randomUUID().toString(), monitorValue);
                          producerMonitoring.send(pRecord);
                      } catch (Exception ex) {
                          log.error("################# mapPartitionsToPair.call() ERROR", ex);
                          ex.printStackTrace();
                      }
                  });
              } finally {
                  producerMonitoring.close();
                  consumer.close();
              }
              return result.iterator();
          }
      });
      

      输出:

      Consumer Record:(f45cd24b-6232-45b2-b8f2-814753ae89bf, diff: 0   (partition:4; actualEndOffsetStreaming:1177; actualEndOffset:1178; actualPosition=1178), 2, 109)
      Consumer Record:(3ec4f576-1fff-4c91-885f-fc709f7f4531, diff: 0   (partition:4; actualEndOffsetStreaming:1176; actualEndOffset:1178; actualPosition=1178), 3, 105)
      

      【讨论】:

        猜你喜欢
        • 2019-09-24
        • 2019-08-29
        • 2019-07-22
        • 2018-06-01
        • 1970-01-01
        • 1970-01-01
        • 2018-01-16
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多