druid 是一个基于列存储的适合实时数据分析的分布式系统,所有节点可以分成3个部分:master、query、data,分别运行相应的服务,如下图所示:
Historical: 历史节点的职责主要是对历史的数据进行存储和查询。它们通过Zookeeper来声明自己存储的节点,同时也通过zookeeper来监听加载或删除Segment的信号。Segment是按时间段划分的一个个数据块。
Coordinator:协调节点监测一组历史节点来保证数据的可用和冗余。通过zk来感知Historical节点的存在,通过在Zookeeper上创建entry来和Historical节点通信来告诉他们加载或者删除Segment。
Overlord:节点负责接受任务,向MiddleManager分发任务,创建锁,和返回状态给调用者。
Middle Manager:执行提交任务的工作节点。将任务分发到peons,peon组件用来跑索引任务。
Broker:节点接收外部客户端的查询,并且将查询路由到历史节点和实时节点。
druid依赖一些外部组件:
- Zookeeper集群:用于节点之间的通信。
- 元数据存储实例:Mysql,用于存储segments所在位置,任务的进度等。
- Deep Storage(可选):HDFS,用于存储历史不变的segments。
数据结构:
导入的数据按照时间被存储到不同的segment中,segment中的每一列主要分三种类型,时间列,维度列和指标列。druid的一个特性是可以在导入时通过配置文件,选择demensions进行聚合,类似sql语句中的count(*),生成新的metrics字段,从而减少存储空间和提高查询效率。
数据导入:
主要是通过overlord注册任务,然后分发给不同的Middle Manager,所有传入的事件数据会先维持一个内存中的索引缓存, 随着事件数据的传入,这些索引会逐步递增,并且这些索引是可以立即查询的,达到一定条件如定期一小时或者最大的事件条数等限制,会打包成一个segment,存储到historical节点中。
安装部署:
参考官方文档,druid对机器性能要求比较高,一般最少3台机器,分别用于部署master、data和query,自己在4核8G的机器上单机部署时,内存基本就被占满了,所以分在了两台机器上做测试,网页打开 broker_ip:8888,可以看到部署后的服务运行情况。
部署可参考文档:https://www.jianshu.com/p/10ad2d9d92d0
数据可以从kafka或者hdfs中导入,自己主要测试了通过kafka实时导入的方式:
首先配置一个监听kafka topic的配置文件ad-test.json,其中用到了dataSketch这个组件进行uv的精确统计:
1{
2 "type": "kafka",
3 "dataSchema": {
4 "dataSource": "ad-test", //写入druid的表名
5 "parser": {
6 "type": "string",
7 "parseSpec": {
8 "format": "json", //表示每条消息的格式为json
9 "timestampSpec": {
10 "column": "log_time",
11 "format": "posix"
12 },
13 "columns": [
14 ],
15 "dimensionsSpec": {
16 "dimensions": [
17 ],
18 "dimensionExclusions": ["level", "msg"]
19 }
20 }
21 },
22 "metricsSpec": [
23 {
24 "name" : "totalcount",
25 "type" : "count"
26 },
27 {
28 "name":"sketch_uid",
29 "type":"thetaSketch",
30 "fieldName":"copy_uid",
31 "isInputThetaSketch":"false",
32 "size":"1048576"
33 }
34 ],
35 "granularitySpec": {
36 "type": "uniform",
37 "segmentGranularity": "HOUR",
38 "queryGranularity": "MINUTE"
39 }
40 },
41 "tuningConfig": {
42 "type": "kafka",
43 "maxRowsPerSegment": 5000000
44 },
45 "ioConfig": {
46 "topic": "ad-test", //监听的kafka topic
47 "consumerProperties": {
48 "bootstrap.servers": "localhost:9092", //监听的kafka端口
49 "group.id": "supervisorGroup"
50 },
51 "taskCount": 1,
52 "replicas": 1,
53 "taskDuration": "PT1H"
54 }
55}
然后向overlord服务注册一个任务:
curl -X POST -H 'Content-Type: application/json' -d @ad-test.json http://overload_ip:8080/druid/indexer/v1/supervisor
在网页上task下可以看到正在运行的任务情况。