1.创建logtopic
[[email protected] kafka]# bin/kafka-topics.sh --create --zookeeper 172.16.101.58:2181,172.16.101.59:2181,172.16.101.60:2181/kafka --replication-factor 3 --partitions 1 --topic logtopic


2.创建avro_memory_kafka.properties (kafka sink)
[[email protected] ~]# cd /tmp/flume-ng/conf
[[email protected] conf]# cp avro_memory_hdfs.properties avro_memory_kafka.properties
[[email protected] conf]# vi avro_memory_kafka.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 172.16.101.54
a1.sources.r1.port = 4545


# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = logtopic
a1.sinks.k1.kafka.bootstrap.servers = 172.16.101.58:9092,172.16.101.59:9092,172.16.101.60:9092
a1.sinks.k1.kafka.flumeBatchSize = 6000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.keep-alive = 90
a1.channels.c1.capacity = 2000000
a1.channels.c1.transactionCapacity = 6000


# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


3.后台启动 flume-ng agent(聚合节点)和查看nohup.out
[[email protected] ~]# source /etc/profile
[[email protected] ~]# cd /tmp/flume-ng/
[[email protected] flume-ng]# nohup  flume-ng agent -c conf -f /tmp/flume-ng/conf/avro_memory_kafka.properties  -n a1 -Dflume.root.logger=INFO,console &
[1] 4971
[[email protected] flume-ng]# nohup: ignoring input and appending output to `nohup.out'

[[email protected] flume-ng]#
[[email protected] flume-ng]#
[[email protected] flume-ng]# cat nohup.out


4.检查log收集的三台(收集节点)开启没
[[email protected] flume-ng]$ . ~/.bash_profile
[[email protected] flume-ng]$ . ~/.bash_profile
[[email protected] flume-ng]$ . ~/.bash_profile


[[email protected] flume-ng]$ nohup  flume-ng agent -c /tmp/flume-ng/conf -f /tmp/flume-ng/conf/exec_memory_avro.properties -n a1 -Dflume.root.logger=INFO,console &
[[email protected] flume-ng]$ nohup  flume-ng agent -c /tmp/flume-ng/conf -f /tmp/flume-ng/conf/exec_memory_avro.properties -n a1 -Dflume.root.logger=INFO,console &
[[email protected] flume-ng]$ nohup  flume-ng agent -c /tmp/flume-ng/conf -f /tmp/flume-ng/conf/exec_memory_avro.properties -n a1 -Dflume.root.logger=INFO,console &


5.打开kafka manager监控
http://172.16.101.55:9999

08【在线日志分析】之Flume Agent(聚合节点) sink to kafka cluster

相关文章:

  • 2022-01-12
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-02-15
  • 2022-12-23
  • 2021-11-17
猜你喜欢
  • 2021-07-27
  • 2021-11-29
  • 2022-12-23
  • 2021-10-23
  • 2021-10-10
  • 2021-09-11
相关资源
相似解决方案