【问题标题】:Delay in Golang Consumer Receiving Kafka Messages After Connecting to KafkaGolang Consumer 连接 Kafka 后接收 Kafka 消息延迟
【发布时间】:2018-12-07 14:05:03
【问题描述】:

我是 Golang 和 Kafa 的新手,所以这似乎是一个愚蠢的问题。

我的 Kafka 消费者第一次连接到 Kafka 服务器后,为什么在与 Kafka 服务器建立连接和收到第一条消息之间会有延迟(约 20 秒)?

它在consumer.Messages() 之前打印一条消息,并为收到的每条消息打印另一条消息。约 20 秒的延迟介于第一个 fmt.Println 和第二个 fmt.Println 之间。

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
    cluster "github.com/bsm/sarama-cluster"
)

func main() {

    // Create the consumer and listen for new messages
    consumer := createConsumer()

    // Create a signal channel to know when we are done
    done := make(chan bool)

    // Start processing messages
    go func() { 
        fmt.Println("Start consuming Kafka messages")
        for msg := range consumer.Messages() {
            s := string(msg.Value[:])
            fmt.Println("Msg: ", s)
        }
    }()

    <-done

}

func createConsumer() *cluster.Consumer {
    // Define our configuration to the cluster
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = false
    config.Group.Return.Notifications = false
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    // Create the consumer
    brokers := []string{"127.0.0.1:9092"}
    topics := []string{"orders"}
    consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
    if err != nil {
        log.Fatal("Unable to connect consumer to Kafka")
    }
    go handleErrors(consumer)
    go handleNotifications(consumer)
    return consumer
}

docker-compose.yml

version: '2'
services:
zookeeper:
    image: "confluentinc/cp-zookeeper:5.0.1"
    hostname: zookeeper
    ports:
    - "2181:2181"
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

broker-1:
    image: "confluentinc/cp-enterprise-kafka:5.0.1"
    hostname: broker-1
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_BROKER_RACK: rack-a
    KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
    KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
    KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://127.0.0.1:9092'
    KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
    KAFKA_DELETE_TOPIC_ENABLE: "true"
    KAFKA_JMX_PORT: 9999
    KAFKA_JMX_HOSTNAME: 'broker-1'
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker-1:9092
    CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
    CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
    CONFLUENT_METRICS_ENABLE: 'true'
    CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    KAFKA_CREATE_TOPICS: "orders:1:1"

【问题讨论】:

  • 您的主题中有多少条消息?如果要处理大量数据,Kafka 可能需要一段时间才能找到起点。
  • @Peter 该主题中的消息少于 100 条。共有 2 个主题,第二个主题的消息少于 10 条。 Kafka 和 Zookeeper 在 16 GB 内存的 2018 i7 Macbook Pro 上的 Docker 容器中运行
  • @Peter 执行 docker-compose downdocker-compose up 清除所有 Kafka 消息,延迟仍然不到 20 秒。
  • 我在我的应用程序中使用了与您类似的配置。从我所见,每当我启动我的应用程序并尝试连接到 Kafka 时,Kafka 的 GroupCoordinator 需要一些时间来重新平衡/重新稳定组消费者。就我而言,这就是延迟的原因,也是 Kafka 的工作方式……我认为代码没有问题。希望这有帮助:)

标签: go apache-kafka kafka-consumer-api sarama


【解决方案1】:

我的 Kafka 消费者首次连接到 Kafka 服务器后,为什么在与 Kafka 服务器建立连接和接收到第一条消息之间会有延迟(约 20 秒)?

因为消费者使用了消息通道,所以不会有那么大的延迟 接收来自 kafka 的消息。消息一出 在 kafka 队列中,它将被发送到消费者可以使用的消息通道 收到。

来给你代码实现:-

for msg := range consumer.Messages() {
    s := string(msg.Value[:])
    fmt.Println("Msg: ", s)
}

consumer.Messages() 返回一个频道,for 在频道上循环,该频道在频道内可用时返回一条消息。

请参阅此问题How to create a kafka consumer group in Golang? 以使用 sarama 进行连接。您不需要 sarama-cluster 进行连接。

【讨论】:

    猜你喜欢
    • 2017-02-15
    • 2019-01-23
    • 1970-01-01
    • 2018-11-28
    • 1970-01-01
    • 2020-01-25
    • 2019-07-26
    • 2019-11-20
    • 2018-06-08
    相关资源
    最近更新 更多