大数据篇:数据仓库案例
离线数据仓库
数据仓库(Data WareHouse)是为企业所有决策制定过程,提供所有系统数据支持的战略集合
通过对数据仓库中数据的分析,可以帮助企业,改进业务流程、控制、成本、提高产品质量等
数据仓库,并不是数据最终目的地,而是为数据最终的目的地做好准备:清洗、转义、分类、重组、合并、拆分、统计等等
1 项目简介
1.1 项目需求
- 用户行为数据采集平台搭建
- 业务数据采集平台搭建
- 数据仓库维度建模
- 分析:用户、流量、会员、商品、销售、地区、活动等主题
- 采用即席查询工具,随时进行指标分析
- 对集群性能进行监控,发生异常需要报警
- 元数据管理
- 质量监控
1.2 技术选型
- 数据采集功能如何技术选型
| 采集框架名称 | 主要功能 |
|---|---|
| Sqoop | 大数据平台和关系型数据库的导入导出 |
| Datax | 大数据平台和关系型数据库的导入导出 |
| flume | 擅长日志数据的采集和解析 |
| logstash | 擅长日志数据的采集和解析 |
| maxwell | 常用作实时解析mysql的binlog数据 |
| canal | 常用作实时解析mysql的binlog数据 |
| waterDrop | 数据导入导出工具 |
- 消息中间件的技术选型
| 开源MQ | 概述 |
|---|---|
| RabbitMQ | LShift 用Erlang实现,支持多协议,broker架构,重量级 |
| ZeroMQ | AMQP最初设计者iMatix公司实现,轻量消息内核,无broker设计。C++实现 |
| Kafka | LinkedIn用Scala语言实现,支持hadoop数据并行加载 |
| ActiveMQ | Apach的一种JMS具体实现,支持代理和p2p部署。支持多协议。Java实现 |
| Redis | Key-value NoSQL数据库,有MQ的功能 |
| MemcacheQ | 国人利用memcache缓冲队列协议开发的消息队列,C/C++实现 |
- 数据永久存储技术框架选型
| 框架名称 | 主要用途 |
|---|---|
| HDFS | 分布式文件存储系统 |
| Hbase | Key,value对的nosql数据库 |
| Kudu | Cloudera公司开源提供的类似于Hbase的数据存储 |
| Hive | 基于MR的数据仓库工具 |
- 数据离线计算框架技术选型(hive引擎)
| 框架名称 | 基本介绍 |
|---|---|
| MapReduce | 最早期的分布式文件计算系统 |
| Spark | 基于spark,一站式解决批流处理问题 |
| Flink | 基于flink,一站式解决批流处理问题 |
- 分析数据库选型
| 对比项目 | Druid | Kylin | Presto | Impala | ES |
|---|---|---|---|---|---|
| 亚秒级响应 | √ | √ | × | × | × |
| 百亿数据集 | √ | √ | √ | √ | √ |
| SQL支持 | √ | √ | √ | √ | √(需插件) |
| 离线 | √ | √ | √ | √ | √ |
| 实时 | √ | √ | × | × | × |
| 精确去重 | × | √ | √ | √ | × |
| 多表Join | × | √ | √ | √ | × |
| JDBC for BI | × | √ | √ | √ | × |
- 其他选型
- 任务调度:DolphinScheduler
- 集群监控:CM+CDH
- 元数据管理:Atlas
- BI工具:Zeppelin、Superset
1.3 架构
1.4 集群资源规划
- 如何确认集群规模(假设每台服务器8T磁盘,128G内存)
- 每天日活跃用户100万,每人一天平均100条:100万 * 100条 = 1亿条
- 每条日志1K左右,每天1一条:1亿 / 1024 /1024 = 约100G
- 半年内不扩容服务器来算:100G * 180天 = 约18T
- 保存3个副本:18T * 3 = 54T
- 预留20% ~ 30%BUF:54T / 0.7 = 77T
- 总结:约10台服务器
由于资源有限,采用3台进行制作
| 服务名称 | 子服务 | 服务器 cdh01.cm | 服务器 cdh02.cm | 服务器 cdh03.cm |
|---|---|---|---|---|
| HDFS | NameNode DataNode SecondaryNameNode |
√ √ |
√ |
√ √ |
| Yarn | NodeManager Resourcemanager |
√ |
√ √ |
√ |
| Zookeeper | Zookeeper Server | √ | √ | √ |
| Flume | Flume Flume(消费 Kafka) |
√ |
√ |
√ |
| Kafka | Kafka | √ | √ | √ |
| Hive | Hive | √ | ||
| MySQL | MySQL | √ | ||
| Sqoop | Sqoop | √ | ||
| Presto | Coordinator Worker |
√ |
√ |
√ |
| DolphinScheduler | DolphinScheduler | √ | ||
| Druid | Druid | √ | √ | √ |
| Kylin | Kylin | √ | ||
| Hbase | HMaster HRegionServer |
√ √ |
√ |
√ |
| Superset | Superset | √ | ||
| Atlas | Atlas | √ | ||
| Solr | Solr | √ |
2 数据生成模块
此模块主要针对于用户行为数据的采集,为什么要进行用户行为数据的采集呢?
因为对于企业来说,用户就是钱,需要将用户的习惯等数据进行采集,以便在大数据衍生产品如用户画像标签系统进行分析,那么一般情况下用户的信息都是离线分析的,后期我们可以将分析结果存入ES等倒排索引生态中,在使用实时计算的方式匹配用户习惯,进行定制化推荐,更进一步的深度学习,对相似用户进行推荐。
2.1 埋点数据基本格式
-
公共字段:基本所有安卓手机都包含的字段
-
业务字段:埋点上报的字段,有具体的业务类型
{
"ap":"xxxxx",//项目数据来源 app pc
"cm": { //公共字段
"mid": "", // (String) 设备唯一标识
"uid": "", // (String) 用户标识
"vc": "1", // (String) versionCode,程序版本号
"vn": "1.0", // (String) versionName,程序版本名
"l": "zh", // (String) language 系统语言
"sr": "", // (String) 渠道号,应用从哪个渠道来的。
"os": "7.1.1", // (String) Android 系统版本
"ar": "CN", // (String) area 区域
"md": "BBB100-1", // (String) model 手机型号
"ba": "blackberry", // (String) brand 手机品牌
"sv": "V2.2.1", // (String) sdkVersion
"g": "", // (String) gmail
"hw": "1620x1080", // (String) heightXwidth,屏幕宽高
"t": "1506047606608", // (String) 客户端日志产生时的时间
"nw": "WIFI", // (String) 网络模式
"ln": 0, // (double) lng 经度
"la": 0 // (double) lat 纬度
},
"et": [ //事件
{
"ett": "1506047605364", //客户端事件产生时间
"en": "display", //事件名称
"kv": { //事件结果,以 key-value 形式自行定义
"goodsid": "236",
"action": "1",
"extend1": "1",
"place": "2",
"category": "75"
}
}
]
}
- 示例日志(服务器时间戳 | 日志),时间戳可以有效判定网络服务的通信时长:
1540934156385| {
"ap": "gmall", //数仓库名
"cm": {
"uid": "1234",
"vc": "2",
"vn": "1.0",
"la": "EN",
"sr": "",
"os": "7.1.1",
"ar": "CN",
"md": "BBB100-1",
"ba": "blackberry",
"sv": "V2.2.1",
"g": "abc@gmail.com",
"hw": "1620x1080",
"t": "1506047606608",
"nw": "WIFI",
"ln": 0,
"la": 0
},
"et": [
{
"ett": "1506047605364", //客户端事件产生时间
"en": "display", //事件名称
"kv": { //事件结果,以 key-value 形式自行定义
"goodsid": "236",
"action": "1",
"extend1": "1",
"place": "2",
"category": "75"
}
},{
"ett": "1552352626835",
"en": "active_background",
"kv": {
"active_source": "1"
}
}
]
}
}
2.2 埋点事件日志数据
2.2.1 商品列表页
- 事件名称:loading
| 标签 | 含义 |
|---|---|
| action | 动作:开始加载=1,加载成功=2,加载失败=3 |
| loading_time | 加载时长:计算下拉开始到接口返回数据的时间,(开始加载报 0,加载成 功或加载失败才上报时间) |
| loading_way | 加载类型:1-读取缓存,2-从接口拉新数据 (加载成功才上报加载类型) |
| extend1 | 扩展字段 Extend1 |
| extend2 | 扩展字段 Extend2 |
| type | 加载类型:自动加载=1,用户下拽加载=2,底部加载=3(底部条触发点击底部提示条/点击返回顶部加载) |
| type1 | 加载失败码:把加载失败状态码报回来(报空为加载成功,没有失败) |
2.2.2 商品点击
- 事件标签:display
| 标签 | 含义 |
|---|---|
| action | 动作:曝光商品=1,点击商品=2 |
| goodsid | 商品 ID(服务端下发的 ID) |
| place | 顺序(第几条商品,第一条为 0,第二条为 1,如此类推) |
| extend1 | 曝光类型:1 - 首次曝光 2-重复曝光 |
| category | 分类 ID(服务端定义的分类 ID) |
2.2.3 商品详情页
- 事件标签:newsdetail
| 标签 | 含义 |
|---|---|
| entry | 页面入口来源:应用首页=1、push=2、详情页相关推荐=3 |
| action | 动作:开始加载=1,加载成功=2(pv),加载失败=3, 退出页面=4 |
| goodsid | 商品 ID(服务端下发的 ID) |
| show_style | 商品样式:0、无图、1、一张大图、2、两张图、3、三张小图、4、一张小图、 5、一张大图两张小图 |
| news_staytime | 页面停留时长:从商品开始加载时开始计算,到用户关闭页面所用的时间。 若中途用跳转到其它页面了,则暂停计时,待回到详情页时恢复计时。或中 途划出的时间超过 10 分钟,则本次计时作废,不上报本次数据。如未加载成 功退出,则报空。 |
| loading_time | 加载时长:计算页面开始加载到接口返回数据的时间 (开始加载报 0,加载 成功或加载失败才上报时间) |
| type1 | 加载失败码:把加载失败状态码报回来(报空为加载成功,没有失败) |
| category | 分类 ID(服务端定义的分类 ID) |
2.2.4 广告
- 事件名称:ad
| 标签 | 含义 |
|---|---|
| entry | 入口:商品列表页=1 应用首页=2 商品详情页=3 |
| action | 动作: 广告展示=1 广告点击=2 |
| contentType | Type: 1 商品 2 营销活动 |
| displayMills | 展示时长 毫秒数 |
| itemId | 商品 id |
| activityId | 营销活动 id |
2.2.5 消息通知
- 事件标签:notification
| 标签 | 含义 |
|---|---|
| action | 动作:通知产生=1,通知弹出=2,通知点击=3,常驻通知展示(不重复上 报,一天之内只报一次)=4 |
| type | 通知 id:预警通知=1,天气预报(早=2,晚=3),常驻=4 |
| ap_time | 客户端弹出时间 |
| content | 备用字段 |
2.2.6 用户后台活跃
- 事件标签: active_background
| 标签 | 含义 |
|---|---|
| active_source | 1=upgrade,2=download(下载),3=plugin_upgrade |
2.2.7 评论
- 描述:评论表(comment)
| 序号 | 字段名称 | 字段描述 | 字段类型 | 长度 | 允许空 | 缺省值 |
|---|---|---|---|---|---|---|
| 1 | comment_id | 评论表 | int | 10,0 | ||
| 2 | userid | 用户 id | int | 10,0 | √ | 0 |
| 3 | p_comment_id | 父级评论 id(为 0 则是 一级评论,不 为 0 则是回复) |
int | 10,0 | √ | |
| 4 | content | 评论内容 | string | 1000 | √ | |
| 5 | addtime | 创建时间 | string | √ | ||
| 6 | other_id | 评论的相关 id | int | 10,0 | √ | |
| 7 | praise_count | 点赞数量 | int | 10,0 | √ | 0 |
| 8 | reply_count | 回复数量 | int | 10,0 | √ | 0 |
2.2.8 收藏
- 描述:收藏(favorites)
| 序号 | 字段名称 | 字段描述 | 字段类型 | 长度 | 允许空 | 缺省值 |
|---|---|---|---|---|---|---|
| 1 | id | 主键 | int | 10,0 | ||
| 2 | course_id | 商品 id | int | 10,0 | √ | 0 |
| 3 | userid | 用户 ID | int | 10,0 | √ | 0 |
| 4 | add_time | 创建时间 | string | √ |
2.2.9 点赞
- 描述:所有的点赞表(praise)
| 序号 | 字段名称 | 字段描述 | 字段类型 | 长度 | 允许空 | 缺省值 |
|---|---|---|---|---|---|---|
| 1 | id | 主键 id | int | 10,0 | ||
| 2 | userid | 用户 id | int | 10,0 | √ | |
| 3 | target_id | 点赞的对象 id | int | 10,0 | √ | |
| 4 | type | 创建点赞类型:1问答点赞 2问答评论点赞 3文章点赞数 4评论点赞 |
int | 10,0 | √ | |
| 5 | add_time | 添加时间 | string | √ |
2.2.10 错误日志
| errorBrief | 错误摘要 |
|---|---|
| errorBrief | 错误详情 |
2.3 埋点启动日志数据
{
"action":"1",
"ar":"MX",
"ba":"HTC",
"detail":"",
"en":"start",
"entry":"2",
"extend1":"",
"g":"43R2SEQX@gmail.com",
"hw":"640*960",
"l":"en",
"la":"20.4",
"ln":"-99.3",
"loading_time":"2",
"md":"HTC-2",
"mid":"995",
"nw":"4G",
"open_ad_type":"2",
"os":"8.1.2",
"sr":"B",
"sv":"V2.0.6",
"t":"1561472502444",
"uid":"995",
"vc":"10",
"vn":"1.3.4"
}
- 事件标签: start
| 标签 | 含义 |
|---|---|
| entry | 入 口 : push=1 , widget=2 , icon=3 , notification=4, lockscreen_widget =5 |
| open_ad_type | 开屏广告类型: 开屏原生广告=1, 开屏插屏广告=2 |
| action | 状态:成功=1 失败=2 |
| loading_time | 加载时长:计算下拉开始到接口返回数据的时间,(开始加载报 0,加载成 功或加载失败才上报时间) |
| detail | 失败码(没有则上报空) |
| extend1 | 失败的 message(没有则上报空) |
| en | 日志类型 start |
2.4 数据生成脚本
如下案例中将省略图中红框中的日志生成过程,直接使用Java程序构建logFile文件。
2.4.1 数据生成格式
- 启动日志
{"action":"1","ar":"MX","ba":"Sumsung","detail":"201","en":"start","entry":"4","extend1":"","g":"69021X1Q@gmail.com","hw":"1080*1920","l":"pt","la":"-11.0","ln":"-70.0","loading_time":"9","md":"sumsung-5","mid":"244","nw":"3G","open_ad_type":"1","os":"8.2.3","sr":"D","sv":"V2.1.3","t":"1589612165914","uid":"244","vc":"16","vn":"1.2.1"}
- 事件日志(由于转换问题,图中没有 "时间戳|")
1589695383284|{"cm":{"ln":"-79.4","sv":"V2.5.3","os":"8.0.6","g":"81614U54@gmail.com","mid":"245","nw":"WIFI","l":"pt","vc":"6","hw":"1080*1920","ar":"MX","uid":"245","t":"1589627025851","la":"-39.6","md":"HTC-7","vn":"1.3.5","ba":"HTC","sr":"N"},"ap":"app","et":[{"ett":"1589650631883","en":"display","kv":{"goodsid":"53","action":"2","extend1":"2","place":"3","category":"50"}},{"ett":"1589690866312","en":"newsdetail","kv":{"entry":"3","goodsid":"54","news_staytime":"1","loading_time":"6","action":"4","showtype":"0","category":"78","type1":""}},{"ett":"1589641734037","en":"loading","kv":{"extend2":"","loading_time":"0","action":"1","extend1":"","type":"2","type1":"201","loading_way":"2"}},{"ett":"1589687684878","en":"ad","kv":{"activityId":"1","displayMills":"92030","entry":"3","action":"5","contentType":"0"}},{"ett":"1589632980772","en":"active_background","kv":{"active_source":"1"}},{"ett":"1589682030324","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\n at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1589675065650","en":"comment","kv":{"p_comment_id":2,"addtime":"1589624299628","praise_count":509,"other_id":6,"comment_id":7,"reply_count":35,"userid":3,"content":"关色芦候佰间纶珊斑禁尹赞涤仇彭企呵姜毅"}},{"ett":"1589631359459","en":"favorites","kv":{"course_id":7,"id":0,"add_time":"1589681240066","userid":7}},{"ett":"1589616574187","en":"praise","kv":{"target_id":1,"id":7,"type":3,"add_time":"1589642497314","userid":8}}]}
2.4.2 创建maven工程
- data-producer:pom.xml
<!--版本号统一-->
<properties>
<slf4j.version>1.7.20</slf4j.version>
<logback.version>1.0.7</logback.version>
</properties>
<dependencies> <!--阿里巴巴开源 json 解析框架-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency> <!--日志生成框架-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!--主类名-->
<mainClass>com.heaton.bigdata.datawarehouse.app.App</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
- data-producer:logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false"> <!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径 -->
<property name="LOG_HOME" value="/root/logs/"/> <!-- 控制台输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder
class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!--格式化输出:%d 表示日期,%thread 表示线程名,%-5level:级别从左显示 5 个字符宽度%msg: 日志消息,%n 是换行符 -->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender> <!-- 按照每天生成日志文件。存储事件日志 -->
<appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- <File>${LOG_HOME}/app.log</File>设置日志不超过${log.max.size}时的保存路径,注意, 如果是 web 项目会保存到 Tomcat 的 bin 目录 下 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--日志文件输出的文件名 -->
<FileNamePattern>${LOG_HOME}/app-%d{yyyy-MM-dd}.log</FileNamePattern> <!--日志文件保留天数 -->
<MaxHistory>30</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%msg%n</pattern>
</encoder> <!--日志文件最大的大小 -->
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<MaxFileSize>10MB</MaxFileSize>
</triggeringPolicy>
</appender> <!--异步打印日志-->
<appender name="ASYNC_FILE"
class="ch.qos.logback.classic.AsyncAppender"> <!-- 不丢失日志.默认的,如果队列的 80%已满,则会丢弃 TRACT、DEBUG、INFO 级别的日志 -->
<discardingThreshold>0</discardingThreshold> <!-- 更改默认的队列的深度,该值会影响性能.默认值为 256 -->
<queueSize>512</queueSize> <!-- 添加附加的 appender,最多只能添加一个 -->
<appender-ref ref="FILE"/>
</appender> <!-- 日志输出级别 -->
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="ASYNC_FILE"/>
<appender-ref ref="error"/>
</root>
</configuration>
- data-flume:pom.xml
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
- hive-function:pom.xml
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
2.4.3 各事件bean
data-producer工程
2.4.3.1 公共日志类
import lombok.Data;
/**
* @author Heaton
* @email 70416450@qq.com
* @date 2020/4/25 14:54
* @describe 公共日志类
*/
@Data
public class AppBase {
private String mid; // (String) 设备唯一
private String uid; // (String) 用户 uid
private String vc; // (String) versionCode,程序版本号
private String vn; // (String) versionName,程序版本名
private String l; // (String) 系统语言
private String sr; // (String) 渠道号,应用从哪个渠道来的。
private String os; // (String) Android 系统版本
private String ar; // (String) 区域
private String md; // (String) 手机型号
private String ba; // (String) 手机品牌
private String sv; // (String) sdkVersion
private String g; // (String) gmail
private String hw; // (String) heightXwidth,屏幕宽高
private String t; // (String) 客户端日志产生时的时间
private String nw; // (String) 网络模式
private String ln; // (double) lng 经度
private String la; // (double) lat 纬度
}
2.4.3.2 启动日志类
import lombok.Data;
/**
* @author Heaton
* @email 70416450@qq.com
* @date 2020/4/25 14:54
* @describe 启动日志类
*/
@Data
public class AppStart extends AppBase {
private String entry;//入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget
private String open_ad_type;//开屏广告类型: 开屏原生广告=1, 开屏插屏广告=2
private String action;//状态:成功=1 失败=2
private String loading_time;//加载时长:计算下拉开始到接口返回数据的时间,(开始加载报 0,加载成功或加载失败才上报时间)
private String detail;//失败码(没有则上报空)
private String extend1;//失败的 message(没有则上报空)
private String en;//启动日志类型标记
}
2.4.3.3 错误日志类
import lombok.Data;
/**
* @author Heaton
* @email 70416450@qq.com
* @date 2020/4/25 14:54
* @describe 错误日志类
*/
@Data
public class AppErrorLog {
private String errorBrief; //错误摘要
private String errorDetail; //错误详情
}
2.4.3.4 商品点击日志类
import lombok.Data;
/**
* @author Heaton
* @email 70416450@qq.com
* @date 2020/4/25 14:54
* @describe 商品点击日志类
*/
@Data
public class AppDisplay {
private String action;//动作:曝光商品=1,点击商品=2
private String goodsid;//商品 ID(服务端下发的 ID)
private String place;//顺序(第几条商品,第一条为 0,第二条为 1,如此类推)
private String extend1;//曝光类型:1 - 首次曝光 2-重复曝光(没有使用)
private String category;//分类 ID(服务端定义的分类 ID)
}
2.4.3.5 商品详情类
import lombok.Data;
/**
* @author Heaton
* @email 70416450@qq.com
* @date 2020/4/25 14:54
* @describe 商品详情类
*/
@Data
public class AppNewsDetail {
private String entry;//页面入口来源:应用首页=1、push=2、详情页相关推荐
private String action;//动作:开始加载=1,加载成功=2(pv),加载失败=3, 退出页面=4
private String goodsid;//商品 ID(服务端下发的 ID)
private String showtype;//商品样式:0、无图 1、一张大图 2、两张图 3、三张小图 4、一张小 图 5、一张大图两张小图 来源于详情页相关推荐的商品,上报样式都为 0(因为都是左文右图)
private String news_staytime;//页面停留时长:从商品开始加载时开始计算,到用户关闭页面 所用的时间。若中途用跳转到其它页面了,则暂停计时,待回到详情页时恢复计时。或中途划出的时间超 过 10 分钟,则本次计时作废,不上报本次数据。如未加载成功退出,则报空。
private String loading_time;//加载时长:计算页面开始加载到接口返回数据的时间 (开始加 载报 0,加载成功或加载失败才上报时间)
private String type1;//加载失败码:把加载失败状态码报回来(报空为加载成功,没有失败)
private String category;//分类 ID(服务端定义的分类 ID)
}
2.4.3.6 商品列表类
import lombok.Data;
/**
* @author Heaton
* @email 70416450@qq.com
* @date 2020/4/25 14:54
* @describe 商品列表类
*/
@Data
public class AppLoading {
private String action;//动作:开始加载=1,加载成功=2,加载失败
private String loading_time;//加载时长:计算下拉开始到接口返回数据的时间,(开始加载报 0, 加载成功或加载失败才上报时间)
private String loading_way;//加载类型:1-读取缓存,2-从接口拉新数据 (加载成功才上报加 载类型)
private String extend1;//扩展字段 Extend1
private String extend2;//扩展字段 Extend2
private String type;//加载类型:自动加载=1,用户下拽加载=2,底部加载=3(底部条触发点击底 部提示条/点击返回顶部加载)
private String type1;//加载失败码:把加载失败状态码报回来(报空为加载成功,没有失败)
}
2.4.3.7 广告类
import lombok.Data;
/**
* @author Heaton
* @email 70416450@qq.com
* @date 2020/4/25 14:54
* @describe 广告类
*/
@Data
public class AppAd {
private String entry;//入口:商品列表页=1 应用首页=2 商品详情页=3
private String action;//动作: 广告展示=1 广告点击=2
private String contentType;//Type: 1 商品 2 营销活动
private String displayMills;//展示时长 毫秒数
private String itemId; //商品id
private String activityId; //营销活动id
}
2.4.3.8 消息通知日志类
import lombok.Data;
/**
* @author Heaton
* @email 70416450@qq.com
* @date 2020/4/25 14:54
* @describe 消息通知日志类
*/
@Data
public class AppNotification {
private String action;//动作:通知产生=1,通知弹出=2,通知点击=3,常驻通知展示(不重复上 报,一天之内只报一次)
private String type;//通知 id:预警通知=1,天气预报(早=2,晚=3),常驻=4
private String ap_time;//客户端弹出时间
private String content;//备用字段
}
2.4.3.9 用户后台活跃类
import lombok.Data;
/**
* @author Heaton
* @email 70416450@qq.com
* @date 2020/4/25 14:54
* @describe 用户后台活跃类
*/
@Data
public class AppActive {
private String active_source;//1=upgrade,2=download(下载),3=plugin_upgrade
}
2.4.3.10 用户评论类
import lombok.Data;
/**
* @author Heaton
* @email 70416450@qq.com
* @date 2020/4/25 14:54
* @describe 用户评论类
*/
@Data
public class AppComment {
private int comment_id;//评论表
private int userid;//用户 id
private int p_comment_id;//父级评论 id(为 0 则是一级评论,不为 0 则是回复)
private String content;//评论内容
private String addtime;//创建时间
private int other_id;//评论的相关 id
private int praise_count;//点赞数量
private int reply_count;//回复数量
}
2.4.3.11 用户收藏类
import lombok.Data;
/**
* @author Heaton
* @email 70416450@qq.com
* @date 2020/4/25 14:54
* @describe 用户收藏类
*/
@Data
public class AppFavorites {
private int id;//主键
private int course_id;//商品 id
private int userid;//用户 ID
private String add_time;//创建时间
}
2.4.3.12 用户点赞类
import lombok.Data;
/**
* @author Heaton
* @email 70416450@qq.com
* @date 2020/4/25 14:54
* @describe 用户点赞类
*/
@Data
public class AppPraise {
private int id; //主键 id
private int userid;//用户 id
private int target_id;//点赞的对象 id
private int type;//点赞类型 1 问答点赞 2 问答评论点赞 3 文章点赞数 4 评论点赞
private String add_time;//添加时间
}
2.4.4 启动类
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.util.Random;
/**
* @author Heaton
* @email 70416450@qq.com
* @date 2020/4/25 14:54
* @describe 启动类
*/
public class App {
private final static Logger logger = LoggerFactory.getLogger(App.class);
private static Random rand = new Random();
// 设备id
private static int s_mid = 0;
// 用户id
private static int s_uid = 0;
// 商品id
private static int s_goodsid = 0;
public static void main(String[] args) {
// 参数一:控制发送每条的延时时间,默认是0
Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;
// 参数二:循环遍历次数
int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;
// 生成数据
generateLog(delay, loop_len);
}
private static void generateLog(Long delay, int loop_len) {
for (int i = 0; i < loop_len; i++) {
int flag = rand.nextInt(2);
switch (flag) {
case (0):
//应用启动
AppStart appStart = generateStart();
String jsonString = JSON.toJSONString(appStart);
//控制台打印
logger.info(jsonString);
break;
case (1):
JSONObject json = new JSONObject();
json.put("ap", "app");
json.put("cm", generateComFields());
JSONArray eventsArray = new JSONArray();
// 事件日志
// 商品点击,展示
if (rand.nextBoolean()) {
eventsArray.add(generateDisplay());
json.put("et", eventsArray);
}
// 商品详情页
if (rand.nextBoolean()) {
eventsArray.add(generateNewsDetail());
json.put("et", eventsArray);
}
// 商品列表页
if (rand.nextBoolean()) {
eventsArray.add(generateNewList());
json.put("et", eventsArray);
}
// 广告
if (rand.nextBoolean()) {
eventsArray.add(generateAd());
json.put("et", eventsArray);
}
// 消息通知
if (rand.nextBoolean()) {
eventsArray.add(generateNotification());
json.put("et", eventsArray);
}
// 用户后台活跃
if (rand.nextBoolean()) {
eventsArray.add(generateBackground());
json.put("et", eventsArray);
}
//故障日志
if (rand.nextBoolean()) {
eventsArray.add(generateError());
json.put("et", eventsArray);
}
// 用户评论
if (rand.nextBoolean()) {
eventsArray.add(generateComment());
json.put("et", eventsArray);
}
// 用户收藏
if (rand.nextBoolean()) {
eventsArray.add(generateFavorites());
json.put("et", eventsArray);
}
// 用户点赞
if (rand.nextBoolean()) {
eventsArray.add(generatePraise());
json.put("et", eventsArray);
}
//时间
long millis = System.currentTimeMillis();
//控制台打印
logger.info(millis + "|" + json.toJSONString());
break;
}
// 延迟
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 公共字段设置
*/
private static JSONObject generateComFields() {
AppBase appBase = new AppBase();
//设备id
appBase.setMid(s_mid + "");
s_mid++;
// 用户id
appBase.setUid(s_uid + "");
s_uid++;
// 程序版本号 5,6等
appBase.setVc("" + rand.nextInt(20));
//程序版本名 v1.1.1
appBase.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10));
// 安卓系统版本
appBase.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10));
// 语言 es,en,pt
int flag = rand.nextInt(3);
switch (flag) {
case (0):
appBase.setL("es");
break;
case (1):
appBase.setL("en");
break;
case (2):
appBase.setL("pt");
break;
}
// 渠道号 从哪个渠道来的
appBase.setSr(getRandomChar(1));
// 区域
flag = rand.nextInt(2);
switch (flag) {
case 0:
appBase.setAr("BR");
case 1:
appBase.setAr("MX");
}
// 手机品牌 ba ,手机型号 md,就取2位数字了
flag = rand.nextInt(3);
switch (flag) {
case 0:
appBase.setBa("Sumsung");
appBase.setMd("sumsung-" + rand.nextInt(20));
break;
case 1:
appBase.setBa("Huawei");
appBase.setMd("Huawei-" + rand.nextInt(20));
break;
case 2:
appBase.setBa("HTC");
appBase.setMd("HTC-" + rand.nextInt(20));
break;
}
// 嵌入sdk的版本
appBase.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10));
// gmail
appBase.setG(getRandomCharAndNumr(8) + "@gmail.com");
// 屏幕宽高 hw
flag = rand.nextInt(4);
switch (flag) {
case 0:
appBase.setHw("640*960");
break;
case 1:
appBase.setHw("640*1136");
break;
case 2:
appBase.setHw("750*1134");
break;
case 3:
appBase.setHw("1080*1920");
break;
}
// 客户端产生日志时间
long millis = System.currentTimeMillis();
appBase.setT("" + (millis - rand.nextInt(99999999)));
// 手机网络模式 3G,4G,WIFI
flag = rand.nextInt(3);
switch (flag) {
case 0:
appBase.setNw("3G");
break;
case 1:
appBase.setNw("4G");
break;
case 2:
appBase.setNw("WIFI");
break;
}
// 拉丁美洲 西经34°46′至西经117°09;北纬32°42′至南纬53°54′
// 经度
appBase.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + "");
// 纬度
appBase.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + "");
return (JSONObject) JSON.toJSON(appBase);
}
/**
* 商品展示事件
*/
private static JSONObject generateDisplay() {
AppDisplay appDisplay = new AppDisplay();
boolean boolFlag = rand.nextInt(10) < 7;
// 动作:曝光商品=1,点击商品=2,
if (boolFlag) {
appDisplay.setAction("1");
} else {
appDisplay.setAction("2");
}
// 商品id
String goodsId = s_goodsid + "";
s_goodsid++;
appDisplay.setGoodsid(goodsId);
// 顺序 设置成6条吧
int flag = rand.nextInt(6);
appDisplay.setPlace("" + flag);
// 曝光类型
flag = 1 + rand.nextInt(2);
appDisplay.setExtend1("" + flag);
// 分类
flag = 1 + rand.nextInt(100);
appDisplay.setCategory("" + flag);
JSONObject jsonObject = (JSONObject) JSON.toJSON(appDisplay);
return packEventJson("display", jsonObject);
}
/**
* 商品详情页
*/
private static JSONObject generateNewsDetail() {
AppNewsDetail appNewsDetail = new AppNewsDetail();
// 页面入口来源
int flag = 1 + rand.nextInt(3);
appNewsDetail.setEntry(flag + "");
// 动作
appNewsDetail.setAction("" + (rand.nextInt(4) + 1));
// 商品id
appNewsDetail.setGoodsid(s_goodsid + "");
// 商品来源类型
flag = 1 + rand.nextInt(3);
appNewsDetail.setShowtype(flag + "");
// 商品样式
flag = rand.nextInt(6);
appNewsDetail.setShowtype("" + flag);
// 页面停留时长
flag = rand.nextInt(10) * rand.nextInt(7);
appNewsDetail.setNews_staytime(flag + "");
// 加载时长
flag = rand.nextInt(10) * rand.nextInt(7);
appNewsDetail.setLoading_time(flag + "");
// 加载失败码
flag = rand.nextInt(10);
switch (flag) {
case 1:
appNewsDetail.setType1("102");
break;
case 2:
appNewsDetail.setType1("201");
break;
case 3:
appNewsDetail.setType1("325");
break;
case 4:
appNewsDetail.setType1("433");
break;
case 5:
appNewsDetail.setType1("542");
break;
default:
appNewsDetail.setType1("");
break;
}
// 分类
flag = 1 + rand.nextInt(100);
appNewsDetail.setCategory("" + flag);
JSONObject eventJson = (JSONObject) JSON.toJSON(appNewsDetail);
return packEventJson("newsdetail", eventJson);
}
/**
* 商品列表
*/
private static JSONObject generateNewList() {
AppLoading appLoading = new AppLoading();
// 动作
int flag = rand.nextInt(3) + 1;
appLoading.setAction(flag + "");
// 加载时长
flag = rand.nextInt(10) * rand.nextInt(7);
appLoading.setLoading_time(flag + "");
// 失败码
flag = rand.nextInt(10);
switch (flag) {
case 1:
appLoading.setType1("102");
break;
case 2:
appLoading.setType1("201");
break;
case 3:
appLoading.setType1("325");
break;
case 4:
appLoading.setType1("433");
break;
case 5:
appLoading.setType1("542");
break;
default:
appLoading.setType1("");
break;
}
// 页面 加载类型
flag = 1 + rand.nextInt(2);
appLoading.setLoading_way("" + flag);
// 扩展字段1
appLoading.setExtend1("");
// 扩展字段2
appLoading.setExtend2("");
// 用户加载类型
flag = 1 + rand.nextInt(3);
appLoading.setType("" + flag);
JSONObject jsonObject = (JSONObject) JSON.toJSON(appLoading);
return packEventJson("loading", jsonObject);
}
/**
* 广告相关字段
*/
private static JSONObject generateAd() {
AppAd appAd = new AppAd();
// 入口
int flag = rand.nextInt(3) + 1;
appAd.setEntry(flag + "");
// 动作
flag = rand.nextInt(5) + 1;
appAd.setAction(flag + "");
// 内容类型类型
flag = rand.nextInt(6) + 1;
appAd.setContentType(flag + "");
// 展示样式
flag = rand.nextInt(120000) + 1000;
appAd.setDisplayMills(flag + "");
flag = rand.nextInt(1);
if (flag == 1) {
appAd.setContentType(flag + "");
flag = rand.nextInt(6);
appAd.setItemId(flag + "");
} else {
appAd.setContentType(flag + "");
flag = rand.nextInt(1) + 1;
appAd.setActivityId(flag + "");
}
JSONObject jsonObject = (JSONObject) JSON.toJSON(appAd);
return packEventJson("ad", jsonObject);
}
/**
* 启动日志
*/
private static AppStart generateStart() {
AppStart appStart = new AppStart();
//设备id
appStart.setMid(s_mid + "");
s_mid++;
// 用户id
appStart.setUid(s_uid + "");
s_uid++;
// 程序版本号 5,6等
appStart.setVc("" + rand.nextInt(20));
//程序版本名 v1.1.1
appStart.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10));
// 安卓系统版本
appStart.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10));
//设置日志类型
appStart.setEn("start");
// 语言 es,en,pt
int flag = rand.nextInt(3);
switch (flag) {
case (0):
appStart.setL("es");
break;
case (1):
appStart.setL("en");
break;
case (2):
appStart.setL("pt");
break;
}
// 渠道号 从哪个渠道来的
appStart.setSr(getRandomChar(1));
// 区域
flag = rand.nextInt(2);
switch (flag) {
case 0:
appStart.setAr("BR");
case 1:
appStart.setAr("MX");
}
// 手机品牌 ba ,手机型号 md,就取2位数字了
flag = rand.nextInt(3);
switch (flag) {
case 0:
appStart.setBa("Sumsung");
appStart.setMd("sumsung-" + rand.nextInt(20));
break;
case 1:
appStart.setBa("Huawei");
appStart.setMd("Huawei-" + rand.nextInt(20));
break;
case 2:
appStart.setBa("HTC");
appStart.setMd("HTC-" + rand.nextInt(20));
break;
}
// 嵌入sdk的版本
appStart.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10));
// gmail
appStart.setG(getRandomCharAndNumr(8) + "@gmail.com");
// 屏幕宽高 hw
flag = rand.nextInt(4);
switch (flag) {
case 0:
appStart.setHw("640*960");
break;
case 1:
appStart.setHw("640*1136");
break;
case 2:
appStart.setHw("750*1134");
break;
case 3:
appStart.setHw("1080*1920");
break;
}
// 客户端产生日志时间
long millis = System.currentTimeMillis();
appStart.setT("" + (millis - rand.nextInt(99999999)));
// 手机网络模式 3G,4G,WIFI
flag = rand.nextInt(3);
switch (flag) {
case 0:
appStart.setNw("3G");
break;
case 1:
appStart.setNw("4G");
break;
case 2:
appStart.setNw("WIFI");
break;
}
// 拉丁美洲 西经34°46′至西经117°09;北纬32°42′至南纬53°54′
// 经度
appStart.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + "");
// 纬度
appStart.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + "");
// 入口
flag = rand.nextInt(5) + 1;
appStart.setEntry(flag + "");
// 开屏广告类型
flag = rand.nextInt(2) + 1;
appStart.setOpen_ad_type(flag + "");
// 状态
flag = rand.nextInt(10) > 8 ? 2 : 1;
appStart.setAction(flag + "");
// 加载时长
appStart.setLoading_time(rand.nextInt(20) + "");
// 失败码
flag = rand.nextInt(10);
switch (flag) {
case 1:
appStart.setDetail("102");
break;
case 2:
appStart.setDetail("201");
break;
case 3:
appStart.setDetail("325");
break;
case 4:
appStart.setDetail("433");
break;
case 5:
appStart.setDetail("542");
break;
default:
appStart.setDetail("");
break;
}
// 扩展字段
appStart.setExtend1("");
return appStart;
}
/**
* 消息通知
*/
private static JSONObject generateNotification() {
AppNotification appNotification = new AppNotification();
int flag = rand.nextInt(4) + 1;
// 动作
appNotification.setAction(flag + "");
// 通知id
flag = rand.nextInt(4) + 1;
appNotification.setType(flag + "");
// 客户端弹时间
appNotification.setAp_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");
// 备用字段
appNotification.setContent("");
JSONObject jsonObject = (JSONObject) JSON.toJSON(appNotification);
return packEventJson("notification", jsonObject);
}
/**
* 后台活跃
*/
private static JSONObject generateBackground() {
AppActive appActive_background = new AppActive();
// 启动源
int flag = rand.nextInt(3) + 1;
appActive_background.setActive_source(flag + "");
JSONObject jsonObject = (JSONObject) JSON.toJSON(appActive_background);
return packEventJson("active_background", jsonObject);
}
/**
* 错误日志数据
*/
private static JSONObject generateError() {
AppErrorLog appErrorLog = new AppErrorLog();
String[] errorBriefs = {"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)", "at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"}; //错误摘要
String[] errorDetails = {"java.lang.NullPointerException\\n " + "at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n " + "at cn.lift.dfdf.web.AbstractBaseController.validInbound", "at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n " + "at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n" + " at java.lang.reflect.Method.invoke(Method.java:606)\\n"}; //错误详情
//错误摘要
appErrorLog.setErrorBrief(errorBriefs[rand.nextInt(errorBriefs.length)]);
//错误详情
appErrorLog.setErrorDetail(errorDetails[rand.nextInt(errorDetails.length)]);
JSONObject jsonObject = (JSONObject) JSON.toJSON(appErrorLog);
return packEventJson("error", jsonObject);
}
/**
* 为各个事件类型的公共字段(时间、事件类型、Json数据)拼接
*/
private static JSONObject packEventJson(String eventName, JSONObject jsonObject) {
JSONObject eventJson = new JSONObject();
eventJson.put("ett", (System.currentTimeMillis() - rand.nextInt(99999999)) + "");
eventJson.put("en", eventName);
eventJson.put("kv", jsonObject);
return eventJson;
}
/**
* 获取随机字母组合
*
* @param length 字符串长度
*/
private static String getRandomChar(Integer length) {
StringBuilder str = new StringBuilder();
Random random = new Random();
for (int i = 0; i < length; i++) {
// 字符串
str.append((char) (65 + random.nextInt(26)));// 取得大写字母
}
return str.toString();
}
/**
* 获取随机字母数字组合
*
* @param length 字符串长度
*/
private static String getRandomCharAndNumr(Integer length) {
StringBuilder str = new StringBuilder();
Random random = new Random();
for (int i = 0; i < length; i++) {
boolean b = random.nextBoolean();
if (b) { // 字符串
// int choice = random.nextBoolean() ? 65 : 97; 取得65大写字母还是97小写字母
str.append((char) (65 + random.nextInt(26)));// 取得大写字母
} else { // 数字
str.append(String.valueOf(random.nextInt(10)));
}
}
return str.toString();
}
/**
* 收藏
*/
private static JSONObject generateFavorites() {
AppFavorites favorites = new AppFavorites();
favorites.setCourse_id(rand.nextInt(10));
favorites.setUserid(rand.nextInt(10));
favorites.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");
JSONObject jsonObject = (JSONObject) JSON.toJSON(favorites);
return packEventJson("favorites", jsonObject);
}
/**
* 点赞
*/
private static JSONObject generatePraise() {
AppPraise praise = new AppPraise();
praise.setId(rand.nextInt(10));
praise.setUserid(rand.nextInt(10));
praise.setTarget_id(rand.nextInt(10));
praise.setType(rand.nextInt(4) + 1);
praise.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");
JSONObject jsonObject = (JSONObject) JSON.toJSON(praise);
return packEventJson("praise", jsonObject);
}
/**
* 评论
*/
private static JSONObject generateComment() {
AppComment comment = new AppComment();
comment.setComment_id(rand.nextInt(10));
comment.setUserid(rand.nextInt(10));
comment.setP_comment_id(rand.nextInt(5));
comment.setContent(getCONTENT());
comment.setAddtime((System.currentTimeMillis() - rand.nextInt(99999999)) + "");
comment.setOther_id(rand.nextInt(10));
comment.setPraise_count(rand.nextInt(1000));
comment.setReply_count(rand.nextInt(200));
JSONObject jsonObject = (JSONObject) JSON.toJSON(comment);
return packEventJson("comment", jsonObject);
}
/**
* 生成单个汉字
*/
private static char getRandomChar() {
String str = "";
int hightPos; //
int lowPos;
Random random = new Random();
//随机生成汉子的两个字节
hightPos = (176 + Math.abs(random.nextInt(39)));
lowPos = (161 + Math.abs(random.nextInt(93)));
byte[] b = new byte[2];
b[0] = (Integer.valueOf(hightPos)).byteValue();
b[1] = (Integer.valueOf(lowPos)).byteValue();
try {
str = new String(b, "GBK");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
System.out.println("错误");
}
return str.charAt(0);
}
/**
* 拼接成多个汉字
*/
private static String getCONTENT() {
StringBuilder str = new StringBuilder();
for (int i = 0; i < rand.nextInt(100); i++) {
str.append(getRandomChar());
}
return str.toString();
}
}
2.4.5 启动测试
注意,需要将日志模拟放到2台服务器上,模拟日志每一条中即包括公共日志,又包含事件日志,需要flume拦截器进行日志分发,当然也需要两个flume-ng来做这个事情
打包上传2台服务器节点,生产数据为后面的测试做准备,这里为用户目录test文件夹下
通过参数控制生成消息速度及产量(如下 2秒一条,打印1000条)
#控制时间及条数
nohup java -jar data-producer-1.0-SNAPSHOT-jar-with-dependencies.jar 2000 1000 &
#监控日志
tail -F /root/logs/*.log
通过www.json.cn查看数据格式
3 创建KafKa-Topic
- 创建启动日志主题:topic_start
- 创建事件日志主题:topic_event
4 Flume准备
共分为2组flume
第一组:将服务器日志收集,并使用Kafka-Channels将数据发往Kafka不同的Topic,其中使用拦截器进行公共日志和事件日志的分发,
第二组:收集Kafka数据,使用Flie-Channels缓存数据,最终发往Hdfs保存
4.1 Flume:File->Kafka配置编写
- vim /root/test/file-flume-kafka.conf
#1 定义组件
a1.sources=r1
a1.channels=c1 c2
# 2 source配置 type类型 positionFile记录日志读取位置 filegroups读取哪些目录 app.+为读取什么开头 channels发往哪里
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/test/flume/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#3 拦截器 这里2个为自定义的拦截器 multiplexing为类型区分选择器 header头用于区分类型 mapping匹配头
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.heaton.bigdata.flume.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.heaton.bigdata.flume.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2
#4 channel配置 kafkaChannel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c2.type =org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
在生产日志的2台服务器节点上创建flume配置文件。
LogETLInterceptor,LogTypeInterceptor为自定义拦截
4.2 自定义拦截器
data-flume工程
- LogUtils
import org.apache.commons.lang.math.NumberUtils;
public class LogUtils {
public static boolean validateEvent(String log) {
/** 服务器时间 | json
1588319303710|{
"cm":{
"ln":"-51.5","sv":"V2.0.7","os":"8.0.8","g":"L1470998@gmail.com","mid":"13",
"nw":"4G","l":"en","vc":"7","hw":"640*960","ar":"MX","uid":"13","t":"1588291826938",
"la":"-38.2","md":"Huawei-14","vn":"1.3.6","ba":"Huawei","sr":"Y"
},
"ap":"app",
"et":[{
"ett":"1588228193191","en":"ad","kv":{"activityId":"1","displayMills":"113201","entry":"3","action":"5","contentType":"0"}
},{
"ett":"1588300304713","en":"notification","kv":{"ap_time":"1588277440794","action":"2","type":"3","content":""}
},{
"ett":"1588249203743","en":"active_background","kv":{"active_source":"3"}
},{
"ett":"1588254200122","en":"favorites","kv":{"course_id":5,"id":0,"add_time":"1588264138625","userid":0}
},{
"ett":"1588281152824","en":"praise","kv":{"target_id":4,"id":3,"type":3,"add_time":"1588307696417","userid":8}
}]
}
*/
// 1 切割
String[] logContents = log.split("\\|");
// 2 校验
if (logContents.length != 2) {
return false;
}
//3 校验服务器时间
if (logContents[0].length() != 13 || !NumberUtils.isDigits(logContents[0])) {
return false;
}
// 4 校验 json
if (!logContents[1].trim().startsWith("{")
|| !logContents[1].trim().endsWith("}")) {
return false;
}
return true;
}
public static boolean validateStart(String log) {
/**
{
"action":"1","ar":"MX","ba":"HTC","detail":"201","en":"start","entry":"4","extend1":"",
"g":"4Z174142@gmail.com","hw":"750*1134","l":"pt","la":"-29.7","ln":"-48.1","loading_time":"0",
"md":"HTC-18","mid":"14","nw":"3G","open_ad_type":"2","os":"8.0.8","sr":"D","sv":"V2.8.2",
"t":"1588251833523","uid":"14","vc":"15","vn":"1.2.9"
}
*/
if (log == null) {
return false;
}
// 校验 json
if (!log.trim().startsWith("{") || !log.trim().endsWith("}")) {
return false;
}
return true;
}
}
- LogETLInterceptor
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() {
//初始化
}
@Override
public Event intercept(Event event) {
// 1 获取数据
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
// 2 判断数据类型并向 Header 中赋值
if (log.contains("start")) {
if (LogUtils.validateStart(log)) {
return event;
}
} else {
if (LogUtils.validateEvent(log)) {
return event;
}
}
// 3 返回校验结果
return null;
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : events) {
Event intercept1 = intercept(event);
if (intercept1 != null) {
interceptors.add(intercept1);
}
}
return interceptors;
}
@Override
public void close() {
//关闭
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
- LogTypeInterceptor
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 区分日志类型: body header
// 1 获取 body 数据
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
// 2 获取 header
Map<String, String> headers = event.getHeaders();
// 3 判断数据类型并向 Header 中赋值
if (log.contains("start")) {
headers.put("topic", "topic_start");
} else {
headers.put("topic", "topic_event");
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : events) {
Event intercept1 = intercept(event);
interceptors.add(intercept1);
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
将项目打包放入Flume/lib目录下(所有节点):
CDH路径参考:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/flume-ng/lib
4.3 Flume启停脚本
- vim /root/log-kafka-flume.sh
#! /bin/bash
case $1 in
"start"){
for i in cdh02.cm cdh03.cm
do
echo " --------启动 $i 消费 flume-------"
ssh $i "nohup flume-ng agent --conf-file /root/test/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/root/test/file-flume-kafka.log 2>&1 &"
done
};;
"stop"){
for i in cdh02.cm cdh03.cm
do
echo " --------停止 $i 消费 flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk \'{print \$2}\' | xargs kill"
done
};;
esac
4.4 Flume:Kafka->HDFS配置编写
在第三台服务上准备
- vim /root/test/kafka-flume-hdfs.conf
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## Kafka-source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers= cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.sources.r1.kafka.topics = topic_start
## Kafka- source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.sources.r2.kafka.topics = topic_event
## channel1
a1.channels.c1.type = file
##索引文件路径
a1.channels.c1.checkpointDir=/root/test/flume/checkpoint/behavior1
##持久化路径
a1.channels.c1.dataDirs = /root/test/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## channel2
a1.channels.c2.type = file
##索引文件路径
a1.channels.c1.checkpointDir=/root/test/flume/checkpoint/behavior2
##持久化路径
a1.channels.c1.dataDirs = /root/test/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
## HDFS-sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=/origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
## HDFS-sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 50
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = snappy
a1.sinks.k2.hdfs.codeC = snappy
## 组件拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
4.5 Flume启停脚本
在第三台服务上准备
- vim /root/test/kafka-hdfs-flume.sh
#! /bin/bash
case $1 in
"start"){
for i in cdh01.cm
do
echo " --------启动 $i 消费 flume-------"
ssh $i "nohup flume-ng agent --conf-file /root/test/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/root/test/kafka-flume-hdfs.log 2>&1 &"
done
};;
"stop"){
for i in cdh01.cm
do
echo " --------停止 $i 消费 flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk \'{print \$2}\' | xargs kill"
done
};;
esac
5 业务数据
此模块后主要针对于企业报表决策,为数据分析提供数据支持,解决大数据量下,无法快速产出报表,及一些即席业务需求的快速展示提供数据支撑。划分企业离线与实时业务,用离线的方式直观的管理数据呈现,为实时方案奠定良好基础。
5.1 电商业务流程
5.2 SKU-SPU
- SKU(Stock Keeping Unit):库存量基本单位,现在已经被引申为产品统一编号的简称, 每种产品均对应有唯一的 SKU 号。
- SPU(Standard Product Unit):是商品信息聚合的最小单位,是一组可复用、易检索的 标准化信息集合。
- 总结:黑鲨3 手机就是 SPU。一台铠甲灰、256G 内存的就是 SKU。
5.3 业务表结构
5.3.1 订单表(order_info)
5.3.2 订单详情表(order_detail)
5.3.3 SKU 商品表(sku_info)
5.3.4 用户表(user_info)
5.3.5 商品一级分类表(base_category1)
5.3.6 商品二级分类表(base_category2)
5.3.7 商品三级分类表(base_category3)
5.3.8 支付流水表(payment_info)
5.3.9 省份表(base_province)
5.3.10 地区表(base_region)
5.3.11 品牌表(base_trademark)
5.3.12 订单状态表(order_status_log)
5.3.13 SPU 商品表(spu_info)
5.3.14 商品评论表(comment_info)
5.3.15 退单表(order_refund_info)
5.3.16 加入购物车表(cart_info)
5.3.17 商品收藏表(favor_info)
5.3.18 优惠券领用表(coupon_use)
5.3.19 优惠券表(coupon_info)
5.3.20 活动表(activity_info)
5.3.21 活动订单关联表(activity_order)
5.3.22 优惠规则表(activity_rule)
5.3.23 编码字典表(base_dic)
5.3.24 活动参与商品表(activity_sku)
5.4 时间表结构
5.4.1 时间表(date_info)
5.4.2 假期表(holiday_info)
5.4.3 假期年表(holiday_year)
6 同步策略及数仓分层
数据同步策略的类型包括:全量表、增量表、新增及变化表
-
全量表:每天一个分区,存储完整的数据。
-
增量表:每天新增数据放在一个分区,存储新增加的数据。
-
新增及变化表:每天新增和变化的数据放在一个分区,存储新增加的数据和变化的数据。
-
特殊表:没有分区,只需要存储一次。
6.1 全量策略
每日全量,每天存储一份完整数据,作为一个分区。
适合场景:表数据量不大,且有新增或修改业务的场景
例如:品牌表、编码表、商品分类表、优惠规则表、活动表、商品表、加购表、收藏表、SKU/SPU表
6.2 增量策略
每日增量,每天储存一份增量数据,作为一个分区
适合场景:表数据量大,且只会有新增数据的场景。
例如:退单表、订单状态表、支付流水表、订单详情表、活动与订单关联表、商品评论表
6.3 新增及变化策略
每日新增及变化,储存创建时间和操作时间都是今天的数据,作为一个分区
适合场景:表数据量大,既会有新增,又会有修改。
例如:用户表、订单表、优惠卷领用表。
6.4 特殊策略
某些特殊的维度表,可不必遵循上述同步策略,在数仓中只做一次同步,数据不变化不更新
适合场景:表数据几乎不会变化
1.客观世界维度:没变化的客观世界的维度(比如性别,地区,民族,政治成分,鞋子尺码)可以只存一 份固定值
2.日期维度:日期维度可以一次性导入一年或若干年的数据。
3.地区维度:省份表、地区表
6.5 分析业务表同步策略
考虑到特殊表可能会缓慢变化,比如打仗占地盘,地区表可能就会发生变化,故也选择分区全量同步策略。
6.6 数仓分层
- 为什么分层:
- 简单化:把复杂的任务分解为多层来完成,每层处理各自的任务,方便定位问题。
- 减少重复开发:规范数据分层,通过中间层数据,能够极大的减少重复计算,增加结果复用性。
- 隔离数据:不论是数据异常还是数据敏感性,使真实数据和统计数据解耦。
- 一般在DWD层进行维度建模
- ODS层:原始数据层,存放原始数据
- DWD层:对ODS层数据进行清洗(去空、脏数据,转换类型等),维度退化,脱敏(保护隐私)
- DWS层:以DWD为基础,按天进行汇总
- DWT层:以DWS为基础,按主题进行汇总
- ADS层:为各种数据分析报表提供数据
7 Sqoop同步数据
Sqoop注意点:
Hive 中的 Null 在底层是以“\N”来存储,而 MySQL 中的 Null 在底层就是 Null,为了 保证数据两端的一致性。
- 在导出数据时采用 --input-null-string 和 --input-null-non-string
- 导入数据时采用 --null-string 和 --null-non-string
本例思路为:sqoop抽取mysql数据上传至Hdfs上,存储为parquet文件,在建立hive-ods表,使用对应数据。
使用DolphinScheduler调度执行脚本。
- Sqoop采集Mysql和Hive数据格式
| mysql字段类型 | hive:ods字段类型 | hive:dwd-ads字段类型 |
|---|---|---|
| tinyint | tinyint | tinyint |
| int | int | int |
| bigint | bigint | bigint |
| varchar | string | string |
| datetime | bigint | string |
| bit | boolean | int |
| double | double | double |
| decimal | decimal | decimal |
8 ods层构建
8.1 ods建表
hive创建ods数据库,使用DolphinScheduler创建数据源,在创建DAG时需要选择hive库。
顺便将dwd,dws,dwt,ads一起创建了
- base_dic
drop table if exists ods.mall__base_dic
CREATE EXTERNAL TABLE `ods.mall__base_dic`(
`dic_code` string COMMENT \'编号\',
`dic_name` string COMMENT \'编码名称\',
`parent_code` string COMMENT \'父编号\',
`create_time` bigint COMMENT \'创建日期\',
`operate_time` bigint COMMENT \'修改日期\'
) COMMENT \'编码字典表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/base_dic/\'
tblproperties ("parquet.compression"="snappy")
- base_trademark
drop table if exists ods.mall__base_trademark
CREATE EXTERNAL TABLE `ods.mall__base_trademark`(
`tm_id` string COMMENT \'品牌id\',
`tm_name` string COMMENT \'品牌名称\'
) COMMENT \'品牌表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/base_trademark/\'
tblproperties ("parquet.compression"="snappy")
- base_category3
drop table if exists ods.mall__base_category3
CREATE EXTERNAL TABLE `ods.mall__base_category3`(
`id` bigint COMMENT \'编号\',
`name` string COMMENT \'三级分类名称\',
`category2_id` bigint COMMENT \'二级分类编号\'
) COMMENT \'三级分类表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/base_category3/\'
tblproperties ("parquet.compression"="snappy")
- base_category2
drop table if exists ods.mall__base_category2
CREATE EXTERNAL TABLE `ods.mall__base_category2`(
`id` bigint COMMENT \'编号\',
`name` string COMMENT \'二级分类名称\',
`category1_id` bigint COMMENT \'一级分类编号\'
) COMMENT \'二级分类表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/base_category2/\'
tblproperties ("parquet.compression"="snappy")
- base_category1
drop table if exists ods.mall__base_category1
CREATE EXTERNAL TABLE `ods.mall__base_category1`(
`id` bigint COMMENT \'编号\',
`name` string COMMENT \'分类名称\'
) COMMENT \'一级分类表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/base_category1/\'
tblproperties ("parquet.compression"="snappy")
- activity_rule
drop table if exists ods.mall__activity_rule
CREATE EXTERNAL TABLE `ods.mall__activity_rule`(
`id` int COMMENT \'编号\',
`activity_id` int COMMENT \'类型\',
`condition_amount` decimal(16,2) COMMENT \'满减金额\',
`condition_num` bigint COMMENT \'满减件数\',
`benefit_amount` decimal(16,2) COMMENT \'优惠金额\',
`benefit_discount` bigint COMMENT \'优惠折扣\',
`benefit_level` bigint COMMENT \'优惠级别\'
) COMMENT \'优惠规则\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/activity_rule/\'
tblproperties ("parquet.compression"="snappy")
- activity_info
drop table if exists ods.mall__activity_info
CREATE EXTERNAL TABLE `ods.mall__activity_info`(
`id` bigint COMMENT \'活动id\',
`activity_name` string COMMENT \'活动名称\',
`activity_type` string COMMENT \'活动类型\',
`start_time` bigint COMMENT \'开始时间\',
`end_time` bigint COMMENT \'结束时间\',
`create_time` bigint COMMENT \'创建时间\'
) COMMENT \'活动表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/activity_info/\'
tblproperties ("parquet.compression"="snappy")
- activity_sku
drop table if exists ods.mall__activity_sku
CREATE EXTERNAL TABLE `ods.mall__activity_sku`(
`id` bigint COMMENT \'编号\',
`activity_id` bigint COMMENT \'活动id\',
`sku_id` bigint COMMENT \'sku_id\',
`create_time` bigint COMMENT \'创建时间\'
) COMMENT \'活动参与商品\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/activity_sku/\'
tblproperties ("parquet.compression"="snappy")
- cart_info
drop table if exists ods.mall__cart_info
CREATE EXTERNAL TABLE `ods.mall__cart_info`(
`id` bigint COMMENT \'编号\',
`user_id` bigint COMMENT \'用户id\',
`sku_id` bigint COMMENT \'sku_id\',
`cart_price` decimal(10,2) COMMENT \'放入购物车时价格\',
`sku_num` bigint COMMENT \'数量\',
`sku_name` string COMMENT \'sku名称\',
`create_time` bigint COMMENT \'创建时间\',
`operate_time` bigint COMMENT \'修改时间\',
`is_ordered` bigint COMMENT \'是否已经下单\',
`order_time` bigint COMMENT \'下单时间\'
) COMMENT \'购物车表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/cart_info/\'
tblproperties ("parquet.compression"="snappy")
- favor_info
drop table if exists ods.mall__favor_info
CREATE EXTERNAL TABLE `ods.mall__favor_info`(
`id` bigint COMMENT \'编号\',
`user_id` bigint COMMENT \'用户id\',
`sku_id` bigint COMMENT \'sku_id\',
`spu_id` bigint COMMENT \'商品id\',
`is_cancel` string COMMENT \'是否已取消 0 正常 1 已取消\',
`create_time` bigint COMMENT \'创建时间\',
`cancel_time` bigint COMMENT \'修改时间\'
) COMMENT \'商品收藏表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/favor_info/\'
tblproperties ("parquet.compression"="snappy")
- coupon_info
drop table if exists ods.mall__coupon_info
CREATE EXTERNAL TABLE `ods.mall__coupon_info`(
`id` bigint COMMENT \'购物券编号\',
`coupon_name` string COMMENT \'购物券名称\',
`coupon_type` string COMMENT \'购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券\',
`condition_amount` decimal(10,2) COMMENT \'满额数\',
`condition_num` bigint COMMENT \'满件数\',
`activity_id` bigint COMMENT \'活动编号\',
`benefit_amount` decimal(16,2) COMMENT \'减金额\',
`benefit_discount` bigint COMMENT \'折扣\',
`create_time` bigint COMMENT \'创建时间\',
`range_type` string COMMENT \'范围类型 1、商品 2、品类 3、品牌\',
`spu_id` bigint COMMENT \'商品id\',
`tm_id` bigint COMMENT \'品牌id\',
`category3_id` bigint COMMENT \'品类id\',
`limit_num` int COMMENT \'最多领用次数\',
`operate_time` bigint COMMENT \'修改时间\',
`expire_time` bigint COMMENT \'过期时间\'
) COMMENT \'优惠券表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/coupon_info/\'
tblproperties ("parquet.compression"="snappy")
- sku_info
drop table if exists ods.mall__sku_info
CREATE EXTERNAL TABLE `ods.mall__sku_info`(
`id` bigint COMMENT \'skuid\',
`spu_id` bigint COMMENT \'spuid\',
`price` decimal(10,0) COMMENT \'价格\',
`sku_name` string COMMENT \'sku名称\',
`sku_desc` string COMMENT \'商品规格描述\',
`weight` decimal(10,2) COMMENT \'重量\',
`tm_id` bigint COMMENT \'品牌\',
`category3_id` bigint COMMENT \'三级分类id\',
`create_time` bigint COMMENT \'创建时间\'
) COMMENT \'库存单元表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/sku_info/\'
tblproperties ("parquet.compression"="snappy")
- spu_info
drop table if exists ods.mall__spu_info
CREATE EXTERNAL TABLE `ods.mall__spu_info`(
`id` bigint COMMENT \'商品id\',
`spu_name` string COMMENT \'商品名称\',
`category3_id` bigint COMMENT \'三级分类id\',
`tm_id` bigint COMMENT \'品牌id\'
) COMMENT \'商品表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/spu_info/\'
tblproperties ("parquet.compression"="snappy")
- base_province
drop table if exists ods.mall__base_province
CREATE EXTERNAL TABLE `ods.mall__base_province`(
`id` bigint COMMENT \'id\',
`name` string COMMENT \'省名称\',
`region_id` string COMMENT \'大区id\',
`area_code` string COMMENT \'行政区位码\',
`iso_code` string COMMENT \'国际编码\'
) COMMENT \'省份表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/base_province/\'
tblproperties ("parquet.compression"="snappy")
- base_region
drop table if exists ods.mall__base_region
CREATE EXTERNAL TABLE `ods.mall__base_region`(
`id` string COMMENT \'大区id\',
`region_name` string COMMENT \'大区名称\'
) COMMENT \'地区表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/base_region/\'
tblproperties ("parquet.compression"="snappy")
- refund_info
drop table if exists ods.mall__order_refund_info
CREATE EXTERNAL TABLE `ods.mall__order_refund_info`(
`id` bigint COMMENT \'编号\',
`user_id` bigint COMMENT \'用户id\',
`order_id` bigint COMMENT \'订单编号\',
`sku_id` bigint COMMENT \'skuid\',
`refund_type` string COMMENT \'退款类型\',
`refund_num` bigint COMMENT \'退货件数\',
`refund_amount` decimal(16,2) COMMENT \'退款金额\',
`refund_reason_type` string COMMENT \'原因类型\',
`create_time` bigint COMMENT \'创建时间\'
) COMMENT \'退单表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/order_refund_info/\'
tblproperties ("parquet.compression"="snappy")
- order_status_log
drop table if exists ods.mall__order_status_log
CREATE EXTERNAL TABLE `ods.mall__order_status_log`(
`id` bigint COMMENT \'编号\',
`order_id` bigint COMMENT \'订单编号\',
`order_status` string COMMENT \'订单状态\',
`operate_time` bigint COMMENT \'操作时间\'
) COMMENT \'订单状态表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/order_status_log/\'
tblproperties ("parquet.compression"="snappy")
- payment_info
drop table if exists ods.mall__payment_info
CREATE EXTERNAL TABLE `ods.mall__payment_info`(
`id` bigint COMMENT \'编号\',
`out_trade_no` string COMMENT \'对外业务编号\',
`order_id` bigint COMMENT \'订单编号\',
`user_id` bigint COMMENT \'用户编号\',
`alipay_trade_no` string COMMENT \'支付宝交易流水编号\',
`total_amount` decimal(16,2) COMMENT \'支付金额\',
`subject` string COMMENT \'交易内容\',
`payment_type` string COMMENT \'支付方式\',
`payment_time` bigint COMMENT \'支付时间\'
) COMMENT \'支付流水表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/payment_info/\'
tblproperties ("parquet.compression"="snappy")
- order_detail
drop table if exists ods.mall__order_detail
CREATE EXTERNAL TABLE `ods.mall__order_detail`(
`id` bigint COMMENT \'编号\',
`order_id` bigint COMMENT \'订单编号\',
`user_id` bigint COMMENT \'用户id\',
`sku_id` bigint COMMENT \'sku_id\',
`sku_name` string COMMENT \'sku名称\',
`order_price` decimal(10,2) COMMENT \'购买价格(下单时sku价格)\',
`sku_num` string COMMENT \'购买个数\',
`create_time` bigint COMMENT \'创建时间\'
) COMMENT \'订单明细表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/order_detail/\'
tblproperties ("parquet.compression"="snappy")
- activity_order
drop table if exists ods.mall__activity_order
CREATE EXTERNAL TABLE `ods.mall__activity_order`(
`id` bigint COMMENT \'编号\',
`activity_id` bigint COMMENT \'活动id\',
`order_id` bigint COMMENT \'订单编号\',
`create_time` bigint COMMENT \'发生日期\'
) COMMENT \'活动与订单关联表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/activity_order/\'
tblproperties ("parquet.compression"="snappy")
- comment_info
drop table if exists ods.mall__comment_info
CREATE EXTERNAL TABLE `ods.mall__comment_info`(
`id` bigint COMMENT \'编号\',
`user_id` bigint COMMENT \'用户名称\',
`sku_id` bigint COMMENT \'skuid\',
`spu_id` bigint COMMENT \'商品id\',
`order_id` bigint COMMENT \'订单编号\',
`appraise` string COMMENT \'评价 1 好评 2 中评 3 差评\',
`comment_txt` string COMMENT \'评价内容\',
`create_time` bigint COMMENT \'创建时间\'
) COMMENT \'商品评论表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/comment_info/\'
tblproperties ("parquet.compression"="snappy")
- coupon_use
drop table if exists ods.mall__coupon_use
CREATE EXTERNAL TABLE `ods.mall__coupon_use`(
`id` bigint COMMENT \'编号\',
`coupon_id` bigint COMMENT \'购物券ID\',
`user_id` bigint COMMENT \'用户ID\',
`order_id` bigint COMMENT \'订单ID\',
`coupon_status` string COMMENT \'购物券状态\',
`get_time` bigint COMMENT \'领券时间\',
`using_time` bigint COMMENT \'使用时间\',
`used_time` bigint COMMENT \'过期时间\'
) COMMENT \'优惠券领用表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/coupon_use/\'
tblproperties ("parquet.compression"="snappy")
- user_info
drop table if exists ods.mall__user_info
CREATE EXTERNAL TABLE `ods.mall__user_info`(
`id` bigint COMMENT \'编号\',
`name` string COMMENT \'用户姓名\',
`email` string COMMENT \'邮箱\',
`user_level` string COMMENT \'用户级别\',
`birthday` bigint COMMENT \'用户生日\',
`gender` string COMMENT \'性别 M男,F女\',
`create_time` bigint COMMENT \'创建时间\',
`operate_time` bigint COMMENT \'修改时间\'
) COMMENT \'用户表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/user_info/\'
tblproperties ("parquet.compression"="snappy")
- order_info
drop table if exists ods.mall__order_info
CREATE EXTERNAL TABLE `ods.mall__order_info`(
`id` bigint COMMENT \'编号\',
`final_total_amount` decimal(16,2) COMMENT \'总金额\',
`order_status` string COMMENT \'订单状态\',
`user_id` bigint COMMENT \'用户id\',
`out_trade_no` string COMMENT \'订单交易编号(第三方支付用)\',
`create_time` bigint COMMENT \'创建时间\',
`operate_time` bigint COMMENT \'操作时间\',
`province_id` int COMMENT \'地区\',
`benefit_reduce_amount` decimal(16,2) COMMENT \'优惠金额\',
`original_total_amount` decimal(16,2) COMMENT \'原价金额\',
`feight_fee` decimal(16,2) COMMENT \'运费\'
) COMMENT \'订单表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/order_info/\'
tblproperties ("parquet.compression"="snappy")
- start_log
此为埋点启动日志表
drop table if exists ods.mall__start_log
CREATE EXTERNAL TABLE `ods.mall__start_log`(
`line` string COMMENT \'启动日志\'
) COMMENT \'启动日志表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
location \'/warehouse/ods/mall/start_log/\'
- event_log
此为埋点事件日志表
drop table if exists ods.mall__event_log
CREATE EXTERNAL TABLE `ods.mall__event_log`(
`line` string COMMENT \'事件日志\'
) COMMENT \'事件日志表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
location \'/warehouse/ods/mall/event_log/\'
- date_info
此为时间表
drop table if exists ods.mall__date_info
CREATE EXTERNAL TABLE `ods.mall__date_info`(
`date_id` int COMMENT \'日\',
`week_id` int COMMENT \'周\',
`week_day` int COMMENT \'周的第几天\',
`day` int COMMENT \'每月的第几天\',
`month` int COMMENT \'第几月\',
`quarter` int COMMENT \'第几季度\',
`year` int COMMENT \'年\',
`is_workday` int COMMENT \'是否是周末\',
`holiday_id` int COMMENT \'是否是节假日\'
) COMMENT \'时间维度表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ods/mall/date_info/\'
tblproperties ("parquet.compression"="snappy")
8.2 mysql数据抽取
- sqoop抽取脚本基础
#!/bin/bash
db_date=${date}
mysql_db_name=${db_name}
mysql_db_addr=${db_addr}
mysql_db_user=${db_user}
mysql_db_password=${db_password}
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
echo "日期:"$db_date
echo "mysql库名:"$mysql_db_name
import_data() {
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/sqoop import \
--connect jdbc:mysql://$mysql_db_addr:3306/$mysql_db_name?tinyInt1isBit=false \
--username $mysql_db_user \
--password $mysql_db_password \
--target-dir /origin_data/$mysql_db_name/$1/$db_date \
--delete-target-dir \
--num-mappers 1 \
--null-string \'\' \
--null-non-string \'\\n\' \
--fields-terminated-by "\t" \
--query "$2"\' and $CONDITIONS;\' \
--as-parquetfile
}
- DolphinScheduler全局参数
| date | 不传为昨天 |
|---|---|
| db_name | 数据库名字 |
| db_addr | 数据库IP地址 |
| db_user | 数据库用户 |
| db_password | 数据库密码 |
元数据中数据开始日期为2020-03-15
如下导入数据代码片段,拼接上述的基础片段执行
- 全量表代码片段
import_data "base_dic" "select
dic_code,
dic_name,
parent_code,
create_time,
operate_time
from base_dic
where 1=1"
import_data "base_trademark" "select
tm_id,
tm_name
from base_trademark
where 1=1"
import_data "base_category3" "select
id,
name,
category2_id
from base_category3 where 1=1"
import_data "base_category2" "select
id,
name,
category1_id
from base_category2 where 1=1"
import_data "base_category1" "select
id,
name
from base_category1 where 1=1"
import_data "activity_rule" "select
id,
activity_id,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from activity_rule
where 1=1"
import_data "activity_info" "select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from activity_info
where 1=1"
import_data "activity_sku" "select
id,
activity_id,
sku_id,
create_time
FROM
activity_sku
where 1=1"
import_data "cart_info" "select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time
from cart_info
where 1=1"
import_data "favor_info" "select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from favor_info
where 1=1"
import_data "coupon_info" "select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from coupon_info
where 1=1"
import_data "sku_info" "select
id,
spu_id,
price,
sku_name,
sku_desc,
weight,
tm_id,
category3_id,
create_time
from sku_info where 1=1"
import_data "spu_info" "select
id,
spu_name,
category3_id,
tm_id
from spu_info
where 1=1"
- 特殊表代码片段
import_data "base_province" "select
id,
name,
region_id,
area_code,
iso_code
from base_province
where 1=1"
import_data "base_region" "select
id,
region_name
from base_region
where 1=1"
import_data "date_info" "select
date_id,
week_id,
week_day,
day,
month,
quarter,
year,
is_workday,
holiday_id
from date_info
where 1=1"
- 增量表代码片段
import_data "order_refund_info" "select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from order_refund_info
where
date_format(create_time,\'%Y-%m-%d\')=\'$db_date\'"
import_data "order_status_log" "select
id,
order_id,
order_status,
operate_time
from order_status_log
where
date_format(operate_time,\'%Y-%m-%d\')=\'$db_date\'"
import_data "payment_info" "select
id,
out_trade_no,
order_id,
user_id,
alipay_trade_no,
total_amount,
subject,
payment_type,
payment_time
from payment_info
where
DATE_FORMAT(payment_time,\'%Y-%m-%d\')=\'$db_date\'"
import_data "order_detail" "select
od.id,
od.order_id,
oi.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time
from order_detail od
join order_info oi
on od.order_id=oi.id
where
DATE_FORMAT(od.create_time,\'%Y-%m-%d\')=\'$db_date\'"
import_data "activity_order" "select
id,
activity_id,
order_id,
create_time
from activity_order
where
date_format(create_time,\'%Y-%m-%d\')=\'$db_date\'"
import_data "comment_info" "select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
comment_txt,
create_time
from comment_info
where date_format(create_time,\'%Y-%m-%d\')=\'$db_date\'"
- 增量及变化表代码片段
import_data "coupon_use" "select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from coupon_use
where (date_format(get_time,\'%Y-%m-%d\')=\'$db_date\'
or date_format(using_time,\'%Y-%m-%d\')=\'$db_date\'
or date_format(used_time,\'%Y-%m-%d\')=\'$db_date\')"
import_data "user_info" "select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time
from user_info
where (DATE_FORMAT(create_time,\'%Y-%m-%d\')=\'$db_date\'
or DATE_FORMAT(operate_time,\'%Y-%m-%d\')=\'$db_date\')"
import_data "order_info" "select
id,
final_total_amount,
order_status,
user_id,
out_trade_no,
create_time,
operate_time,
province_id,
benefit_reduce_amount,
original_total_amount,
feight_fee
from order_info
where (date_format(create_time,\'%Y-%m-%d\')=\'$db_date\'
or date_format(operate_time,\'%Y-%m-%d\')=\'$db_date\')"
8.3 ods层数据加载
- 脚本修改$table_name即可
注意2张埋点日志表的数据导出目录
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ods
table_name=base_dic
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
load data inpath \'/origin_data/$APP1/$table_name/$db_date\' OVERWRITE into table $hive_table_name partition(dt=\'$db_date\');
"
$hive -e "$sql"
9 dwd层构建
9.1 dwd层构建(启动-事件日志)
9.1.1 启动日志表
- 建表
drop table if exists dwd.mall__start_log
CREATE EXTERNAL TABLE `dwd.mall__start_log`(
`mid_id` string COMMENT \'设备唯一标识\',
`user_id` string COMMENT \'用户标识\',
`version_code` string COMMENT \'程序版本号\',
`version_name` string COMMENT \'程序版本名\',
`lang` string COMMENT \'系统语言\',
`source` string COMMENT \'渠道号\',
`os` string COMMENT \'系统版本\',
`area` string COMMENT \'区域\',
`model` string COMMENT \'手机型号\',
`brand` string COMMENT \'手机品牌\',
`sdk_version` string COMMENT \'sdkVersion\',
`gmail` string COMMENT \'gmail\',
`height_width` string COMMENT \'屏幕宽高\',
`app_time` string COMMENT \'客户端日志产生时的时间\',
`network` string COMMENT \'网络模式\',
`lng` string COMMENT \'经度\',
`lat` string COMMENT \'纬度\',
`entry` string COMMENT \'入口: push=1,widget=2,icon=3,notification=4,lockscreen_widget=5\',
`open_ad_type` string COMMENT \'开屏广告类型: 开屏原生广告=1, 开屏插屏广告=2\',
`action` string COMMENT \'状态:成功=1 失败=2\',
`loading_time` string COMMENT \'加载时长\',
`detail` string COMMENT \'失败码\',
`extend1` string COMMENT \'失败的 message\'
) COMMENT \'启动日志表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/start_log/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=start_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
get_json_object(line,\'$.mid\') mid_id,
get_json_object(line,\'$.uid\') user_id,
get_json_object(line,\'$.vc\') version_code,
get_json_object(line,\'$.vn\') version_name,
get_json_object(line,\'$.l\') lang,
get_json_object(line,\'$.sr\') source,
get_json_object(line,\'$.os\') os,
get_json_object(line,\'$.ar\') area,
get_json_object(line,\'$.md\') model,
get_json_object(line,\'$.ba\') brand,
get_json_object(line,\'$.sv\') sdk_version,
get_json_object(line,\'$.g\') gmail,
get_json_object(line,\'$.hw\') height_width,
get_json_object(line,\'$.t\') app_time,
get_json_object(line,\'$.nw\') network,
get_json_object(line,\'$.ln\') lng,
get_json_object(line,\'$.la\') lat,
get_json_object(line,\'$.entry\') entry,
get_json_object(line,\'$.open_ad_type\') open_ad_type,
get_json_object(line,\'$.action\') action,
get_json_object(line,\'$.loading_time\') loading_time,
get_json_object(line,\'$.detail\') detail,
get_json_object(line,\'$.extend1\') extend1
from $hive_origin_table_name
where dt=\'$db_date\';
"
$hive -e "$sql"
9.1.2 事件日志表
- 建表
drop table if exists dwd.mall__event_log
CREATE EXTERNAL TABLE `dwd.mall__event_log`(
`mid_id` string COMMENT \'设备唯一标识\',
`user_id` string COMMENT \'用户标识\',
`version_code` string COMMENT \'程序版本号\',
`version_name` string COMMENT \'程序版本名\',
`lang` string COMMENT \'系统语言\',
`source` string COMMENT \'渠道号\',
`os` string COMMENT \'系统版本\',
`area` string COMMENT \'区域\',
`model` string COMMENT \'手机型号\',
`brand` string COMMENT \'手机品牌\',
`sdk_version` string COMMENT \'sdkVersion\',
`gmail` string COMMENT \'gmail\',
`height_width` string COMMENT \'屏幕宽高\',
`app_time` string COMMENT \'客户端日志产生时的时间\',
`network` string COMMENT \'网络模式\',
`lng` string COMMENT \'经度\',
`lat` string COMMENT \'纬度\',
`event_name` string COMMENT \'事件名称\',
`event_json` string COMMENT \'事件详情\',
`server_time` string COMMENT \'服务器时间\'
) COMMENT \'事件日志表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/event_log/\'
tblproperties ("parquet.compression"="snappy")
9.2.1 制作 UDF UDTF
- udf
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONException;
import org.json.JSONObject;
public class BaseFieldUDF extends UDF {
public String evaluate(String line, String key) throws JSONException {
String[] log = line.split("\\|");
if (log.length != 2 || StringUtils.isBlank(log[1])) {
return "";
}
JSONObject baseJson = new JSONObject(log[1].trim());
String result = "";
// 获取服务器时间
if ("st".equals(key)) {
result = log[0].trim();
} else if ("et".equals(key)) {
// 获取事件数组
if (baseJson.has("et")) {
result = baseJson.getString("et");
}
} else {
JSONObject cm = baseJson.getJSONObject("cm");
// 获取 key 对应公共字段的 value
if (cm.has(key)) {
result = cm.getString(key);
}
}
return result;
}
public static void main(String[] args) throws JSONException {
String line = " 1588319303710|{\n" +
" \"cm\":{\n" +
" \"ln\":\"-51.5\",\"sv\":\"V2.0.7\",\"os\":\"8.0.8\",\"g\":\"L1470998@gmail.com\",\"mid\":\"13\",\n" +
" \"nw\":\"4G\",\"l\":\"en\",\"vc\":\"7\",\"hw\":\"640*960\",\"ar\":\"MX\",\"uid\":\"13\",\"t\":\"1588291826938\",\n" +
" \"la\":\"-38.2\",\"md\":\"Huawei-14\",\"vn\":\"1.3.6\",\"ba\":\"Huawei\",\"sr\":\"Y\"\n" +
" },\n" +
" \"ap\":\"app\",\n" +
" \"et\":[{\n" +
" \"ett\":\"1588228193191\",\"en\":\"ad\",\"kv\":{\"activityId\":\"1\",\"displayMills\":\"113201\",\"entry\":\"3\",\"action\":\"5\",\"contentType\":\"0\"}\n" +
" },{\n" +
" \"ett\":\"1588300304713\",\"en\":\"notification\",\"kv\":{\"ap_time\":\"1588277440794\",\"action\":\"2\",\"type\":\"3\",\"content\":\"\"}\n" +
" },{\n" +
" \"ett\":\"1588249203743\",\"en\":\"active_background\",\"kv\":{\"active_source\":\"3\"}\n" +
" },{\n" +
" \"ett\":\"1588225856101\",\"en\":\"comment\",\"kv\":{\"p_comment_id\":0,\"addtime\":\"1588263895040\",\"praise_count\":231,\"other_id\":5,\"comment_id\":5,\"reply_count\":62,\"userid\":7,\"content\":\"骸汞\"}\n" +
" },{\n" +
" \"ett\":\"1588254200122\",\"en\":\"favorites\",\"kv\":{\"course_id\":5,\"id\":0,\"add_time\":\"1588264138625\",\"userid\":0}\n" +
" },{\n" +
" \"ett\":\"1588281152824\",\"en\":\"praise\",\"kv\":{\"target_id\":4,\"id\":3,\"type\":3,\"add_time\":\"1588307696417\",\"userid\":8}\n" +
" }]\n" +
" }";
String s = new BaseFieldUDF().evaluate(line, "mid");
String ss = new BaseFieldUDF().evaluate(line, "st");
String sss = new BaseFieldUDF().evaluate(line, "et");
System.out.println(s);
System.out.println(ss);
System.out.println(sss);
}
}
结果:
13
1588319303710
[{"ett":"1588228193191","en":"ad","kv":{"activityId":"1","displayMills":"113201","entry":"3","action":"5","contentType":"0"}},{"ett":"1588300304713","en":"notification","kv":{"ap_time":"1588277440794","action":"2","type":"3","content":""}},{"ett":"1588249203743","en":"active_background","kv":{"active_source":"3"}},{"ett":"1588225856101","en":"comment","kv":{"p_comment_id":0,"addtime":"1588263895040","praise_count":231,"other_id":5,"comment_id":5,"reply_count":62,"userid":7,"content":"骸汞"}},{"ett":"1588254200122","en":"favorites","kv":{"course_id":5,"id":0,"add_time":"1588264138625","userid":0}},{"ett":"1588281152824","en":"praise","kv":{"target_id":4,"id":3,"type":3,"add_time":"1588307696417","userid":8}}]
- udtf
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONException;
import java.util.ArrayList;
public class EventJsonUDTF extends GenericUDTF {
//该方法中,我们将指定输出参数的名称和参数类型:
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("event_name");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("event_json");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
fieldOIs);
}
//输入 1 条记录,输出若干条结果
@Override
public void process(Object[] objects) throws HiveException {
// 获取传入的 et
String input = objects[0].toString();
// 如果传进来的数据为空,直接返回过滤掉该数据
if (StringUtils.isBlank(input)) {
return;
} else {
try {
// 获取一共有几个事件(ad/facoriters)
JSONArray ja = new JSONArray(input);
if (ja == null)
return;
// 循环遍历每一个事件
for (int i = 0; i < ja.length(); i++) {
String[] result = new String[2];
try {
// 取出每个的事件名称(ad/facoriters)
result[0] = ja.getJSONObject(i).getString("en");
// 取出每一个事件整体
result[1] = ja.getString(i);
} catch (JSONException e) {
continue;
}
// 将结果返回
forward(result);
}
} catch (JSONException e) {
e.printStackTrace();
}
}
}
//当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
@Override
public void close() throws HiveException {
}
}
9.1.2.2 直接永久使用UDF
- 上传UDF资源
将hive-function-1.0-SNAPSHOT包传到HDFS 的/user/hive/jars下
hadoop dfs -mkdir /user/hive/jars
hadoop dfs -put hive-function-1.0-SNAPSHOT.jar /user/hive/jars/hive-function-1.0-SNAPSHOT.jar
在hive中创建永久UDF
create function base_analizer as \'com.heaton.bigdata.udf.BaseFieldUDF\' using jar \'hdfs://cdh01.cm:8020/user/hive/jars/hive-function-1.0-SNAPSHOT.jar\';
create function flat_analizer as \'com.heaton.bigdata.udtf.EventJsonUDTF\' using jar \'hdfs://cdh01.cm:8020/user/hive/jars/hive-function-1.0-SNAPSHOT.jar\';
9.1.2.3 Dolphin使用方式UDF
在DAG图创建SQL工具中选择对应UDF函数即可使用,但是目前Dolphin1.2.0中关联函数操作保存无效。
大家可以使用UDF管理功能将JAR传入到HDFS上,这样通过脚本加入临时函数,也可以很好的完成功能。
临时函数语句:
create temporary function base_analizer as \'com.heaton.bigdata.udf.BaseFieldUDF\' using jar \'hdfs://cdh01.cm:8020/dolphinscheduler/dolphinscheduler/udfs/hive-function-1.0-SNAPSHOT.jar\'; create temporary function flat_analizer as \'com.heaton.bigdata.udtf.EventJsonUDTF\' using jar \'hdfs://cdh01.cm:8020/dolphinscheduler/dolphinscheduler/udfs/hive-function-1.0-SNAPSHOT.jar\';
9.2.4 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=event_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
base_analizer(line,\'mid\') as mid_id,
base_analizer(line,\'uid\') as user_id,
base_analizer(line,\'vc\') as version_code,
base_analizer(line,\'vn\') as version_name,
base_analizer(line,\'l\') as lang,
base_analizer(line,\'sr\') as source,
base_analizer(line,\'os\') as os,
base_analizer(line,\'ar\') as area,
base_analizer(line,\'md\') as model,
base_analizer(line,\'ba\') as brand,
base_analizer(line,\'sv\') as sdk_version,
base_analizer(line,\'g\') as gmail,
base_analizer(line,\'hw\') as height_width,
base_analizer(line,\'t\') as app_time,
base_analizer(line,\'nw\') as network,
base_analizer(line,\'ln\') as lng,
base_analizer(line,\'la\') as lat,
event_name,
event_json,
base_analizer(line,\'st\') as server_time
from $hive_origin_table_name lateral view flat_analizer(base_analizer(line,\'et\')) tmp_flat as event_name,event_json
where dt=\'$db_date\' and base_analizer(line,\'et\')<>\'\';
"
$hive -e "$sql"
9.1.3 商品点击表
- 建表
drop table if exists dwd.mall__display_log
CREATE EXTERNAL TABLE `dwd.mall__display_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`goodsid` string,
`place` string,
`extend1` string,
`category` string,
`server_time` string
) COMMENT \'商品点击表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/display_log/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=display_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,\'$.kv.action\') action,
get_json_object(event_json,\'$.kv.goodsid\') goodsid,
get_json_object(event_json,\'$.kv.place\') place,
get_json_object(event_json,\'$.kv.extend1\') extend1,
get_json_object(event_json,\'$.kv.category\') category,
server_time
from dwd.mall__event_log
where dt=\'$db_date\' and event_name=\'display\';
"
$hive -e "$sql"
9.1.4 商品列表表
- 建表
drop table if exists dwd.mall__loading_log
CREATE EXTERNAL TABLE `dwd.mall__loading_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`loading_time` string,
`loading_way` string,
`extend1` string,
`extend2` string,
`type` string,
`type1` string,
`server_time` string
) COMMENT \'商品列表表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/loading_log/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=loading_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,\'$.kv.action\') action,
get_json_object(event_json,\'$.kv.loading_time\') loading_time,
get_json_object(event_json,\'$.kv.loading_way\') loading_way,
get_json_object(event_json,\'$.kv.extend1\') extend1,
get_json_object(event_json,\'$.kv.extend2\') extend2,
get_json_object(event_json,\'$.kv.type\') type,
get_json_object(event_json,\'$.kv.type1\') type1,
server_time
from dwd.mall__event_log
where dt=\'$db_date\' and event_name=\'loading\';
"
$hive -e "$sql"
9.1.5 广告表
- 建表
drop table if exists dwd.mall__ad_log
CREATE EXTERNAL TABLE `dwd.mall__ad_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`action` string,
`contentType` string,
`displayMills` string,
`itemId` string,
`activityId` string,
`server_time` string
) COMMENT \'广告表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/ad_log/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=ad_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,\'$.kv.entry\') entry,
get_json_object(event_json,\'$.kv.action\') action,
get_json_object(event_json,\'$.kv.contentType\') contentType,
get_json_object(event_json,\'$.kv.displayMills\') displayMills,
get_json_object(event_json,\'$.kv.itemId\') itemId,
get_json_object(event_json,\'$.kv.activityId\') activityId,
server_time
from dwd.mall__event_log
where dt=\'db_date\' and event_name=\'ad\';
"
$hive -e "$sql"
9.1.6 消息通知表
- 建表
drop table if exists dwd.mall__notification_log
CREATE EXTERNAL TABLE `dwd.mall__notification_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`noti_type` string,
`ap_time` string,
`content` string,
`server_time` string
) COMMENT \'消息通知表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/notification_log/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=notification_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,\'$.kv.action\') action,
get_json_object(event_json,\'$.kv.noti_type\') noti_type,
get_json_object(event_json,\'$.kv.ap_time\') ap_time,
get_json_object(event_json,\'$.kv.content\') content,
server_time
from dwd.mall__event_log
where dt=\'$db_date\' and event_name=\'notification\';
"
$hive -e "$sql"
9.1.7 用户后台活跃表
- 建表
drop table if exists dwd.mall__active_background_log
CREATE EXTERNAL TABLE `dwd.mall__active_background_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`active_source` string,
`server_time` string
) COMMENT \'用户后台活跃表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/active_background_log/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=active_background_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,\'$.kv.active_source\') active_source,
server_time
from dwd.mall__event_log
where dt=\'$db_date\' and event_name=\'active_background\';
"
$hive -e "$sql"
9.1.8 评论表
- 建表
drop table if exists dwd.mall__comment_log
CREATE EXTERNAL TABLE `dwd.mall__comment_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`comment_id` int,
`userid` int,
`p_comment_id` int,
`content` string,
`addtime` string,
`other_id` int,
`praise_count` int,
`reply_count` int,
`server_time` string
) COMMENT \'评论表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/comment_log/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=comment_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,\'$.kv.comment_id\') comment_id,
get_json_object(event_json,\'$.kv.userid\') userid,
get_json_object(event_json,\'$.kv.p_comment_id\') p_comment_id,
get_json_object(event_json,\'$.kv.content\') content,
get_json_object(event_json,\'$.kv.addtime\') addtime,
get_json_object(event_json,\'$.kv.other_id\') other_id,
get_json_object(event_json,\'$.kv.praise_count\') praise_count,
get_json_object(event_json,\'$.kv.reply_count\') reply_count,
server_time
from dwd.mall__event_log
where dt=\'$db_date\' and event_name=\'comment\';
"
$hive -e "$sql"
9.1.9 收藏表
- 建表
drop table if exists dwd.mall__favorites_log
CREATE EXTERNAL TABLE `dwd.mall__favorites_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` int,
`course_id` int,
`userid` int,
`add_time` string,
`server_time` string
) COMMENT \'收藏表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/favorites_log/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=favorites_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,\'$.kv.id\') id,
get_json_object(event_json,\'$.kv.course_id\') course_id,
get_json_object(event_json,\'$.kv.userid\') userid,
get_json_object(event_json,\'$.kv.add_time\') add_time,
server_time
from dwd.mall__event_log
where dt=\'$db_date\' and event_name=\'favorites\';
"
$hive -e "$sql"
9.1.10 点赞表
- 建表
drop table if exists dwd.mall__praise_log
CREATE EXTERNAL TABLE `dwd.mall__praise_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` string,
`userid` string,
`target_id` string,
`type` string,
`add_time` string,
`server_time` string
) COMMENT \'点赞表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/praise_log/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=praise_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,\'$.kv.id\') id,
get_json_object(event_json,\'$.kv.userid\') userid,
get_json_object(event_json,\'$.kv.target_id\') target_id,
get_json_object(event_json,\'$.kv.type\') type,
get_json_object(event_json,\'$.kv.add_time\') add_time,
server_time
from dwd.mall__event_log
where dt=\'$db_date\' and event_name=\'praise\';
"
$hive -e "$sql"
9.1.11 错误日志表
- 建表
drop table if exists dwd.mall__error_log
CREATE EXTERNAL TABLE `dwd.mall__error_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`errorBrief` string,
`errorDetail` string,
`server_time` string
) COMMENT \'错误日志表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/error_log/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=error_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,\'$.kv.errorBrief\') errorBrief,
get_json_object(event_json,\'$.kv.errorDetail\') errorDetail,
server_time
from dwd.mall__event_log
where dt=\'$db_date\' and event_name=\'error\';
"
$hive -e "$sql"
9.2 dwd层构建(业务库)
此层在构建之初,增量表需要动态分区来划分时间,将数据放入指定分区
| 事实/维度 | 时间 | 用户 | 地区 | 商品 | 优惠卷 | 活动 | 编码 | 度量 |
|---|---|---|---|---|---|---|---|---|
| 订单 | √ | √ | √ | √ | 件数/金额 | |||
| 订单详情 | √ | √ | √ | 件数/金额 | ||||
| 支付 | √ | √ | 次数/金额 | |||||
| 加入购物车 | √ | √ | √ | 件数/金额 | ||||
| 收藏 | √ | √ | √ | 个数 | ||||
| 评价 | √ | √ | √ | 个数 | ||||
| 退款 | √ | √ | √ | 件数/金额 | ||||
| 优惠卷领用 | √ | √ | √ | 个数 |
9.2.1 商品维度表(全量)
- 建表
drop table if exists dwd.mall__dim_sku_info
CREATE EXTERNAL TABLE `dwd.mall__dim_sku_info`(
`id` string COMMENT \'商品 id\',
`spu_id` string COMMENT \'spuid\',
`price` double COMMENT \'商品价格\',
`sku_name` string COMMENT \'商品名称\',
`sku_desc` string COMMENT \'商品描述\',
`weight` double COMMENT \'重量\',
`tm_id` string COMMENT \'品牌 id\',
`tm_name` string COMMENT \'品牌名称\',
`category3_id` string COMMENT \'三级分类 id\',
`category2_id` string COMMENT \'二级分类 id\',
`category1_id` string COMMENT \'一级分类 id\',
`category3_name` string COMMENT \'三级分类名称\',
`category2_name` string COMMENT \'二级分类名称\',
`category1_name` string COMMENT \'一级分类名称\',
`spu_name` string COMMENT \'spu 名称\',
`create_time` string COMMENT \'创建时间\'
) COMMENT \'商品维度表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/dim_sku_info/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_sku_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
sku.id,
sku.spu_id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.tm_id,
ob.tm_name,
sku.category3_id,
c2.id category2_id,
c1.id category1_id,
c3.name category3_name,
c2.name category2_name,
c1.name category1_name,
spu.spu_name,
from_unixtime(cast(sku.create_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') create_time
from
(
select * from ods.mall__sku_info where dt=\'$db_date\'
)sku
join
(
select * from ods.mall__base_trademark where dt=\'$db_date\'
)ob on sku.tm_id=ob.tm_id
join
(
select * from ods.mall__spu_info where dt=\'$db_date\'
)spu on spu.id = sku.spu_id
join
(
select * from ods.mall__base_category3 where dt=\'$db_date\'
)c3 on sku.category3_id=c3.id
join
(
select * from ods.mall__base_category2 where dt=\'$db_date\'
)c2 on c3.category2_id=c2.id
join
(
select * from ods.mall__base_category1 where dt=\'$db_date\'
)c1 on c2.category1_id=c1.id;
"
$hive -e "$sql"
9.2.2 优惠券信息维度表(全量)
- 建表
drop table if exists dwd.mall__dim_coupon_info
CREATE EXTERNAL TABLE `dwd.mall__dim_coupon_info`(
`id` string COMMENT \'购物券编号\',
`coupon_name` string COMMENT \'购物券名称\',
`coupon_type` string COMMENT \'购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券\',
`condition_amount` string COMMENT \'满额数\',
`condition_num` string COMMENT \'满件数\',
`activity_id` string COMMENT \'活动编号\',
`benefit_amount` string COMMENT \'减金额\',
`benefit_discount` string COMMENT \'折扣\',
`create_time` string COMMENT \'创建时间\',
`range_type` string COMMENT \'范围类型 1、商品 2、品类 3、品牌\',
`spu_id` string COMMENT \'商品 id\',
`tm_id` string COMMENT \'品牌 id\',
`category3_id` string COMMENT \'品类 id\',
`limit_num` string COMMENT \'最多领用次数\',
`operate_time` string COMMENT \'修改时间\',
`expire_time` string COMMENT \'过期时间\'
) COMMENT \'优惠券信息维度表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/dim_coupon_info/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_coupon_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
from_unixtime(cast(create_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
from_unixtime(cast(operate_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') operate_time,
from_unixtime(cast(expire_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') expire_time
from ods.mall__coupon_info
where dt=\'$db_date\';
"
$hive -e "$sql"
9.2.3 活动维度表(全量)
- 建表
drop table if exists dwd.mall__dim_activity_info
CREATE EXTERNAL TABLE `dwd.mall__dim_activity_info`(
`id` string COMMENT \'编号\',
`activity_name` string COMMENT \'活动名称\',
`activity_type` string COMMENT \'活动类型\',
`condition_amount` string COMMENT \'满减金额\',
`condition_num` string COMMENT \'满减件数\',
`benefit_amount` string COMMENT \'优惠金额\',
`benefit_discount` string COMMENT \'优惠折扣\',
`benefit_level` string COMMENT \'优惠级别\',
`start_time` string COMMENT \'开始时间\',
`end_time` string COMMENT \'结束时间\',
`create_time` string COMMENT \'创建时间\'
) COMMENT \'活动维度表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/dim_activity_info/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_activity_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
info.id,
info.activity_name,
info.activity_type,
rule.condition_amount,
rule.condition_num,
rule.benefit_amount,
rule.benefit_discount,
rule.benefit_level,
from_unixtime(cast(info.start_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') start_time,
from_unixtime(cast(info.end_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') end_time,
from_unixtime(cast(info.create_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') create_time
from
(
select * from ods.mall__activity_info where dt=\'$db_date\'
)info
left join
(
select * from ods.mall__activity_rule where dt=\'$db_date\'
)rule on info.id = rule.activity_id;
"
$hive -e "$sql"
9.2.4 地区维度表(特殊)
- 建表
drop table if exists dwd.mall__dim_base_province
CREATE EXTERNAL TABLE `dwd.mall__dim_base_province`(
`id` string COMMENT \'id\',
`province_name` string COMMENT \'省市名称\',
`area_code` string COMMENT \'地区编码\',
`iso_code` string COMMENT \'ISO 编码\',
`region_id` string COMMENT \'地区 id\',
`region_name` string COMMENT \'地区名称\'
) COMMENT \'地区维度表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/dim_base_province/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_base_province
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_name
from ods.mall__base_province bp
join ods.mall__base_region br
on bp.region_id=br.id;
"
$hive -e "$sql"
9.2.5 时间维度表(特殊)
- 建表
drop table if exists dwd.mall__dim_date_info
CREATE EXTERNAL TABLE `dwd.mall__dim_date_info`(
`date_id` string COMMENT \'日\',
`week_id` int COMMENT \'周\',
`week_day` int COMMENT \'周的第几天\',
`day` int COMMENT \'每月的第几天\',
`month` int COMMENT \'第几月\',
`quarter` int COMMENT \'第几季度\',
`year` int COMMENT \'年\',
`is_workday` int COMMENT \'是否是周末\',
`holiday_id` int COMMENT \'是否是节假日\'
) COMMENT \'时间维度表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/dim_date_info/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_date_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
date_id,
week_id,
week_day,
day,
month,
quarter,
year,
is_workday,
holiday_id
from ods.mall__date_info
"
$hive -e "$sql"
9.2.6 用户维度表(新增及变化-缓慢变化维-拉链表)
9.2.6.1 拉链表介绍
拉链表,记录每条信息的生命周期,一旦一条记录的生命周期结束,就重新开始一条新的记录,并把当前日期放入生效开始日期。
如果当前信息至今有效,在生效结束日期中填入一个极大值(如:9999-99-99),下表为张三的手机号变化例子
| 用户ID | 姓名 | 手机号 | 开始日期 | 结束日期 |
|---|---|---|---|---|
| 1 | 张三 | 134XXXX5050 | 2019-01-01 | 2019-01-02 |
| 1 | 张三 | 139XXXX3232 | 2019-01-03 | 2020-01-01 |
| 1 | 张三 | 137XXXX7676 | 2020-01-02 | 9999-99-99 |
- 适合场景:数据会发生变化,但是大部分不变(即:缓慢变化维)
比如:用户信息发生变化,但是每天变化比例不高,按照每日全量,则效率低
- 如何使用拉链表:通过-->生效开始日期<=某个日期 且 生效结束日期>=某个日期,能够得到某个时间点的数据全量切片。
- 拉链表形成过程
- 制作流程
用户当日全部数据和MySQL中每天变化的数据拼接在一起,形成一个<新的临时拉链表。
用临时拉链表覆盖旧的拉链表数据。
从而解决Hive中数据不能更新的问题
9.2.6.2 用户维度表
用户表中的数据每日既有可能新增,也有可能修改,属于缓慢变化维度,此处采用拉链表存储用户维度数据。
- 建表
drop table if exists dwd.mall__dim_user_info_his
CREATE EXTERNAL TABLE `dwd.mall__dim_user_info_his`(
`id` string COMMENT \'用户 id\',
`name` string COMMENT \'姓名\',
`birthday` string COMMENT \'生日\',
`gender` string COMMENT \'性别\',
`email` string COMMENT \'邮箱\',
`user_level` string COMMENT \'用户等级\',
`create_time` string COMMENT \'创建时间\',
`operate_time` string COMMENT \'操作时间\',
`start_date` string COMMENT \'有效开始日期\',
`end_date` string COMMENT \'有效结束日期\'
) COMMENT \'用户拉链表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/dim_user_info_his/\'
tblproperties ("parquet.compression"="snappy")
- 临时表建表(结构与主表相同)
drop table if exists dwd.mall__dim_user_info_his_tmp
CREATE EXTERNAL TABLE `dwd.mall__dim_user_info_his_tmp`(
`id` string COMMENT \'用户 id\',
`name` string COMMENT \'姓名\',
`birthday` string COMMENT \'生日\',
`gender` string COMMENT \'性别\',
`email` string COMMENT \'邮箱\',
`user_level` string COMMENT \'用户等级\',
`create_time` string COMMENT \'创建时间\',
`operate_time` string COMMENT \'操作时间\',
`start_date` string COMMENT \'有效开始日期\',
`end_date` string COMMENT \'有效结束日期\'
) COMMENT \'用户拉链表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/dim_user_info_his_tmp/\'
tblproperties ("parquet.compression"="snappy")
- 首先(主表)数据初始化,只做一次
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_user_info_his
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
id,
name,
from_unixtime(cast(birthday/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') birthday,
gender,
email,
user_level,
from_unixtime(cast(create_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') create_time,
from_unixtime(cast(operate_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') operate_time,
\'$db_date\',
\'9999-99-99\'
from ods.mall__user_info oi
where oi.dt=\'$db_date\';
"
$hive -e "$sql"
- 临时表数据计算导入(在主表数据之后执行)
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_user_info_his_tmp
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
*
from
( --查询当前时间的所有信息
select
cast(id as string) id,
name,
from_unixtime(cast(birthday/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') birthday,
gender,
email,
user_level,
from_unixtime(cast(create_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') create_time,
from_unixtime(cast(operate_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') operate_time,
\'$db_date\' start_date,
\'9999-99-99\' end_date
from ods.mall__user_info where dt=\'$db_date\'
union all
--查询当前变化了的数据,修改日期
select
uh.id,
uh.name,
from_unixtime(cast(uh.birthday/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') birthday,
uh.gender,
uh.email,
uh.user_level,
from_unixtime(cast(uh.create_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') create_time,
from_unixtime(cast(uh.operate_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') operate_time,
uh.start_date,
if(ui.id is not null and uh.end_date=\'9999-99-99\', date_add(ui.dt,-1),uh.end_date) end_date
from dwd.mall__dim_user_info_his uh left join
(
--查询当前时间的所有信息
select
cast(id as string) id,
name,
from_unixtime(cast(birthday/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') birthday,
gender,
email,
user_level,
from_unixtime(cast(create_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') create_time,
from_unixtime(cast(operate_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') operate_time,
dt
from ods.mall__user_info
where dt=\'$db_date\'
) ui on uh.id=ui.id
)his
order by his.id, start_date;
"
$hive -e "$sql"
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_user_info_his
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select * from dwd.mall__dim_user_info_his_tmp;
"
$hive -e "$sql"
9.2.7 订单详情事实表(事务型快照事实表-新增)
- 建表
drop table if exists dwd.mall__fact_order_detail
CREATE EXTERNAL TABLE `dwd.mall__fact_order_detail`(
`id` bigint COMMENT \'编号\',
`order_id` bigint COMMENT \'订单编号\',
`user_id` bigint COMMENT \'用户id\',
`sku_id` bigint COMMENT \'sku_id\',
`sku_name` string COMMENT \'sku名称\',
`order_price` decimal(10,2) COMMENT \'购买价格(下单时sku价格)\',
`sku_num` string COMMENT \'购买个数\',
`create_time` bigint COMMENT \'创建时间\',
`province_id` string COMMENT \'省份ID\',
`total_amount` decimal(20,2) COMMENT \'订单总金额\'
) COMMENT \'订单明细表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/fact_order_detail/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_order_detail
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
od.id,
od.order_id,
od.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time,
oi.province_id,
od.order_price*od.sku_num
from (select * from ods.mall__order_detail where dt=\'$db_date\' ) od
join (select * from ods.mall__order_info where dt=\'$db_date\' ) oi
on od.order_id=oi.id;
"
$hive -e "$sql"
9.2.7 支付事实表(事务型快照事实表-新增)
- 建表
drop table if exists dwd.mall__fact_payment_info
CREATE EXTERNAL TABLE `dwd.mall__fact_payment_info`(
`id` string COMMENT \'\',
`out_trade_no` string COMMENT \'对外业务编号\',
`order_id` string COMMENT \'订单编号\',
`user_id` string COMMENT \'用户编号\',
`alipay_trade_no` string COMMENT \'支付宝交易流水编号\',
`payment_amount` decimal(16,2) COMMENT \'支付金额\',
`subject` string COMMENT \'交易内容\',
`payment_type` string COMMENT \'支付类型\',
`payment_time` string COMMENT \'支付时间\',
`province_id` string COMMENT \'省份 ID\'
) COMMENT \'支付事实表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/fact_payment_info/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_payment_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
pi.id,
pi.out_trade_no,
pi.order_id,
pi.user_id,
pi.alipay_trade_no,
pi.total_amount,
pi.subject,
pi.payment_type,
from_unixtime(cast(pi.payment_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') payment_time,
oi.province_id
from
(
select * from ods.mall__payment_info where dt=\'$db_date\'
)pi
join
(
select id, province_id from ods.mall__order_info where dt=\'$db_date\'
)oi
on pi.order_id = oi.id;
"
$hive -e "$sql"
9.2.8 退款事实表(事务型快照事实表-新增)
- 建表
drop table if exists dwd.mall__fact_order_refund_info
CREATE EXTERNAL TABLE `dwd.mall__fact_order_refund_info`(
`id` string COMMENT \'编号\',
`user_id` string COMMENT \'用户 ID\',
`order_id` string COMMENT \'订单 ID\',
`sku_id` string COMMENT \'商品 ID\',
`refund_type` string COMMENT \'退款类型\',
`refund_num` bigint COMMENT \'退款件数\',
`refund_amount` decimal(16,2) COMMENT \'退款金额\',
`refund_reason_type` string COMMENT \'退款原因类型\',
`create_time` string COMMENT \'退款时间\'
) COMMENT \'退款事实表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/fact_order_refund_info/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_order_refund_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
from_unixtime(cast(create_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') create_time
from ods.mall__order_refund_info
where dt=\'$db_date\';
"
$hive -e "$sql"
9.2.9 评价事实表(事务型快照事实表-新增)
- 建表
drop table if exists dwd.mall__fact_comment_info
CREATE EXTERNAL TABLE `dwd.mall__fact_comment_info`(
`id` string COMMENT \'编号\',
`user_id` string COMMENT \'用户 ID\',
`sku_id` string COMMENT \'商品 sku\',
`spu_id` string COMMENT \'商品 spu\',
`order_id` string COMMENT \'订单 ID\',
`appraise` string COMMENT \'评价\',
`create_time` string COMMENT \'评价时间\'
) COMMENT \'评价事实表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/fact_comment_info/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_comment_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
from_unixtime(cast(create_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') create_time
from ods.mall__comment_info
where dt=\'$db_date\';
"
$hive -e "$sql"
9.2.10 加购事实表(周期型快照事实表-全量)
- 建表
drop table if exists dwd.mall__fact_cart_info
CREATE EXTERNAL TABLE `dwd.mall__fact_cart_info`(
`id` string COMMENT \'编号\',
`user_id` string COMMENT \'用户 id\',
`sku_id` string COMMENT \'skuid\',
`cart_price` string COMMENT \'放入购物车时价格\',
`sku_num` string COMMENT \'数量\',
`sku_name` string COMMENT \'sku 名称 (冗余)\',
`create_time` string COMMENT \'创建时间\',
`operate_time` string COMMENT \'修改时间\',
`is_ordered` string COMMENT \'是否已经下单。1 为已下单;0 为未下单\',
`order_time` string COMMENT \'下单时间\'
) COMMENT \'加购事实表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/fact_cart_info/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_cart_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
from_unixtime(cast(create_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') create_time,
from_unixtime(cast(operate_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') operate_time,
is_ordered,
from_unixtime(cast(order_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') order_time
from ods.mall__cart_info
where dt=\'$db_date\';
"
$hive -e "$sql"
9.2.11 收藏事实表(周期型快照事实表-全量)
- 建表
drop table if exists dwd.mall__fact_favor_info
CREATE EXTERNAL TABLE `dwd.mall__fact_favor_info`(
`id` string COMMENT \'编号\',
`user_id` string COMMENT \'用户 id\',
`sku_id` string COMMENT \'skuid\',
`spu_id` string COMMENT \'spuid\',
`is_cancel` string COMMENT \'是否取消\',
`create_time` string COMMENT \'收藏时间\',
`cancel_time` string COMMENT \'取消时间\'
) COMMENT \'收藏事实表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/fact_favor_info/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_favor_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
id,
user_id,
sku_id,
spu_id,
is_cancel,
from_unixtime(cast(create_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') create_time,
from_unixtime(cast(cancel_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\') cancel_time
from ods.mall__favor_info
where dt=\'$db_date\';
"
$hive -e "$sql"
9.2.12 优惠券领用事实表(累积型快照事实表-新增及变化)
- 建表
drop table if exists dwd.mall__fact_coupon_use
CREATE EXTERNAL TABLE `dwd.mall__fact_coupon_use`(
`` string COMMENT \'编号\',
`coupon_id` string COMMENT \'优惠券 ID\',
`user_id` string COMMENT \'userid\',
`order_id` string COMMENT \'订单 id\',
`coupon_status` string COMMENT \'优惠券状态\',
`get_time` string COMMENT \'领取时间\',
`using_time` string COMMENT \'使用时间(下单)\',
`used_time` string COMMENT \'使用时间(支付)\'
) COMMENT \'优惠券领用事实表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/fact_coupon_use/\'
tblproperties ("parquet.compression"="snappy")
dt 是按照优惠卷领用时间 get_time 做为分区。
get_time 为领用时间,领用过后数据就需要存在,然后在下单和支付的时候叠加更新时间
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_coupon_use
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
if(new.id is null,old.id,new.id) id,
if(new.coupon_id is null,old.coupon_id,new.coupon_id) coupon_id,
if(new.user_id is null,old.user_id,new.user_id) user_id,
if(new.order_id is null,old.order_id,new.order_id) order_id,
if(new.coupon_status is null,old.coupon_status,new.coupon_status) coupon_status,
from_unixtime(cast(if(new.get_time is null,old.get_time,new.get_time)/1000 as bigint),\'yyyy-MM-dd\') get_time,
from_unixtime(cast(if(new.using_time is null,old.using_time,new.using_time)/1000 as bigint),\'yyyy-MM-dd\') using_time,
from_unixtime(cast(if(new.used_time is null,old.used_time,new.used_time)/1000 as bigint),\'yyyy-MM-dd\'),
from_unixtime(cast(if(new.get_time is null,old.get_time,new.get_time)/1000 as bigint),\'yyyy-MM-dd\')
from
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from dwd.mall__fact_coupon_use
where dt in
(
select
from_unixtime(cast(get_time/1000 as bigint),\'yyyy-MM-dd\')
from ods.mall__coupon_use
where dt=\'$db_date\'
)
)old
full outer join
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ods.mall__coupon_use
where dt=\'$db_date\'
)new
on old.id=new.id;
"
$hive -e "$sql"
9.2.13 订单事实表(累积型快照事实表-新增及变化)
- 建表
drop table if exists dwd.mall__fact_order_info
CREATE EXTERNAL TABLE `dwd.mall__fact_order_info`(
`id` string COMMENT \'订单编号\',
`order_status` string COMMENT \'订单状态\',
`user_id` string COMMENT \'用户 id\',
`out_trade_no` string COMMENT \'支付流水号\',
`create_time` string COMMENT \'创建时间(未支付状态)\',
`payment_time` string COMMENT \'支付时间(已支付状态)\',
`cancel_time` string COMMENT \'取消时间(已取消状态)\',
`finish_time` string COMMENT \'完成时间(已完成状态)\',
`refund_time` string COMMENT \'退款时间(退款中状态)\',
`refund_finish_time` string COMMENT \'退款完成时间(退款完成状态)\',
`province_id` string COMMENT \'省份 ID\',
`activity_id` string COMMENT \'活动 ID\',
`original_total_amount` string COMMENT \'原价金额\',
`benefit_reduce_amount` string COMMENT \'优惠金额\',
`feight_fee` string COMMENT \'运费\',
`final_total_amount` decimal(10,2) COMMENT \'订单金额\'
) COMMENT \'订单事实表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwd/mall/fact_order_info/\'
tblproperties ("parquet.compression"="snappy")
- 数据导入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_order_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
if(new.id is null,old.id,new.id),
if(new.order_status is null,old.order_status,new.order_status),
if(new.user_id is null,old.user_id,new.user_id),
if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),
if(new.tms[\'1001\'] is null,from_unixtime(cast(old.create_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\'),new.tms[\'1001\']),--1001 对应未支付状态
if(new.tms[\'1002\'] is null,from_unixtime(cast(old.payment_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\'),new.tms[\'1002\']),
if(new.tms[\'1003\'] is null,from_unixtime(cast(old.cancel_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\'),new.tms[\'1003\']),
if(new.tms[\'1004\'] is null,from_unixtime(cast(old.finish_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\'),new.tms[\'1004\']),
if(new.tms[\'1005\'] is null,from_unixtime(cast(old.refund_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\'),new.tms[\'1005\']),
if(new.tms[\'1006\'] is null,from_unixtime(cast(old.refund_finish_time/1000 as bigint),\'yyyy-MM-dd HH:mm:ss\'),new.tms[\'1006\']),
if(new.province_id is null,old.province_id,new.province_id),
if(new.activity_id is null,old.activity_id,new.activity_id),
if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),
if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),
if(new.feight_fee is null,old.feight_fee,new.feight_fee),
if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount)
from
(
select
id,
order_status,
user_id,
out_trade_no,
create_time,
payment_time,
cancel_time,
finish_time,
refund_time,
refund_finish_time,
province_id,
activity_id,
original_total_amount,
benefit_reduce_amount,
feight_fee,
final_total_amount
from dwd.mall__fact_order_info
where dt in
(
select
from_unixtime(cast(create_time/1000 as bigint),\'yyyy-MM-dd\')
from ods.mall__order_info
where dt=\'$db_date\'
)
)old
full outer join
(
select
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
from
(
select
order_id,
str_to_map(concat_ws(\',\',collect_set(concat(order_status,\'=\',from_unixtime(cast(operate_time/1000 as bigint),\'yyyy-MM-dd\')))),\',\',\'=\')
tms
from ods.mall__order_status_log
where dt=\'$db_date\'
group by order_id
)log
join
(
select * from ods.mall__order_info where dt=\'$db_date\'
)info
on log.order_id=info.id
left join
(
select * from ods.mall__activity_order where dt=\'$db_date\'
)act
on log.order_id=act.order_id
)new
on old.id=new.id;
"
$hive -e "$sql"
10 DWS层构建
不在进行压缩处理,因为压缩对于硬盘是好的,但是对于CPU计算是差的,对于DWS层的表,会被经常使用,那么讲究的是计算效率,此层主要处理每日主题行为
10.1 每日设备行为(用户行为)
- 建表
drop table if exists dws.mall__uv_detail_daycount
CREATE EXTERNAL TABLE `dws.mall__uv_detail_daycount`(
`mid_id` string COMMENT \'设备唯一标识\',
`user_id` string COMMENT \'用户标识\',
`version_code` string COMMENT \'程序版本号\',
`version_name` string COMMENT \'程序版本名\',
`lang` string COMMENT \'系统语言\',
`source` string COMMENT \'渠道号\',
`os` string COMMENT \'安卓系统版本\',
`area` string COMMENT \'区域\',
`model` string COMMENT \'手机型号\',
`brand` string COMMENT \'手机品牌\',
`sdk_version` string COMMENT \'sdkVersion\',
`gmail` string COMMENT \'gmail\',
`height_width` string COMMENT \'屏幕宽高\',
`app_time` string COMMENT \'客户端日志产生时的时间\',
`network` string COMMENT \'网络模式\',
`lng` string COMMENT \'经度\',
`lat` string COMMENT \'纬度\',
`login_count` bigint COMMENT \'活跃次数\'
) COMMENT \'每日设备行为表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dws/mall/uv_detail_daycount/\'
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=uv_detail_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt=\'$db_date\')
select
mid_id,
concat_ws(\'|\', collect_set(user_id)) user_id,
concat_ws(\'|\', collect_set(version_code)) version_code,
concat_ws(\'|\', collect_set(version_name)) version_name,
concat_ws(\'|\', collect_set(lang))lang,
concat_ws(\'|\', collect_set(source)) source,
concat_ws(\'|\', collect_set(os)) os,
concat_ws(\'|\', collect_set(area)) area,
concat_ws(\'|\', collect_set(model)) model,
concat_ws(\'|\', collect_set(brand)) brand,
concat_ws(\'|\', collect_set(sdk_version)) sdk_version,
concat_ws(\'|\', collect_set(gmail)) gmail,
concat_ws(\'|\', collect_set(height_width)) height_width,
concat_ws(\'|\', collect_set(app_time)) app_time,
concat_ws(\'|\', collect_set(network)) network,
concat_ws(\'|\', collect_set(lng)) lng,
concat_ws(\'|\', collect_set(lat)) lat,
count(*) login_count
from dwd.mall__start_log
where dt=\'$db_date\'
group by mid_id;
"
$hive -e "$sql"
10.2 每日会员行为(业务)
- 建表
drop table if exists dws.mall__user_action_daycount
CREATE EXTERNAL TABLE `dws.mall__user_action_daycount`(
user_id string comment \'用户 id\',
login_count bigint comment \'登录次数\',
cart_count bigint comment \'加入购物车次数\',
cart_amount double comment \'加入购物车金额\',
order_count bigint comment \'下单次数\',
order_amount decimal(16,2) comment \'下单金额\',
payment_count bigint comment \'支付次数\',
payment_amount decimal(16,2) comment \'支付金额\'
) COMMENT \'每日会员行为表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dws/mall/user_action_daycount/\'
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=user_action_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
with
tmp_login as
(
select
user_id,
count(*) login_count
from dwd.mall__start_log
where dt=\'$db_date\'
and user_id is not null
group by user_id
),
tmp_cart as
(
select
user_id,
count(*) cart_count,
sum(cart_price*sku_num) cart_amount
from dwd.mall__fact_cart_info
where dt=\'$db_date\'
and user_id is not null
and date_format(create_time,\'yyyy-MM-dd\')=\'$db_date\'
group by user_id
),
tmp_order as
(
select
user_id,
count(*) order_count,
sum(final_total_amount) order_amount
from dwd.mall__fact_order_info
where dt=\'$db_date\'
group by user_id
) ,
tmp_payment as
(
select
user_id,
count(*) payment_count,
sum(payment_amount) payment_amount
from dwd.mall__fact_payment_info
where dt=\'$db_date\'
group by user_id
)
insert overwrite table $hive_table_name partition(dt=\'$db_date\')
select
user_actions.user_id,
sum(user_actions.login_count),
sum(user_actions.cart_count),
sum(user_actions.cart_amount),
sum(user_actions.order_count),
sum(user_actions.order_amount),
sum(user_actions.payment_count),
sum(user_actions.payment_amount)
from
(
select
user_id,
login_count,
0 cart_count,
0 cart_amount,
0 order_count,
0 order_amount,
0 payment_count,
0 payment_amount
from
tmp_login
union all
select
user_id,
0 login_count,
cart_count,
cart_amount,
0 order_count,
0 order_amount,
0 payment_count,
0 payment_amount
from
tmp_cart
union all
select
user_id,
0 login_count,
0 cart_count,
0 cart_amount,
order_count,
order_amount,
0 payment_count,
0 payment_amount
from tmp_order
union all
select
user_id,
0 login_count,
0 cart_count,
0 cart_amount,
0 order_count,
0 order_amount,
payment_count,
payment_amount
from tmp_payment
) user_actions
group by user_id;
"
$hive -e "$sql"
10.3 每日商品行为(业务)
- 建表
drop table if exists dws.mall__sku_action_daycount
CREATE EXTERNAL TABLE `dws.mall__sku_action_daycount`(
sku_id string comment \'sku_id\',
order_count bigint comment \'被下单次数\',
order_num bigint comment \'被下单件数\',
order_amount decimal(16,2) comment \'被下单金额\',
payment_count bigint comment \'被支付次数\',
payment_num bigint comment \'被支付件数\',
payment_amount decimal(16,2) comment \'被支付金额\',
refund_count bigint comment \'被退款次数\',
refund_num bigint comment \'被退款件数\',
refund_amount decimal(16,2) comment \'被退款金额\',
cart_count bigint comment \'被加入购物车次数\',
cart_num bigint comment \'被加入购物车件数\',
favor_count bigint comment \'被收藏次数\',
appraise_good_count bigint comment \'好评数\',
appraise_mid_count bigint comment \'中评数\',
appraise_bad_count bigint comment \'差评数\',
appraise_default_count bigint comment \'默认评价数\'
) COMMENT \'每日商品行为表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dws/mall/sku_action_daycount/\'
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=sku_action_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
with
tmp_order as
(
select
cast(sku_id as string) sku_id,
count(*) order_count,
sum(sku_num) order_num,
sum(total_amount) order_amount
from dwd.mall__fact_order_detail
where dt=\'$db_date\'
group by sku_id
),
tmp_payment as
(
select
cast(sku_id as string) sku_id,
count(*) payment_count,
sum(sku_num) payment_num,
sum(total_amount) payment_amount
from dwd.mall__fact_order_detail
where dt=\'$db_date\'
and order_id in
(
select
id
from dwd.mall__fact_order_info
where (dt=\'$db_date\' or dt=date_add(\'$db_date\',-1))
and date_format(payment_time,\'yyyy-MM-dd\')=\'$db_date\'
)
group by sku_id
),
tmp_refund as
(
select
cast(sku_id as string) sku_id,
count(*) refund_count,
sum(refund_num) refund_num,
sum(refund_amount) refund_amount
from dwd.mall__fact_order_refund_info
where dt=\'$db_date\'
group by sku_id
),
tmp_cart as
(
select
cast(sku_id as string) sku_id,
count(*) cart_count,
sum(sku_num) cart_num
from dwd.mall__fact_cart_info
where dt=\'$db_date\'
and date_format(create_time,\'yyyy-MM-dd\')=\'$db_date\'
group by sku_id
),
tmp_favor as
(
select
cast(sku_id as string) sku_id,
count(*) favor_count
from dwd.mall__fact_favor_info
where dt=\'$db_date\'
and date_format(create_time,\'yyyy-MM-dd\')=\'$db_date\'
group by sku_id
),
tmp_appraise as
(
select
cast(sku_id as string) sku_id,
sum(if(appraise=\'1201\',1,0)) appraise_good_count,
sum(if(appraise=\'1202\',1,0)) appraise_mid_count,
sum(if(appraise=\'1203\',1,0)) appraise_bad_count,
sum(if(appraise=\'1204\',1,0)) appraise_default_count
from dwd.mall__fact_comment_info
where dt=\'$db_date\'
group by sku_id
)
insert overwrite table $hive_table_name partition(dt=\'$db_date\')
select
sku_id,
sum(order_count),
sum(order_num),
sum(order_amount),
sum(payment_count),
sum(payment_num),
sum(payment_amount),
sum(refund_count),
sum(refund_num),
sum(refund_amount),
sum(cart_count),
sum(cart_num),
sum(favor_count),
sum(appraise_good_count),
sum(appraise_mid_count),
sum(appraise_bad_count),
sum(appraise_default_count)
from
(
select
sku_id,
order_count,
order_num,
order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_order
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
payment_count,
payment_num,
payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_payment
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
refund_count,
refund_num,
refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_refund
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
cart_count,
cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_cart
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_favor
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from tmp_appraise
)tmp
group by sku_id;
"
$hive -e "$sql"
10.4 每日优惠券统计(业务)
- 建表
drop table if exists dws.mall__coupon_use_daycount
CREATE EXTERNAL TABLE `dws.mall__coupon_use_daycount`(
`coupon_id` string COMMENT \'优惠券 ID\',
`coupon_name` string COMMENT \'购物券名称\',
`coupon_type` string COMMENT \'购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券\',
`condition_amount` string COMMENT \'满额数\',
`condition_num` string COMMENT \'满件数\',
`activity_id` string COMMENT \'活动编号\',
`benefit_amount` string COMMENT \'减金额\',
`benefit_discount` string COMMENT \'折扣\',
`create_time` string COMMENT \'创建时间\',
`range_type` string COMMENT \'范围类型 1、商品 2、品类 3、品牌\',
`spu_id` string COMMENT \'商品 id\',
`tm_id` string COMMENT \'品牌 id\',
`category3_id` string COMMENT \'品类 id\',
`limit_num` string COMMENT \'最多领用次数\',
`get_count` bigint COMMENT \'领用次数\',
`using_count` bigint COMMENT \'使用(下单)次数\',
`used_count` bigint COMMENT \'使用(支付)次数\'
) COMMENT \'每日优惠券统计表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dws/mall/coupon_use_daycount/\'
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=coupon_use_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name partition(dt=\'$db_date\')
select
cu.coupon_id,
ci.coupon_name,
ci.coupon_type,
ci.condition_amount,
ci.condition_num,
ci.activity_id,
ci.benefit_amount,
ci.benefit_discount,
ci.create_time,
ci.range_type,
ci.spu_id,
ci.tm_id,
ci.category3_id,
ci.limit_num,
cu.get_count,
cu.using_count,
cu.used_count
from
(
select
coupon_id,
sum(if(date_format(get_time,\'yyyy-MM-dd\')=\'$db_date\',1,0))
get_count,
sum(if(date_format(using_time,\'yyyy-MM-dd\')=\'$db_date\',1,0))
using_count,
sum(if(date_format(used_time,\'yyyy-MM-dd\')=\'$db_date\',1,0))
used_count
from dwd.mall__fact_coupon_use
where dt=\'$db_date\'
group by coupon_id
)cu
left join
(
select
*
from dwd.mall__dim_coupon_info
where dt=\'$db_date\'
)ci on cu.coupon_id=ci.id;
"
$hive -e "$sql"
10.5 每日活动统计(业务)
- 建表
drop table if exists dws.mall__activity_info_daycount
CREATE EXTERNAL TABLE `dws.mall__activity_info_daycount`(
`id` string COMMENT \'编号\',
`activity_name` string COMMENT \'活动名称\',
`activity_type` string COMMENT \'活动类型\',
`start_time` string COMMENT \'开始时间\',
`end_time` string COMMENT \'结束时间\',
`create_time` string COMMENT \'创建时间\',
`order_count` bigint COMMENT \'下单次数\',
`payment_count` bigint COMMENT \'支付次数\'
) COMMENT \'每日活动统计表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dws/mall/activity_info_daycount/\'
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=activity_info_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name partition(dt=\'$db_date\')
select
oi.activity_id,
ai.activity_name,
ai.activity_type,
ai.start_time,
ai.end_time,
ai.create_time,
oi.order_count,
oi.payment_count
from
(
select
activity_id,
sum(if(date_format(create_time,\'yyyy-MM-dd\')=\'$db_date\',1,0))
order_count,
sum(if(date_format(payment_time,\'yyyy-MM-dd\')=\'$db_date\',1,0))
payment_count
from dwd.mall__fact_order_info
where (dt=\'$db_date\' or dt=date_add(\'$db_date\',-1))
and activity_id is not null
group by activity_id
)oi
join
(
select
*
from dwd.mall__dim_activity_info
where dt=\'$db_date\'
)ai
on oi.activity_id=ai.id;
"
$hive -e "$sql"
10.6 每日购买行为(业务)
- 建表
drop table if exists dws.mall__sale_detail_daycount
CREATE EXTERNAL TABLE `dws.mall__sale_detail_daycount`(
user_id string comment \'用户 id\',
sku_id string comment \'商品 id\',
user_gender string comment \'用户性别\',
user_age string comment \'用户年龄\',
user_level string comment \'用户等级\',
order_price decimal(10,2) comment \'商品价格\',
sku_name string comment \'商品名称\',
sku_tm_id string comment \'品牌 id\',
sku_category3_id string comment \'商品三级品类 id\',
sku_category2_id string comment \'商品二级品类 id\',
sku_category1_id string comment \'商品一级品类 id\',
sku_category3_name string comment \'商品三级品类名称\',
sku_category2_name string comment \'商品二级品类名称\',
sku_category1_name string comment \'商品一级品类名称\',
spu_id string comment \'商品 spu\',
sku_num int comment \'购买个数\',
order_count bigint comment \'当日下单单数\',
order_amount decimal(16,2) comment \'当日下单金额\'
) COMMENT \'每日购买行为表\'
PARTITIONED BY (
`dt` String COMMENT \'partition\'
)
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dws/mall/sale_detail_daycount/\'
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=sale_detail_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name partition(dt=\'$db_date\')
select
op.user_id,
op.sku_id,
ui.gender,
months_between(\'$db_date\', ui.birthday)/12 age,
ui.user_level,
si.price,
si.sku_name,
si.tm_id,
si.category3_id,
si.category2_id,
si.category1_id,
si.category3_name,
si.category2_name,
si.category1_name,
si.spu_id,
op.sku_num,
op.order_count,
op.order_amount
from
(
select
user_id,
sku_id,
sum(sku_num) sku_num,
count(*) order_count,
sum(total_amount) order_amount
from dwd.mall__fact_order_detail
where dt=\'$db_date\'
group by user_id, sku_id
)op
join
(
select
*
from dwd.mall__dim_user_info_his
where end_date=\'9999-99-99\'
)ui on op.user_id = ui.id
join
(
select
*
from dwd.mall__dim_sku_info
where dt=\'$db_date\'
)si on op.sku_id = si.id;
"
$hive -e "$sql"
11 DWT层构建
此层主要针对dws层每日数据进行汇总,不建立分区,不压缩,每日进行数据覆盖
11.1 设备主题宽表
- 建表
drop table if exists dwt.mall__uv_topic
CREATE EXTERNAL TABLE `dwt.mall__uv_topic`(
`mid_id` string COMMENT \'设备唯一标识\',
`user_id` string COMMENT \'用户标识\',
`version_code` string COMMENT \'程序版本号\',
`version_name` string COMMENT \'程序版本名\',
`lang` string COMMENT \'系统语言\',
`source` string COMMENT \'渠道号\',
`os` string COMMENT \'安卓系统版本\',
`area` string COMMENT \'区域\',
`model` string COMMENT \'手机型号\',
`brand` string COMMENT \'手机品牌\',
`sdk_version` string COMMENT \'sdkVersion\',
`gmail` string COMMENT \'gmail\',
`height_width` string COMMENT \'屏幕宽高\',
`app_time` string COMMENT \'客户端日志产生时的时间\',
`network` string COMMENT \'网络模式\',
`lng` string COMMENT \'经度\',
`lat` string COMMENT \'纬度\',
`login_date_first` string comment \'首次活跃时间\',
`login_date_last` string comment \'末次活跃时间\',
`login_day_count` bigint comment \'当日活跃次数\',
`login_count` bigint comment \'累积活跃天数\'
) COMMENT \'设备主题宽表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwt/mall/uv_topic/\'
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=uv_topic
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.mid_id,old.mid_id),
nvl(new.user_id,old.user_id),
nvl(new.version_code,old.version_code),
nvl(new.version_name,old.version_name),
nvl(new.lang,old.lang),
nvl(new.source,old.source),
nvl(new.os,old.os),
nvl(new.area,old.area),
nvl(new.model,old.model),
nvl(new.brand,old.brand),
nvl(new.sdk_version,old.sdk_version),
nvl(new.gmail,old.gmail),
nvl(new.height_width,old.height_width),
nvl(new.app_time,old.app_time),
nvl(new.network,old.network),
nvl(new.lng,old.lng),
nvl(new.lat,old.lat),
if(old.mid_id is null,\'2020-03-10\',old.login_date_first),
if(new.mid_id is not null,\'2020-03-10\',old.login_date_last),
if(new.mid_id is not null, new.login_count,0),
nvl(old.login_count,0)+if(new.login_count>0,1,0)
from
(
select
*
from dwt.mall__uv_topic
)old
full outer join
(
select
*
from dws.mall__uv_detail_daycount
where dt=\'$db_date\'
)new
on old.mid_id=new.mid_id;
"
$hive -e "$sql"
11.2 会员主题宽表
- 建表
drop table if exists dwt.mall__user_topic
CREATE EXTERNAL TABLE `dwt.mall__user_topic`(
user_id string comment \'用户 id\',
login_date_first string comment \'首次登录时间\',
login_date_last string comment \'末次登录时间\',
login_count bigint comment \'累积登录天数\',
login_last_30d_count bigint comment \'最近 30 日登录天数\',
order_date_first string comment \'首次下单时间\',
order_date_last string comment \'末次下单时间\',
order_count bigint comment \'累积下单次数\',
order_amount decimal(16,2) comment \'累积下单金额\',
order_last_30d_count bigint comment \'最近 30 日下单次数\',
order_last_30d_amount bigint comment \'最近 30 日下单金额\',
payment_date_first string comment \'首次支付时间\',
payment_date_last string comment \'末次支付时间\',
payment_count decimal(16,2) comment \'累积支付次数\',
payment_amount decimal(16,2) comment \'累积支付金额\',
payment_last_30d_count decimal(16,2) comment \'最近 30 日支付次数\',
payment_last_30d_amount decimal(16,2) comment \'最近 30 日支付金额\'
) COMMENT \'会员主题宽表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwt/mall/user_topic/\'
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=user_topic
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.user_id,old.user_id),
if(old.login_date_first is null and
new.login_count>0,\'$db_date\',old.login_date_first),
if(new.login_count>0,\'$db_date\',old.login_date_last),
nvl(old.login_count,0)+if(new.login_count>0,1,0),
nvl(new.login_last_30d_count,0),
if(old.order_date_first is null and
new.order_count>0,\'$db_date\',old.order_date_first),
if(new.order_count>0,\'$db_date\',old.order_date_last),
nvl(old.order_count,0)+nvl(new.order_count,0),
nvl(old.order_amount,0)+nvl(new.order_amount,0),
nvl(new.order_last_30d_count,0),
nvl(new.order_last_30d_amount,0),
if(old.payment_date_first is null and
new.payment_count>0,\'$db_date\',old.payment_date_first),
if(new.payment_count>0,\'$db_date\',old.payment_date_last),
nvl(old.payment_count,0)+nvl(new.payment_count,0),
nvl(old.payment_amount,0)+nvl(new.payment_amount,0),
nvl(new.payment_last_30d_count,0),
nvl(new.payment_last_30d_amount,0)
from
dwt.mall__user_topic old
full outer join
(
select
user_id,
sum(if(dt=\'$db_date\',login_count,0)) login_count,
sum(if(dt=\'$db_date\',order_count,0)) order_count,
sum(if(dt=\'$db_date\',order_amount,0)) order_amount,
sum(if(dt=\'$db_date\',payment_count,0)) payment_count,
sum(if(dt=\'$db_date\',payment_amount,0)) payment_amount,
sum(if(login_count>0,1,0)) login_last_30d_count,
sum(order_count) order_last_30d_count,
sum(order_amount) order_last_30d_amount,
sum(payment_count) payment_last_30d_count,
sum(payment_amount) payment_last_30d_amount
from dws.mall__user_action_daycount
where dt>=date_add( \'$db_date\',-30)
group by user_id
)new
on old.user_id=new.user_id;
"
$hive -e "$sql"
11.3 商品主题宽表
- 建表
drop table if exists dwt.mall__sku_topic
CREATE EXTERNAL TABLE `dwt.mall__sku_topic`(
sku_id string comment \'sku_id\',
spu_id string comment \'spu_id\',
order_last_30d_count bigint comment \'最近 30 日被下单次数\',
order_last_30d_num bigint comment \'最近 30 日被下单件数\',
order_last_30d_amount decimal(16,2) comment \'最近 30 日被下单金额\',
order_count bigint comment \'累积被下单次数\',
order_num bigint comment \'累积被下单件数\',
order_amount decimal(16,2) comment \'累积被下单金额\',
payment_last_30d_count bigint comment \'最近 30 日被支付次数\',
payment_last_30d_num bigint comment \'最近 30 日被支付件数\',
payment_last_30d_amount decimal(16,2) comment \'最近 30 日被支付金额\',
payment_count bigint comment \'累积被支付次数\',
payment_num bigint comment \'累积被支付件数\',
payment_amount decimal(16,2) comment \'累积被支付金额\',
refund_last_30d_count bigint comment \'最近三十日退款次数\',
refund_last_30d_num bigint comment \'最近三十日退款件数\',
refund_last_30d_amount decimal(10,2) comment \'最近三十日退款金额\',
refund_count bigint comment \'累积退款次数\',
refund_num bigint comment \'累积退款件数\',
refund_amount decimal(10,2) comment \'累积退款金额\',
cart_last_30d_count bigint comment \'最近 30 日被加入购物车次数\',
cart_last_30d_num bigint comment \'最近 30 日被加入购物车件数\',
cart_count bigint comment \'累积被加入购物车次数\',
cart_num bigint comment \'累积被加入购物车件数\',
favor_last_30d_count bigint comment \'最近 30 日被收藏次数\',
favor_count bigint comment \'累积被收藏次数\',
appraise_last_30d_good_count bigint comment \'最近 30 日好评数\',
appraise_last_30d_mid_count bigint comment \'最近 30 日中评数\',
appraise_last_30d_bad_count bigint comment \'最近 30 日差评数\',
appraise_last_30d_default_count bigint comment \'最近 30 日默认评价数\',
appraise_good_count bigint comment \'累积好评数\',
appraise_mid_count bigint comment \'累积中评数\',
appraise_bad_count bigint comment \'累积差评数\',
appraise_default_count bigint comment \'累积默认评价数\'
) COMMENT \'商品主题宽表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwt/mall/sku_topic/\'
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=sku_topic
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.sku_id,old.sku_id), sku_info.spu_id,
nvl(new.order_count30,0),
nvl(new.order_num30,0),
nvl(new.order_amount30,0),
nvl(old.order_count,0) + nvl(new.order_count,0),
nvl(old.order_num,0) + nvl(new.order_num,0),
nvl(old.order_amount,0) + nvl(new.order_amount,0),
nvl(new.payment_count30,0),
nvl(new.payment_num30,0),
nvl(new.payment_amount30,0),
nvl(old.payment_count,0) + nvl(new.payment_count,0),
nvl(old.payment_num,0) + nvl(new.payment_count,0),
nvl(old.payment_amount,0) + nvl(new.payment_count,0),
nvl(new.refund_count30,0),
nvl(new.refund_num30,0),
nvl(new.refund_amount30,0),
nvl(old.refund_count,0) + nvl(new.refund_count,0),
nvl(old.refund_num,0) + nvl(new.refund_num,0),
nvl(old.refund_amount,0) + nvl(new.refund_amount,0),
nvl(new.cart_count30,0),
nvl(new.cart_num30,0),
nvl(old.cart_count,0) + nvl(new.cart_count,0),
nvl(old.cart_num,0) + nvl(new.cart_num,0),
nvl(new.favor_count30,0),
nvl(old.favor_count,0) + nvl(new.favor_count,0),
nvl(new.appraise_good_count30,0),
nvl(new.appraise_mid_count30,0),
nvl(new.appraise_bad_count30,0),
nvl(new.appraise_default_count30,0) ,
nvl(old.appraise_good_count,0) + nvl(new.appraise_good_count,0),
nvl(old.appraise_mid_count,0) + nvl(new.appraise_mid_count,0),
nvl(old.appraise_bad_count,0) + nvl(new.appraise_bad_count,0),
nvl(old.appraise_default_count,0) + nvl(new.appraise_default_count,0)
from
(
select
sku_id,
spu_id,
order_last_30d_count,
order_last_30d_num,
order_last_30d_amount,
order_count,
order_num,
order_amount ,
payment_last_30d_count,
payment_last_30d_num,
payment_last_30d_amount,
payment_count,
payment_num,
payment_amount,
refund_last_30d_count,
refund_last_30d_num,
refund_last_30d_amount,
refund_count,
refund_num,
refund_amount,
cart_last_30d_count,
cart_last_30d_num,
cart_count,
cart_num,
favor_last_30d_count,
favor_count,
appraise_last_30d_good_count,
appraise_last_30d_mid_count,
appraise_last_30d_bad_count,
appraise_last_30d_default_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from dwt.mall__sku_topic
)old
full outer join
(
select
sku_id,
sum(if(dt=\'$db_date\', order_count,0 )) order_count,
sum(if(dt=\'$db_date\',order_num ,0 )) order_num,
sum(if(dt=\'$db_date\',order_amount,0 )) order_amount ,
sum(if(dt=\'$db_date\',payment_count,0 )) payment_count,
sum(if(dt=\'$db_date\',payment_num,0 )) payment_num,
sum(if(dt=\'$db_date\',payment_amount,0 )) payment_amount,
sum(if(dt=\'$db_date\',refund_count,0 )) refund_count,
sum(if(dt=\'$db_date\',refund_num,0 )) refund_num,
sum(if(dt=\'$db_date\',refund_amount,0 )) refund_amount,
sum(if(dt=\'$db_date\',cart_count,0 )) cart_count,
sum(if(dt=\'$db_date\',cart_num,0 )) cart_num,
sum(if(dt=\'$db_date\',favor_count,0 )) favor_count,
sum(if(dt=\'$db_date\',appraise_good_count,0 )) appraise_good_count,
sum(if(dt=\'$db_date\',appraise_mid_count,0 ) ) appraise_mid_count ,
sum(if(dt=\'$db_date\',appraise_bad_count,0 )) appraise_bad_count,
sum(if(dt=\'$db_date\',appraise_default_count,0 )) appraise_default_count,
sum(order_count) order_count30 ,
sum(order_num) order_num30,
sum(order_amount) order_amount30,
sum(payment_count) payment_count30,
sum(payment_num) payment_num30,
sum(payment_amount) payment_amount30,
sum(refund_count) refund_count30,
sum(refund_num) refund_num30,
sum(refund_amount) refund_amount30,
sum(cart_count) cart_count30,
sum(cart_num) cart_num30,
sum(favor_count) favor_count30,
sum(appraise_good_count) appraise_good_count30,
sum(appraise_mid_count) appraise_mid_count30,
sum(appraise_bad_count) appraise_bad_count30,
sum(appraise_default_count) appraise_default_count30
from dws.mall__sku_action_daycount
where dt >= date_add (\'$db_date\', -30)
group by sku_id
)new
on new.sku_id = old.sku_id
left join
(
select * from dwd.mall__dim_sku_info where dt=\'$db_date\'
) sku_info
on nvl(new.sku_id,old.sku_id)= sku_info.id;
"
$hive -e "$sql"
11.4 优惠卷主题宽表
- 建表
drop table if exists dwt.mall__coupon_topic
CREATE EXTERNAL TABLE `dwt.mall__coupon_topic`(
`coupon_id` string COMMENT \'优惠券 ID\',
`get_day_count` bigint COMMENT \'当日领用次数\',
`using_day_count` bigint COMMENT \'当日使用(下单)次数\',
`used_day_count` bigint COMMENT \'当日使用(支付)次数\',
`get_count` bigint COMMENT \'累积领用次数\',
`using_count` bigint COMMENT \'累积使用(下单)次数\',
`used_count` bigint COMMENT \'累积使用(支付)次数\'
) COMMENT \'优惠券主题宽表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwt/mall/coupon_topic/\'
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=coupon_topic
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.coupon_id,old.coupon_id),
nvl(new.get_count,0),
nvl(new.using_count,0),
nvl(new.used_count,0),
nvl(old.get_count,0)+nvl(new.get_count,0),
nvl(old.using_count,0)+nvl(new.using_count,0),
nvl(old.used_count,0)+nvl(new.used_count,0)
from
(
select
*
from dwt.mall__coupon_topic
)old
full outer join
(
select
coupon_id,
get_count,
using_count,
used_count
from dws.mall__coupon_use_daycount
where dt=\'$db_date\'
)new
on old.coupon_id=new.coupon_id;
"
$hive -e "$sql"
11.5 活动主题宽表
- 建表
drop table if exists dwt.mall__activity_topic
CREATE EXTERNAL TABLE `dwt.mall__activity_topic`(
`id` string COMMENT \'活动 id\',
`activity_name` string COMMENT \'活动名称\',
`order_day_count` bigint COMMENT \'当日日下单次数\',
`payment_day_count` bigint COMMENT \'当日支付次数\',
`order_count` bigint COMMENT \'累积下单次数\',
`payment_count` bigint COMMENT \'累积支付次数\'
) COMMENT \'活动主题宽表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/dwt/mall/activity_topic/\'
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=activity_topic
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.id,old.id),
nvl(new.activity_name,old.activity_name),
nvl(new.order_count,0),
nvl(new.payment_count,0),
nvl(old.order_count,0)+nvl(new.order_count,0),
nvl(old.payment_count,0)+nvl(new.payment_count,0)
from
(
select
*
from dwt.mall__activity_topic
)old
full outer join
(
select
id,
activity_name,
order_count,
payment_count
from dws.mall__activity_info_daycount
where dt=\'$db_date\'
)new
on old.id=new.id;
"
$hive -e "$sql"
12 ADS层构建
此层为最终数据需求层,考虑数据导出和数据数量决定是否需要压缩,不需要分区,每天刷写
12.1 设备主题
12.1.1 活跃设备数(日、周、月)
日活:当日活跃的设备数
周活:当周活跃的设备数
月活:当月活跃的设备数
- 建表
drop table if exists ads.mall__uv_count
CREATE EXTERNAL TABLE `ads.mall__uv_count`(
`dt` string COMMENT \'统计日期\',
`day_count` bigint COMMENT \'当日用户数量\',
`wk_count` bigint COMMENT \'当周用户数量\',
`mn_count` bigint COMMENT \'当月用户数量\',
`is_weekend` string COMMENT \'Y,N 是否是周末,用于得到本周最终结果\',
`is_monthend` string COMMENT \'Y,N 是否是月末,用于得到本月最终结果\'
) COMMENT \'活跃设备数表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/uv_count/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=uv_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\' dt,
daycount.ct,
wkcount.ct,
mncount.ct,
if(date_add(next_day(\'$db_date\',\'MO\'),-1)=\'$db_date\',\'Y\',\'N\') ,
if(last_day(\'$db_date\')=\'$db_date\',\'Y\',\'N\')
from
(
select
\'$db_date\' dt,
count(*) ct
from dwt.mall__uv_topic
where login_date_last=\'$db_date\'
)daycount join
(
select
\'$db_date\' dt,
count (*) ct
from dwt.mall__uv_topic
where login_date_last>=date_add(next_day(\'$db_date\',\'MO\'),-7)
and login_date_last<= date_add(next_day(\'$db_date\',\'MO\'),-1)
) wkcount on daycount.dt=wkcount.dt
join
(
select
\'$db_date\' dt,
count (*) ct
from dwt.mall__uv_topic
where
date_format(login_date_last,\'yyyy-MM\')=date_format(\'$db_date\',\'yyyy-MM\')
)mncount on daycount.dt=mncount.dt;
"
$hive -e "$sql"
12.1.2 每日新增设备
- 建表
drop table if exists ads.mall__new_mid_count
CREATE EXTERNAL TABLE `ads.mall__new_mid_count`(
`create_date` string comment \'创建时间\' ,
`new_mid_count` bigint comment \'新增设备数量\'
) COMMENT \'每日新增设备表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/new_mid_count/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=new_mid_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
login_date_first,
count(*)
from dwt.mall__uv_topic
where login_date_first=\'$db_date\'
group by login_date_first;
"
$hive -e "$sql"
12.1.3 沉默用户数
沉默用户:只在安装当天启动过,且启动时间是在 7 天前
- 建表
drop table if exists ads.mall__silent_count
CREATE EXTERNAL TABLE `ads.mall__silent_count`(
`dt` string COMMENT \'统计日期\',
`silent_count` bigint COMMENT \'沉默设备数\'
) COMMENT \'沉默用户数表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/silent_count/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=silent_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\',
count(*)
from dwt.mall__uv_topic
where login_date_first=login_date_last
and login_date_last<=date_add(\'$db_date\',-7);
"
$hive -e "$sql"
12.1.4 本周回流用户数
本周回流用户:上周未活跃,本周活跃的设备,且不是本周新增设备
- 建表
drop table if exists ads.mall__back_count
CREATE EXTERNAL TABLE `ads.mall__back_count`(
`wk_dt` string COMMENT \'统计日期所在周\',
`wastage_count` bigint COMMENT \'回流设备数\'
) COMMENT \'本周回流用户数表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/back_count/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=back_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\',
count(*)
from
(
select
mid_id
from dwt.mall__uv_topic
where login_date_last>=date_add(next_day(\'$db_date\',\'MO\'),-7)
and login_date_last<= date_add(next_day(\'$db_date\',\'MO\'),-1)
and login_date_first<date_add(next_day(\'$db_date\',\'MO\'),-7)
)current_wk
left join
(
select
mid_id
from dws.mall__uv_detail_daycount
where dt>=date_add(next_day(\'$db_date\',\'MO\'),-7*2)
and dt<= date_add(next_day(\'$db_date\',\'MO\'),-7-1)
group by mid_id
)last_wk
on current_wk.mid_id=last_wk.mid_id
where last_wk.mid_id is null;
"
$hive -e "$sql"
12.1.5 流失用户数
流失用户:最近 7 天未活跃的设备
- 建表
drop table if exists ads.mall__wastage_count
CREATE EXTERNAL TABLE `ads.mall__wastage_count`(
`dt` string COMMENT \'统计日期\',
`wastage_count` bigint COMMENT \'流失设备数\'
) COMMENT \'流失用户数表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/wastage_count/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=wastage_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\',
count(*)
from
(
select
mid_id
from dwt.mall__uv_topic
where login_date_last<=date_add(\'$db_date\',-7)
group by mid_id
)t1;
"
$hive -e "$sql"
12.1.6 留存率
- 建表
drop table if exists ads.mall__user_retention_day_rate
CREATE EXTERNAL TABLE `ads.mall__user_retention_day_rate`(
`stat_date` string comment \'统计日期\',
`create_date` string comment \'设备新增日期\',
`retention_day` int comment \'截止当前日期留存天数\',
`retention_count` bigint comment \'留存数量\',
`new_mid_count` bigint comment \'设备新增数量\',
`retention_ratio` decimal(10,2) comment \'留存率\'
) COMMENT \'留存率表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/user_retention_day_rate/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=user_retention_day_rate
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\',--统计日期
date_add(\'$db_date\',-1),--新增日期
1,--留存天数
sum(if(login_date_first=date_add(\'$db_date\',-1) and
login_date_last=\'$db_date\',1,0)),--2020-03-09 的 1 日留存数
sum(if(login_date_first=date_add(\'$db_date\',-1),1,0)),--2020-03-09 新增
sum(if(login_date_first=date_add(\'$db_date\',-1) and
login_date_last=\'$db_date\',1,0))/sum(if(login_date_first=date_add(\'$db_date\',-1),1,0))*100
from dwt.mall__uv_topic
union all
select
\'$db_date\',--统计日期
date_add(\'$db_date\',-2),--新增日期
2,--留存天数
sum(if(login_date_first=date_add(\'$db_date\',-2) and
login_date_last=\'$db_date\',1,0)),--2020-03-08 的 2 日留存数
sum(if(login_date_first=date_add(\'$db_date\',-2),1,0)),--2020-03-08 新增
sum(if(login_date_first=date_add(\'$db_date\',-2) and
login_date_last=\'$db_date\',1,0))/sum(if(login_date_first=date_add(\'$db_date\',-2),1,0))*100
from dwt.mall__uv_topic
union all
select
\'$db_date\',--统计日期
date_add(\'$db_date\',-3),--新增日期
3,--留存天数
sum(if(login_date_first=date_add(\'$db_date\',-3) and
login_date_last=\'$db_date\',1,0)),--2020-03-07 的 3 日留存数
sum(if(login_date_first=date_add(\'$db_date\',-3),1,0)),--2020-03-07 新增
sum(if(login_date_first=date_add(\'$db_date\',-3) and
login_date_last=\'$db_date\',1,0))/sum(if(login_date_first=date_add(\'$db_date\',-3),1,0))*100
from dwt.mall__uv_topic;
"
$hive -e "$sql"
12.1.7 最近连续三周活跃用户数
- 建表
drop table if exists ads.mall__continuity_wk_count
CREATE EXTERNAL TABLE `ads.mall__continuity_wk_count`(
`dt` string COMMENT \'统计日期,一般用结束周周日日期,如果每天计算一次,可用当天日期\',
`wk_dt` string COMMENT \'持续时间\',
`continuity_count` bigint COMMENT \'活跃次数\'
) COMMENT \'最近连续三周活跃用户数表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/continuity_wk_count/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=continuity_wk_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\',
concat(date_add(next_day(\'$db_date\',\'MO\'),-7*3),\'_\',date_add(next_day(\'$db_date\',\'MO\'),-1)),
count(*)
from
(
select
mid_id
from
(
select
mid_id
from dws.mall__uv_detail_daycount
where dt>=date_add(next_day(\'$db_date\',\'monday\'),-7)
and dt<=date_add(next_day(\'$db_date\',\'monday\'),-1)
group by mid_id
union all
select
mid_id
from dws.mall__uv_detail_daycount
where dt>=date_add(next_day(\'$db_date\',\'monday\'),-7*2)
and dt<=date_add(next_day(\'$db_date\',\'monday\'),-7-1)
group by mid_id
union all
select
mid_id
from dws.mall__uv_detail_daycount
where dt>=date_add(next_day(\'$db_date\',\'monday\'),-7*3)
and dt<=date_add(next_day(\'$db_date\',\'monday\'),-7*2-1)
group by mid_id
)t1
group by mid_id
having count(*)=3
)t2
"
$hive -e "$sql"
12.1.8 最近七天内连续三天活跃用户数
- 建表
drop table if exists ads.mall__continuity_uv_count
CREATE EXTERNAL TABLE `ads.mall__continuity_uv_count`(
`dt` string COMMENT \'统计日期\',
`wk_dt` string COMMENT \'最近 7 天日期\',
`continuity_count` bigint
) COMMENT \'最近七天内连续三天活跃用户数表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/continuity_uv_count/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=continuity_uv_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\',
concat(date_add(\'db_date\',-6),\'_\',\'db_date\'),
count(*)
from
(
select
mid_id
from
(
select
mid_id
from
(
select
mid_id,
date_sub(dt,rank) date_dif
from
(
select
mid_id,
dt,
rank() over(partition by mid_id order by dt) rank
from dws.mall__uv_detail_daycount
where dt>=date_add(\'db_date\',-6) and
dt<=\'db_date\'
)t1
)t2
group by mid_id,date_dif
having count(*)>=3
)t3
group by mid_id
)t4;
"
$hive -e "$sql"
12.2 会员主题
12.2.1 会员主题信息
- 建表
drop table if exists ads.mall__user_topic
CREATE EXTERNAL TABLE `ads.mall__user_topic`(
`dt` string COMMENT \'统计日期\',
`day_users` string COMMENT \'活跃会员数\',
`day_new_users` string COMMENT \'新增会员数\',
`day_new_payment_users` string COMMENT \'新增消费会员数\',
`payment_users` string COMMENT \'总付费会员数\',
`users` string COMMENT \'总会员数\',
`day_users2users` decimal(10,2) COMMENT \'会员活跃率\',
`payment_users2users` decimal(10,2) COMMENT \'会员付费率\',
`day_new_users2users` decimal(10,2) COMMENT \'会员新鲜度\'
) COMMENT \'会员主题信息表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/user_topic/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=user_topic
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\',
sum(if(login_date_last=\'$db_date\',1,0)),
sum(if(login_date_first=\'$db_date\',1,0)),
sum(if(payment_date_first=\'$db_date\',1,0)),
sum(if(payment_count>0,1,0)),
count(*),
sum(if(login_date_last=\'$db_date\',1,0))/count(*),
sum(if(payment_count>0,1,0))/count(*),
sum(if(login_date_first=\'$db_date\',1,0))/sum(if(login_date_last=\'$db_date\',1,0))
from dwt.mall__user_topic
"
$hive -e "$sql"
12.2.2 漏斗分析
统计“浏览->购物车->下单->支付”的转化率
思路:统计各个行为的人数,然后计算比值。
- 建表
drop table if exists ads.mall__user_action_convert_day
CREATE EXTERNAL TABLE `ads.mall__user_action_convert_day`(
`dt` string COMMENT \'统计日期\',
`total_visitor_m_count` bigint COMMENT \'总访问人数\',
`cart_u_count` bigint COMMENT \'加入购物车的人数\',
`visitor2cart_convert_ratio` decimal(10,2) COMMENT \'访问到加入购物车转化率\',
`order_u_count` bigint COMMENT \'下单人数\',
`cart2order_convert_ratio` decimal(10,2) COMMENT \'加入购物车到下单转化率\',
`payment_u_count` bigint COMMENT \'支付人数\',
`order2payment_convert_ratio` decimal(10,2) COMMENT \'下单到支付的转化率\'
) COMMENT \'漏斗分析表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/user_action_convert_day/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=user_action_convert_day
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\',
uv.day_count,
ua.cart_count,
cast(ua.cart_count/uv.day_count as decimal(10,2)) visitor2cart_convert_ratio,
ua.order_count,
cast(ua.order_count/ua.cart_count as decimal(10,2)) visitor2order_convert_ratio,
ua.payment_count,
cast(ua.payment_count/ua.order_count as decimal(10,2)) order2payment_convert_ratio
from
(
select
dt,
sum(if(cart_count>0,1,0)) cart_count,
sum(if(order_count>0,1,0)) order_count,
sum(if(payment_count>0,1,0)) payment_count
from dws.mall__user_action_daycount
where dt=\'$db_date\'
group by dt
)ua join ads.mall__uv_count uv on uv.dt=ua.dt;
"
$hive -e "$sql"
12.3 商品主题
12.3.1 商品个数信息
- 建表
drop table if exists ads.mall__product_info
CREATE EXTERNAL TABLE `ads.mall__product_info`(
`dt` string COMMENT \'统计日期\',
`sku_num` string COMMENT \'sku 个数\',
`spu_num` string COMMENT \'spu 个数\'
) COMMENT \'商品个数信息表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/product_info/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\' dt,
sku_num,
spu_num
from
(
select
\'$db_date\' dt,
count(*) sku_num
from
dwt.mall__sku_topic
) tmp_sku_num
join
(
select
\'$db_date\' dt,
count(*) spu_num
from
(
select
spu_id
from
dwt.mall__sku_topic
group by
spu_id
) tmp_spu_id
) tmp_spu_num
on
tmp_sku_num.dt=tmp_spu_num.dt;
"
$hive -e "$sql"
12.3.2 商品销量排行
- 建表
drop table if exists ads.mall__product_sale_topN
CREATE EXTERNAL TABLE `ads.mall__product_sale_topN`(
`dt` string COMMENT \'统计日期\',
`sku_num` string COMMENT \'sku 个数\',
`spu_num` string COMMENT \'spu 个数\'
) COMMENT \'商品销量排名表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/product_sale_topN/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_sale_topN
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\' dt,
sku_id,
payment_amount
from
dws.mall__sku_action_daycount
where
dt=\'$db_date\'
order by payment_amount desc
limit 10;
"
$hive -e "$sql"
12.3.3 商品收藏排名
- 建表
drop table if exists ads.mall__product_favor_topN
CREATE EXTERNAL TABLE `ads.mall__product_favor_topN`(
`dt` string COMMENT \'统计日期\',
`sku_id` string COMMENT \'商品 ID\',
`favor_count` bigint COMMENT \'收藏量\'
) COMMENT \'商品收藏排名表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/product_favor_topN/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_favor_topN
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\' dt,
sku_id,
favor_count
from
dws.mall__sku_action_daycount
where
dt=\'$db_date\'
order by favor_count desc
limit 10;
"
$hive -e "$sql"
12.3.4 商品加入购物车排名
- 建表
drop table if exists ads.mall__product_cart_topN
CREATE EXTERNAL TABLE `ads.mall__product_cart_topN`(
`dt` string COMMENT \'统计日期\',
`sku_id` string COMMENT \'商品 ID\',
`cart_num` bigint COMMENT \'加入购物车数量\'
) COMMENT \'商品加入购物车排名表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/product_cart_topN/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_cart_topN
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\' dt,
sku_id,
cart_num
from
dws.mall__sku_action_daycount
where
dt=\'$db_date\'
order by cart_num desc
limit 10;
"
$hive -e "$sql"
12.3.5 商品退款率排名(近30天)
- 建表
drop table if exists ads.mall__product_refund_topN
CREATE EXTERNAL TABLE `ads.mall__product_refund_topN`(
`dt` string COMMENT \'统计日期\',
`sku_id` string COMMENT \'商品 ID\',
`refund_ratio` decimal(10,2) COMMENT \'退款率\'
) COMMENT \'商品退款率排名(最近 30 天)表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/product_refund_topN/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_refund_topN
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\',
sku_id,
refund_last_30d_count/payment_last_30d_count*100 refund_ratio
from dwt.mall__sku_topic
order by refund_ratio desc
limit 10;
"
$hive -e "$sql"
12.3.6 商品差评率
- 建表
drop table if exists ads.mall__appraise_bad_topN
CREATE EXTERNAL TABLE `ads.mall__appraise_bad_topN`(
`dt` string COMMENT \'统计日期\',
`sku_id` string COMMENT \'商品 ID\',
`appraise_bad_ratio` decimal(10,2) COMMENT \'差评率\'
) COMMENT \'商品差评率表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/appraise_bad_topN/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=appraise_bad_topN
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\' dt,
sku_id,
appraise_bad_count/(appraise_good_count+appraise_mid_count+appraise_bad_count+appraise_default_count) appraise_bad_ratio
from
dws.mall__sku_action_daycount
where
dt=\'$db_date\'
order by appraise_bad_ratio desc
limit 10;
"
$hive -e "$sql"
12.4 营销主题
12.4.1 下单数目统计
- 建表
drop table if exists ads.mall__order_daycount
CREATE EXTERNAL TABLE `ads.mall__order_daycount`(
dt string comment \'统计日期\',
order_count bigint comment \'单日下单笔数\',
order_amount bigint comment \'单日下单金额\',
order_users bigint comment \'单日下单用户数\'
) COMMENT \'下单数目统计表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/order_daycount/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=order_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
\'$db_date\',
sum(order_count),
sum(order_amount),
sum(if(order_count>0,1,0))
from dws.mall__user_action_daycount
where dt=\'$db_date\';
"
$hive -e "$sql"
12.4.2 支付信息统计
- 建表
drop table if exists ads.mall__payment_daycount
CREATE EXTERNAL TABLE `ads.mall__payment_daycount`(
dt string comment \'统计日期\',
order_count bigint comment \'单日支付笔数\',
order_amount bigint comment \'单日支付金额\',
payment_user_count bigint comment \'单日支付人数\',
payment_sku_count bigint comment \'单日支付商品数\',
payment_avg_time double comment \'下单到支付的平均时长,取分钟数\'
) COMMENT \'支付信息统计表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/payment_daycount/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=payment_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
tmp_payment.dt,
tmp_payment.payment_count,
tmp_payment.payment_amount,
tmp_payment.payment_user_count,
tmp_skucount.payment_sku_count,
tmp_time.payment_avg_time
from
(
select
\'$db_date\' dt,
sum(payment_count) payment_count,
sum(payment_amount) payment_amount,
sum(if(payment_count>0,1,0)) payment_user_count
from dws.mall__user_action_daycount
where dt=\'$db_date\'
)tmp_payment
join
(
select
\'$db_date\' dt,
sum(if(payment_count>0,1,0)) payment_sku_count
from dws.mall__sku_action_daycount
where dt=\'$db_date\'
)tmp_skucount on tmp_payment.dt=tmp_skucount.dt
join
(
select
\'$db_date\' dt,
sum(unix_timestamp(payment_time)-unix_timestamp(create_time))/count(*)/60
payment_avg_time
from dwd.mall__fact_order_info
where dt=\'$db_date\'
and payment_time is not null
)tmp_time on tmp_payment.dt=tmp_time.dt
"
$hive -e "$sql"
12.4.3 复购率
- 建表
drop table if exists ads.mall__sale_tm_category1_stat_mn
CREATE EXTERNAL TABLE `ads.mall__sale_tm_category1_stat_mn`(
tm_id string comment \'品牌 id\',
category1_id string comment \'1 级品类 id \',
category1_name string comment \'1 级品类名称 \',
buycount bigint comment \'购买人数\',
buy_twice_last bigint comment \'两次以上购买人数\',
buy_twice_last_ratio decimal(10,2) comment \'单次复购率\',
buy_3times_last bigint comment \'三次以上购买人数\',
buy_3times_last_ratio decimal(10,2) comment \'多次复购率\',
stat_mn string comment \'统计月份\',
stat_date string comment \'统计日期\'
) COMMENT \'复购率表\'
row format delimited fields terminated by \'\t\'
stored as parquet
location \'/warehouse/ads/mall/sale_tm_category1_stat_mn/\'
tblproperties ("parquet.compression"="snappy")
- 导入数据
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=sale_tm_category1_stat_mn
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
mn.sku_tm_id,
mn.sku_category1_id,
mn.sku_category1_name,
sum(if(mn.order_count>=1,1,0)) buycount,
sum(if(mn.order_count>=2,1,0)) buyTwiceLast,
sum(if(mn.order_count>=2,1,0))/sum( if(mn.order_count>=1,1,0)) buyTwiceLastRatio,
sum(if(mn.order_count>=3,1,0)) buy3timeLast ,
sum(if(mn.order_count>=3,1,0))/sum( if(mn.order_count>=1,1,0)) buy3timeLastRatio,
date_format(\'$db_date\' ,\'yyyy-MM\') stat_mn,
\'$db_date\' stat_date
from
(
select
user_id,
sd.sku_tm_id,
sd.sku_category1_id,
sd.sku_category1_name,
sum(order_count) order_count
from dws.mall__sale_detail_daycount sd
where date_format(dt,\'yyyy-MM\')=date_format(\'$db_date\' ,\'yyyy-MM\')
group by user_id, sd.sku_tm_id, sd.sku_category1_id, sd.sku_category1_name
) mn
group by mn.sku_tm_id, mn.sku_category1_id, mn.sku_category1_name;
"
$hive -e "$sql"