【问题标题】:Cannot produce Message when Main Thread sleep less than 1000主线程睡眠小于 1000 时无法产生消息
【发布时间】:2017-12-01 20:06:20
【问题描述】:

当我使用 Kafka 的 Java API 时,如果我让我的主线程睡眠时间少于 2000ns,它就无法生成任何消息。我真的很想知道为什么会这样?

这是我的制作人:

public class Producer {
    private final KafkaProducer<String, String> producer;
    private final String topic;

    public Producer(String topic, String[] args) {
        //......
        //......
        producer = new KafkaProducer<>(props);
        this.topic = topic;
    }

    public void producerMsg() throws InterruptedException {
        String data = "Apache Storm is a free and open source distributed";
        data = data.replaceAll("[\\pP‘’“”]", "");
        String[] words = data.split(" ");
        Random _rand = new Random();

        Random rnd = new Random();
        int events = 10;
        for (long nEvents = 0; nEvents < events; nEvents++) {
            long runtime = new Date().getTime();
            int lastIPnum = rnd.nextInt(255);
            String ip = "192.168.2." + lastIPnum;
            String msg = words[_rand.nextInt(words.length)];
            try {
                producer.send(new ProducerRecord<>(topic, ip, msg));
                System.out.println("Sent message: (" + ip + ", " + msg + ")");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Producer producer = new Producer(Constants.TOPIC, args);
        producer.producerMsg();
        //If I write Thread.sleep(1000),It will not work!!!!!!!!!!!!!!!!!!!!
        Thread.sleep(2000);
    }
}

欣赏一下

【问题讨论】:

    标签: apache-kafka kafka-consumer-api kafka-producer-api


    【解决方案1】:

    你能展示你用来配置 Producer 的道具吗?我只是猜测有可能......

    在 producerMsg() 中,您使用异步方式来使用生产者,因此只需 producer.send(),这意味着将消息放在内部缓冲区中,用于制作稍后发送的批次。生产者有一个内部线程从缓冲区获取并发送批处理。也许只有 1000 毫秒不足以达到生产者真正发送消息的条件(参见 batch.size 和 linger.ms),主应用程序结束,生产者死亡而不发送消息。给它更多时间(2000 ms),它可以工作。顺便说一句,我没有尝试代码。

    所以原因似乎是你的:

    props.put("linger.ms", 1000);

    与您的睡眠相匹配。因此生产者将在 1000 毫秒后开始发送消息,因为批处理尚未满(batch.size 为 16 MB)。同时,主线程在休眠 1 秒后结束,生产者不发送消息。您必须使用较低的 linger.ms 时间。

    【讨论】:

    • 好的,你能看到我的道具吗
    • 对对对,就是这个原因
    • 很高兴为您提供帮助!那么你能把这个标记为其他有同样问题的人的答案吗?
    猜你喜欢
    • 2012-04-05
    • 2012-07-07
    • 1970-01-01
    • 2018-12-04
    • 1970-01-01
    • 2014-07-05
    • 1970-01-01
    • 2018-08-09
    • 1970-01-01
    相关资源
    最近更新 更多