flume-kafka-storm

flume读取日志数据,然后发送至kafka。

1、flume配置文件

agent.sources = kafkaSource
agent.channels = kafkaChannel
agent.sinks = kafkaSink

agent.sources.kafkaSource.type = exec
agent.sources.kafkaSource.command = tail -F /home/hadoop/kafkaData/kafka.log
agent.sources.kafkaSource.channels = kafkaChannel

agent.sinks.kafkaSink.channel = kafkaChannel
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.topic = stormTopic
agent.sinks.kafkaSink.brokerList = 192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092
agent.sinks.kafkaSink.kafka.flumeBatchSize = 20
agent.sinks.kafkaSink.kafka.producer.acks = 1
agent.sinks.kafkaSink.kafka.producer.linger.ms = 1
agent.sinks.kafkaSink.kafka.producer.compression.type = snappy
agent.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder 

agent.channels.kafkaChannel.type=memory                                                                                                       
agent.channels.kafkaChannel.capacity=10000                                                                                                    
agent.channels.kafkaChannel.transactionCapacity=100

2、启动flume

bin/flume-ng agent --conf-file  conf/flume-kafka.conf -c conf/ --name agent -Dflume.root.logger=DEBUG,console

3、需要在flume机器上修改hosts文件,添加上kafka的主机名和ip的映射。

4、在kafka上创建主题

bin/kafka-topics.sh --create --zookeeper hadoop2:2181 --replication-factor 1 --partitions 3 --topic stormTopic

5、模拟生成日志脚本

for((i=0;i<=10000;i++));
do echo "kafka_test-"$i>>/home/hadoop/kafkaData/kafka.log;
done

6、在kafka上开启消费者

bin/kafka-console-consumer.sh --zookeeper hadoop2:2181 --from-beginning --topic stormTopic

Kafka系列四 flume-kafka-storm整合

至此,flum->kafka的数据流走通。

7、整合Storm,将kafka作为stom的spout,将使用KafkaSpout。

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3     <modelVersion>4.0.0</modelVersion>
 4     <groupId>cn.itcast</groupId>
 5     <artifactId>kafkaStorm</artifactId>
 6     <version>0.0.1-SNAPSHOT</version>
 7     <packaging>jar</packaging>
 8     <dependencies>
 9         <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
10         <dependency>
11             <groupId>org.apache.storm</groupId>
12             <artifactId>storm-core</artifactId>
13             <version>1.2.1</version>
14             <scope>provided</scope>
15         </dependency>
16         <dependency>
17             <groupId>org.apache.storm</groupId>
18             <artifactId>storm-kafka</artifactId>
19             <version>1.2.1</version>
20         </dependency>
21         <dependency>
22             <groupId>org.apache.kafka</groupId>
23             <artifactId>kafka_2.12</artifactId>
24             <version>1.0.0</version>
25             <exclusions>
26                 <exclusion>
27                     <groupId>org.slf4j</groupId>
28                     <artifactId>slf4j-log4j12</artifactId>
29                 </exclusion>
30             </exclusions>
31         </dependency>
32         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
33         <dependency>
34             <groupId>org.apache.kafka</groupId>
35             <artifactId>kafka-clients</artifactId>
36             <version>1.0.0</version>
37         </dependency>
38 
39 
40     </dependencies>
41     <build>
42         <plugins>
43             <!-- 资源文件拷贝插件 -->
44             <plugin>
45                 <groupId>org.apache.maven.plugins</groupId>
46                 <artifactId>maven-resources-plugin</artifactId>
47                 <version>2.7</version>
48                 <configuration>
49                     <encoding>UTF-8</encoding>
50                 </configuration>
51             </plugin>
52             <!-- java编译插件 -->
53             <plugin>
54                 <groupId>org.apache.maven.plugins</groupId>
55                 <artifactId>maven-compiler-plugin</artifactId>
56                 <version>3.2</version>
57                 <configuration>
58                     <source>1.8</source>
59                     <target>1.8</target>
60                     <encoding>UTF-8</encoding>
61                 </configuration>
62             </plugin>
63             <plugin>
64                 <groupId>org.apache.maven.plugins</groupId>
65                 <artifactId>maven-jar-plugin</artifactId>
66                 <version>2.4</version>
67             </plugin>
68             <plugin>
69                 <groupId>org.apache.maven.plugins</groupId>
70                 <artifactId>maven-assembly-plugin</artifactId>
71                 <version>2.4</version>
72                 <configuration>
73                     <descriptorRefs>
74                         <descriptorRef>jar-with-dependencies</descriptorRef>
75                     </descriptorRefs>
76                     <archive>
77                         <manifest>
78                             <mainClass>cn.itcast.kafka.Kafka2Storm</mainClass>
79                         </manifest>
80                     </archive>
81                 </configuration>
82                 <executions>
83                     <execution>
84                         <id>make-assembly</id>
85                         <phase>package</phase>
86                         <goals>
87                             <goal>single</goal>
88                         </goals>
89                     </execution>
90                 </executions>
91             </plugin>
92         </plugins>
93     </build>
94 </project>
pom.xml

相关文章: