流程简介:

Nginx+Flume+Hive日志采集

 

1.Nginx数据格式

Nginx定义日志格式:

$remote_addr 客户端IP

$time_local 通用日志格式下的本地时间

$status 状态码

$body_bytes_sent 发送给客户端的字节数,不包括响应头的大小

$http_user_agent 客户端浏览器信息

$http_referer 请求的referer地址。

$request 完整的原始请求

$request_method #HTTP请求方法,通常为"GET"或"POST"

$request_time 请求处理时长

$request_uri 完整的请求地址

$server_protocol #服务器的HTTP版本,通常为 "HTTP/1.0" 或 "HTTP/1.1"

$request_body POST请求参数,参数需放form中

 

token $http_token (自定义header字段前加http_,即可将指定的自定义header字段打印到log中。)

version $arg_version (自定义body字段前加arg_,即可将指定的自定义header字段打印到log中。)

 

Nginx配置文件中配置输出日志格式:

log_format user_log_format "$remote_addr,$time_local,$status,$body_bytes_sent,$http_user_agent,$http_referer,$request_method,$request_time,$request_uri,$server_protocol,$request_body,$http_token";

示例:

1.119.140.194,29/Dec/2018:02:08:50 +0000,200,556,okhttp/3.8.1,-,GET,0.028,/tv/phone/resource_types?type=3,HTTP/1.1,-,-

 

2.Flume采集清洗

Nginx+Flume+Hive日志采集

flume依赖的包:

Nginx+Flume+Hive日志采集

flume-conf.properties:

# agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1

# sources
a1.sources.s1.type = spooldir
a1.sources.s1.channels = c1
# FTP路径
a1.sources.s1.spoolDir = /var/ftp/
# 自定义拦截器
a1.sources.s1.interceptors = f1
a1.sources.s1.interceptors.f1.type = com.hx.common.flume.FlumeBuilder

# sink
a1.sinks.k1.type=hive
a1.sinks.k1.channel = c1
# hive地址
a1.sinks.k1.hive.metastore=thrift://*.*.*.*:9083
a1.sinks.k1.hive.database=hive_test
a1.sinks.k1.hive.table=nginx_acc_log
a1.sinks.k1.serializer=delimited
# 输入分隔符
a1.sinks.k1.serializer.delimiter=","
# 输出分隔符
a1.sinks.k1.serializer.serdeSeparator=','
a1.sinks.k1.serializer.fieldnames=remote_addr,time_local,status,body_bytes_sent,http_user_agent,http_referer,request_method,request_time,request_uri,server_protocol,request_body,http_token,id,appkey,sing,version

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100

 

启动flume:

nohup ./flume-ng agent -c /opt/flume/apache-flume/conf -f /opt/flume/apache-flume/conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console &

 

经过Flume拦截器处理后字段:

remote_addr,time_local,status,body_bytes_sent,http_user_agent,http_referer,request_method,request_time,request_uri,server_protocol,request_body,http_token,id,appkey,sing,version

 

3.Hive操作

先切换账户:

su hadoop

启动hive:

nohup hive --service metastore >> ~/metastore.log 2>&1 &        

nohup  hive --service hiveserver2 >> ~/hiveserver2.log 2>&1 &   

 

修改权限:

hdfs dfs -ls /user/hive/warehouse

hadoop dfs -chmod 777 /user/hive/warehouse/hive_test.db

 

建表语句:(与flume输出一致)

DROP TABLE IF EXISTS nginx_acc_log;

 

create table nginx_acc_log(remote_addr string,time_local string,status string,body_bytes_sent string,http_user_agent string,http_referer string,request_method string,request_time string,request_uri string,server_protocol string,request_body string,http_token string,id string,appkey string,sing string,version string) clustered by (id) into 5 buckets stored as orc TBLPROPERTIES ('transactional'='true');

 

设置hive事务:

<!--支持事务-->
<property>
    <name>hive.support.concurrency</name>
    <value>true</value>
</property>
<property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>nonstrict</value>
</property>
<property>
    <name>hive.txn.manager</name>
    <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
    <name>hive.compactor.initiator.on</name>
    <value>true</value>
</property>
<property>
    <name>hive.compactor.worker.threads</name>
    <value>5</value>
</property>
<property>
    <name>hive.enforce.bucketing</name>
    <value>true</value>
</property>

 

显示表:

hive> show databases;

hive> use hive_test;

hive> show tables;

hive> select * from nginx_acc_log;

 

 

 

相关文章: