【问题标题】:Filtering log files in Flume using interceptors使用拦截器过滤 Flume 中的日志文件
【发布时间】:2016-08-27 23:04:33
【问题描述】:

我有一个 http 服务器写入日志文件,然后我使用 Flume 将其加载到 HDFS 首先,我想根据标题或正文中的数据过滤数据。我读到我可以使用带有正则表达式的拦截器来做到这一点,有人可以准确解释我需要做什么吗?我是否需要编写覆盖 Flume 代码的 Java 代码?

我还想获取数据并根据标头将其发送到不同的接收器(即 source=1 到 sink1,source=2 到 sink2)这是如何完成的?

谢谢你,

西蒙

【问题讨论】:

    标签: hadoop flume


    【解决方案1】:

    您无需编写 Java 代码来过滤事件。使用Regex Filtering Interceptor 过滤正文匹配某个正则表达式的事件:

    agent.sources.logs_source.interceptors = regex_filter_interceptor
    agent.sources.logs_source.interceptors.regex_filter_interceptor.type = regex_filter
    agent.sources.logs_source.interceptors.regex_filter_interceptor.regex = <your regex>
    agent.sources.logs_source.interceptors.regex_filter_interceptor.excludeEvents = true
    

    要根据标头路由事件,请使用Multiplexing Channel Selector:

    a1.sources = r1
    a1.channels = c1 c2 c3 c4
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = state
    a1.sources.r1.selector.mapping.CZ = c1
    a1.sources.r1.selector.mapping.US = c2 c3
    a1.sources.r1.selector.default = c4
    

    此处,标题为“state”="CZ" 的事件转到频道“c1”,“state”="US" - 转到“c2”和“c3”,所有其他 - 转到“c4”。

    这样您还可以按标头过滤事件 - 只需将特定标头值路由到指向 Null Sink 的通道。

    【讨论】:

      【解决方案2】:

      您可以使用水槽通道选择器将事件简单地路由到不同的目的地。或者您可以将多个 Flume 代理链接在一起以实现复杂的路由功能。 但是链式水槽代理将变得有点难以维护(资源使用和水槽拓扑)。 你可以看看flume-ng router sink,它可能会提供你想要的一些功能。

      首先,通过flume interceptor在事件头中添加特定字段

      a1.sources = r1 r2
      a1.channels = c1 c2
      a1.sources.r1.channels =  c1
      a1.sources.r1.type = seq
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = static
      a1.sources.r1.interceptors.i1.key = datacenter
      a1.sources.r1.interceptors.i1.value = NEW_YORK
      a1.sources.r2.channels =  c2
      a1.sources.r2.type = seq
      a1.sources.r2.interceptors = i2
      a1.sources.r2.interceptors.i2.type = static
      a1.sources.r2.interceptors.i2.key = datacenter
      a1.sources.r2.interceptors.i2.value = BERKELEY
      

      然后,您可以像这样设置您的水槽通道选择器:

      a2.sources = r2
      a2.sources.channels = c1 c2 c3 c4
      a2.sources.r2.selector.type = multiplexing
      a2.sources.r2.selector.header = datacenter
      a2.sources.r2.selector.mapping.NEW_YORK = c1
      a2.sources.r2.selector.mapping.BERKELEY= c2 c3
      a2.sources.r2.selector.default = c4
      

      或者,您可以像这样设置 avro-router 接收器:

      agent.sinks.routerSink.type = com.datums.stream.AvroRouterSink
      agent.sinks.routerSink.hostname = test_host
      agent.sinks.routerSink.port = 34541
      agent.sinks.routerSink.channel = memoryChannel
      
      # Set sink name
      agent.sinks.routerSink.component.name = AvroRouterSink
      
      # Set header name for routing
      agent.sinks.routerSink.condition = datacenter
      
      # Set routing conditions
      agent.sinks.routerSink.conditions = east,west
      agent.sinks.routerSink.conditions.east.if = ^NEW_YORK
      agent.sinks.routerSink.conditions.east.then.hostname = east_host
      agent.sinks.routerSink.conditions.east.then.port = 34542
      agent.sinks.routerSink.conditions.west.if = ^BERKELEY
      agent.sinks.routerSink.conditions.west.then.hostname = west_host
      agent.sinks.routerSink.conditions.west.then.port = 34543
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-06-16
        • 2019-05-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多