实时数仓项目搭建及理论

23 Oct 2021

项目背景与需求

项目背景

随着互联网的发展,数据的时效性对企业的精细化运营越来越重要, 商场如战场,在每天产生的海量数据中,如何 能实时有效的挖掘出有价值的信息, 对企业的决策运营策略调整有很大帮助。此外,随着 5G 技术的成熟、广泛应 用, 对于互联网、物联网等数据时效性要求非常高的行业,企业就更需要一套完整成熟的实时数据体系来提高自身 的行业竞争力。 随着数据时效性在企业运营中的重要性日益凸现,例如:
实时推荐;
精准营销;
广告投放效果;
实时物流

实时数仓产生的背景

数据的实时处理能力成为企业提升竞争力的一大因素,最初阶段企业主要采用来一个需求,编写一个实时计算任务的方式来处理实时数据,随着需求的增多,计算任务也相应增多,并且不同任务的开发人员不同,导致开发风格差异化,该阶段的实时数据处理缺乏统一的规划,代码风格差异化严重,在维护成本和开发效率上有很大障碍。
为避免上述问题,人们参照数据仓库的概念和模型来重新规划和设计实时数据处理,在此基础上产生了实时数据仓库(实时数仓)

离线转实时

需求分析

日志数据:启动日志、点击日志(广告点击日志)
业务数据:用户下单、提交订单、支付、退款等核心交易数据的分析
广告流量实时统计: 生成动态黑名单
恶意刷单:一旦发现恶意刷单时进行实时告警
基于动态黑名单进行点击行为过滤, 计算每隔5分钟统计最近1小时内各广告的点击量, 计算每天各省的热门广告, 计算每天各广告最近1小时内的点击趋势…

技术选型

数据采集:Flume、Canal
数据存储:MySQL、Kafka、HBase、Redis
数据计算:Flink
OLAP: ClickHouse、Druid 框架、软件尽量不要选择最新的版本,选择半年前左右稳定的版本。

Canal同步业务数据

环境准备

Hadoop、HBASE、Flink、ClickHouse、MySQL、Canal、Kafka

初识Canal

配置MySQL的binlog

Canal 安装

下载地址: https://github.com/alibaba/canal/releases

把下载的Canal.deployer-1.1.4.tar.gz拷贝到linux,解压缩(路径可自行调整)

 [root@linux123 ~]# mkdir /opt/servers/canal  
[root@linux123 mysql]# tar -zxf canal.deployer-1.1.4.tar.gz  -C /opt/servers/canal  

修改Canal配置 这个文件是canal的基本通用配置,主要关心一下端口号,不改的话默认就是11111
修改内容如下:

# 配置zookeeper地址  
canal.zkServers =centos7-1:2181,centos7-3:2181   
# tcp, kafka, RocketMQ  
canal.serverMode = kafka  

# 配置kafka地址  
canal.mq.servers =centos7-1:9092,centos7-3:9092  

修改conf/example/instance.properties
这个文件是针对要追踪的MySQL的实例配置
修改内容如下:

# 配置MySQL数据库所在的主机 canal.instance.master.address = linux123:3306 # username/password,配置数据库用户和密码 canal.instance.dbUsername =canal canal.instance.dbPassword =canal  
# mq config,对应Kafka主题:  
canal.mq.topic=test  

启动Canal

sh bin/startup.sh  
# 关闭Canal  
sh bin/stop.sh  

Kafka客户端测试

# 1. 启动zookeeper

# 2. 启动kafka
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties  

# 3. 创建topic
kafka-topics.sh --zookeeper localhost:2181/mykafka --create --topic test --partitions 2 --replication-factor 1  

# 4. 消费topic  
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test  

mysql操作

use dwshow;  

DROP TABLE IF EXISTS `ad_show`;  
CREATE TABLE `ad_show` (  
  `dt` varchar(10) NOT NULL COMMENT '日期',
  `cnt` bigint(20) DEFAULT NULL COMMENT '总点击数据',
  `u_cnt` bigint(20) DEFAULT NULL COMMENT '不同用户点击数',
  `device_cnt` bigint(20) DEFAULT NULL COMMENT '不同设备点击数',
  `ad_action` tinyint(4) NOT NULL COMMENT '用户行为;0 曝光;1 曝光后点击;2 购买',
  `hour` tinyint(4) NOT NULL COMMENT '小时',
  PRIMARY KEY (`dt`,`hour`,`ad_action`)  
) ENGINE=InnoDB DEFAULT CHARSET=utf8;  

INSERT INTO `ad_show` VALUES ('2020-07-21', '7855924', '5522275', '5512895', '0', '0');  
UPDATE `ad_show` set cnt=2048 where dt='2020-07-21';  
delete from `ad_show` where dt='2020-07-21';  

ODS层处理

大数据数据仓库的架构

ODS(Operational Data Store)数据

在数据仓库建模中,未经任何加工处理的原始业务层数据,我们称之为ODS(Operational Data Store)数据。在互联 网企业中,常见的ODS数据有业务日志数据(Log)和业务DB数据(DB)两类

系统实时监控&可视化

普罗米修斯Prometheus

数据质量

数据质量概述

流程图描述了一般的实时数据计算流程,接收日志或者MQ到kafka,用Flink进行处理和计算(指标),将最终计算结 果(指标)存储在redis中,最后查询出redis中的数据给大屏、看板等展示。

但是在整个过程中,不得不思考一下,最后计算出来的存储在redis中指标数据是不是正确的呢?怎么能给用户或者 老板一个信服的理由呢?

比如说:离线的同事说离线昨天的数据订单是1w,实时昨天的数据却是2w,存在这么大的误差,到底是实时计算出 问题了,还是离线出问题了呢?

对于上图中加工的实时宽表数据,可以进行持久化,进行存储。
这样,实时数据也有明细数据,就可以和离线数据进行比对了,到底是日志丢失还是消息没有发送或者计算的业务逻辑有问题,就能够一目了然。

Flink双流Join

  1. Join大体分类只有两种:Window Join和Interval Join。Window Join又可以根据Window的类型细分出3种: Tumbling Window Join、Sliding Window Join、Session Widnow Join。
  2. Windows类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join 操作;
  3. interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理;
  4. 目前Stream join的结果是数据的笛卡尔积;
  5. 日常使用中的一些问题,数据延迟、window序列化相关。

基于时间的双流join

import org.apache.flink.streaming.api.TimeCharacteristic  
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction  
import org.apache.flink.streaming.api.scala._  
import org.apache.flink.streaming.api.windowing.time.Time  
import org.apache.flink.util.Collector  

object TimeJoin {  
  case class UserClickLog(userId: String, eventTime: String, eventType: String, pageId:  
  String)  
  case class UserBrowseLog(userId: String, eventTime: String, productId: String, productPrice: String)  
  def main(args: Array[String]): Unit = {  
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment  
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  
    env.setParallelism(1)  

    val clickStream = env.fromElements(  
      UserClickLog("user_1", "1500", "click", "page_1"),  
      UserClickLog("user_1", "2000", "click", "page_1")  
    )  
      .assignAscendingTimestamps(_.eventTime.toLong*1000)  
      .keyBy(_.userId)  

    val browseStream = env.fromElements(  
      UserBrowseLog("user_1", "1000", "product_1", "10"),  
      UserBrowseLog("user_1", "1500", "product_1", "10"),  
      UserBrowseLog("user_1", "1501", "product_1", "10"),  
      UserBrowseLog("user_1", "1502", "product_1", "10")  
    )  
      .assignAscendingTimestamps(_.eventTime.toLong*1000)  
      .keyBy(_.userId)  

    clickStream.intervalJoin(browseStream)  
      .between(Time.minutes(-10),Time.seconds(0))  
      .process(new IntervalJoinFunc)  
      .print()  

    env.execute()  
  }  

  class IntervalJoinFunc extends ProcessJoinFunction[UserClickLog, UserBrowseLog, String] {  
    override def processElement(left: UserClickLog, right: UserBrowseLog, ctx: ProcessJoinFunction[UserClickLog, UserBrowseLog, String]#Context, out: Collector[String]): Unit = {  
      val str = s"${left} ==> ${right}"  
      out.collect(str)  
    }  
  }  

}  

基于window的双流join

import org.apache.flink.streaming.api.TimeCharacteristic  
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}  
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows  
import org.apache.flink.streaming.api.windowing.time.Time  

/**  
 * 根据时间窗口去join,跟intervalJoin相比,更加以窗口window为中心
 */  
object WindowJoin {  
  case class UserClickLog(userId: String, eventTime: String, eventType: String, pageId:  
  String)  
  case class UserBrowseLog(userId: String, eventTime: String, productId: String, productPrice: String)  
  def main(args: Array[String]): Unit = {  
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment  
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  
    env.setParallelism(1)  

    val input1Stream: DataStream[(Int, Long)] = env.fromElements((1, 1999L), (1,  
      2000L),(1, 2001L)).assignAscendingTimestamps(_._2)  
    val input2Stream: DataStream[(Int, Long)] = env.fromElements((1, 1000L),(1, 1001L), (1, 1002L), (1, 1500L),(1,  
      3999L),(1, 4000L)).assignAscendingTimestamps(_._2)  

    input1Stream.join(input2Stream)  
        .where(k => k._1)   //left key  
        .equalTo(k=>k._1)   //right key  
        .window(TumblingEventTimeWindows.of(Time.seconds(2)))  //window  
        .apply{(e1, e2) => e1 + "...." + e2}  //  
        .print()  

    /**  
     * (1,1999)....(1,1000)  
     * (1,1999)....(1,1001)  
     * (1,1999)....(1,1002)  
     * (1,1999)....(1,1500)  
     * (1,2000)....(1,3999)  
     * (1,2001)....(1,3999)  
     */  
    // .window(TumblingEventTimeWindows.of(Time.seconds(2)))  
    //  滚动窗口,默认开始偏移量是0,所以从0~1999是一个窗口,2000~3999 是一个窗口
    env.execute()  
  }  

}  

connect CoProcessFunction