【问题标题】:How to write a file to Kafka Producer如何将文件写入 Kafka Producer
【发布时间】:2016-01-21 07:01:42
【问题描述】:

我正在尝试在 Kafka 中加载一个简单的文本文件而不是标准输入。 下载Kafka后,我执行了以下步骤:

启动 zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

启动服务器

bin/kafka-server-start.sh config/server.properties

创建了一个名为“test”的主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

运行制片人:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
Test1
Test2

消费者聆听:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2

我想将一个数据文件甚至一个简单的文本文件传递给消费者可以直接看到的生产者,而不是标准输入。任何帮助将不胜感激。谢谢!

【问题讨论】:

    标签: file apache-kafka kafka-consumer-api kafka-producer-api


    【解决方案1】:

    你可以用管道输入:

    kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
    --new-producer < my_file.txt
    

    找到here

    从 0.9.0 开始:

    kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt
    

    【讨论】:

    • 我使用的是 Kafka-0.9。 --new-producer 在 kafka-console-producer.sh 中不受支持,相反,$ kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
    • 我认为 --new-producer 是 0.9 的实际生产者 :)
    • @BalázsMáriaNémeth,如何在固定时间间隔后仅传递 n 行(例如传递前 10 行,等待 5 秒,传递 10-20 行等)。在所有行结束后,它应该从第 1 行重新开始?
    • @vdep 你应该可以使用 kafkacat 做到这一点:github.com/edenhill/kafkacat
    • 这不适用于 0.11 .. 对新版本有什么建议吗?
    【解决方案2】:
    $ kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt
    

    在 Kafka-0.9.0 中为我工作

    【讨论】:

      【解决方案3】:

      这里有一些比较通用的方法,但对于简单的文件来说可能是多余的

      尾巴

      tail -n0 -F my_file.txt | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

      解释

      1. tail 随着文件的增长或不断向其添加日志,从文件末尾读取
      2. -n0 表示输出最后 0 行,所以只选择新行
      3. -F 按名称而不是描述符跟随文件,因此即使旋转它也可以工作

      系统日志-ng

      options {                                                                                                                             
          flush_lines (0);                                                                                                                
          time_reopen (10);                                                                                                               
          log_fifo_size (1000);                                                                                                          
          long_hostnames (off);                                                                                                           
          use_dns (no);                                                                                                                   
          use_fqdn (no);                                                                                                                  
          create_dirs (no);                                                                                                               
          keep_hostname (no);                                                                                                             
      };
      
      source s_file {
          file("path to my-file.txt" flags(no-parse));
      }
      
      
      destination loghost {
          tcp("*.*.*.*" port(5140));
      } 
      

      消耗

      nc -k -l 5140 | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

      解释(来自man nc

      -k' Forces nc to stay listening for another connection after its current connection is completed. It is an error to use this option without the -l option.
      
      -l' Used to specify that nc should listen for an incoming connection rather than initiate a connection to a remote host. It is an error to use this option in conjunction with the -p, -s, or -z options. Additionally, any timeouts specified with the -w option are ignored.
      

      参考

      Syslog-ng

      【讨论】:

        【解决方案4】:
        echo "Hello" | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
        

        【讨论】:

        • 虽然此代码 sn-p 可以解决问题,包括解释 really helps 以提高您的帖子质量。请记住,您正在为将来的读者回答问题,而不仅仅是现在提问的人!请edit您的答案添加解释,并说明适用的限制和假设。
        • 有时代码就是文档。很明显,它只是在一些文本中进行管道传输。
        猜你喜欢
        • 2019-04-23
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-07-31
        • 1970-01-01
        • 1970-01-01
        • 2017-10-01
        • 1970-01-01
        相关资源
        最近更新 更多