Hive是基于Hadoop的一个数据仓库工具,可以将 结构化的数据文件 映射为一张表 (类似于RDBMS中的表),并提供类SQL查询功能;Hive是由Facebook开源,用于解 决海量结构化日志的数据统计。
- Hive本质是:将 SQL 转换为 MapReduce 的任务进行运算
- 底层由HDFS来提供数据存储
- 可以将Hive理解为一个:将 SQL 转换为 MapReduce 任务的工具
数据仓库(Data Warehouse)是一个面向主题的、集成的、相对稳定的、反映历史变 化的数据集合,主要用于管理决策。(数据仓库之父比尔·恩门,1991年提出)。
- 数据仓库的目的:构建面向分析的、集成的数据集合;为企业提供决策支持
- 数据仓库本身不产生数据,数据来源于外部
- 存储了大量数据,对这些数据的分析和处理不可避免的用到Hive
Hive的优缺点
Hive的优点
学习成本低。Hive提供了类似SQL的查询语言,开发人员能快速上手; 处理海量数据。底层执行的是MapReduce 任务; 系统可以水平扩展。底层基于Hadoop; 功能可以扩展。Hive允许用户自定义函数; 良好的容错性。某个节点发生故障,HQL仍然可以正常完成; 统一的元数据管理。元数据包括:有哪些表、表有什么字段、字段是什么类型
Hive的缺点
HQL表达能力有限; 迭代计算无法表达; Hive的执行效率不高(基于MR的执行引擎); Hive自动生成的MapReduce作业,某些情况下不够智能; Hive的调优困难;
Hive架构
-
用户接口 CLI(Common Line Interface):Hive的命令行,用于接收HQL,并返 回结果; JDBC/ODBC:是指Hive的java实现,与传统数据库JDBC类似; WebUI:是指可通过浏览器访问Hive;
-
Thrift Server, Hive可选组件,是一个软件框架服务,允许客户端使用包括Java、C++、Ruby和其他很多种语言,通过 编程的方式远程访问Hive;
-
元数据管理(MetaStore) Hive将元数据存储在关系数据库中(如mysql、 derby)。Hive的元数据包括:数据库名、表名及类型、字段名称及数据类型、数 据所在位置等;
-
驱动程序(Driver)
解析器 (SQLParser) :使用第三方工具(antlr)将HQL字符串转换成抽象语 法树(AST);对AST进行语法分析,比如字段是否存在、SQL语义是否有 误、表是否存在;
编译器 (Compiler) :将抽象语法树编译生成逻辑执行计划;
优化器 (Optimizer) :对逻辑执行计划进行优化,减少不必要的列、使用分 区等;
执行器 (Executr) :把逻辑执行计划转换成可以运行的物理计划;
Hive安装与配置
安装前提:3台虚拟机,安装了Hadoop
安装软件:Hive(2.3.7) + MySQL (5.7.26)
备注:Hive的元数据默认存储在自带的 derby 数据库中,生产中多采用MySQL
derby:java语言开发占用资源少,单进程,单用户。仅仅适用于个人的测试。
MySQL安装
-
1、环境准备(删除有冲突的依赖包、安装必须的依赖包) centos7.6自带的 MariaDB(MariaDB是MySQL的一个分支)和其他现存的mysql,与要安装的MySQL有冲突,需要删除。
# 查询是否安装了 mariadb rpm -aq | grep mariadb # 删除mariadb。-e 删除指定的套件;--nodeps 不验证套件的相互关联性 rpm -e --nodeps mariadb-libs
- 2、安装依赖
yum install perl -y yum install net-tools -y
- 3、安装MySQL
# 解压缩 tar xvf mysql-5.7.26-1.el7.x86_64.rpm-bundle.tar # 依次运行以下命令 rpm -ivh mysql-community-common-5.7.26-1.el7.x86_64.rpm rpm -ivh mysql-community-libs-5.7.26-1.el7.x86_64.rpm rpm -ivh mysql-community-client-5.7.26-1.el7.x86_64.rpm rpm -ivh mysql-community-server-5.7.26-1.el7.x86_64.rpm
- 4、启动数据库
systemctl start mysqld
- 5、查找root密码
grep password /var/log/mysqld.log
-
mysql没有临时密码问题
1 . Stop MySQL:
systemctl stop mysqld
2 . Set the mySQL environment option
systemctl set-environment MYSQLD_OPTS="--skip-grant-tables"
3 . Start MySQL using the options you just set
systemctl start mysqld
4 . Login as root without a password
mysql -u root
5 . set password
update mysql.user set authentication_string=password(‘newpassword’) where user='root' and Host = 'localhost'; # or SET PASSWORD = PASSWORD('newpasswd'); flush privileges;
6 . Stop MySQL
systemctl stop mysqld
7 . Unset the MySQL environment option so it starts normally next time
systemctl unset-environment MYSQLD_OPTS
8 . Start MySQL normally:
systemctl start mysqld
-
- 6、修改 root 口令
# 进入MySQL,使用前面查询到的口令 mysql -u root -p # 设置口令强度;将root口令设置为12345678;刷新 set global validate_password_policy=0; set password for 'root'@'localhost' =password('12345678'); flush privileges;
- 7、创建 hive 用户(optional)
-- 创建用户设置口令、授权、刷新 CREATE USER 'hive'@'%' IDENTIFIED BY '12345678'; GRANT ALL ON *.* TO 'hive'@'%'; FLUSH PRIVILEGES;
Hive安装
Hive官网: http://hive.apache.org
下载网址: http://archive.apache.org/dist/hive/
文档网址:https://cwiki.apache.org/confluence/display/Hive/LanguageManual
安装前提:3台虚拟机,安装了Hadoop
安装软件:Hive(2.3.7) + MySQL (5.7.26)
# hive安装包
apache-hive-2.3.7-bin.tar.gz
# MySQL安装包 mysql-5.7.26-1.el7.x86_64.rpm-bundle.tar
# MySQL的JDBC驱动程序
mysql-connector-java-5.1.46.jar
# 整体的安装步骤:
1、安装MySQL
2、安装配置Hive
3、Hive添加常用配置
- 1、下载Hive软件,并解压缩
cd /opt/lagou/software tar zxvf apache-hive-2.3.7-bin.tar.gz -C ../servers/ cd ../servers mv apache-hive-2.3.7-bin hive-2.3.7
- 2、修改环境变量
# 在 /etc/profile 文件中增加环境变量 export HIVE_HOME=/opt/lagou/servers/hive-2.3.7 export PATH=$PATH:$HIVE_HOME/bin # 执行并生效 source /etc/profile
- 3、修改 Hive 配置
cd $HIVE_HOME/conf vi hive-site.xml
增加以下内容:
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <!-- hive元数据的存储位置 --> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://centos7-3:3306/hivemetadata?createDatabaseIfNotExist=true&useSSL=false</value> <description>JDBC connect string for a JDBC metastore</description> </property> <!-- 指定驱动程序 --> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> <description>Driver class name for a JDBC metastore</description> </property> <!-- 连接数据库的用户名 --> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>hive</value> <description>username to use against metastore database</description> </property> <!-- 连接数据库的口令 --> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>12345678</value> <description>password to use against metastore database</description> </property> <!-- 数据默认的存储位置(HDFS) --> <property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> <description>location of default database for the warehouse</description> </property> <!-- 在命令行中,显示当前操作的数据库 --> <property> <name>hive.cli.print.current.db</name> <value>true</value> <description>Whether to include the current database in the Hive prompt.</description> </property> <!-- 在命令行中,显示数据的表头 --> <property> <name>hive.cli.print.header</name> <value>true</value> </property> <!-- 操作小规模数据时,使用本地模式,提高效率 --> <property> <name>hive.exec.mode.local.auto</name> <value>true</value> <description>Let Hive determine whether to run in local mode automatically</description> </property> </configuration>
备注:
- 注意jdbc的连接串,如果没有 useSSL=false 会有大量警告
- 在xml文件中 & 表示 &
-
4、拷贝 MySQL JDBC 驱动程序 将 mysql-connector-java-5.1.46.jar 拷贝到 $HIVE_HOME/lib
-
5、初始化元数据库 执行成功,会多一个 hivemetadata的database
schematool -dbType mysql -initSchema
-
6、启动Hive,执行命令 启动hive服务之前,请先启动hdfs、yarn的服务
$ hive hive> show functions;
-
- Hive的日志文件 Hive的log默认存放在 /tmp/root 目录下(root为当前用户名);这个位置可以修改。但不建议改
vi $HIVE_HOME/conf/hive-log4j2.properties # 添加以下内容: property.hive.log.dir = /opt/lagou/servers/hive-2.3.7/logs
Hive基础知识
参数配置方式
参数配置的三种方式:
1、用户自定义配置文件(hive-site.xml)
2、启动hive时指定参数(-hiveconf)
3、hive命令行指定参数(set)
配置信息的优先级:
set > -hiveconf > hive-site.xml > hive-default.xml
-
用户自定义配置文件(hive-site.xml)
- 启动hive时指定参数(-hiveconf)
hive -hiveconf hive.exec.mode.local.auto=false;
- hive命令行指定参数(set)
# 查看配置 set hive.exec.mode.local.auto; hive.exec.mode.local.auto=false # 修改配置 set hive.exec.mode.local.auto = true;
Hive命令
hive -help
-
hive -e “select * from users” -e:不进入hive交互窗口,执行sql语句
-
hive -f hqlfile1.sql -f:执行脚本中sql语句
# 创建文件hqlfile1.sql,内容:select * from users # 执行文件中的SQL语句 hive -f hqlfile1.sql # 执行文件中的SQL语句,将结果写入文件 hive -f hqlfile1.sql >> result1.log
-
exit; quit; 退出Hive命令行
-
在命令行执行 shell 命令 / dfs 命令
hive> ! ls; hive> ! clear; hive> dfs -ls / ;
数据类型与文件格式
- 常用数据类型
- 数据类型的隐式转化,小类型可以转化成大类型,反之就不行
- number->string : okay
- string->number : negative
- boolean 类型不能和number相互转化
- 数据类型的显示转换
使用cast函数进行强制类型转换;如果强制类型转换失败,返回NULL
hive> select cast('1111s' as int); OK NULL hive> select cast('1111' as int); OK 1111
-
int
-
string
-
double
-
float
-
timestamp
-
boolean
- 数据类型的隐式转化,小类型可以转化成大类型,反之就不行
-
集合数据类型
-
array 有序的相同数据类型的集合
-
map key-value对。key必须是基 本数据类型,value不限
-
struct 不同类型字段的集合。类似 于C语言的结构体
-
union 不同类型的元素存储在同一
字段的不同行中, 基本不使用 -
示例代码
hive> select array(1,2,3); OK [1,2,3] -- 使用 [] 访问数组元素 hive> select arr[0] from (select array(1,2,3) arr) tmp; hive> select map('a', 1, 'b', 2, 'c', 3); OK {"a":1,"b":2,"c":3} -- 使用 [] 访问map元素 hive> select mymap["a"] from (select map('a', 1, 'b', 2, 'c', 3) as mymap) tmp; -- 使用 [] 访问map元素。 key 不存在返回 NULL hive> select mymap["x"] from (select map('a', 1, 'b', 2, 'c', 3) as mymap) tmp; NULL hive> select struct('username1', 7, 1288.68); OK {"col1":"username1","col2":7,"col3":1288.68} -- 给 struct 中的字段命名 hive> select named_struct("name", "username1", "id", 7, "salary", 12880.68); OK {"name":"username1","id":7,"salary":12880.68} -- 使用 列名.字段名 访问具体信息 hive> select userinfo.id > from (select named_struct("name", "username1", "id", 7, "salary", 12880.68) userinfo) tmp;
-
-
文本文件数据编码 Hive表中的数据在存储在文件系统上,Hive定义了默认的存储格式,Hive默认使用几个很少出现在字段值中的控制字符。也支持用户自定义文件存储格式, 来表示替换默认分隔符的字符。
^A / ^B / ^C 都是特殊的控制字符,使用 more 、 cat 命令是看不见的;可以使用
cat -A file.dat
-
\n 用于分隔行。每一行是一条记录,使用换行符分割数据
-
^A (Ctrl + v) + (Ctrl + a) => ^A
用于分隔字段。在CREATE TABLE语句中使用八进制编码 \001表示
-
^B (Ctrl + v) + (Ctrl + b) => ^B
用于分隔 ARRAY、MAP、STRUCT 中的元素。在CREATE TABLE语句中使用八进制编码\002表示
-
^C (Ctrl + v) + (Ctrl + c) => ^C
Map中 key、value之间的分隔符。在CREATE TABLE语句 中使用八进制编码\003表示
-
- 读时模式
- 写时模式:
在传统数据库中,在加载时发现数据不符合表的定义,则拒绝加载数据。数据在写入数据库时对照表模式进行检查,这种模式称为”写时模式”(schema on write)。
写时模式 -> 写数据检查 -> RDBMS;
- 读时模式:
Hive中数据加载过程采用”读时模式” (schema on read),加载数据时不进行数据格 式的校验,读取数据时如果不合法则显示NULL。这种模式的优点是加载数据迅速。
读时模式 -> 读时检查数据 -> Hive;好处:加载数据快;问题:数据显示NULL
- 写时模式:
DDL命令
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
数据库操作
Hive的数据库名、表名均不区分大小写; 名字不能使用数字开头; 不能使用关键字,尽量不使用特殊符号;
- 创建数据库语法
CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name [COMMENT database_comment] [LOCATION hdfs_path] [MANAGEDLOCATION hdfs_path] [WITH DBPROPERTIES (property_name=property_value, ...)];
-- 创建数据库,在HDFS上存储路径为 /user/hive/warehouse/*.db hive (default)> create database mydb; hive (default)> dfs -ls /user/hive/warehouse; -- 避免数据库已经存在时报错,使用 if not exists 进行判断【标准写法】 hive (default)> create database if not exists mydb; -- 创建数据库。添加备注,指定数据库在存放位置 hive (default)> create database if not exists mydb2 comment 'this is mydb2' location '/user/hive/mydb2.db';
- 查看数据库
-- 查看所有数据库 show database; -- 查看数据库信息 desc database mydb2; desc database extended mydb2; describe database extended mydb2;
-
使用数据库 use mydb;
- 删除数据库
-- 删除一个空数据库 drop database databasename; -- 如果数据库不为空,使用 cascade 强制删除 drop database databasename cascade;
建表语法
create [external] table [IF NOT EXISTS] table_name
[(colName colType [comment 'comment'], ...)]
[comment table_comment]
[partition by (colName colType [comment col_comment], ...)]
[clustered BY (colName, colName, ...)
[sorted by (col_name [ASC|DESC], ...)] into num_buckets buckets]
[row format row_format]
[stored as file_format]
[LOCATION hdfs_path]
[TBLPROPERTIES (property_name=property_value, ...)]
[AS select_statement];
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS]
[db_name.]table_name
LIKE existing_table_or_view_name
[LOCATION hdfs_path];
-
CREATE TABLE 按给定名称创建表,如果表已经存在则抛出异常。可使用if not exists 规避。
-
EXTERNAL 创建外部表,否则创建的是内部表(管理表)。 删除内部表时,数据和表的定义同时被删除; 删除外部表时,仅仅删除了表的定义,数据保留; 在生产环境中,多使用外部表;
-
comment 表的注释
-
partition by 对表中数据进行分区,指定表的分区字段
-
clustered by 创建分桶表,指定分桶字段
-
sorted by 对桶中的一个或多个列排序,较少使用
-
ROW FORMAT DELIMITED 存储子句
ROW FORMAT DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS TERMINATED BY char] [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char] | SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, ...)]
建表时可指定 SerDe 。如果没有指定 ROW FORMAT 或者 ROW FORMAT DELIMITED,将会使用默认的 SerDe。建表时还需要为表指定列,在指定列的同 时也会指定自定义的 SerDe。Hive通过 SerDe 确定表的具体的列的数据。SerDe是 Serialize/Deserilize 的简称, hive使用Serde进行行对象的序列与反 序列化。
-
stored as SEQUENCEFILE|TEXTFILE|RCFILE 如果文件数据是纯文本,可以使 用 STORED AS TEXTFILE(缺省);如果数据需要压缩,使用 STORED AS SEQUENCEFILE(二进制序列文件)。
-
LOCATION 表在HDFS上的存放位置
-
TBLPROPERTIES 定义表的属性
-
AS 后面可以接查询语句,表示根据后面的查询结果创建表, 复制表和数据, 表的结构可能有变化,比如partitions没了
-
LIKE like 表名,允许用户复制现有的表结构,但是不复制数据
内部表 & 外部表
在创建表的时候,可指定表的类型。表有两种类型,分别是内部表(管理表)、外部表。
- 默认情况下,创建内部表。如果要创建外部表,需要使用关键字 external
- 在删除内部表时,表的定义(元数据) 和 数据 同时被删除
- 在删除外部表时,仅删除表的定义,数据被保留
-
在生产环境中,多使用外部表
- 内部表
-- 创建内部表 create table t1( id int, name string, hobby array<string>, addr map<string, string> ) row format delimited fields terminated by ";" collection items terminated by "," map keys terminated by ":";
- 外部表
-- 创建外部表external create external table t2( id int, name string, hobby array<string>, addr map<string, string> ) row format delimited fields terminated by ";" collection items terminated by "," map keys terminated by ":";
- 内部表与外部表的转换
-- 内部表转外部表 alter table t1 set tblproperties("EXTERNAL"="TRUE"); -- 外部表转内部表。EXTERNAL 大写,false 不区分大小 alter table t1 set tblproperties('EXTERNAL'='FALSE'); -- 查询表信息,是否转换成功 desc formatted t1;
分区表
Hive在执行查询时,一般会扫描整个表的数据。由于表的数据量大,全表扫描消耗时间长、效率低。
而有时候,查询只需要扫描表中的一部分数据即可,Hive引入了分区表的概念,将表 的数据存储在不同的子目录中,每一个子目录对应一个分区。只查询部分分区数据 时,可避免全表扫描,提高查询效率。
在实际中,通常根据时间、地区等信息进行分区。
- 创建表
-- 创建表 create table if not exists t3( id int, name string, hobby array<string>, addr map<string, string> ) partitioned by (dt string) row format delimited fields terminated by ";" collection items terminated by "," map keys terminated by ":";
- 加载数据
-- 加载数据 load data local inpath '/root/data/t1.dat' into table t3 partition(dt="2021-01-02"); load data local inpath '/root/data/t1.dat' into table t3 partition(dt="2021-01-03"); -- * 分区字段不是表中已经存在的数据,可以将分区字段看成伪列
- 查看分区
-- 查看分区 show partitions t3;
- 分区操作
-- 增加一个分区,不加载数据 alter table t3 add partition(dt="2021-01-04"); -- 增加多个分区,不加载数据 alter table t3 add partition(dt="2021-01-05") partition(dt="2021-01-06") partition(dt="2021-01-07"); -- 修改分区的hdfs路径 alter table t3 partition(dt="2021-01-02") set location '/user/hive/warehouse/mydb1.db/t3/dt=2021-01-07'; -- 可以删除一个或多个分区,用逗号隔开 alter table t3 drop partition(dt="2021-01-05"); alter table t3 drop partition(dt="2021-01-06"), partition(dt="2021-01-07"); -- * 删除的时候要用 "," 隔开 添加的时候不用
分桶表
当单个的分区或者表的数据量过大,分区不能更细粒度的划分数据,就需要使用分桶技术将数据划分成更细的粒度。将数据按照指定的字段进行分成多个桶中去,即将数据按照字段进行划分,数据按照字段划分到多个文件当中去。分桶的原理:
- MR中:key.hashCode % reductTask
- Hive中:分桶字段.hashCode % 分桶个数
-
网上有资料说要使用分区表需要设置 hive.enforce.bucketing=true,那是Hive 1.x 以前的版本;Hive 2.x 中,删除了该参数,始终可以分桶;
- 创建分桶表
-- 创建分桶表 create table course( id int, name string, score int ) clustered by (id) into 3 buckets row format delimited fields terminated by "\t";
-
导入数据 分桶表的导入时需要计算具体分到哪个桶,所以不能直接加载文件,需要借助一个结构相同的普通表
insert ... select ... 给桶表加载数据
- 创建普通表 create table course_common( id int, name string, score int ) row format delimited fields terminated by "\t"; -- 普通表加载数据 load data local inpath '/root/data/course.dat' into table course_common; select * from course_common; -- 通过 insert ... select ... 给桶表加载数据 insert into table course select * from course_common; select * from course; -- 观察分桶数据。数据按照:(分区字段.hashCode) % (分桶数) 进行分区 desc formatted course; dfs -ls /user/hive/warehouse/mydb1.db/course; dfs -cat /user/hive/warehouse/mydb1.db/course/000000_0; dfs -cat /user/hive/warehouse/mydb1.db/course/000001_0; dfs -cat /user/hive/warehouse/mydb1.db/course/000002_0;
修改表alter
-- 修改表名。rename
alter table course rename to course1;
-- 修改列名。change column
alter table course1 change column id uid int;
-- 修改字段类型。change column
alter table course1 change column score score string;
alter table course1 change column score score int;
-- The following columns have types incompatible with the existing columns in their respective positions
-- 修改字段数据类型时,要满足数据类型转换的要求。如int可以转为string,但是 string不能转为int
-- 增加字段。add columns()
alter table course1 add columns(sex string);
alter table course1 add columns(hobby array<String>, salary double);
-- 删除字段:replace columns
-- 这里仅仅只是在元数据中删除了字段,并没有改动hdfs上的数据文件
alter table course1 replace columns(id int, name string, score int);
-
rename to
-
change column
-
add columns
-
replace columns
删除表drop
-- 删除表
drop table course1;
数据操作
数据导入
LOAD DATA [LOCAL] INPATH 'filepath'
[OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1,
partcol2=val2 ...)]
- LOCAL
- LOAD DATA LOCAL … 从本地文件系统加载数据到Hive表中。本地文件会拷 贝到Hive表指定的位置
- LOAD DATA … 从HDFS加载数据到Hive表中。HDFS文件移动到Hive表指定的位置
-
INPATH 加载数据的路径
-
OVERWRITE 覆盖表中已有数据;否则表示追加数据
-
PARTITION 将数据加载到指定的分区
- 示例代码
-- 创建表 CREATE TABLE tabA ( id int ,name string ,area string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; -- 加载本地文件到hive(tabA) load data local inpath '/root/data/sourceA.txt' into table tabA; select * from tabA; -- 检查本地文件还在 -- 加载hdfs文件到hive(tabA) load data inpath '/user/hive/tabB' into table tabA; -- 检查HDFS文件,已经被转移 -- 加载数据覆盖表中已有数据 load data local inpath '/root/data/sourceA.txt' overwrite into table tabA; select * from tabA; -- 创建表时加载数据 CREATE TABLE tabB ( id int, name string, area string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' Location '/user/hive/tabB'; -- 文件不会被移动,直接使用指定的路径,直接加载对应的文件
插入数据
-- 创建分区表
create table tabC (
id int,
name string,
area string
)
partitioned by (month string)
row format delimited fields terminated by ",";
-- 插入数据
insert into table tabC partition(month="202101") values (4, "zhangsan", "BJ"), (5, "lisi", "TJ"), (6, "wangwu", "TW");
-- 插入查询的结果数据
insert into table tabC partition(month="202102") select id, name, area from tabC;
-- 多表(多分区)插入模式
from tabC
insert overwrite table tabC partition(month="202102")
select id, name, area where month="202101"
insert overwrite table tabC partition(month="202103")
select id, name, area where month="202102";
-- 创建表并插入数据(as select)
create table if not exists tabD as select * from tabC;
-- 新建的表没有partitions,原来的partitions字段会被转化成column
数据导出
-- 将查询结果导出到本地
insert overwrite local directory "/root/data/tabC"
select * from tabC;
-- 将查询结果格式化输出到本地
insert overwrite local directory "/root/data/ftabC"
row format delimited fields terminated by "\t"
select * from tabC;
-- 将查询结果导出到HDFS
insert overwrite directory "/user/hive/tmp/tabC"
row format delimited fields terminated by "\t"
select * from tabC;
-- dfs 命令导出数据到本地。本质是执行数据文件的拷贝
dfs -get /user/hive/warehouse/mydb1.db/tabc /root/data/gtabC;
-- hive 命令导出数据到本地。执行查询将查询结果重定向到文件
hive -e "select * from mydb1.tabC" > my.log
-- export 导出数据到HDFS。使用export导出数据时,不仅有数还有表的元数据信息
export table tabC to "/user/hive/tmp/myTabC";
-- export 导出的数据,可以使用 import 命令导入到 Hive 表中
-- 使用 like tname创建的表结构与原表一致。create ... as select ... 结构 可能不一致
create table tabE like tabC;
import table tabE from '/user/hive/tmp/myTabC';
-- 截断表,清空数据。(注意:仅能操作内部表)
truncate table tabE;
-- 以下语句报错,外部表不能执行 truncate 操作
alter table tabE set tblproperties("EXTERNAL"="TRUE");
DQL命令
select语法
DQL – Data Query Language 数据查询语言
SELECT [ALL | DISTINCT] select_expr, select_expr, ...
FROM table_reference
[WHERE where_condition]
[GROUP BY col_list]
[ORDER BY col_list]
[CLUSTER BY col_list | [DISTRIBUTE BY col_list] [SORT BY
col_list]]
[LIMIT [offset,] rows]
注意事项:
- SQL语句对大小写不敏感
- SQL语句可以写一行(简单SQL)也可以写多行(复杂SQL)
- 各子句一般要分行
-
使用缩进格式,提高SQL语句的可读性(重要)
- 基本查询
-- 计算,查处系统变量及函数 select 8*222; select current_date; -- 起别名,不能使用关键字,需要时使用 `` 括起来 select 8*222 num; select current_date `date`; -- 全表查询 select * from emp; -- 使用聚合函数 -- 计算行数使用* select count(*) from emp; -- 也可以用来查询某个值不为null的个数 select count(comm) from emp; select sum(sal) from emp; select avg(sal),max(sal),min(sal) from emp; -- 使用limit子句限制返回的行数 select * from emp limit 3; -- 第一个参数是页码,从0开始,不写就是0 select * from emp limit 1, 3;
-
where WHERE子句紧随FROM子句,使用WHERE子句,过滤不满足条件的数据;
where 子句中不能使用列的别名;-
比较运算符
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF -
逻辑运算符 and、or、not
- 示例代码
/** null 参与运算就过为null,除了null<=>null 的结果是true */ -- null select null=null; -- null select null==null; -- true -> null<=>null -> null is null select null<=>null; select sal, comm, sal+comm from emp; -- 使用 is null 判空 select * from emp where comm is null; select * from emp where comm is not null; -- 使用 in select * from emp where sal in (1300, 3000); -- 使用 between ... and ... select * from emp where sal between 1300 and 3000; -- 使用 like select * from emp where ename like "A%"; -- 使用 rlike。正则表达式,名字以A或S开头 select * from emp where ename rlike "^(A|S).*";
-
group by GROUP BY语句通常与聚组函数一起使用,按照一个或多个列对数据进行分组,对每个组进行聚合操作。
-- 计算emp表每个部门的平均工资 select deptno, avg(sal) avg_sal from emp group by deptno; -- 计算emp每个部门中每个岗位的最高薪水 select deptno, job, max(sal) max_sal from emp group by deptno, job; -- 求每个部门的平均薪水大于2000的部门 -- having select deptno, avg(sal) avg_sal from emp group by deptno having avg_sal > 2000;
-
表连接 Hive支持通常的SQL JOIN语句。默认情况下,仅支持等值连接,不支持非等值连接。
JOIN 语句中经常会使用表的别名。使用别名可以简化SQL语句的编写,使用表名前缀可以提高SQL的解析效率。连接查询操作分为两大类:内连接和外连接,而外连接可进一步细分为三种类型:
- 内连接: [inner] join
- 外连接 (outer join)
- 左外连接。 left [outer] join,左表的数据全部显示
- 右外连接。 right [outer] join,右表的数据全部显示
- 全外连接。 full [outer] join,两张表的数据都显示
-- 内连接 select * from u1 join u2 on u1.id=u2.id; -- 左外连接 select * from u1 left join u2 on u1.id=u2.id; -- 右外连接 select * from u1 right join u2 on u1.id=u2.id; -- 全外连接 select * from u1 full join u2 on u1.id=u2.id; -- 多表连接 select * from techer t left join course c on t.t_id = c.t_id left join score s on s.c_id = c.c_id left join student stu on s.s_id = stu.s_id; /** Hive总是按照从左到右的顺序执行,Hive会对每对 JOIN 连接对象启动一个 MapReduce 任务。 上面的例子中会首先启动一个 MapReduce job 对表 t 和表 c 进行连接操作;然后再 启动一个 MapReduce job 将第一个 MapReduce job 的输出和表 s 进行连接操作; 然后再继续直到全部操作; */
- 笛卡尔积
满足以下条件将会产生笛卡尔集:
- 没有连接条件
- 连接条件无效
- 所有表中的所有行互相连接
如果表A、B分别有M、N条数据,其笛卡尔积的结果将有 M*N 条数据;缺省条件下 hive不支持笛卡尔积运算;
** 危险操作,表一条数n,表二条数m,查询结果等于n*m,默认关闭
-- 关闭触发笛卡尔积的警告 set hive.strict.checks.cartesian.product=false; select * from u1, u2; select count(*) from u1, u2;
-
-
排序子句
- 全局排序(order by)
order by 子句出现在select语句的结尾;
order by子句对最终的结果进行排序; 默认使用升序(ASC);可以使用DESC,跟在字段名之后表示降序; ORDER BY执行全局排序,只有一个reduce;-- ORDER BY执行全局排序,只有一个reduce; -- 普通排序 select * from emp order by sal; -- 降序 select * from emp order by sal desc; -- 按别名排序 select empno, ename, job, mgr, sal + nvl(comm, 0) salcomm, deptno from emp order by deptno desc, salcomm desc; -- 排序字段要出现在select子句中。以下语句无法执行(因为select子句中缺少deptno) select empno, ename, job, mgr, sal + nvl(comm, 0) salcomm from emp order by deptno desc, salcomm desc;
-
MR内部排序(sort by) 对于大规模数据而言order by效率低; 在很多业务场景,我们并不需要全局有序的数据,此时可以使用sort by;
sort by为每个reduce产生一个排序文件,在reduce内部进行排序,得到局部有序的 结果;-- 设置reduce个数 set mapreduce.job.reduces=2; -- 按照工资降序查看员工信息 select * from emp sort by sal; select * from emp sort by sal desc; -- 将查询结果导入到文件中(按照工资降序)。生成两个输出文件,每个文件内部数据按 工资降序排列 insert overwrite local directory "/root/data/output/empSal" row format delimited fields terminated by "\t" select * from emp sort by sal desc;
- 分区排序(distribute by)
- distribute by -> MR partitioner
- distribute by 将特定的行发送到特定的reducer中,便于后继的聚合 与 排序操作;
- distribute by 类似于MR中的分区操作,可以结合sort by操作,使分区数据有序;
- distribute by 要写在sort by之前;
-- 启动2个reducer task;先按 deptno 分区,在分区内按 sal 排序 set mapreduce.job.reduces=2; -- 将结果输出到文件,观察输出结果 insert overwrite local directory "/root/data/output/disEmp" select * from emp distribute by deptno sort by sal desc; -- 分区倾斜,全都在一个文件中 -- 将数据分到3个区中,每个分区都有数据 set mapreduce.job.reduces=3; insert overwrite local directory "/root/data/output/disEmp" row format delimited fields terminated by "\t" select * from emp distribute by deptno sort by sal desc;
-
Cluster By 当distribute by 与 sort by是同一个字段时,可使用cluster by简化语法; cluster by 只能是升序,不能指定排序规则;
很少用到 -- 语法上是等价的 select * from emp distribute by deptno sort by deptno; select * from emp cluster by deptno;
- 全局排序(order by)
order by 子句出现在select语句的结尾;
函数
系统内置函数
- show functions;
-- 查看系统自带函数 -- 显示自带函数的用法 desc function upper; desc function extended upper;
- 日期函数
select current_date; -- 2021-01-31 -- 建议使用current_timestamp,有没有括号都可以 select current_timestamp; -- 2021-01-31 14:54:15.003 -- 时间戳转日期 select from_unixtime(1612075958); -- 2021-01-31 14:52:38 select from_unixtime(1612075958, "yyyyMMdd"); -- 20210131 select from_unixtime(1612075958, "yyyy-MM-dd HH:mm"); -- 2021-01-31 14:52 -- 日期转时间戳 select unix_timestamp(); -- 1612075958 -- 需要写到秒 select unix_timestamp("2021-01-31 14:52:38"); -- 计算时间差 Returns the number of days from startdate to enddate select datediff('2019-10-10', '2020-01-31'); -- -113 select datediff('2019-10-10', current_date); -- -479 -- 查询当月第几天 select dayofmonth(current_date); -- 31 -- 计算月末: select last_day(current_date); -- 31 -- 当月第1天: select date_sub(current_date, dayofmonth(current_date)-1); -- 2021-01-01 -- 下个月第1天: select add_months(date_sub(current_date, dayofmonth(current_date)-1), 1); -- 字符串转时间(字符串必须为:yyyy-MM-dd格式) select to_date("2010-09-01"); -- 2010-09-01 select to_date("2010-09-01 12:12:00"); -- 2010-09-01 -- 日期、时间戳、字符串类型格式化输出标准时间格式 select date_format(current_timestamp, "yyyy-MM-dd HH:mm:ss"); -- 2021-01-31 15:10:27 select date_format(current_date, "yyyy-MM-dd HH:mm:ss"); -- 2021-01-31 00:00:00 select date_format("2020-01-10", "yyyy-MM-dd HH:mm:ss"); -- 2020-01-10 00:00:00
- 字符串函数
-- 转小写。lower select lower("WWW.LAGOU>COM"); -- www.lagou>com -- 转大写。upper select upper("www.lagou>com"); -- WWW.LAGOU>COM -- 求字符串长度。length select length("123456"); -- 6 -- 字符串拼接。 concat / || select empno || "\t" || ename idname from emp; select concat(empno, "\t", ename) idname from emp; -- 指定分隔符。concat_ws(separator, [string | array(string)]+) select concat_ws("\t", cast(empno as string), ename) idname from emp; select concat_ws(" ", array("my", "name", "is", "april")); -- my name is april -- 求子串。substr select substr("www.lagou.com", 5); -- lagou.com select substr("www.lagou.com", 5, 5); -- lagou select substr("www.lagou.com", -5); -- u.com -- 字符串切分。split,注意 '.' 要转义 select split("www.lagou.com", "\\.");
- 数学函数
-- 四舍五入。round select round(3.141596); -- 3 select round(3.141596, 2); -- 3.14 select round(314.1596, -1); -- 310 -- 向上取整。ceil select ceil(3.141596); -- 4 -- 向下取整。floor select floor(3.141596); -- 3 -- 其他数学函数包括:绝对值、平方、开方、对数运算、三角运算等
- 条件函数
-- if (boolean testCondition, T valueTrue, T valueFalseOrNull) -- 将emp表的员工工资等级分类:0-1500、1500-3000、3000以上 select empno, ename, deptno, sal, if (sal <= 1500, 1, if (sal < 3000, 2, 3)) level from emp; -- CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END -- 复杂条件用 case when 更直观 select empno, ename, deptno, sal, case when sal <= 1500 then 1 when sal < 3000 then 2 else 3 end level from emp; -- 以下语句等价 select empno, ename, deptno, sal, case deptno when 10 then "accounting" when 20 then "sell" when 30 then "dev" else "unknow" end deptName from emp; select empno, ename, deptno, sal, case when deptno = 10 then "accounting" when deptno = 20 then "sell" when deptno = 30 then "dev" else "unknow" end deptName from emp; -- COALESCE(T v1, T v2, ...)。返回参数中的第一个非空值;如果所有值都为 NULL,那么返回NULL select coalesce(null, null, 2); -- 2 select coalesce(null, 2, 3); -- 2 select coalesce(null, null, null); -- null -- isnull(a) isnotnull(a) select isnull(null), isnotnull(null); -- true false -- nvl(T value, T default_value) select nvl(null, 10); -- 10 select nvl(20, 10); -- 20 -- nullif(x, y) 相等为空,否则为a -- Returns NULL if a=b; otherwise returns a (as of Hive 2.3.0). select nullif("a", "a"), nullif("a", "b"); -- NULL a
-
UDTF函数 – UDTF : User Defined Table-Generating Functions
– 用户定义表生成函数,一行输入,多行输出。- explode
-- explode,炸裂函数 -- 就是将一行中复杂的 array 或者 map 结构拆分成多行 select explode(array('A', 'B', 'C')) as name; select explode(map('a', 2, 'b', 3, 'd', 20));
- lateral view
-- UDTF's are not supported outside the SELECT clause, nor nested in expressions -- SELECT pageid, explode(adid_list) AS myCol... is not supported -- SELECT explode(explode(adid_list)) AS myCol... is not supported -- lateral(横向的,横向的匹配) view 常与 表生成函数explode结合使用 /** Lateral view is used in conjunction with user-defined table generating functions such as explode(). As mentioned in Built-in Table-Generating Functions, a UDTF generates zero or more output rows for each input row. A lateral view first applies the UDTF to each row of base table and then joins resulting output rows to the input rows to form a virtual table having the supplied table alias. */ -- lateral view 语法: /** lateralView: LATERAL VIEW udtf(expression) tableAlias AS columnAlias (',' columnAlias)* fromClause: FROM baseTable (lateralView)* */ with t1 as ( select 'OK' cola, split('www.lagou.com', '\\.') colb ) select cola, colc from t1 lateral view explode(colb) t2 as colc; -- 等价于上面的语句, with tableAlias (select *** ) 将查询结果作为临时表 select cola, colc from ( select 'OK' cola, split('www.lagou.com', '\\.') colb ) t1 lateral view explode(colb) t2 as colc;
-
collect_list,collect_set 可以把用explode炸裂的数据,重新合并
— 原数据 id1 id2 flag a b 2 a b 1 a b 3 c d 6 c d 8 c d 8 — 期待效果 a b 2|1|3 c d 6|8 -- 第一步 将元素聚拢 select id1, id2, collect_set(flag) from rowline2 group by id1, id2; -- 第二步 将元素连接在一起, collect_set去重合并 select id1, id2, concat_ws("|", collect_set(cast(flag as string))) from rowline2 group by id1, id2; -- collect_list简单合并 select id1, id2, concat_ws("|", collect_list(cast(flag as string))) from rowline2 group by id1, id2;
- 表中有两个及以上键值是数组,双重炸开并合并相同序号的值
create table user_info( user_id string, link_man string, link_mobile string ); insert into table user_info values ("id001","April;Angie;Bella", "9001;9002;9003"), ("id002","April;Angie;Leo", "9001;9002;9004"), ("id003","Leo;Lina", "9004;9006"); select * from user_info; -- user_id|link_man |link_mobile | -- -------+-----------------+--------------+ -- id001 |April;Angie;Bella|9001;9002;9003| -- id002 |April;Angie;Leo |9001;9002;9004| -- id003 |Leo;Lina |9004;9006 | -- 两个键值都是数组形式,双重行转列,合并同意位置的值 select t1.user_id, t1.tag, t1.man, t2.mobile from (select user_id, man, ROW_NUMBER() over(partition by user_id) tag from user_info lateral view explode(split(link_man, '\;')) t1 as man ) t1, (select user_id, mobile, ROW_NUMBER() over(partition by user_id) tag from user_info lateral view explode(split(link_mobile, '\;')) t1 as mobile ) t2 where t1.user_id=t2.user_id and t1.tag=t2.tag; -- user_id|tag|man |mobile| -- -------+---+-----+------+ -- id001 | 1|Bella|9003 | -- id001 | 2|Angie|9002 | -- id001 | 3|April|9001 | -- id002 | 1|Leo |9004 | -- id002 | 2|Angie|9002 | -- id002 | 3|April|9001 | -- id003 | 1|Lina |9006 | -- id003 | 2|Leo |9004 |
窗口函数
- explode
窗口函数又名开窗函数,属于分析函数的一种。用于解决复杂报表统计需求的功能强大的函数,很多场景都需要用到。窗口函数用于计算基于组的某种聚合值,它和聚合函数的不同之处是:对于每个组返回多行,而聚合函数对于每个组只返回一行。
窗口函数指定了分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变化而变化。
-
over 关键字 表示和聚合函数配合使用,在一个窗口内允许聚合函数
-- 使用窗口函数,查询员工姓名、薪水、薪水总和 select ename, deptno, sal, sum(sal) over() from emp; -- 薪资占比 select ename, deptno, sal, sum(sal) over() sal_sum, round(sal / sum(sal) over()*100, 1) || "%" rational from emp;
-
partition by子句 分组内部聚合, 在over窗口中进行分区,对某一列进行分区统计,窗口的大小就是分区的大小
-- partition by子句 -- 分组内部聚合 select ename, deptno, sal, sum(sal) over(partition by deptno) from emp;
-
order by 子句 order by 子句对输入的数据进行排序
-- order by 子句 -- 增加了order by子句;sum:从分组的第一行到当前行求和 select ename, deptno, sal, sum(sal) over(partition by deptno order by sal) from emp;
- Window子句
如果要对窗口的结果做更细粒度的划分,使用window子句,有如上的几个选项:- unbounded preceding。组内第一行数据
- n preceding。组内当前行的前n行数据
- current row。当前行数据
- n following。组内当前行的后n行数据
- unbounded following。组内最后一行数据
- rows between … and …
-- 等价。组内,第一行到当前行的和 select ename, deptno, sal, sum(sal) over(partition by deptno order by sal) from emp; select ename, deptno, sal, sum(sal) over(partition by deptno order by sal rows between unbounded preceding and current row) from emp; -- 组内,第一行到最后一行的和 select ename, deptno, sal, sum(sal) over(partition by deptno order by sal rows between unbounded preceding and unbounded following) from emp; -- 组内,前一行 + 当前行 +后一行 select ename, deptno, sal, sum(sal) over(partition by deptno order by sal rows between 1 preceding and 1 following) from emp;
-
排名函数 都是从1开始,生成数据项在分组中的排名
-- 排名函数 row_number / rank / dense_rank 100 1 1 1 100 2 1 1 100 3 1 1 99 4 4 2 98 5 5 3 98 6 5 3 97 7 7 4
-
row_number 排名顺序增加不会重复;如1、2、3、4、… …
-
RANK 排名相等会在名次中留下空位;如1、2、2、4、5、… …
-
DENSE_RANK 排名相等会在名次中不会留下空位 ;如1、2、2、3、4、… …
-
示例代码
-- 按照班级,使用3种方式对成绩进行排名 select *, row_number() over(partition by cname order by score desc) `row_number`, rank() over(partition by cname order by score desc) `rank`, dense_rank() over(partition by cname order by score desc) `dense_rank` from t2; /** t2.cname t2.sname t2.score row_number rank dense_rank class2 s21 100 1 1 1 class2 s27 99 2 2 2 class2 s24 99 3 2 2 class2 s25 98 4 4 3 class2 s22 98 5 4 3 class2 s28 97 6 6 4 class2 s26 96 7 7 5 class1 s01 100 1 1 1 class1 s05 100 2 1 1 class1 s03 100 3 1 1 class1 s07 99 4 4 2 class1 s02 98 5 5 3 class1 s09 98 6 5 3 class1 s04 97 7 7 4 */ -- 求每个班级前3名的学员--前3名的定义是什么--假设使用dense_rank with tmp as ( select *, dense_rank() over(partition by cname order by score desc) `rank` from t2 ) select * from tmp where rank <=3;
-
-
序列函数
-
lag 返回当前数据行的上一行数据
-
lead 返回当前数据行的下一行数据
-
first_value 取分组内排序后,截止到当前行,第一个值
-
last_value 分组内排序后,截止到当前行,最后一个值
-
ntile 将分组的数据按照顺序切分成n片,返回当前切片值
-
示例代码
-- lag。返回当前数据组的上一行数据 -- lead。返回当前数据组的上一行数据 select cid, ctime, pv, lag(pv) over(partition by cid order by ctime) lag_pv, lead(pv) over(partition by cid order by ctime) lead_pv from userpv; select cid, ctime, pv, lag(pv, 2) over(partition by cid order by ctime) lag_pv, lead(pv, 3) over(partition by cid order by ctime) lead_pv from userpv; -- first_value / last_value select cid, ctime, pv, first_value(pv) over(partition by cid order by ctime) first_pv, last_value(pv) over(partition by cid order by ctime) last_pv from userpv; -- first_value 为组内第一个值 -- last_value 为当前值 select cid, ctime, pv, first_value(pv) over(partition by cid order by ctime) first_pv, last_value(pv) over(partition by cid order by ctime rows between unbounded preceding and unbounded following) last_pv from userpv; -- first_value 为组内第一个值 -- last_value 为组内最后一个值 -- ntile。按照cid进行分组,每组数据分成2份 select cid, ctime, pv, ntile(2) over(partition by cid order by ctime) tile from userpv;
-
HQL面试题
- 连续7天登录的用户
-- 连续值的求解,面试中常见的问题。这也是同一类,基本都可按照以下思路进行 -- 1、使用 row_number 在组内给数据编号(rownum) -- 2、某个值 - rownum = gid,得到结果可以作为后面分组计算的依据 -- 3、根据求得的gid,作为分组条件,求最终结果 -- 1、使用 row_number 在组内给数据编号(rownum) select uid, dt, row_number() over(partition by uid order by dt) number from ulogin where status = 1; -- 2、某个值 - rownum = gid,得到结果可以作为后面分组计算的依据 select uid, dt, date_sub(dt, row_number() over(partition by uid order by dt)) gid from ulogin where status = 1; -- 3、根据求得的gid,作为分组条件,求最终结果 with tmp as ( select uid, dt, date_sub(dt, row_number() over(partition by uid order by dt)) gid from ulogin where status = 1 ) select uid, count(*) continue_days from tmp group by uid, gid having continue_days >= 7;
- TopN
-- 编写sql语句实现每班前三名,分数一样并列,同时求出前三名按名次排序的分差 -- 求解思路: -- 1、上排名函数,分数一样并列,所以用dense_rank -- 2、将上一行数据下移,相减即得到分数差 -- 3、处理 NULL select class, score, dense_rank() over(partition by class order by score desc) rank from stu; select class, score, dense_rank() over(partition by class order by score desc) rank, score - (lag(score) over(partition by class order by score desc)) lagscore from stu; select class, score, dense_rank() over(partition by class order by score desc) rank, nvl(score - (lag(score) over(partition by class order by score desc)), 0) lagscore from stu;
-
行列转化
- case when then end
…— 原数据 1 java 1 hadoop 1 hive 1 hbase 2 java 2 hive 2 spark 2 flink 3 java 3 hadoop 3 hive 3 kafka /** -- 编写sql,得到结果如下(1表示选修,0表示未选修) id java hadoop hive hbase spark flink kafka 1 1 1 1 1 0 0 0 2 1 0 1 0 1 1 0 3 1 1 1 0 0 0 1 */ select id, sum(case course when "java" then 1 else 0 end) java, sum(case course when "hadoop" then 1 else 0 end) hadoop, sum(case course when "hive" then 1 else 0 end) hive, sum(case course when "hbase" then 1 else 0 end) hbase, sum(case course when "spark" then 1 else 0 end) spark, sum(case course when "flink" then 1 else 0 end) flink, sum(case course when "kafka" then 1 else 0 end) kafka from rowline1 group by id;
- collect
— 原数据 id1 id2 flag a b 2 a b 1 a b 3 c d 6 c d 8 c d 8 — 期待效果 a b 2|1|3 c d 6|8 -- 第一步 将元素聚拢 select id1, id2, collect_set(flag) from rowline2 group by id1, id2; -- 第二步 将元素连接在一起, collect_set去重合并 select id1, id2, concat_ws("|", collect_set(cast(flag as string))) from rowline2 group by id1, id2; -- collect_list简单合并 select id1, id2, concat_ws("|", collect_list(cast(flag as string))) from rowline2 group by id1, id2;
- case when then end
自定义函数
用户自定义函数分为以下三类:
- UDF(User Defined Function)。用户自定义函数,一进一出
- UDAF(User Defined Aggregation Function)。用户自定义聚集函数,多进一 出;类似于:count/max/min
-
UDTF(User Defined Table-Generating Functions)。用户自定义表生成函 数,一进多出;类似于:explode
- UDF开发
- 继承org.apache.hadoop.hive.ql.exec.UDF
- 需要实现evaluate函数;evaluate函数支持重载
- UDF必须要有返回类型,可以返回null,但是返回类型不能为void
-
UDF开发步骤
- 创建maven java 工程,添加依赖
<!-- pom.xml 文件 --> <dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.3.7</version> </dependency> </dependencies>
- 开发java类继承UDF,实现evaluate 方法
package com.lagou.UDF; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text; public class MyNvl extends UDF { /** * nvl(ename, "OK"): ename==null or ename=="" or ename==" " => 返 回第二个参数 * @param t1 * @param t2 * @return */ public Text evaluate(final Text t1, final Text t2) { if (t1 == null || t1.toString().trim().length() == 0) { return t2; } return t1; } }
-
将项目打包上传服务器
- 添加开发的jar包
add jar /home/hadoop/hiveudf.jar;
- 设置函数与自定义函数关联
create temporary function mynvl as "com.lagou.UDF.MyNvl";
- 使用自定义函数
-- 基本功能还有 select mynvl(comm, 0) from mydb.emp; -- 测试扩充的功能 select mynvl("", "OK"); select mynvl(" ", "OK");
- 可创建永久函数
-- 上传文件到hdfs dfs -mkdir -p /user/hadoop/jar; dfs -put /root/data/HiveUDFDemo.jar /user/hadoop/jar/; -- 引用hdfs上面的包创建永久函数 create function mynvl as 'com.lagou.UDF.MyNvl' using jar 'hdfs:/user/hadoop/jar/HiveUDFDemo.jar'; -- 改函数会被加载到当前的使用的数据库中,使用的时候需要加域名 select default.mynvl("", "OK");
- 删除函数
drop function default.mynvl;
- 创建maven java 工程,添加依赖
DML命令
数据操纵语言DML(Data Manipulation Language),DML主要有三种形式:插入(INSERT)、删除(DELETE)、更新(UPDATE)。
事务(transaction)是一组单元化操作,这些操作要么都执行,要么都不执行,是一个不可分割的工作单元。 事务具有的四个要素:原子性(Atomicity)、一致性(Consistency)、隔离性 (Isolation)、持久性(Durability),这四个基本要素通常称为ACID特性。
- 原子性。一个事务是一个不可再分割的工作单位,事务中的所有操作要么都发 生,要么都不发生。
- 一致性。事务的一致性是指事务的执行不能破坏数据库数据的完整性和一致性, 一个事务在执行之前和执行之后,数据库都必须处于一致性状态。
- 隔离性。在并发环境中,并发的事务是相互隔离的,一个事务的执行不能被其他 事务干扰。即不同的事务并发操纵相同的数据时,每个事务都有各自完整的数据 空间,即一个事务内部的操作及使用的数据对其他并发事务是隔离的,并发执行 的各个事务之间不能互相干扰。
- 持久性。事务一旦提交,它对数据库中数据的改变就应该是永久性的。
Hive 事务
Hive从0.14版本开始支持事务 和 行级更新,但缺省是不支持的,需要一些附加的配 置。要想支持行级insert、update、delete,需要配置Hive支持事务。
- Hive事务的限制
- Hive提供行级别的ACID语义
- BEGIN、COMMIT、ROLLBACK 暂时不支持,所有操作自动提交
- 目前只支持 ORC 的文件格式
- 默认事务是关闭的,需要设置开启
- 要是使用事务特性,表必须是分桶的
- 只能使用内部表
- 如果一个表用于ACID写入(INSERT、UPDATE、DELETE),必须在表中设置表 属性 : “transactional=true”
- 必须使用事务管理器 org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
- 目前支持快照级别的隔离。就是当一次数据查询时,会提供一个数据一致性的快照
- LOAD DATA语句目前在事务表中暂时不支持
- HDFS是不支持文件的修改
当有数据追加到文件,HDFS不对读数据的用户提供 一致性的。为了在HDFS上支持数据的更新:
- 表和分区的数据都被存在基本文件中(base files)
- 新的记录和更新,删除都存在增量文件中(delta files)
- 一个事务操作创建一系列的增量文件
- 在读取的时候,将基础文件和修改,删除合并,最后返回给查询
Hive 事务操作示例
-- 这些参数也可以设置在hive-site.xml中
SET hive.support.concurrency = true;
-- Hive 0.x and 1.x only
SET hive.enforce.bucketing = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
-- 创建表用于更新。满足条件:内部表、ORC格式、分桶、设置表属性
create table zxz_data(
name string,
nid int,
phone string,
ntime date)
clustered by(nid) into 5 buckets
stored as orc
tblproperties('transactional'='true');
-- 创建临时表,用于向分桶表插入数据
create table temp1(
name string,
nid int,
phone string,
ntime date)
row format delimited
fields terminated by ",";
-- 向临时表加载数据;向事务表中加载数据
load data local inpath '/root/data/zxz_data.txt' overwrite into table temp1;
insert into table zxz_data select * from temp1;
-- 检查数据和文件
select * from zxz_data;
dfs -ls /user/hive/warehouse/mydb1.db/zxz_data;
-- DML 操作
delete from zxz_data where nid = 3;
dfs -ls /user/hive/warehouse/mydb1.db/zxz_data;
-- insert
insert into zxz_data values ("name3", 3, "010-83596208", current_date);-- 不支持中间使用函数
-- FAILED: SemanticException [Error 10293]: Unable to create temp file for insert values Expression of type TOK_FUNCTION not supported in insert/values
-- 可执行
insert into zxz_data values ("name3", 3, "010-83596208", "2020-06-01");
insert into zxz_data select "name3", 3, "010-83596208", current_date;
select * from zxz_data;
dfs -ls /user/hive/warehouse/mydb1.db/zxz_data;
insert into zxz_data values
("name6", 6, "010-83596208", "2020-06-02"),
("name7", 7, "010-83596208", "2020-06-03"),
("name8", 9, "010-83596208", "2020-06-05"),
("name9", 8, "010-83596208", "2020-06-06");
dfs -ls /user/hive/warehouse/mydb1.db/zxz_data;
update zxz_data set name=concat(name, "00") where nid>3;
dfs -ls /user/hive/warehouse/mydb1.db/zxz_data;
-- 分桶字段不能修改,下面的语句不能执行
-- Updating values of bucketing columns is not supported
update zxz_data set nid = nid + 1;
元数据管理与存储
Metastore
在Hive的具体使用中,首先面临的问题便是如何定义表结构信息,跟结构化的数据映 射成功。所谓的映射指的是一种对应关系。在Hive中需要描述清楚表跟文件之间的映射关系、列和字段之间的关系等等信息。这些描述映射关系的数据的称之为Hive的元数据。该数据十分重要,因为只有通过查询它才可以确定用户编写sql和最终操作文 件之间的关系。
Metadata即元数据。元数据包含用Hive创建的database、table、表的字段等元信 息。元数据存储在关系型数据库中。如hive内置的Derby、第三方如MySQL等。
Metastore即元数据服务,是Hive用来管理库表元数据的一个服务。有了它上层的 服务不用再跟裸的文件数据打交道,而是可以基于结构化的库表信息构建计算框架。
通过metastore服务将Hive的元数据暴露出去,而不是需要通过对Hive元数据库 mysql的访问才能拿到Hive的元数据信息;metastore服务实际上就是一种thrift服 务,通过它用户可以获取到Hive元数据,并且通过thrift获取元数据的方式,屏蔽了 数据库访问需要驱动,url,用户名,密码等细节。
-
内嵌模式
内嵌模式使用的是内嵌的Derby数据库来存储元数据,也不需要额外起Metastore服 务。数据库和Metastore服务都嵌入在主Hive Server进程中。这个是默认的,配置 简单,但是一次只能一个客户端连接,适用于用来实验,不适用于生产环境。优点:配置简单,解压hive安装包 bin/hive 启动即可使用;
缺点:不同路径启动hive,每一个hive拥有一套自己的元数据,无法共享。- metastore内嵌模式配置
1、下载软件解压缩
2、设置环境变量,并使之生效
3、初始化数据库。
schematool -dbType derby -initSchema
4、进入hive命令行
5、再打开一个hive命令行,发现无法进入
- metastore内嵌模式配置
1、下载软件解压缩
-
本地模式
本地模式采用外部数据库来存储元数据,目前支持的数据库有:MySQL、
Postgres、Oracle、MS SQL Server。教学中实际采用的是MySQL。本地模式不需要单独起metastore服务,用的是跟Hive在同一个进程里的metastore 服务。也就是说当启动一个hive 服务时,其内部会启动一个metastore服务。Hive根 据 hive.metastore.uris 参数值来判断,如果为空,则为本地模式。
缺点:每启动一次hive服务,都内置启动了一个metastore;在hive-site.xml中暴露 的数据库的连接信息;
优点:配置较简单,本地模式下hive的配置中指定mysql的相关信息即可。
-
远程模式
远程模式下,需要单独起metastore服务,然后每个客户端都在配置文件里配置连接 到该metastore服务。远程模式的metastore服务和hive运行在不同的进程里。在生 产环境中,建议用远程模式来配置Hive Metastore。在这种模式下,其他依赖hive的软件都可以通过Metastore访问Hive。此时需要配置 hive.metastore.uris 参数来指定 metastore 服务运行的机器ip和端口,并且需要单 独手动启动metastore服务。metastore服务可以配置多个节点上,避免单节点故障 导致整个集群的hive client不可用。同时hive client配置多个metastore地址,会自 动选择可用节点。
-
metastore远程模式配置
-
配置规划
- 1、将 centos7-3 的 hive 安装文件拷贝到 centos7-1、centos7-2
scp -r hive-2.3.7/ root@centos7-1:$PWD
-
- 在centos7-1、centos7-3上分别启动 metastore 服务
# 查询9083端口(metastore服务占用的端口) lsof -i:9083 # 如果被占用 kill -9 pid # 安装lsof yum install -y lsof # 后台启动 metastore 服务 nohup hive --service metastore &
-
3、修改 centos7-2 上hive-site.xml 删除配置文件中:MySQL的配置、连接数据库 的用户名、口令等信息;增加连接metastore的配置:
<!-- hive metastore 服务地址 --> <property> <name>hive.metastore.uris</name> <value>thrift://centos7-1:9083,thrift://centos7-3:9083</value> </property>
-
4、启动hive 此时client端无需实例化hive的metastore,启动速度会加快。
# 分别在centos7-1、centos7-3上执行以下命令,查看连接情况 lsof -i:9083
- 5、高可用测试 关闭已连接的metastore服务,发现hive连到另一个节点的服务 上,仍然能够正常使用。
-
-
HiveServer2
HiveServer2是一个服务端接口,使远程客户端可以执行对Hive的查询并返回结果。 目前基于Thrift RPC的实现是HiveServer的改进版本,并支持多客户端并发和身份验 证,启动hiveServer2服务后,就可以使用jdbc、odbc、thrift 的方式连接。
HiveServer2(HS2)是一种允许客户端对Hive执行查询的服务。HiveServer2是 HiveServer1的后续 版本。HS2支持多客户端并发和身份验证,旨在为JDBC、ODBC 等开放API客户端提供更好的支持。
-
Thrift Thrift是一种接口描述语言和二进制通讯协议,它被用来定义和创建跨语言的服务。 它被当作一个远程过程调用(RPC)框架来使用,是由Facebook为“大规模跨语言服 务开发”而开发的。
- HiveServer2作用
- 为Hive提供了一种允许客户端远程访问的服务
- 基于thrift协议,支持跨平台,跨编程语言对Hive访问
- 允许远程访问Hive
-
HiveServer2配置
-
配置规划
-
配置步骤
-
- 修改集群上的 core-site.xml 增加以下内容
<!-- HiveServer2 连不上10000;hadoop为安装用户 --> <!-- root用户可以代理所有主机上的所有用户 --> <property> <name>hadoop.proxyuser.root.hosts</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.root.groups</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.hadoop.hosts</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.hadoop.groups</name> <value>*</value> </property>
-
- 修改 集群上的 hdfs-site.xml 添加以下内容
<!-- HiveServer2 连不上10000;启用 webhdfs 服务 --> <property> <name>dfs.webhdfs.enabled</name> <value>true</value> </property>
-
- 同步配置文件,然后重启 hdfs
-
- 启动centos7-3上的 HiveServer2 服务
# 启动 hiveserver2 服务 nohup hiveserver2 & # 检查 hiveserver2 端口 lsof -i:10000 # 从2.0开始,HiveServer2提供了WebUI # 还可以使用浏览器检查hiveserver2的启动情况。http://centos7-3:10002/
-
- 启动 centos7-2 节点上的 beeline
Beeline是从 Hive 0.11版本引入的,是 Hive 新的命令行客户端工具。
Hive客户端工具后续将使用Beeline 替代 Hive 命令行工具 ,并且后续版本也会废弃 掉 Hive 客户端工具。
!connect jdbc:[hive2://centos7-3:10000](hive2://centos7-3:10000) use mydb; show tables; select * from emp; create table tabtest1 (c1 int, c2 string); !connect jdbc:mysql://centos7-3:3306 !help !quit
-
用户未授权的错误 根据报错的用户名,写对应的用户名就好
21/02/01 17:11:45 [main]: WARN jdbc.HiveConnection: Failed to connect to centos7-3:10000 Error: Could not open client transport with JDBC Uri: jdbc:hive2://centos7-3:10000: Failed to open new session: java.lang.RuntimeException: org.apache.hadoop.security.AccessControlException: Permission denied: user=hadoop, access=EXECUTE, inode="/tmp":root:supergroup:drwx------ at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:350) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:311) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:238) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:189) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:539) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1702) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1720) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.resolvePath(FSDirectory.java:641) at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getFileInfo(FSDirStatAndListingOp.java:110) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:2949) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1124) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:873) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:871) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:817) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2606) (state=08S01,code=0) beeline> !connect jdbc:hive2://centos7-3:10000 Connecting to jdbc:hive2://centos7-3:10000 Enter username for jdbc:hive2://centos7-3:10000: root Enter password for jdbc:hive2://centos7-3:10000: Connected to: Apache Hive (version 2.3.7) Driver: Hive JDBC (version 2.3.7)
- 启动 centos7-2 节点上的 beeline
Beeline是从 Hive 0.11版本引入的,是 Hive 新的命令行客户端工具。
-
-
HCatalog
HCatalog 提供了一个统一的元数据服务,允许不同的工具如 Pig、MapReduce 等通 过 HCatalog 直接访问存储在 HDFS 上的底层文件。HCatalog是用来访问Metastore 的Hive子项目,它的存在给了整个Hadoop生态环境一个统一的定义。
HCatalog 使用了 Hive 的元数据存储,这样就使得像 MapReduce 这样的第三方应 用可以直接从 Hive 的数据仓库中读写数据。同时,HCatalog 还支持用户在 MapReduce 程序中只读取需要的表分区和字段,而不需要读取整个表,即提供一种 逻辑上的视图来读取数据,而不仅仅是从物理文件的维度。
HCatalog 提供了一个称为 hcat 的命令行工具。这个工具和 Hive 的命令行工具类 似,两者最大的不同就是 hcat 只接受不会产生 MapReduce 任务的命令。
-
示例代码 hcatalog 可以使用 DDL指令,主要用来建表,查询表结构
# 进入 hcat 所在目录。$HIVE_HOME/hcatalog/bin cd $HIVE_HOME/hcatalog/bin # 执行命令,创建表 ./hcat -e "create table default.test1(id string, name string, age int)" # 长命令可写入文件,使用 -f 选项执行 ./hcat -f createtable.txt # 查看元数据 ./hcat -e "use mydb; show tables" # 查看表结构 ./hcat -e "desc mydb.emp" # 删除表 ./hcat -e "drop table default.test1"
数据存储格式
Hive支持的存储数的格式主要有:TEXTFILE(默认格式) 、SEQUENCEFILE、RCFILE、ORCFILE、PARQUET。
- textfile为默认格式,建表时没有指定文件格式,则使用TEXTFILE,导入数据时 会直接把数据文件拷贝到hdfs上不进行处理;
-
sequencefile,rcfile,orcfile格式的表不能直接从本地文件导入数据,数据要先导入到textfile格式的表中, 然后再从表中用insert导入sequencefile、rcfile、 orcfile表中。
- 行存储与列存储
行式存储下一张表的数据都是放在一起的,但列式存储下数据被分开保存了。- 行式存储:
- 优点:数据被保存在一起,insert和update更加容易
- 缺点:选择(selection)时即使只涉及某几列,所有数据也都会被读取
- 列式存储:
- 优点:查询时只有涉及到的列会被读取,效率高
- 缺点:选中的列要重新组装,insert/update比较麻烦
- TEXTFILE、SEQUENCEFILE 的存储格式是基于行存储的;
- ORC和PARQUET 是基于列式存储的。
- 行式存储:
-
TextFile Hive默认的数据存储格式,数据不做压缩,磁盘开销大,数据解析开销大。 可结合 Gzip、Bzip2使用(系统自动检查,执行查询时自动解压),但使用这种方式,hive不 会对数据进行切分,从而无法对数据进行并行操作。
create table if not exists uaction_text( userid string, itemid string, behaviortype int, geohash string, itemcategory string, time string) row format delimited fields terminated by ',' stored as textfile; load data local inpath '/home/hadoop/data/useraction.dat' overwrite into table uaction_text;
-
SEQUENCEFILE SequenceFile是Hadoop API提供的一种二进制文件格式,其具有使用方便、可分 割、可压缩的特点。 SequenceFile支持三种压缩选择:none,record,block。 Record压缩率低,一般建议使用BLOCK压缩。
-
RCFile
RCFile全称Record Columnar File,列式记录文件,是一种类似于SequenceFile的键 值对数据文件。RCFile结合列存储和行存储的优缺点,是基于行列混合存储的 RCFile。RCFile遵循的“先水平划分,再垂直划分”的设计理念。先将数据按行水平划分为行 组,这样一行的数据就可以保证存储在同一个集群节点;然后在对行进行垂直划分。
- 一张表可以包含多个HDFS block
- 在每个block中,RCFile以行组为单位存储其中的数据
- row group又由三个部分组成
- 用于在block中分隔两个row group的16字节的标志区
- 存储row group元数据信息的header
- 实际数据区,表中的实际数据以列为单位进行存储
- ORCFile
ORC File,它的全名是Optimized Row Columnar (ORC) file,其实就是对RCFile做 了一些优化,在hive 0.11中引入的存储格式。这种文件格式可以提供一种高效的方 法来存储Hive数据。它的设计目标是来克服Hive其他格式的缺陷。运用ORC File可以 提高Hive的读、写以及处理数据的性能。ORC文件结构由三部分组成:- 文件脚注(file footer):包含了文件中 stripe 的列表,每个stripe行数,以及每个 列的数据类型。还包括每个列的最大、最小值、行计数、求和等信息
- postscript:压缩参数和压缩大小相关信息
- 条带(stripe):ORC文件存储数据的地方。在默认情况下,一个stripe的大小为
250MB- Index Data:一个轻量级的index,默认是每隔1W行做一个索引。包括该条 带的一些统计信息,以及数据在stripe中的位置索引信息
- Rows Data:存放实际的数据。先取部分行,然后对这些行按列进行存储。 对每个列进行了编码,分成多个stream来存储
- Stripe Footer:存放stripe的元数据信息
ORC在每个文件中提供了3个级别的索引:文件级、条带级、行组级。借助ORC提供 的索引信息能加快数据查找和读取效率,规避大部分不满足条件的查询条件的文件和 数据块。使用ORC可以避免磁盘和网络IO的浪费,提升程序效率,提升整个集群的 工作负载。
- 创建代码
create table if not exists uaction_orc( userid string, itemid string, behaviortype int, geohash string, itemcategory string, time string) stored as orc; insert overwrite table uaction_orc select * from uaction_text;
-
Parquet
Apache Parquet是Hadoop生态圈中一种新型列式存储格式,它可以兼容Hadoop生 态圈中大多数计算框架(Mapreduce、Spark等),被多种查询引擎支持(Hive、 Impala、Drill等),与语言和平台无关的。Parquet文件是以二进制方式存储的,不能直接读取的,文件中包括实际数据和元数 据,Parquet格式文件是自解析的。
- Row group
- 写入数据时的最大缓存单元
- MR任务的最小并发单元
- 一般大小在50MB-1GB之间
- Column chunk
- 存储当前Row group内的某一列数据
- 最小的IO并发单元
- Page
- 压缩、读数据的最小单元
- 获得单条数据时最小的读取数据单元
- 大小一般在8KB-1MB之间,越大压缩效率越高
- Footer
- 数据Schema信息
- 每个Row group的元信息:偏移量、大小
- 每个Column chunk的元信息:每个列的编码格式、首页偏移量、首索引页偏移 量、个数、大小等信息
- 创建代码
create table if not exists uaction_parquet( userid string, itemid string, behaviortype int, geohash string, itemcategory string, time string) stored as parquet; insert overwrite table uaction_parquet select * from uaction_text;
- Row group
-
文件存储格式对比测试
- 适当减小文件的数据量
# 检查文件行数 wc -l uaction.dat # head -n 1000000 uaction.dat > uaction1.dat tail -n 1000000 uaction.dat > uaction2.dat
运行上面的建表及数据导入语句,加载数据
-
文件压缩比 ORC > Parquet > text
hive (mydb)> dfs -ls /user/hive/warehouse/mydb.db/ua*; 13517070 /user/hive/warehouse/mydb.db/uaction_orc/000000_1000 34867539 /user/hive/warehouse/mydb.db/uaction_parquet/000000_1000 90019734 /user/hive/warehouse/mydb.db/uaction_text/useraction.dat
-
执行查询 orc 与 parquet类似 > txt
SELECT COUNT(*) FROM uaction_text; SELECT COUNT(*) FROM uaction_orc; SELECT COUNT(*) FROM uaction_parquet; -- text : 14.446 -- orc: 0.15 -- parquet : 0.146
-
结论 在生产环境中,Hive表的数据格式使用最多的有三种:TextFile、ORCFile、 Parquet。
- TextFile文件更多的是作为跳板来使用(即方便将数据转为其他格式)
- 有update、delete和事务性操作的需求,通常选择ORCFile
- 没有事务性要求,希望支持Impala、Spark,建议选择Parquet
- 适当减小文件的数据量
Hive调优策略
Hive作为大数据领域常用的数据仓库组件,在设计和开发阶段需要注意效率。
影响Hive效率的不仅仅是数据量过大;数据倾斜、数据冗余、job或I/O过多、 MapReduce分配不合理等因素都对Hive的效率有影响。
对Hive的调优既包含对HiveQL语句本身的优化,也包含Hive配置项和MR方面的调整。
架构优化
-
执行引擎
Hive支持多种执行引擎,分别是 MapReduce、Tez、Spark、Flink。可以通过hive- site.xml文件中的hive.execution.engine属性控制。Tez是一个构建于YARN之上的支持复杂的DAG(有向无环图)任务的数据处理框 架。由Hontonworks开源,将MapReduce的过程拆分成若干个子过程,同时可以把 多个mapreduce任务组合成一个较大的DAG任务,减少了MapReduce之间的文件存 储,同时合理组合其子过程从而大幅提升MR作业的性能。
-
优化器 与关系型数据库类似,Hive会在真正执行计算之前,生成和优化逻辑执行计划与物理 执行计划。Hive有两种优化器:Vectorize(向量化优化器) 与 Cost-Based Optimization (CBO 成本优化器)。
-
矢量化查询执行 矢量化查询(要求执行引擎为Tez)执行通过一次批量执行1024行而不是每行一行来提 高扫描,聚合,过滤器和连接等操作的性能,这个功能一显着缩短查询执行时间。
set hive.vectorized.execution.enabled = true; - - 默认 false set hive.vectorized.execution.reduce.enabled = true; - - 默认 false
备注:要使用矢量化查询执行,必须用ORC格式存储数据
-
成本优化器 Hive的CBO是基于apache Calcite的,Hive的CBO通过查询成本(有analyze收集的统 计信息)会生成有效率的执行计划,最终会减少执行的时间和资源的利用,使用CBO 的配置如下:
SET hive.cbo.enable=true; --从 v0.14.0默认 true SET hive.compute.query.using.stats=true; -- 默认false SET hive.stats.fetch.column.stats=true; -- 默认false SET hive.stats.fetch.partition.stats=true; -- 默认true
定期执行表(analyze)的分析,分析后的数据放在元数据库中。
-
-
分区表 对于一张比较大的表,将其设计成分区表可以提升查询的性能,对于一个特定分区的查询,只会加载对应分区路径的文件数据,所以执行速度会比较快。
分区字段的选择是影响查询性能的重要因素,尽量避免层级较深的分区,这样会造成太多的子文件夹。一些常见的分区字段可以是:
- 日期或时间。如year、month、day或者hour,当表中存在时间或者日期字段时
- 地理位置。如国家、省份、城市等
- 业务逻辑。如部门、销售区域、客户等等
-
分桶表 类比MySQL的索引
与分区表类似,分桶表的组织方式是将HDFS上的文件分割成多个文件。
分桶可以加快数据采样,也可以提升join的性能(join的字段是分桶字段),因为分桶可 以确保某个key对应的数据在一个特定的桶内(文件),巧妙地选择分桶字段可以大幅 度提升join的性能。
通常情况下,分桶字段可以选择经常用在过滤操作或者join操作的字段。 -
文件格式 在HiveQL的create table语句中,可以使用
stored as …
指定表的存储格式。 Hive表支持的存储格式有TextFile、SequenceFile、RCFile、ORC、Parquet等。存储格式一般需要根据业务进行选择,生产环境中绝大多数表都采用TextFile、 ORC、Parquet存储格式之一。
TextFile是最简单的存储格式,它是纯文本记录,也是Hive的默认格式。其磁盘开销 大,查询效率低,更多的是作为跳板来使用。RCFile、ORC、Parquet等格式的表都 不能由文件直接导入数据,必须由TextFile来做中转。
Parquet和ORC都是Apache旗下的开源列式存储格式。列式存储比起传统的行式存 储更适合批量OLAP查询,并且也支持更好的压缩和编码。选择Parquet的原因主要 是它支持Impala查询引擎,并且对update、delete和事务性操作需求很低。
-
数据压缩
压缩技术可以减少map与reduce之间的数据传输,从而可以提升查询性能,关于压 缩的配置可以在hive的命令行中或者hive-site.xml文件中进行配置。SET hive.exec.compress.intermediate=true
关于压缩的编码器可以通过mapred-site.xml, hive-site.xml进行配置,也可以通过命 令行进行配置,如:
-- 中间结果压缩 SET hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec ; -- 输出结果压缩 SET hive.exec.compress.output=true; SET mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodc
参数优化
Hive 参数说明的官方文档:
https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
-
本地模式 当Hive处理的数据量较小时,启动分布式去处理数据会有点浪费,因为可能启动的时 间比数据处理的时间还要长。Hive支持将作业动态地转为本地模式,需要使用下面的 配置:
SET hive.exec.mode.local.auto=true; -- 默认 false SET hive.exec.mode.local.auto.inputbytes.max=50000000; SET hive.exec.mode.local.auto.input.files.max=5; -- 默认 4
一个作业只要满足下面的条件,会启用本地模式
- 输入文件的大小小于 hive.exec.mode.local.auto.inputbytes.max 配置的大小
- map任务的数量小于 hive.exec.mode.local.auto.input.files.max 配置的大小
- reduce任务的数量是1或者0
- 严格模式
所谓严格模式,就是强制不允许用户执行3种有风险的HiveQL语句,一旦执行会直接 失败。这3种语句是:
- 查询分区表时不限定分区列的语句;
- 两表join产生了笛卡尔积的语句;
- 用order by来排序,但没有指定limit的语句。
要开启严格模式,需要将参数 hive.mapred.mode 设为
strict
(缺省值)。
该参数可以不在参数文件中定义,在执行SQL之前设置(set hive.mapred.mode=nostrict
) -
JVM重用 默认情况下,Hadoop会为为一个map或者reduce启动一个JVM,这样可以并行执行map和reduce。
当map或者reduce是那种仅运行几秒钟的轻量级作业时,JVM启动进程所耗费的时 间会比作业执行的时间还要长。Hadoop可以重用JVM,通过共享JVM以串行而非并 行的方式运行map或者reduce。
JVM的重用适用于同一个作业的map和reduce,对于不同作业的task不能够共享 JVM。如果要开启JVM重用,需要配置一个作业最大task数量,默认值为1,如果设置 为-1,则表示不限制:
# 代表同一个MR job中顺序执行的5个task重复使用一个JVM,减少启动和关闭的开销 SET mapreduce.job.jvm.numtasks=5;
这个功能的缺点是,开启JVM重用将一直占用使用到的task插槽,以便进行重用,直 到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间 要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无 法被其他的job使用,直到所有的task都结束了才会释放。
-
并行执行 Hive的查询通常会被转换成一系列的stage,这些stage之间并不是一直相互依赖的, 可以并行执行这些stage,通过下面的方式进行配置:
SET hive.exec.parallel=true; -- 默认false SET hive.exec.parallel.thread.number=16; -- 默认8
并行执行可以增加集群资源的利用率,如果集群的资源使用率已经很高了,那么并行执行的效果不会很明显。
-
推测执行 在分布式集群环境下,因为程序Bug、负载不均衡、资源分布不均等原因,会造成同 一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任 务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这 些任务会拖慢作业的整体执行进度。
为了避免这种情况发生,Hadoop采用了推测执行机制,它根据一定的规则推测出 “拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理 同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
set mapreduce.map.speculative=true set mapreduce.reduce.speculative=true set hive.mapred.reduce.tasks.speculative.execution=true
- 合并小文件
- 在map执行前合并小文件,减少map数
# 缺省参数 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
- 在Map-Reduce的任务结束时合并小文件
# 在 map-only 任务结束时合并小文件,默认true SET hive.merge.mapfiles = true; # 在 map-reduce 任务结束时合并小文件,默认false SET hive.merge.mapredfiles = true; # 合并文件的大小,默认256M SET hive.merge.size.per.task = 268435456; # 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge SET hive.merge.smallfiles.avgsize = 16777216;
-
Fetch模式 Fetch模式是指Hive中对某些情况的查询可以不必使用MapReduce计算。select col1, col2 from tab;
可以简单地读取表对应的存储目录下的文件,然后输出查询结果到控制台。在开启 fetch模式之后,在全局查找、字段查找、limit查找等都不启动 MapReduce。
# Default Value: minimal in Hive 0.10.0 through 0.13.1, more in Hive 0.14.0 and later hive.fetch.task.conversion=more
SQL优化
-
列裁剪和分区裁剪 列裁剪是在查询时只读取需要的列;分区裁剪就是只读取需要的分区。
简单的说:select 中不要有多余的列,坚决避免 select * from tab;
查询分区表,不读多余的数据;select uid, event_type, record_data from calendar_record_log where pt_date >= 20190201 and pt_date <= 20190224 and status = 0;
-
sort by 代替 order by HiveQL中的order by与其他关系数据库SQL中的功能一样,是将结果按某字段全局 排序,这会导致所有map端数据都进入一个reducer中,在数据量大时可能会长时间 计算不完。
如果使用sort by,那么还是会视情况启动多个reducer进行排序,并且保证每个 reducer内局部有序。为了控制map端数据分配到reducer的key,往往还要配合 distribute by 一同使用。如果不加 distribute by 的话,map端数据就会随机分配到 reducer。
group by … order by … | V distribute by … sort by ….
-
group by 代替 count(distinct) 当要统计某一列的去重数时,如果数据量很大,count(distinct) 会非常慢。原因与 order by类似,count(distinct)逻辑只会有很少的reducer来处理。此时可以用 group by 来改写:
-- 原始SQL select count(distinct uid) from tab; -- 优化后的SQL select count(1) from (select uid from tab group by uid) tmp;
这样写会启动两个MR job(单纯distinct只会启动一个),所以要确保数据量大到启 动job的overhead远小于计算耗时,才考虑这种方法。当数据集很小或者key的倾斜 比较明显时,group by还可能会比distinct慢。
-
group by 配置调整
-
map端预聚合 group by时,如果先起一个combiner在map端做部分预聚合,可以有效减少shuffle 数据量。
-- 默认为true set hive.map.aggr = true
Map端进行聚合操作的条目数
set hive.groupby.mapaggr.checkinterval = 100000
通过 hive.groupby.mapaggr.checkinterval 参数也可以设置map端预聚合的行数 阈值,超过该值就会分拆job,默认值10W。
-
倾斜均衡配置项 group by时如果某些key对应的数据量过大,就会发生数据倾斜。Hive自带了一个均 衡数据倾斜的配置项 hive.groupby.skewindata ,默认值false。
其实现方法是在group by时启动两个MR job。第一个job会将map端数据随机输入 reducer,每个reducer做部分聚合,相同的key就会分布在不同的reducer中。第二 个job再将前面预处理过的数据按key聚合并输出结果,这样就起到了均衡的效果。
但是,配置项毕竟是死的,单纯靠它有时不能根本上解决问题,建议了解数据倾斜的细节,并优化查询语句。
-
-
join 优化
-
join 基础优化
- common join
普通连接,在SQL中不特殊指定连接方式使用的都是这种普通连接。
- 缺点:性能较差(要将数据分区,有shuffle)
- 优点:操作简单,普适性强
-
map join map端连接,与普通连接的区别是这个连接中不会有reduce阶段存在,连接在map端完成
- 适用场景:大表与小表连接,小表数据量应该能够完全加载到内存,否则不适用
- 优点:在大小表连接时性能提升明显
备注:Hive 0.6 的时候默认认为写在select 后面的是大表,前面的是小表, 或者使 用 /+mapjoin(map_table) / select a., b. from a join b on a.id = b.id【要求小表在前,大表之后】
hive 0.7 的时候这个计算是自动化的,它首先会自动判断哪个是小表,哪个是大表, 这个参数由(hive.auto.convert.join=true)来控制,然后控制小表的大小由 (hive.smalltable.filesize=25000000)参数控制(默认是25M),当小表超过这个 大小,hive 会默认转化成common join。Hive 0.8.1,hive.smalltable.filesize => hive.mapjoin.smalltable.filesize
缺点:使用范围较小,只针对大小表且小表能完全加载到内存中的情况。
-
bucket map join 分桶连接:Hive 建表的时候支持hash 分区通过指定clustered by (col_name,xxx ) into number_buckets buckets 关键字.当连接的两个表的join key 就是bucket column 的时候,就可以通过设置hive.optimize.bucketmapjoin= true 来执行优化。
原理:通过两个表分桶在执行连接时会将小表的每个分桶映射成hash表,每个task 节点都需要这个小表的所有hash表,但是在执行时只需要加载该task所持有大表分 桶对应的小表部分的hash表就可以,所以对内存的要求是能够加载小表中最大的 hash块即可。
备注:小表与大表的分桶数量需要是倍数关系,这个是因为分桶策略决定的,分桶时会根据分桶字段对桶数取余后决定哪个桶的,所以要保证成倍数关系。
优点:比map join对内存的要求降低,能在逐行对比时减少数据计算量(不用比对小表全量)
缺点:只适用于分桶表
- common join
普通连接,在SQL中不特殊指定连接方式使用的都是这种普通连接。
-
利用map join特性 map join特别适合大小表join的情况。Hive会将build table和probe table在map端直接完成join过程,消灭了reduce,效率很高。
select a.event_type, b.upload_time from calendar_event_code a inner join ( select event_type, upload_time from calendar_record_log where pt_date = 20190225 ) b on a.event_type = b.event_type;
map join的配置项是 hive.auto.convert.join ,默认值true。
当build table大小小于hive.mapjoin.smalltable.filesize 会启用map join,默 认值25000000(约25MB)。还有 hive.mapjoin.cache.numrows ,表示缓存 build table的多少行数据到内存,默认值25000。 -
分桶表map join map join对分桶表还有特别的优化。由于分桶表是基于一列进行hash存储的,因此 非常适合抽样(按桶或按块抽样)。它对应的配置项是
hive.optimize.bucketmapjoin 。 -
倾斜均衡配置项 这个配置与 group by 的倾斜均衡配置项异曲同工,通过 hive.optimize.skewjoin 来配置,默认false。
如果开启了,在join过程中Hive会将计数超过阈值 hive.skewjoin.key (默认 100000)的倾斜key对应的行临时写进文件中,然后再启动另一个job做map join生 成结果。通过 hive.skewjoin.mapjoin.map.tasks 参数还可以控制第二个job的 mapper数量,默认10000。
-
处理空值或无意义值 日志类数据中往往会有一些项没有记录到,其值为null,或者空字符串、-1等。如果 缺失的项很多,在做join时这些空值就会非常集中,拖累进度【备注:这个字段是连 接字段】。
若不需要空值数据,就提前写 where 语句过滤掉。需要保留的话,将空值key用随机 方式打散,例如将用户ID为null的记录随机改为负值:
select a.uid, a.event_type, b.nickname, b.age from ( select (case when uid is null then cast(rand()*-10240 as int) else uid end) as uid, event_type from calendar_record_log where pt_date >= 20190201 ) a left outer join ( select uid,nickname,age from user_info where status = 4 ) b on a.uid = b.uid;
-
单独处理倾斜key 如果倾斜的 key 有实际的意义,一般来讲倾斜的key都很少,此时可以将它们单独抽 取出来,对应的行单独存入临时表中,然后打上一个较小的随机数前缀(比如 0~9),最后再进行聚合。
不要一个Select语句中,写太多的Join。一定要了解业务,了解数据。(A0-A9) 分成多条语句,分步执行;(A0-A4; A5-A9);先执行大表与小表的关联;
-
- 调整 Map 数
通常情况下,作业会通过输入数据的目录产生一个或者多个map任务。主要因素包括:
- 输入文件总数
- 输入文件大小
- HDFS文件块大小
map越多越好吗。当然不是,合适的才是最好的。
如果一个任务有很多小文件(« 128M),每个小文件也会被当做一个数据块,用一个 Map Task 来完成。
一个 Map Task 启动和初始化时间 » 处理时间,会造成资源浪费,而且系统中同时可用的map数是有限的。
对于小文件采用的策略是合并。每个map处理接近128M的文件块,会有其他问题吗。也不一定。
有一个125M的文件,一般情况下会用一个Map Task完成。假设这个文件字段很少, 但记录数却非常多。如果Map处理的逻辑比较复杂,用一个map任务去做,性能也不好。对于复杂文件采用的策略是增加 Map 数。
computeSliteSize(max(minSize, min(maxSize, blocksize))) = blocksize minSize : mapred.min.split.size (默认值1) maxSize : mapred.max.split.size (默认值256M) 调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。 建议用set的方式,针对SQL语句进行调整。
- 调整 Reduce 数
reducer数量的确定方法比mapper简单得多。使用参数 mapred.reduce.tasks 可以 直接设定reducer数量。如果未设置该参数,Hive会进行自行推测,逻辑如下:
- 参数 用来设定每个reducer能够处理的最大数据量,默认值256M
- 参数 用来设定每个job的最大reducer数量,默认值 999(1.2版本之前)或1009(1.2版本之后)
- 得出reducer数:
reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)
即: min(输入总数据量 / 256M, 1009)
reducer数量与输出文件的数量相关。如果reducer数太多,会产生大量小文件,对 HDFS造成压力。如果reducer数太少,每个reducer要处理很多数据,容易拖慢运行 时间或者造成OOM。