您可以使用水槽通道选择器将事件简单地路由到不同的目的地。或者您可以将多个 Flume 代理链接在一起以实现复杂的路由功能。
但是链式水槽代理将变得有点难以维护(资源使用和水槽拓扑)。
你可以看看flume-ng router sink,它可能会提供你想要的一些功能。
首先,通过flume interceptor在事件头中添加特定字段
a1.sources = r1 r2
a1.channels = c1 c2
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
a1.sources.r2.channels = c2
a1.sources.r2.type = seq
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = datacenter
a1.sources.r2.interceptors.i2.value = BERKELEY
然后,您可以像这样设置您的水槽通道选择器:
a2.sources = r2
a2.sources.channels = c1 c2 c3 c4
a2.sources.r2.selector.type = multiplexing
a2.sources.r2.selector.header = datacenter
a2.sources.r2.selector.mapping.NEW_YORK = c1
a2.sources.r2.selector.mapping.BERKELEY= c2 c3
a2.sources.r2.selector.default = c4
或者,您可以像这样设置 avro-router 接收器:
agent.sinks.routerSink.type = com.datums.stream.AvroRouterSink
agent.sinks.routerSink.hostname = test_host
agent.sinks.routerSink.port = 34541
agent.sinks.routerSink.channel = memoryChannel
# Set sink name
agent.sinks.routerSink.component.name = AvroRouterSink
# Set header name for routing
agent.sinks.routerSink.condition = datacenter
# Set routing conditions
agent.sinks.routerSink.conditions = east,west
agent.sinks.routerSink.conditions.east.if = ^NEW_YORK
agent.sinks.routerSink.conditions.east.then.hostname = east_host
agent.sinks.routerSink.conditions.east.then.port = 34542
agent.sinks.routerSink.conditions.west.if = ^BERKELEY
agent.sinks.routerSink.conditions.west.then.hostname = west_host
agent.sinks.routerSink.conditions.west.then.port = 34543