【问题标题】:How to use a growing file as Apache Kafka producer and read only the newly appended data如何使用不断增长的文件作为 Apache Kafka 生产者并仅读取新附加的数据
【发布时间】:2018-03-18 06:38:25
【问题描述】:

我正在尝试使用 file 作为我的 kafka 生产者。源文件不断增长(比如每秒 20 条记录/行)。以下是与我的问题类似的帖子:

How to write a file to Kafka Producer

但在这种情况下,每次在文件中插入新行时,都会读取整个文件并将其添加到 Kafka 主题中。我只希望将新附加的行发送到主题(即,如果文件已经包含 10 行并且附加了 4 行,则只需将这 4 行发送到主题)。

有没有办法做到这一点??

尝试了其他解决方案:

  1. Apache Flume 使用 source type 作为 'spooldir'。但它没有用,因为它从添加到目录的新文件中读取数据,而不是在将数据附加到已读取文件时读取数据。

  2. 我们还尝试将 Flume source type 作为“exec”,将 command 作为“tail –F /path/file-name”。这似乎也不起作用。

也欢迎使用任何其他工具的建议,因为我的目标是实时读取文件中的数据(即,我需要数据一插入文件) .

【问题讨论】:

  • 你尝试过来自 Kafka Connect 的 FileSource Connector 吗?
  • 感谢黄钦。我不知道连接器。它工作:)

标签: apache-kafka real-time flume


【解决方案1】:

您可以查看几个选项,具体取决于您的具体需求。

Kafka 连接

正如Chin Huang 所述,来自 Kafka Connect 的 FileSource 连接器应该能够在不安装其他软件的情况下执行您想要的操作。查看Connect Quickstart 以获取有关如何启动和运行它的指导,他们实际上有一个将文件读入 Kafka 的示例。

Logstash

Logstash 是此类事情的经典选择,它的Kafka 输出将按照您的意愿执行一个或多个文件。下面的配置应该给你大致的你想要的。

input {
  file {
    path => "/path/to/your/file"
  }
output {
    kafka {
        bootstrap_servers => "127.0.0.1:9092"
        topic_id => "topicname"
    }
}

文件节拍

Filebeat 与 Logstash 非常相似,如果您想对从文件读取的数据执行额外处理,它提供的功能较少。此外,它是用 go 而不是 java 编写的,因此运行它的机器上的占用空间应该更小。 以下应该是让您入门的最小配置(根据记忆,如果它们是必需的,您可能需要添加一个或两个参数):

filebeat.prospectors:
- type: log
  paths:
    - /path/to/your/file

output.kafka:
  hosts: ["127.0.0.1:9092"]
  topic: 'topicname'

水槽

如果您想重新访问您的 Flume 选项,请查看 TaildirSource,我没有使用它,但听起来它应该非常适合您的用例。

【讨论】:

  • 所有建议的解决方案都对我有用 :) 非常感谢 :)
  • 我们可以为此使用 Kafka SpoolDir 连接器吗?我只想从特定目录中的多个文件中读取更新/新记录。
猜你喜欢
  • 2016-05-24
  • 2017-10-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-12-17
  • 1970-01-01
  • 1970-01-01
  • 2015-07-02
相关资源
最近更新 更多