Java Kafka 消费积压监控
后端代码:
Monitor.java代码:
package com.suncreate.kafkaConsumerMonitor.service; import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.*; /** * kafka消费监控 * * @author suxiang */ public class Monitor { private static final Logger log = LoggerFactory.getLogger(Monitor.class); private String servers; private String topic; private String groupId; private long lastTime; private long lastTotalLag = 0L; private long lastLogSize = 0L; private long lastOffset = 0L; private double lastRatio = 0; private long speedLogSize = 0L; private long speedOffset = 0L; private String time; private List<ConsumerInfo> list; private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public String getTime() { return time; } public void setTime(String time) { this.time = time; } public long getLastTotalLag() { return lastTotalLag; } public double getLastRatio() { return lastRatio; } public String getTopic() { return topic; } public String getGroupId() { return groupId; } public long getSpeedLogSize() { return speedLogSize; } public long getSpeedOffset() { return speedOffset; } public List<ConsumerInfo> getList() { return list; } public void setList(List<ConsumerInfo> list) { this.list = list; } private KafkaConsumer<String, String> consumer; private List<TopicPartition> topicPartitionList; private final DecimalFormat decimalFormat = new DecimalFormat("0.00"); private ConsumerGroupsService consumerGroupsService; private String groupIdShort; private boolean needUpdate; /** * kafka消费监控 * * @param servers * @param consumerGroupsService * @param topic * @param groupId * @param needUpdate true:需要更新 groupId 和 KafkaConsumer,groupId传递前缀即可;false:不需要更新 groupId 和 KafkaConsumer,groupId传递全称 */ public Monitor(String servers, ConsumerGroupsService consumerGroupsService, String topic, String groupId, boolean needUpdate) { this.servers = servers; this.topic = topic; this.groupIdShort = groupId; this.groupId = consumerGroupsService.getGroupId(topic, groupId); this.consumerGroupsService = consumerGroupsService; this.needUpdate = needUpdate; this.list = new ArrayList<>(); //消费者 consumer = createConsumer(); //查询 topic partitions topicPartitionList = new ArrayList<>(); List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfoList) { TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); topicPartitionList.add(topicPartition); } } public void monitor(boolean addToList) { try { long startTime = System.currentTimeMillis(); //查询 log size Map<Integer, Long> endOffsetMap = new HashMap<>(); Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitionList); for (TopicPartition partitionInfo : endOffsets.keySet()) { endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo)); } //查询消费 offset Map<Integer, Long> commitOffsetMap = new HashMap<>(); for (TopicPartition topicAndPartition : topicPartitionList) { OffsetAndMetadata committed = consumer.committed(topicAndPartition); commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); } long endTime = System.currentTimeMillis(); log.info("查询logSize和offset耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒"); //累加lag long totalLag = 0L; long logSize = 0L; long offset = 0L; if (endOffsetMap.size() == commitOffsetMap.size()) { for (Integer partition : endOffsetMap.keySet()) { long endOffset = endOffsetMap.get(partition); long commitOffset = commitOffsetMap.get(partition); long diffOffset = endOffset - commitOffset; totalLag += diffOffset; logSize += endOffset; offset += commitOffset; } } else { log.error("Topic:" + topic + " consumer:" + consumer + " topic partitions lost"); } log.info("Topic:" + topic + " logSize:" + logSize + " offset:" + offset + " totalLag:" + totalLag); if (lastTime > 0) { if (System.currentTimeMillis() - lastTime > 0) { speedLogSize = (long) ((logSize - lastLogSize) / ((System.currentTimeMillis() - lastTime) / 1000.0)); speedOffset = (long) ((offset - lastOffset) / ((System.currentTimeMillis() - lastTime) / 1000.0)); } if (speedLogSize > 0) { String strRatio = decimalFormat.format(speedOffset * 100 / (speedLogSize * 1.0)); lastRatio = Double.parseDouble(strRatio); log.info("Topic:" + topic + " speedLogSize:" + speedLogSize + " speedOffset:" + speedOffset + " 百分比:" + strRatio + "%"); } } lastTime = System.currentTimeMillis(); lastTotalLag = totalLag; lastLogSize = logSize; lastOffset = offset; if (addToList) { this.setTime(simpleDateFormat.format(new Date())); this.list.add(new ConsumerInfo(this.getTopic(), this.getGroupId(), this.getLastTotalLag(), this.getLastRatio(), this.getSpeedLogSize(), this.getSpeedOffset(), this.getTime())); if (this.list.size() > 500) { this.list.remove(0); } } } catch (Exception e) { log.error("Monitor error", e); } } private KafkaConsumer<String, String> createConsumer() { //消费者 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new KafkaConsumer<String, String>(properties); } /** * 更新 groupId 和 KafkaConsumer */ public void update() { if (needUpdate) { try { String oldGroupId = this.groupId; this.groupId = consumerGroupsService.getGroupId(topic, groupIdShort); log.info("groupId 已更新 旧groupId=" + oldGroupId + " 新groupId=" + this.groupId); if (this.consumer != null) { try { this.consumer.close(); } catch (Exception e) { log.error("consumer close error", e); } this.consumer = null; } this.consumer = createConsumer(); log.info("KafkaConsumer 已更新"); } catch (Exception e) { log.error("Monitor update error", e); } } } }