Druid--高性能实时分析数据库

19 Jul 2021

Part 1 Apache Druid简介及架构

Druid概述

1.1 什么是Druid

数据分析的基础架构可以分为以下几类:

屏幕快照111

互联网技术的快速增长催生了各类大体量的数据,Hadoop很大的贡献在于帮助企业将他们那些低价值的事件流数据转化为高价值的聚合数据;

Hadoop擅长的是存储和获取大规模数据,它并不提供任何性能上的保证它能多快获取到数据。虽然Hadoop是一个高可用的系统,但在高并发负载下性能会下降;

Hadoop是一个很好的后端、批量处理和数据仓库系统。在一个需要高并发并且保证查询性能和数据可用性的并需要提供产品级别的保证的需求,Hadoop并不能满足。

Druid 是 Metamarkets 公司(一家为在线媒体或广告公司提供数据分析服务的公司)推出的一个分布式内存实时分析系统,用于解决如何在大规模数据集下进行快速的、交互式的查询和分析。

Druid 是一个开源的数据分析引擎工具,为实时和历史数据的次秒级(多于一秒)查询设计。主要应用于对数据的OLAP查询,Druid 提供低延迟(实时)的数据摄取、灵活的数据探索、快速的数据聚合。现有的 Druid 部署已支持扩展到数万亿时间和 PB 级数据。

1.2 与其他OLAP技术对比

  Druid Kylin ES Spark SQL ClickHouse
数据规模 超大 超大 中等 超大
查询效率 中等
并发度
灵活性
SQL支持

SparkSQL / Impala / ClickHouse,支持海量数据,灵活性强,但对响应时间是没有保证的。当数据量和计算复杂度增加后,响应时间会变慢,从秒级到分钟级,甚至小时级都有可能。

搜索引擎架构的系统(Elasticsearch等),在入库时将数据转换为倒排索引。牺牲了灵活性换取很好的性能,在搜索类查询上能做到亚秒级响应,但是对于扫描聚合为主的查询,随着处理数据量的增加,响应时间也会退化到分钟级。

Druid / Kylin,则在入库时对数据进行预聚合,进一步牺牲灵活性换取性能,以实现对超大数据集的秒级响应。

目前没有一个OLAP分析引擎能在数据量、灵活程度、性能(吞吐&并发)做到完美,需要基于自己的业务场景进行取舍和选型。

1.3 技术特点

Apache Druid是一个开源的、分布式、实时OLAP分析工具。Druid的核心设计结合了数据仓库、时间序列数据库和搜索系统的思想,适用于多种场景的高性能数据实时分析。Druid将这三个系统中的每个系统的关键特征合并到其接收层、存储格式、查询层和核心体系结构中。

diagram-2

时间序列数据库主要用于处理带时间标签(按照时间的顺序变化)的数据,带时间标签的数据也称为时间序列数据。

时间序列数据主要由电力行业、化工行业等各类型实时监测、检查与分析设备所采集、产生的数据,这些工业数据的典型特点是:产生频率快(每一个监测点一秒钟内可产生多条数据)、严重依赖于采集时间(每一条数据均要求对应唯一的时间)、测点多信息量大(常规的实时监测系统均有成千上万的监测点,监测点每秒钟都产生数据,每天产生几十GB的数据量)。

1、主要特点
2、集成

Druid是开源大数据技术的补充,包括Apache Kafka,Apache Hadoop,Apache Flink等,通常位于存储或处理层与最终应用之间,充当查询层或数据分析服务。

diagram-3

3、Ingestion(摄取)

Druid支持流式传输和批量摄取。Druid连接到数据源,包括:Kafka(用于流数据加载),或分布式文件系统,如HDFS(用于批处理数据加载)。

Druid在 “索引” 过程中将数据源中的原始数据转换为支持高效读取的优化格式(Segment,段)。

diagram-4

4、存储

Druid的数据存储采用列式存储格式。根据列的类型(字符串,数字等),应用不同的压缩和编码方法,根据列类型构建不同类型的索引。

Druid为字符串列构建倒排索引,以进行快速搜索和过滤。Druid可按时间对数据进行智能分区,以实现面向时间的快速查询。

Druid在摄取数据时对数据进行预聚合,节省大量存储空间。

diagram-5

5、查询方式

Druid支持JSON、SQL两种方式查询数据。

diagram-6

1.4 应用场景

Druid擅长的部分

是否需要使用Druid

体系架构

2.1 Druid进程和服务

根据线程的服务类型分为:

2.2 外部依赖

Part 2 Druid 部署

Druid官网:https://druid.apache.org/

下载 Druid 安装包、并解压缩:

cd /opt/lagou/software
wget http://apache.communilink.net/druid/0.19.0/apache-druid-0.19.0-bin.tar.gz
tar -zxvf apache-druid-0.19.0-bin.tar.gz

查看主目录:

微信图片_20201029140607

 tree -L 1 ./
 
./
├── bin                    运行相关脚本文件
├── conf                   生产环境配置文件
├── extensions             各种jar包,第三方扩展
├── hadoop-dependencies    hadoop相关依赖
├── lib                    Druid所有核心软件包
├── LICENSE                许可证
├── licenses
├── NOTICE                 对快速入门很有用帮助的文档
├── quickstart            单机测试部署用到的配置及数据
├── README                 
└── var                   启动时生成文件,数据文件在这里了

单服务器部署

单服务器部署的配置文件:

conf/druid/single-server/
├── large
├── medium
├── micro-quickstart
├── nano-quickstart
├── small
└── xlarge

单服务器参考配置:

Nano-Quickstart:1个CPU,4GB RAM
    启动命令: bin/start-nano-quickstart
    配置目录: conf/druid/single-server/nano-quickstart/*
微型快速入门:4个CPU,16GB RAM
    启动命令: bin/start-micro-quickstart
    配置目录: conf/druid/single-server/micro-quickstart/*
小型:8 CPU,64GB RAM(〜i3.2xlarge)
    启动命令: bin/start-small
    配置目录: conf/druid/single-server/small/*
中:16 CPU,128GB RAM(〜i3.4xlarge)
    启动命令: bin/start-medium
    配置目录: conf/druid/single-server/medium/*
大型:32 CPU,256GB RAM(〜i3.8xlarge)
    启动命令: bin/start-large
    配置目录: conf/druid/single-server/large/*
大型X:64 CPU,512GB RAM(〜i3.16xlarge)
    启动命令: bin/start-xlarge
    配置目录: conf/druid/single-server/xlarge/*

启动Druid:

./bin/start-nano-quickstart start

image-20201130090718795

登录http://wcentos7-1:8888/查看页面

image-20201130090819666

image-20201130092626325

使用jps可以看见启动了很多服务:

image-20201130092944074

官方建议大型系统采用集群模式部署,以实现容错和减少资源争用。

集群部署

1、部署规划

集群部署采用的分配如下:

主机 Druid服务 其他服务
linux121 Coordinator、Overlord Zookeeper、Kafka
linux122 Historical、MiddleManager Zookeeper、Kafka
linux123 Router、Broker Zookeeper、Kafka、MySQL

虚拟机每个节点2core、3G

DeepStorage:Hadoop 2.9.2

2、设置环境变量

vi /etc/profile

# 在文件中增加以下内容
export DRUID_HOME=/opt/lagou/servers/druid-0.19.0
export PATH=$PATH:$DRUID_HOME/bin

3、MySQL中创建相关数据库

使用 root 账号登录MySQL,mysql -uroot -p12345678

CREATE DATABASE druid DEFAULT CHARACTER SET utf8mb4;
CREATE USER 'druid'@'%' IDENTIFIED BY '12345678';
GRANT ALL PRIVILEGES ON druid.* TO 'druid'@'%';

-- ERROR 1819 (HY000): Your password does not satisfy the current policy requirements
set global validate_password_policy=0;

4、配置Druid参数

将hadoop配置文件core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml链接到 conf/druid/cluster/_common/ 下

cd /opt/lagou/servers/druid-0.19.0/conf/druid/cluster/_common
ln -s $HADOOP_HOME/etc/hadoop/core-site.xml core-site.xml
ln -s $HADOOP_HOME/etc/hadoop/hdfs-site.xml hdfs-site.xml
ln -s $HADOOP_HOME/etc/hadoop/yarn-site.xml yarn-site.xml
ln -s $HADOOP_HOME/etc/hadoop/mapred-site.xml mapred-site.xml

将MySQL的驱动程序,链接到 $DRUID_HOME/extensions/mysql-metadata-storage/ 下

ln -s $HIVE_HOME/lib/mysql-connector-java-5.1.46.jar mysql-connector-java-5.1.46.jar

修改配置文件($DRUID_HOME/conf/druid/cluster/_common/common.runtime.properties)

# 增加"mysql-metadata-storage"
druid.extensions.loadList=["mysql-metadata-storage", "druid-hdfs-storage", "druid-kafka-indexing-service", "druid-datasketches"]

# 每台机器写自己的ip或hostname
druid.host=linux121

# 填写zk地址
druid.zk.service.host=linux121:2181,linux122:2181,linux123:2181
druid.zk.paths.base=/druid

# 注释掉前面 derby 的配置
# 增加 mysql 的配置
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://linux123:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=12345678

# 注释掉local的配置
# 增加HDFS的配置,即使用HDFS作为深度存储
druid.storage.type=hdfs
druid.storage.storageDirectory=/druid/segments

# 注释掉 indexer.logs For local disk的配置
# 增加 indexer.logs For HDFS 的配置
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=/druid/indexing-logs

配置主节点文件(参数大小根据实际情况配置)

$DRUID_HOME/conf/druid/cluster/master/coordinator-overlord/jvm.config

-server
-Xms512m
-Xmx512m
-XX:+ExitOnOutOfMemoryError
-XX:+UseG1GC
-Duser.timezone=UTC+8
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

配置数据节点文件(参数大小根据实际情况配置)

$DRUID_HOME/conf/druid/cluster/data/historical/jvm.config

-server
-Xms512m
-Xmx512m
-XX:MaxDirectMemorySize=1g
-XX:+ExitOnOutOfMemoryError
-Duser.timezone=UTC+8
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

$DRUID_HOME/conf/druid/cluster/data/historical/runtime.properties

# 修改这一个参数
druid.processing.buffer.sizeBytes=50000000

备注: druid.processing.buffer.sizeBytes 每个查询用于聚合的堆外哈希表的大小

maxDirectMemory= druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)

如果 druid.processing.buffer.sizeBytes 太大,那么需要加大maxDirectMemory,否则 historical 服务无法启动

$DRUID_HOME/conf/druid/cluster/data/middleManager/jvm.config

-server
-Xms128m
-Xmx128m
-XX:+ExitOnOutOfMemoryError
-Duser.timezone=UTC+8
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

配置查询节点文件(参数大小根据实际情况配置)

$DRUID_HOME/conf/druid/cluster/query/broker/jvm.config

-server
-Xms512m
-Xmx512m
-XX:MaxDirectMemorySize=512m
-XX:+ExitOnOutOfMemoryError
-Duser.timezone=UTC+8
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

$DRUID_HOME/conf/druid/cluster/query/broker/runtime.properties

# 修改这一个参数
druid.processing.buffer.sizeBytes=50000000

备注:

druid.processing.buffer.sizeBytes 每个查询用于聚合的堆外哈希表的大小

maxDirectMemory = druid.processing.buffer.sizeBytes*(druid.processing.numMergeBuffers + druid.processing.numThreads + 1)

如果 druid.processing.buffer.sizeBytes 太大,那么需要加大maxDirectMemory,否则 broker 服务无法启动

$DRUID_HOME/conf/druid/cluster/query/router/jvm.config

-server
-Xms128m
-Xmx128m
-XX:+UseG1GC
-XX:MaxDirectMemorySize=128m
-XX:+ExitOnOutOfMemoryError
-Duser.timezone=UTC+8
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

小结:各服务 JVM 内存分配设置如下:

5、分发并启动服务

向linux122、linux123分发安装包

scp -r druid-0.19.0/ linux122:$PWD
scp -r druid-0.19.0/ linux123:$PWD

备注:

先启动zk的服务:

# linux121
zk.sh start

在主节点(linux121)上启动服务:

nohup start-cluster-master-no-zk-server &

使用jps检查可看见1个名为Main的后台进程,如下图所示:

image-20201130220911015

在数据节点(linux122)上启动服务:

nohup start-cluster-data-server &

使用jps检查可看见2个名为Main的后台进程,如下图所示:

image-20201130221932749

在查询节点(linux123)上启动服务:

nohup start-cluster-query-server &

使用jps检查可看见2个名为Main的后台进程,如下图所示:

image-20201130222043893

关闭服务:

# 在各个节点运行
cd /opt/lagou/servers/druid-0.19.0/bin
./service --down

6、查看界面

使用浏览器查看:http://linux123:8888/

image-20201130222319207

image-20201130222415122

Part 3 Druid入门案例

从Kafka中加载流式数据

1.1 数据及需求说明

Druid典型应用架构

image-20201204101129933

不在Druid中处理复杂的数据转换清洗工作

假设有以下网络流量数据:

数据为json格式,通过Kafka传输

每行数据包含:时间戳(ts)、维度列、指标列

维度列:srcip、srcport、dstip、dstport、protocol

指标列:packets、bytes、cost

需要计算的指标:

数据汇总的粒度:分钟

测试数据:

{"ts":"2020-10-01T00:01:35Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666, "dstPort":8888, "protocol": "tcp", "packets":1, "bytes":1000, "cost": 0.1}
{"ts":"2020-10-01T00:01:36Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666, "dstPort":8888, "protocol": "tcp", "packets":2, "bytes":2000, "cost": 0.1}
{"ts":"2020-10-01T00:01:37Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666, "dstPort":8888, "protocol": "tcp", "packets":3, "bytes":3000, "cost": 0.1}
{"ts":"2020-10-01T00:01:38Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666, "dstPort":8888, "protocol": "tcp", "packets":4, "bytes":4000, "cost": 0.1}

{"ts":"2020-10-01T00:02:08Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666, "dstPort":8888, "protocol": "udp", "packets":5, "bytes":5000, "cost": 0.2}
{"ts":"2020-10-01T00:02:09Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666, "dstPort":8888, "protocol": "udp", "packets":6, "bytes":6000, "cost": 0.2}
{"ts":"2020-10-01T00:02:10Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666, "dstPort":8888, "protocol": "udp", "packets":7, "bytes":7000, "cost": 0.2}
{"ts":"2020-10-01T00:02:11Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666, "dstPort":8888, "protocol": "udp", "packets":8, "bytes":8000, "cost": 0.2}
{"ts":"2020-10-01T00:02:12Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666, "dstPort":8888, "protocol": "udp", "packets":9, "bytes":9000, "cost": 0.2}

最后执行查询:

select * from tab;

-- 有两个返回值,以下仅为示意
{"ts":"2020-10-01T00:01","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666, "dstPort":8888, "protocol": "tcp", "packets":5, "bytes":1000, "cost": 0.4, "count":4}

{"ts":"2020-10-01T00:02","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666, "dstPort":8888, "protocol": "udp", "packets":9, "bytes":5000, "cost": 1.0, "count":5}

-- 其他查询
select dstPort, min(packets), max(bytes), sum(count), min(count)
  from tab
group by dstPort

1.2 创建Topic发送消息

启动Kafka集群,并创建一个名为 “lagoudruid1” 的 Topic :

-- 创建topic
kafka-topics.sh --create --zookeeper linux121:2181,linux122:2181/kafka1.0 --replication-factor 2 --partitions 6 --topic lagoudruid1

-- 启动生产者
kafka-console-producer.sh --broker-list linux121:9092,linux122:9092 --topic lagoudruid1

备注:–zookeeper linux121:2181,linux122:2181/kafka1.0 => kafka1.0为namespace,请注意自己是否添加了

1.3 摄取数据

浏览器访问 linux123:8888,点击控制台中的 Load data

1、Start

选择 Apache Kafka ,点击 Connect data

image-20201204144509239

2、Connect

image-20201204144651407

3、Parse data

image-20201204145027082

4、Parse time

image-20201204145144046

5、Tranform
6、Filter
7、configure Schema

image-20201204145851009

8、Partition

image-20201204150202481

9、Tune

image-20201204152814679

10、Publish

image-20201204152950237

11、Edit spec

image-20201204153241447

1.4 数据查询

image-20201204154505258

image-20201204154716630

-- 查看全部的数据
select * 
from "lagoutab1"

-- 其他查询
select dstPort, min(sum_packets), max(min_bytes), sum("count"), min("count")
  from "lagoutab1"
group by dstPort
-- count字段加引号,表示是一个列名(本质是进行转义,否则认为count是一个函数,将报错)

image-20201204154936537

备注:维度相同的数据进行了Rollup

1.5 数据摄取规范

{
    "type":"kafka",
    "spec":{
        "ioConfig":Object{...},
        "tuningConfig":Object{...},
        "dataSchema":Object{...}
    }
}

dataSchema的定义

Druid摄入数据规范的核心是dataSchema,dataSchema定义了如何解析输入的数据,并将数据存储到Druid中。

        "dataSchema":{
            "dataSource":"lagoutab1",
            "granularitySpec":{
                "type":"uniform",
                "queryGranularity":"MINUTE",
                "segmentGranularity":"DAY",
                "rollup":true
            },
            "timestampSpec":{
                "column":"ts",
                "format":"iso"
            },
            "dimensionsSpec":Object{...},
            "metricsSpec":Array[6]
        }

备注:如果没有定义Rollup,在摄取数据时维度和度量之间没有区别

ioConfig的定义:

输入数据的数据源在ioConfig中指定,每个任务类型都有自己的ioConfig,这里从 kafka 获取数据,配置如下:

        "ioConfig":{
            "type":"kafka",
            "consumerProperties":{
                "bootstrap.servers":"linux121:9092,linux122:9092"
            },
            "topic":"lagoudruid1",
            "inputFormat":{
                "type":"json"
            },
            "useEarliestOffset":true,
            "appendToExisting":true
        }

tuningConfig的定义

tuningConfig规范根据摄取任务类型而有所不同。

        "tuningConfig":{
            "type":"kafka"
        }

从HDFS中加载数据

hdfs dfs -cat /data/druidlog.dat

-- druidlog.dat
{"ts":"2020-10-01T00:01:35Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666, "dstPort":8888, "protocol": "tcp", "packets":1, "bytes":1000, "cost": 0.1}
{"ts":"2020-10-01T00:01:36Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666, "dstPort":8888, "protocol": "tcp", "packets":2, "bytes":2000, "cost": 0.1}
{"ts":"2020-10-01T00:01:37Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666, "dstPort":8888, "protocol": "tcp", "packets":3, "bytes":3000, "cost": 0.1}
{"ts":"2020-10-01T00:01:38Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666, "dstPort":8888, "protocol": "tcp", "packets":4, "bytes":4000, "cost": 0.1}
{"ts":"2020-10-01T00:02:08Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666, "dstPort":8888, "protocol": "udp", "packets":5, "bytes":5000, "cost": 0.2}
{"ts":"2020-10-01T00:02:09Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666, "dstPort":8888, "protocol": "udp", "packets":6, "bytes":6000, "cost": 0.2}
{"ts":"2020-10-01T00:02:10Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666, "dstPort":8888, "protocol": "udp", "packets":7, "bytes":7000, "cost": 0.2}
{"ts":"2020-10-01T00:02:11Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666, "dstPort":8888, "protocol": "udp", "packets":8, "bytes":8000, "cost": 0.2}
{"ts":"2020-10-01T00:02:12Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666, "dstPort":8888, "protocol": "udp", "packets":9, "bytes":9000, "cost": 0.2}

定义数据摄取规范

数据查询

select * from "lagoutab2"

select protocol, count(*) rowcoun, sum(bytes) as bytes, sum(packets) as packets, max(cost) as maxcost
  from "lagoutab2"
group by protocol

Part 4 Druid 架构与原理

基础架构

image-20201207095908048

Druid 总体包含以下 6 类节点:

1、Coordinator node:主要负责历史节点的数据负载均衡,以及通过规则管理数据的生命周期。协调节点告诉历史节点加载新数据、卸载过期数据、复制数据、 和为了负载均衡移动数据。

Coordinator 是周期性运行的(由 druid.coordinator.period 配置指定,默认执行间隔为 60s);
Coordinator 需要维护和 ZooKeeper 的连接,以获取集群的信息。Segment 和 Rule 的信息保存在元数据库中,
所以也需要维护与元数据库的连接。

2、Overlord node:进程监视 MiddleManager 进程,并且是Druid 数据摄入的主节点。负责将提取任务分配给 MiddleManagers 并协调 Segement 发布,包括接受、拆解、分配 Task,以及创建 Task 相关的锁,并返回 Task 的状态。

3、Historical node:加载生成好的数据文件,以供数据查询。Historical node是整个集群查询性能的核心所在,Historical 会承担绝大部分的 segment 查询。

Historical 进程从 Deep Storage 中下载 Segment,并响应有关这些 Segment 的查询请求(这些请求来自Broker 进程);
Historical 进程不处理写入请求;
Historical 进程采用了无共享架构设计,它知道如何去加载和删除 Segment,以及如何基于 Segment 来响应查询。
即便底层的深度存储无法正常工作,Historical 进程还是能针对其已同步的 Segments,正常提供查询服务。

4、MiddleManager node:及时摄入实时数据,生成 Segment 数据文件。

MiddleManager 进程是执行提交任务的工作节点。MiddleManagers 将任务转发给在不同 JVM 中运行的 Peon 进程。
MiddleManager、Peon、Task 的对应关系是,每个 Peon 进程一次只能运行一个Task 任务,但一个
MiddleManager 却可以管理多个 Peon 进程。

5、Broker node:接收客户端查询请求,并将这些查询转发给 Historicals 和 MiddleManagers。当 Brokers 从这些子查询中收到结果时,它们会合并这些结果并将它们返回给调用者。

Broker节点负责转发Client查询请求的;
Broker通过zookeeper能够知道哪个Segment在哪些节点上,将查询转发给相应节点;
所有节点返回数据后,Broker会将所有节点的数据进行合并,然后返回给Client;

6、Router node(可选的): 负责将请求路由到Broker、Coordinators、Overlords。

Router 进程可以在 Brokers、Overlords 和 Coordinators 进程之上,提供一层统一的 API网关。
Router 进程是可选的,如果集群数据规模已经达到了 TB级别,需要考虑启用(druid.router.managementProxy.enabled=true)。
一旦集群规模达到一定的数量级,那么发生故障的概率就会变得不容忽视,而 Router 支持将请求只发送给健康的节点,
避免请求失败。同时,查询的响应时间和资源消耗,也会随着数据量的增长而变高,而 Router 支持设置查询的优先级和负载均衡策略,避免了大查询造成的队列堆积或查询热点等问题。

Druid 的进程可以被任意部署,为了理解与部署组织方便。这些进程分为了三类:

Druid 还包含 3 类外部依赖:

Druid使用deep storage来做数据的备份,也作为在Druid进程之间在后台传输数据的一种方式。
当响应查询时,Historical首先从本地磁盘读取预取的段,这也意味着需要在deep storage和加载的数据的Historical中拥有足够的磁盘空间。
表名 作用
druid_dataSource 存储DataSources,以便 Kafka Index Service查找
druid_pendingSegments 存储 pending 的 Segments
druid_segments 存储每个 Segments 的metadata 信息
druid_rules 关于 Segment 的 load / drop 规则
druid_config 存放运行时的配置信息
druid_tasks 为Indexing Service 保存 Task 信息
druid_tasklogs 为Indexing Service 保存 Task 日志
druid_tasklocks 为Indexing Service 保存 Task 锁
druid_supervisors 为Indexing Service保存 SuperVisor 信息
druid_audit 记录配置、Coordinator规则的变化
Coordinator 节点的 Leader 选举
Historical 节点发布 Segment 的协议
Coordinator 和 Historical 之间 load / drop Segment 的协议
Overlord 节点的 Leader 选举
Overlord 和 MiddleManager 之间的 Task 管理

架构演进

Apache Druid 初始版本架构图 ~ 0.6.0(2012~2013)

20200803095928790

0.7.0 ~ 0.12.0(2013~2018)

Apache Druid 旧架构图——数据流转

20200803100042382

Apache Druid 旧架构图——集群管理

20200803100310371

0.13.0 ~ 当前版本(2018~now)

20200803100401479

Lambda 架构

从大的架构上看,Druid是一个Lambda架构。

Lambda架构是由 Storm 的作者 Nathan Marz 提出的一个实时大数据处理框架。Lambda 架构设计是为了在处理大规模数据时,同时发挥流处理和批处理的优势:

从而达到平衡延迟、吞吐量和容错性的目的,为了满足下游的即席查询,批处理和流处理的结果会进行合并。

Lambda 架构包含三层,Batch Layer、Speed Layer 和 Serving Layer:

流式数据的链路为:

Row data → Kafka → Streaming processor (Optional, 实时ETL) → Kafka(Optional) → Druid → Application / User

批处理数据的链路为:

Raw data → Kafka(Optional) → HDFS → ETL process(Optional) → Druid → Application / User

数据存储

druid-timeline

数据分区:

Segment内部存储结构

druid-column-types

MiddleManager节点接受到 ingestion 的任务之后,开始创建Segment:

Segment创建完成之后,Segment文件就是不可更改的,被写入到深度存储(目的是为了防止MiddleManager节点宕机后,Segment的丢失)。 然后Segment会加载到Historical节点,Historical节点可以直接加载到内存中。

同时,metadata store 也会记录下这个新创建的Segment的信息,如结构,尺寸,深度存储的位置等等。Coordinator节点需要这些元数据来协调数据的查找。

索引服务

索引服务架构图如下图所示:

u=4028210120,804552479&fm=15&gp=0

索引服务由三部分组件组成:

索引服务架构与 Yarn 的架构类似:

Task类型有很多,包括:

索引及压缩机制

Druid的查询时延低性能好的主要是因为采用了五个技术点:

1、数据预聚合

Roll-up聚合前:

time AppKey area value
2020-10-05 10:00:00 areakey1 Beijing 1
2020-10-05 10:30:00 areakey1 Beijing 1
2020-10-05 11:00:00 areakey1 Beijing 1
2020-10-05 11:00:00 areakey2 Shanghai 2

Roll-up聚合后:

time AppKey area value
2020-10-05 areakey1 Beijing 3
2020-10-05 areakey2 Shanghai 2

2、位图索引

Druid在摄入的数据示例:

Time AppKey Area Value
2020-10-01 10:00:00 appkey1 深圳 1
2020-10-01 11:00:00 appkey2 北京 1
2020-10-01 12:00:00 appkey2 深圳 1
2020-10-01 13:00:00 appkey2 北京 1
… … … … … … … …
2020-10-01 23:00:00 appkey3 北京 1

按天聚合后的数据如下:

Time AppKey Area Value
2020-10-01 appkey1 北京 10
2020-10-01 appkey1 深圳 20
2020-10-01 appkey2 北京 30
2020-10-01 appkey2 深圳 40
2020-10-01 appkey3 北京 50
2020-10-01 appkey3 深圳 60

Druid通过建立位图索引,实现快速数据查找。

Bitmap 索引主要为了加速查询时有条件过滤的场景。Druid 在生成索引文件的时候,对每个列的每个取值生成对应的 Bitmap 集合。如下图所示:

Key            
appkey1 1 1 0 0 0 0
appkey2 0 0 1 1 0 0
appkey3 0 0 0 0 1 1
深圳 0 1 0 1 0 1
北京 1 0 1 0 1 0

image-20201207160345157

以SQL查询为例:

1)boolean条件查询

select sum(value) 
from tab1
where time='2020-10-01' 
  and appkey in ('appkey1', 'appkey2') 
  and area='北京'

执行过程分析:

2)group by 查询

select area, sum(value) 
  from tab1
 where time='2020-10-01' 
   and appkey in ('appkey1', 'appkey2')
group by area

该查询与上面的查询不同之处在于将符合条件的列

Part 5 Druid实战案例

需求分析

1.1 场景分析

1.2 数据描述

{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","products":[{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"},{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}]}

ts:交易时间

orderId:订单编号

userId:用户id

orderStatusId:订单状态id

orderStatus:订单状态

0-11:未支付,已支付,发货中,已发货,发货失败,已退款,已关单,订单过期,订单已失效,产品已失效,代付拒绝,支付中

payModeId:支付方式id

payMode:支付方式

0-6:微信,支付宝,信用卡,银联,货到付款,现金,其他

payment:支付金额

products:购买商品

备注:一个订单可能包含多个商品,这里是一个嵌套结构

productId:商品id

productName:商品名称

price:单价

productNum:购买数量

categoryid:商品分类id

catname1:商品一级分类名称

catname2:商品二级分类名称

catname3:商品三级分类名称

以上的嵌套的json数据格式,Druid不好处理,需要对数据进行预处理,将数据拉平,处理后的数据格式:

{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"}}

{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}}

项目实战

2.1 kafka生产者

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import scala.io.BufferedSource

object KafkaProducerForDruid {
  def main(args: Array[String]): Unit = {
    // 定义 kafka 参数
    val brokers = "linux121:9092,linux122:9092,linux123:9092"
    val topic = "lagoudruid2"
    val prop = new Properties()

    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])

    // KafkaProducer
    val producer = new KafkaProducer[String, String](prop)

    val source: BufferedSource = scala.io.Source.fromFile("data/lagou_orders1.json")
    val iter: Iterator[String] = source.getLines()

    iter.foreach{line =>
      val msg = new ProducerRecord[String, String](topic, line)
      producer.send(msg)
      Thread.sleep(10)
    }

    producer.close()
    source.close()
  }
}

2.2 启动服务

启动 ZooKeeper、Kafka、Druid、HDFS服务

-- 创建topic
kafka-topics.sh --create --zookeeper linux121:2181,linux122:2181/kafka1.0 --replication-factor 1 --partitions 3 --topic lagoudruid2

2.3 定义数据摄取规范

备注:

数据摄取规范:

{
  "type": "kafka",
  "spec": {
    "dataSchema": {
      "dataSource": "lagoudruid2",
      "timestampSpec": {
        "column": "ts",
        "format": "millis",
        "missingValue": null
      },
      "dimensionsSpec": {
        "dimensions": [],
        "dimensionExclusions": [
          "ts"
        ]
      },
      "metricsSpec": [],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "HOUR",
        "queryGranularity": {
          "type": "none"
        },
        "rollup": false,
        "intervals": null
      },
      "transformSpec": {
        "filter": null,
        "transforms": []
      }
    },
    "ioConfig": {
      "topic": "lagoudruid2",
      "inputFormat": {
        "type": "json",
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": [
            {
              "type": "path",
              "name": "productId",
              "expr": "$.product.productId"
            },
            {
              "type": "path",
              "name": "productName",
              "expr": "$.product.productName"
            },
            {
              "type": "path",
              "name": "price",
              "expr": "$.product.price"
            },
            {
              "type": "path",
              "name": "productNum",
              "expr": "$.product.productNum"
            },
            {
              "type": "path",
              "name": "categoryid",
              "expr": "$.product.categoryid"
            },
            {
              "type": "path",
              "name": "catname1",
              "expr": "$.product.catname1"
            },
            {
              "type": "path",
              "name": "catname2",
              "expr": "$.product.catname2"
            },
            {
              "type": "path",
              "name": "catname3",
              "expr": "$.product.catname3"
            }
          ]
        },
        "featureSpec": {}
      },
      "replicas": 1,
      "taskCount": 1,
      "taskDuration": "PT3600S",
      "consumerProperties": {
        "bootstrap.servers": "linux121:9092,linux122:9092,linux123:9092"
      },
      "pollTimeout": 100,
      "startDelay": "PT5S",
      "period": "PT30S",
      "useEarliestOffset": true,
      "completionTimeout": "PT1800S",
      "lateMessageRejectionPeriod": null,
      "earlyMessageRejectionPeriod": null,
      "lateMessageRejectionStartDateTime": null,
      "stream": "lagoudruid2",
      "useEarliestSequenceNumber": true
    },
    "tuningConfig": {
      "type": "kafka",
      "maxRowsInMemory": 1000000,
      "maxBytesInMemory": 0,
      "maxRowsPerSegment": 5000000,
      "maxTotalRows": null,
      "intermediatePersistPeriod": "PT10M",
      "basePersistDirectory": "/opt/lagou/servers/druid-0.19.0/var/tmp/druid-realtime-persist6578707047959911548",
      "maxPendingPersists": 0,
      "indexSpec": {
        "bitmap": {
          "type": "roaring",
          "compressRunOnSerialization": true
        },
        "dimensionCompression": "lz4",
        "metricCompression": "lz4",
        "longEncoding": "longs",
        "segmentLoader": null
      },
      "indexSpecForIntermediatePersists": {
        "bitmap": {
          "type": "roaring",
          "compressRunOnSerialization": true
        },
        "dimensionCompression": "lz4",
        "metricCompression": "lz4",
        "longEncoding": "longs",
        "segmentLoader": null
      },
      "buildV9Directly": true,
      "reportParseExceptions": false,
      "handoffConditionTimeout": 0,
      "resetOffsetAutomatically": false,
      "segmentWriteOutMediumFactory": null,
      "workerThreads": null,
      "chatThreads": null,
      "chatRetries": 8,
      "httpTimeout": "PT10S",
      "shutdownTimeout": "PT80S",
      "offsetFetchPeriod": "PT30S",
      "intermediateHandoffPeriod": "P2147483647D",
      "logParseExceptions": false,
      "maxParseExceptions": 2147483647,
      "maxSavedParseExceptions": 0,
      "skipSequenceNumberAvailabilityCheck": false,
      "repartitionTransitionDuration": "PT120S"
    }
  },
  "dataSchema": {
    "dataSource": "lagoudruid2",
    "timestampSpec": {
      "column": "ts",
      "format": "millis",
      "missingValue": null
    },
    "dimensionsSpec": {
      "dimensions": [],
      "dimensionExclusions": [
        "ts"
      ]
    },
    "metricsSpec": [],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "HOUR",
      "queryGranularity": {
        "type": "none"
      },
      "rollup": false,
      "intervals": null
    },
    "transformSpec": {
      "filter": null,
      "transforms": []
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsInMemory": 1000000,
    "maxBytesInMemory": 0,
    "maxRowsPerSegment": 5000000,
    "maxTotalRows": null,
    "intermediatePersistPeriod": "PT10M",
    "basePersistDirectory": "/opt/lagou/servers/druid-0.19.0/var/tmp/druid-realtime-persist6578707047959911548",
    "maxPendingPersists": 0,
    "indexSpec": {
      "bitmap": {
        "type": "roaring",
        "compressRunOnSerialization": true
      },
      "dimensionCompression": "lz4",
      "metricCompression": "lz4",
      "longEncoding": "longs",
      "segmentLoader": null
    },
    "indexSpecForIntermediatePersists": {
      "bitmap": {
        "type": "roaring",
        "compressRunOnSerialization": true
      },
      "dimensionCompression": "lz4",
      "metricCompression": "lz4",
      "longEncoding": "longs",
      "segmentLoader": null
    },
    "buildV9Directly": true,
    "reportParseExceptions": false,
    "handoffConditionTimeout": 0,
    "resetOffsetAutomatically": false,
    "segmentWriteOutMediumFactory": null,
    "workerThreads": null,
    "chatThreads": null,
    "chatRetries": 8,
    "httpTimeout": "PT10S",
    "shutdownTimeout": "PT80S",
    "offsetFetchPeriod": "PT30S",
    "intermediateHandoffPeriod": "P2147483647D",
    "logParseExceptions": false,
    "maxParseExceptions": 2147483647,
    "maxSavedParseExceptions": 0,
    "skipSequenceNumberAvailabilityCheck": false,
    "repartitionTransitionDuration": "PT120S"
  },
  "ioConfig": {
    "topic": "lagoudruid2",
    "inputFormat": {
      "type": "json",
      "flattenSpec": {
        "useFieldDiscovery": true,
        "fields": [
          {
            "type": "path",
            "name": "productId",
            "expr": "$.product.productId"
          },
          {
            "type": "path",
            "name": "productName",
            "expr": "$.product.productName"
          },
          {
            "type": "path",
            "name": "price",
            "expr": "$.product.price"
          },
          {
            "type": "path",
            "name": "productNum",
            "expr": "$.product.productNum"
          },
          {
            "type": "path",
            "name": "categoryid",
            "expr": "$.product.categoryid"
          },
          {
            "type": "path",
            "name": "catname1",
            "expr": "$.product.catname1"
          },
          {
            "type": "path",
            "name": "catname2",
            "expr": "$.product.catname2"
          },
          {
            "type": "path",
            "name": "catname3",
            "expr": "$.product.catname3"
          }
        ]
      },
      "featureSpec": {}
    },
    "replicas": 1,
    "taskCount": 1,
    "taskDuration": "PT3600S",
    "consumerProperties": {
      "bootstrap.servers": "linux121:9092,linux122:9092,linux123:9092"
    },
    "pollTimeout": 100,
    "startDelay": "PT5S",
    "period": "PT30S",
    "useEarliestOffset": true,
    "completionTimeout": "PT1800S",
    "lateMessageRejectionPeriod": null,
    "earlyMessageRejectionPeriod": null,
    "lateMessageRejectionStartDateTime": null,
    "stream": "lagoudruid2",
    "useEarliestSequenceNumber": true
  },
  "context": null,
  "suspended": false
}

2.4 查询计算

-- 查记录总数
select count(*) as recordcount 
  from lagoudruid2

-- 查订单总数
select count(distinct orderId) as orderscount
  from lagoudruid2

-- 查有多少用户数
select count(distinct userId) as usercount
  from lagoudruid2
  
-- 统计各种订单状态的订单数
select orderStatus, count(*)
  from (
      select orderId, orderStatus
        from lagoudruid2
      group by orderId, orderStatus
  )
group by orderStatus

-- 统计各种支付方式的订单数
select payMode, count(1)
  from (
      select orderId, payMode
        from lagoudruid2
      group by orderId, payMode
  )
group by payMode

-- 订单金额最大的前10名
select orderId, payment, count(1) as productcount, sum(productNum) as products
  from lagoudruid2
group by orderId, payment
order by payment desc limit 10

-- 计算每秒订单总金额
select timesec, round(sum(payment)/10000, 2)
  from (
      select date_trunc('second', __time) as timesec, orderId, payment
        from lagoudruid2
      group by date_trunc('second', __time), orderId, payment
  )
group by timesec 

SQL参考:https://druid.apache.org/docs/0.19.0/querying/sql.html#data-types

2.5 Druid案例小结