【问题标题】:Apache Flume - send only new file contentsApache Flume - 仅发送新文件内容
【发布时间】:2017-06-08 20:51:48
【问题描述】:

我是 Flume 的新用户,请把我当作一个绝对的菜鸟。我在为特定用例配置 Flume 时遇到了一个小问题,希望您能提供帮助。请注意,我没有使用 HDFS,这就是为什么这个问题与您在论坛上看到的其他问题不同。

我有两台虚拟机 (VM) 通过 Oracle Virtual Box 上的内部网络相互连接。我的目标是让一个 VM 监视一个特定目录,该目录中只有一个文件。当文件改变时,我希望 Flume 只发送新的行/数据。我希望其他 VM 接收此数据并将数据更新/连接到其上特定目录中的单个文件。

到目前为止,我的这个过程非常接近工作。每当在 VM1 中进行更改时,它们都会在 VM2 上更新。但是,VM1 上的整个文件每次都发送到 VM2,而不是新行。例如,如果我写了“Test1”,然后在下面写了“Test2”到 VM1 上的文件,那么 VM2 上的输出将是:

测试1

测试1

测试2

我想看到的是:

            Test1

            Test2

我不确定如何实现这一点,并在彻底检查 Flume 用户指南文档和有关 stackoverflow/stackexchange 的大多数相关文章后发送此电子邮件。供您参考,以下是当前配置(它们以我上面提到的方式工作)。

VM1 configuration

VM2 configuration

我意识到另一种解决方案是将配置保留在 VM1 上,并在每次检测到新内容时覆盖 VM2 上的文件。但是,我也不确定如何实现这一点。

非常感谢您提供的任何帮助!

【问题讨论】:

  • 请忽略我代码中的一些 cmets,我正在尝试。忘了里面有:)
  • 我现在在想,我也可以通过定期删除最终目录中的文件来解决这个问题。
  • 作为另一种可能的解决方法,我正在考虑将目标目录附加到具有假脱机目录源和 file_roll 接收器的代理,而 file_roll 的目标是同一目录。通过这种方式,我实际上将执行刷新操作,因为假脱机目录源允许您在将文件读入通道后删除它们。
  • 我不确定,但TAILDIR source 可能更适合这个用例
  • @gorros 您对我在下面列出的内容有任何见解吗?不幸的是,我的时间不多了,可以使用所有可能的帮助吗?简而言之,使用taildir 源,但只要附加一行,整个文件仍会被发送。我只想发送单行。

标签: apache server virtual-machine flume flume-ng


【解决方案1】:

使用 Flume 中提供的 TailDir 源。它会定期写入位置文件中读取的最后位置,并且它比 exec 源更可靠,因为即使在代理崩溃或由于某种原因停止的情况下,它也会从位置文件中保存的最后位置开始读取.

agent1.sources.src1.type = TAILDIR 
agent1.sources.src1.channels = ch1 
agent1.sources.src1.filegroups =f1
agent1.sources.src1.filegroups.f1= //path to log file 
agent1.sources.src1.maxBackoffSleep = 10000

根据您的需要设置 maxBackoffSleep 值,这意味着代理在轮询日志文件更改之前应该等待多长时间,当它在上次尝试中没有发现任何更改时。

【讨论】:

  • 自从项目延迟以来,我已经有一段时间没有检查这个线程了。我现在要试试这个。这仅适用于 .json 格式吗?如果我想传输 .txt 或 .csv 怎么办?
  • 所以它确实适用于 .txt 格式。但是,每次我只在文件中添加一行时,它仍然会发送文件中的所有内容。我误会了吗?所以在 VM1 上,我有一个 TAILDIR 源和 avro 接收器。在 VM2 上,我有一个 avro 源和一个 file_roll 接收器。
  • 请检查位置是否写入文件~/.flume/taildir_position.json
  • 没有 .json 文件被创建,或者在任何水槽目录中可用。如果有帮助,这些是我的配置:
  • agent.sources=r1 agent.channels=k1 agent.sinks=c1 agent.channels.k1.type=memory agent.channels.k1.capacity=1000 agent.channels.k1.transactionCapacity=100 #connect source, channel,sink agent.sources.r1.channels=k1 agent.sinks.c1.channel=k1 agent.sources.r1.type=TAILDIR agent.sources.r1.channels=k1 agent.sources.r1.filegroups =f1 agent.sources.r1.filegroups.f1=/tail_test_dir/test.txt agent.sources.r1.maxBackoffSleep=1000 agent.sinks.c1.type=avro agent.sinks.c1.hostname=10.10.10.4 agent.sinks .c1.port=4545
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-05-10
  • 2015-12-16
  • 1970-01-01
  • 2020-10-21
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多