【问题标题】:flume: assign key for partition kafka sink水槽:为分区 kafka 接收器分配密钥
【发布时间】:2019-11-20 10:47:12
【问题描述】:

我正在处理一个问题,但在水槽文档中也找不到任何解决方法。 我想取尾文件的绝对路径并保存。 在我想将它作为键传递给 kafka sink 之后,以便在同一分区中拥有具有相同路径的所有事件。 我读过很多文章说这是可能的,但我找不到要分配的配置以使其工作。有人可以给我有关如何配置代理的参考或示例吗?
我有以下代理配置:

source
agent3a.sources.source3a.type = TAILDIR
agent3a.sources.source3a.filegroups= f1
agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/
agent3a.sources.source3a.channels= channel3a
agent3a.sources.source3a.batchSize=1
agent3a.sources.source3a.fileHeader= True

Memory Channel
agent3a.channels.channel3a.type = memory
agent3a.channels.channel3a.transactionCapacity = 100
agent3a.channels.channel3a.capacity = 100

Sink
agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink 
agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port
agent3a.sinks.sink3a.kafka.topic= topic_test
agent3a.sinks.sink3a.kafka.producer.acks=1
agent3a.sinks.sink3a.channel= channel3a
agent3a.sinks.sink3a.FlumeBatchSize=1
agent3a.sinks.sink3a.useFlumeEventFormat = true
agent3a.sinks.sink3a.kafka.producer.batch.size=10

tnk :)

【问题讨论】:

    标签: apache-kafka key flume-ng


    【解决方案1】:

    最后我找到了如何配置代理,以便使用数据源中文件的绝对路径为 kafka 主题中的 partitionin 分配密钥。 更详细地说,需要设置属性'fileHeaderKey=key'。这样,当事件传递给kafka sink时,header包含对key =absolute/path/of /the/file,kafka可以使用它作为消息中的关键。

    agent3a.sources= source3a 
    agent3a.channels= channel3a
    agent3a.sinkss= sink3a
    
    source
    agent3a.sources.source3a.type = TAILDIR
    agent3a.sources.source3a.filegroups= f1
    agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/
    agent3a.sources.source3a.channels= channel3a
    agent3a.sources.source3a.batchSize=1
    agent3a.sources.source3a.fileHeader= True
    agent3a.sources.source3a.fileHeaderKey= key #####property to set the fileHeader the 
                                                      key for partitioning###
    
    Memory Channel
    agent3a.channels.channel3a.type = memory
    agent3a.channels.channel3a.transactionCapacity = 100
    agent3a.channels.channel3a.capacity = 100
    
    Sink
    agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink 
    agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port
    agent3a.sinks.sink3a.kafka.topic= topic_test
    agent3a.sinks.sink3a.kafka.producer.acks=1
    agent3a.sinks.sink3a.channel= channel3a
    agent3a.sinks.sink3a.FlumeBatchSize=1
    agent3a.sinks.sink3a.useFlumeEventFormat = true
    agent3a.sinks.sink3a.kafka.producer.batch.size=10
    

    【讨论】:

      猜你喜欢
      • 2020-12-12
      • 1970-01-01
      • 1970-01-01
      • 2020-05-28
      • 1970-01-01
      • 2019-08-06
      • 1970-01-01
      • 1970-01-01
      • 2018-04-28
      相关资源
      最近更新 更多