原博客地址:http://www.2cto.com/kf/201612/555835.html
1.编写java代码,随机生成用户ID号码,区县号码,乡镇号码(区县和乡镇号码用随机的三位字母表示)和个人总收入格式样例:779362a1-bf04-468a-91b6-a19d772f41fa####AFC####sfe####8091748。
2.用一个线程循环执行,用Thread.sleep(100)来控制线程执行一次停止100ms,防止cpu在死循环中过载,一秒生成10条数据,用log4j生成相应的日志到指定的目录下面,其中日志每分钟就生成一个格式为yyyy-MM-dd-HH-mm 例如:service.log.2016-10-13-11-32,最后在linux下用shell脚本启动这个java程序。
3.编写shell脚本,定时每分钟从log4j生成的脚本copy当前时间前一分钟产生的日志文件到被flume监控的文件夹内,注意copy过去应该在文件名后面加上.COMPLETED,copy完成后又把这个文件名的.COMPLETED去掉。
例如:
|
1
2
3
4
5
|
#首先
cp
./log4j/service.log.2016-10-13-11-37
./monitor/service.log.2016-10-13-11-37.COMPLETED
#然后
mv
./monitor/service.log.2016-10-13-11-37.COMPLETED
./monitor/service.log.2016-10-13-11-37
|
主要是防止源日志文件太大copy的时候会花比较长的时间,到时候flume会抛异常,当然你还可以使用另外一种解决方案:直接move源日志文件到被flume监控的目录中,不过这种方案没有上面的方案优。
4.配置flume的conf文件
5.编辑crontab每分钟执行这个脚本来拉取源日志文件。
环境:
1.使用的虚拟机为:vmware12
2.centOS6.5
3.hadoop2.2.0 单节点(主要测试用,所以直接用的单节点)
4.Flume 1.6.0 (刚开始用的flume-ng-1.5.0-cdh5.4.5,结果配置中的一个方法在这个版本的flume包里找不到抛异常,就换了个版本搞定)
java代码如下:
其中需要配置log4j配置文件,以及添加log4j的依赖jar包
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package
com.lijie.test;
import
java.util.UUID;
import
org.apache.log4j.Logger;
public
class
DataProduct {
public
static
void
main(String[] args) {
Thread
t1 = new
Thread(new
A());
t1.start();
}
}
class
A extends
Thread {
private
final
Logger log = Logger.getLogger(A.class);
public
void
run() {
//无限循环
while
(true)
{
//随机产生一个用户uuid
UUID
userId = UUID.randomUUID();
//产生一个随机的用户总资产
int
num = (int)
(Math.random() * 10000000)
+ 100000;
//产生一个随意的县名
StringBuilder
sb = new
StringBuilder();
for
(int
i = 0;
i < 3;
i++) {
char
a = (char)
(Math.random() * (90
- 65)
+ 65);
sb.append(a);
}
String
xian = sb.toString();
//产生一个随机的镇名
StringBuilder
sb1 = new
StringBuilder();
for
(int
i = 0;
i < 3;
i++) {
char
a = (char)
(Math.random() * (122
- 97)
+ 97);
sb1.append(a);
}
String
zhen = sb1.toString();
//生成日志
log.info(userId
+ "####"
+ xian + "####"
+ zhen + "####"
+ num);
//停0.1秒钟
try
{
Thread.sleep(100);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
}
|
log4j的配置文件:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
log4j.rootCategory=INFO,
stdout , R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d
%p [%t] %C.%M(%L) | %m%n
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=/home/hadoop/log4j/service.log
log4j.appender.R.DatePattern
= '.'yyyy-MM-dd-HH-mm
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d
%p [%t] %C.%M(%L) | %m%n
log4j.logger.com.xxx=DEBUG
log4j.logger.controllers=DEBUG
log4j.logger.vo=DEBUG
log4j.logger.notifiers=DEBUG
log4j.logger.com.opensymphony.oscache=WARN
log4j.logger.net.sf.navigator=WARN
log4j.logger.org.apache.commons=WARN
log4j.logger.org.apache.struts=WARN
log4j.logger.org.displaytag=WARN
log4j.logger.org.springframework=WARN
log4j.logger.org.apache.velocity=FATAL
|
启动java程序的shell脚本 start.sh
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
APP_HOME=/home/hadoop/myjar
APP_CLASSPATH=$APP_HOME/bin
jarList=$(ls
$APP_CLASSPATH|grep jar)
echo
$jarList
for
i in $jarList
do
APP_CLASSPATH="$APP_CLASSPATH/$i":
done
echo
$APP_CLASSPATH
export
CLASSPATH=$CLASSPATH:$APP_CLASSPATH
echo
$CLASSPATH
java
-Xms50m -Xmx250m com.lijie.test.DataProduct
echo
Linux Test End
|
定时拉取源日志的shell脚本 mvlog.sh
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
#!
/bin/bash
DIR=$(cd
`dirname $0`;
pwd)
mydate=`date
+%Y-%m-%d-%H-%M -d '-1
minutes'`
logName="service.log"
monitorDir="/home/hadoop/monitor/"
filePath="${DIR}"/log4j/""
fileName="${logName}"".""${mydate}"
echo
"文件地址:${filePath}"
echo
"文件名字:${fileName}"
if
[ -f "${monitorDir}""${fileName}"
]
then
echo
"文件存在,删除文件"
rm
-rf "${monitorDir}""${fileName}"
fi
echo
"开始复制文件"
cp
"${filePath}${fileName}"
"${monitorDir}${fileName}"".COMPLETED"
echo
"日志复制完成,更改名字"
mv
"${monitorDir}${fileName}"".COMPLETED"
"${monitorDir}${fileName}"
echo
"日志改名完成"
exit
|
flume的配置文件:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
#agent名,
source、channel、sink的名称
a1.sources
= r1
a1.channels
= c1
a1.sinks
= k1
#具体定义source
a1.sources.r1.type
= spooldir
a1.sources.r1.spoolDir
= /home/hadoop/monitor
#具体定义channel
a1.channels.c1.type
= memory
a1.channels.c1.capacity
= 10000
a1.channels.c1.transactionCapacity
= 100
#具体定义sink
a1.sinks.k1.type
= hdfs
a1.sinks.k1.hdfs.path
= hdfs://192.168.80.123:9000/flume/%Y%m%d
a1.sinks.k1.hdfs.filePrefix
= events-
a1.sinks.k1.hdfs.fileType
= DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp
= true
#不按照条数生成文件
a1.sinks.k1.hdfs.rollCount
= 0
#HDFS上的文件达到128M时生成一个文件
a1.sinks.k1.hdfs.rollSize
= 134217728
#HDFS上的文件达到60秒生成一个文件
a1.sinks.k1.hdfs.rollInterval
= 60
#组装source、channel、sink
a1.sources.r1.channels
= c1
a1.sinks.k1.channel
= c1
|
启动flume的命令:
|
1
|
../bin/flume-ng
agent -n a1 -c conf -f ./flume-conf.properties -Dflume.root.logger=DEBUG,console
|
crontab的配置
|
1
2
3
4
5
|
#首先crontab
-e编辑下面的代码然后保存
*
* * * * sh /home/hadoop/mvlog.sh
#然后启动crontab服务
service
crond start
|
准备工作进行好之后,执行java程序
sh ./start.sh
产生如下日志文件:
日志的内容:
定时任务会拉取这个目录下的日志到monitor目录下,flume就会收集,手机完成后会在文件名添加.COMPLETED后缀:
hdfs的flume下面就会生成当天时间格式化的目录,并且收集的数据会被put到该目录下:
java代码一直生成日志文件,crontab每隔一分钟都会拉取日志到flume监控的目录下面,flume就会把该文件收集到hdfs,这样一个简单的flume监控spoolDir日志到HDFS整个流程的小Demo就实现了。