Kafka作为source

配置文件:

#定义各个模块

a1.sources = kafka

a1.sinks = log

a1.channels = c1

 

#配置kafka source

#source的类型为kafkaSource

a1.sources.kafka.type = org.apache.flume.source.kafka.KafkaSource

#消费者连接的zk集群地址

a1.sources.kafka.zookeeperConnect = chd01:2181,cdh02:2181,chd03:2181

#消费者消费的topic,只能是一个。

a1.sources.kafka.topic = hello

#kafka的组id

a1.sources.kafka.groupId = flume

#kafka的消费者连接超时时间单位毫秒

a1.sources.kafka.kafka.consumer.timeout.ms = 3000

 

 

# 配置logger sink

a1.sinks.log.type = logger

 

# 配置 memory channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# 绑定三种组件的关系

a1.sources.kafka.channels = c1

 

a1.sinks.log.channel = c1

 

 

 

 

Kafka作为source

配置文件:

#定义各个模块

a1.sources = kafka

a1.sinks = log

a1.channels = c1

 

#配置kafka source

#source的类型为kafkaSource

a1.sources.kafka.type = org.apache.flume.source.kafka.KafkaSource

#消费者连接的zk集群地址

a1.sources.kafka.zookeeperConnect = node0:2181,node1:2181,node2:2181

#消费者消费的topic,只能是一个。

a1.sources.kafka.topic = hello

#kafka的组id

a1.sources.kafka.groupId = flume

#kafka的消费者连接超时时间单位毫秒

a1.sources.kafka.kafka.consumer.timeout.ms = 3000

 

 

# 配置logger sink

a1.sinks.log.type = logger

 

# 配置 memory channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# 绑定三种组件的关系

a1.sources.kafka.channels = c1

a1.sinks.log.channel = c1

相关文章:

  • 2021-05-14
  • 2018-12-18
  • 2021-06-10
  • 2021-11-15
  • 2021-11-07
  • 2021-05-20
  • 2022-01-23
猜你喜欢
  • 2022-12-23
  • 2021-07-22
  • 2021-09-03
  • 2021-10-13
  • 2022-02-26
  • 2021-09-24
  • 2022-02-01
相关资源
相似解决方案