Kudu--分布式数据存储引擎

07 Jul 2021

概述

背景

在 KUDU 之前,大数据主要以两种方式存储:

但是这样做有很多缺点:

数据模型

KUDU 的数据模型与传统的关系型数据库类似,一个 KUDU 集群由多个表组成,每个表由多个字段组成,一个表必须指定一个由若干个(>=1)字段组成的主键

从用户角度来看,
Kudu是一种存储结构化数据表的存储系统。 在一个Kudu集群中可以定义任意数量的table,每个table都需要预先定义好schema。 每个table的列数是确定的,每一列都需要有名字和类型,每个表中可以把其中一列或多列定义为主键。

这么看来,Kudu更像关系型数据库,而不是像HBase、Cassandra和MongoDB这些NoSQL数据库。不过Kudu目前 还不能像关系型数据一样支持二级索引。 Kudu使用确定的列类型,字段是强类型的,而不是类似于NoSQL 的“everything is byte”。这可以带来两点好处:

用户可以使用 Insert,Update和Delete API对表进行写操作。不论使用哪种API,都必须指定主键。但批量的删除和 更新操作需要依赖更高层次的组件(比如Impala、Spark)。Kudu目前还不支持多行事务。 而在读操作方面,Kudu 只提供了Scan操作来获取数据。用户可以通过指定过滤条件来获取自己想要读取的数据,但目前只提供了两种类型 的过滤条件:主键范围和列值与常数的比较。由于Kudu在硬盘中的数据采用列式存储,所以只扫描需要的列将极大 地提高读取性能。

Kudu的架构

与HDFS和HBase相似,Kudu使用单个的Master节点,用来管理集群的元数据,并且使用任意数量的Tablet Server节点用来存储实际数据。可以部署多个Master节点来提高容错性。

Master

Kudu的master节点负责整个集群的元数据管理和服务协调。它承担着以下功能:

Table

在数据存储方面,Kudu选择完全由自己实现,而没有借助于已有的开源方案。tablet存储主要想要实现的目标为:

安装和运行

环境说明

安装ntp服务

每个节点执行:

yum -y install ntp  

注释掉以下四行:

#server 0.centos.pool.ntp.org iburst  
#server 1.centos.pool.ntp.org iburst  
#server 2.centos.pool.ntp.org iburst  
#server 3.centos.pool.ntp.org iburst  

修改hdp-2 192.168.81.130节点上的配置文件

vi /etc/ntp.conf  

加入如下内容

# 给192.168.81.0网段,子网掩码为255.255.255.0的局域网机的机器有同步时间的权限  
restrict 192.168.81.0 mask 255.255.255.0 notrap nomodify   
# prefer代表优先使用此ip做同步  
server 192.168.81.130 prefer  
 # 当所有服务器都不能使用时,使用本机作为同步服务器  
server 127.127.1.0  
fudge  127.127.1.0 stratum 10  

修改192.168.81.129和192.16881.131节点上的配置文件

vi /etc/ntp.conf  

加入以下内容

server  192.168.81.130 prefer  
server  127.127.1.0  
fudge   127.127.1.0 stratum 10  

启动NTP服务

service ntpd start  
chkconfig ntpd on  

检验
检查ntp服务是否成功输入:ntpstat
输出如下则启动成功并且同步已完成

synchronised to local net at stratum 11  
time correct to within 11 ms  
polling server every 64 s  

/etc/init.d/ntpd start 各个节点检查启动成功,否则启动kudu相关服务会报错

时钟同步,kudu对时间要求很精准

ntpdate -u ntp.api.bz  

配置Yum的Repository

在使用 yum来安装kudu的时候,由于kudu不是yum的常规组建,直接安装会找不到kudu,所以第一步需要将
kudu的repo文件下载并放置到合适的位置。

  1. 下载kudu的repo文件 下载repo文件:
    wget http://archive.cloudera.com/kudu/redhat/7/x86_64/kudu/cloudera-kudu.repo  
    
  2. 将下载的repo文件放置到/etc/yum.repos.d/目录下
     sudo mv cloudera-kudu.repo mv /etc/yum.repos.d/  
    

安装kudu

安装,在每个节点上执行

yum install kudu kudu-master kudu-tserver kudu-client0 kudu-client-devel -y  

配置并启动kudu
hdp-2: master hdp-1,hdp-2,hdp-3 slaver 安装完成,在/etc/kudu/conf目录下有两个文件:master.gflagfile和tserver.gflagfile
使用192.168.81.130作为kudu-master,192.168.81.129、192.168.20.130和192.168.81.131作为kudu- tserver节点

所以192.168.81.130节点需要修改master.gflagfile和tserver.gflagfile文件,而192.168.81.129和 192.168.20.131只需要修改tserver.gflagfile文件

修改kudu-master启动配置 hdp-2节点

vi /etc/default/kudu-master  

修改以下内容,主要是修改ip:

export FLAGS_rpc_bind_addresses=192.168.81.130:7051  

修改每个节点的kudu-tserver启动配置

vi /etc/default/kudu-tserver  

修改以下内容,主要是修改ip为当前节点ip

export FLAGS_rpc_bind_addresses=192.168.81.130:7050  

master.gflagfile的配置修改

--fromenv=rpc_bind_addresses  
--fromenv=log_dir  
--fs_wal_dir=/var/lib/kudu/master --fs_data_dirs=/var/lib/kudu/master   
-unlock_unsafe_flags=true   
-allow_unsafe_replication_factor=true   
-default_num_replicas=1 # 此参数可以调整备份数量,默认为3  

tserver.gflagfile 的配置修改

 # Do not modify these two lines. If you wish to change these variables,  
# modify them in /etc/default/kudu-tserver.  
--fromenv=rpc_bind_addresses  
--fromenv=log_dir  
--fs_wal_dir=/var/lib/kudu/tserver  
--fs_data_dirs=/var/lib/kudu/tserver  
--tserver_master_addrs=hdp-2:7051  
-unlock_unsafe_flags=true  
-allow_unsafe_replication_factor=true  
-default_num_replicas=1  
--tserver_master_addrs=192.168.81.130:7051  
# 此参数指定master  

注意,这里的–tserver_master_addrs指明了集群中master的地址,指向同一个master的tserver形成了一 个kudu集群

创建master.gflagfile和tserver.gflagfile文件中指定的目录,并将所有者更改为kudu,执行如下命令:

mkdir -p /var/lib/kudu/master /var/lib/kudu/tserver  
chown -R kudu:kudu /var/lib/kudu/  

修改 /etc/security/limits.d/20-nproc.conf 文件,解除kudu用户的线程限制,注意:20可能不同,根据自 己的来修改

vi /etc/security/limits.d/20-nproc.conf  
  
# 加入以下两行内容  
kudu       soft    nproc     unlimited  
impala     soft    nproc     unlimited  

启动kudu
master节点(hdp-2 192.168.81.130)执行:

service kudu-master start  
service kudu-tserver start  

192.168.81.129和192.168.81.131执行:

service kudu-tserver start  

kudu-master启动失败
查看/var/log/kudu/err 发现时间问题,解决方案,重启ntpd

service ntpd restart   

然后重启 kudu-master

service kudu-master restart  

Web界面 http://master:8050

KuDu常用API

CRUD demo

用到的依赖

<dependency>  
    <groupId>org.apache.kudu</groupId>  
    <artifactId>kudu-client</artifactId>  
    <version>1.4.0</version>  
</dependency>  
import org.apache.kudu.ColumnSchema;  
import org.apache.kudu.Schema;  
import org.apache.kudu.Type;  
import org.apache.kudu.client.*;  
  
import java.util.ArrayList;  
  
public class KuduDemo {  
    public static void main(String[] args) {  
        String masterAddress = "centos7-1";  
        KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress);  
        KuduClient client = kuduClientBuilder.build();  
  
  
//        createTable(client);  
  
        scanData(client);  
  
    }  
  
    public static void deleteData(KuduClient client) {  
        try {  
            KuduTable kuduTable = client.openTable("student");  
            KuduSession kuduSession = client.newSession();  
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);  
  
            Delete delete = kuduTable.newDelete();  
            PartialRow row = delete.getRow();  
            row.addInt("id", 1);  
  
            kuduSession.flush();  
            kuduSession.apply(delete);  
            kuduSession.close();  
  
        } catch (KuduException e) {  
            e.printStackTrace();  
        }  
    }  
  
    public static void updateData(KuduClient client) {  
        try{  
            KuduTable kuduTable = client.openTable("student");  
  
            KuduSession kuduSession = client.newSession();  
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);  
            Update update = kuduTable.newUpdate();  
            update.getRow().addInt("id", 1);  
            update.getRow().addString("name", "Angie");  
            kuduSession.flush();  
            kuduSession.apply(update);  
            kuduSession.close();  
  
        } catch (KuduException e) {  
            e.printStackTrace();  
        }  
    }  
  
    public static void scanData(KuduClient client) {  
        // kudu查询数据用scanner 思路:  
        // 1、获取client  
        // 2、获取Scanner  
        // 3、从Scanner中循环遍历数据  
  
        try {  
            KuduTable kuduTable = client.openTable("student");  
            KuduScanner scanner = client.newScannerBuilder(kuduTable).build();  
  
            while (scanner.hasMoreRows()) {  
                for (RowResult result : scanner.nextRows()) {  
                    int id = result.getInt("id");  
                    String name = result.getString("name");  
                    System.out.println("id: "+id + ", name: "+name);  
                }  
            }  
        } catch (KuduException e) {  
            e.printStackTrace();  
        }  
    }  
  
    public static void insertData(KuduClient client) {  
        // 1、获取客户端  
        // 2、打开一张表  
        // 3、创建会话  
        // 4、设置刷新模式  
        // 5、获取插入实例  
        // 6、声明带插入数据  
        // 7、刷入数据  
        // 8、应用插入实例  
        // 9、关闭会话  
  
        try{  
            KuduTable kuduTable = client.openTable("student");  
  
            KuduSession kuduSession = client.newSession();  
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);  
            Insert insert = kuduTable.newInsert();  
            insert.getRow().addInt("id", 1);  
            insert.getRow().addString("name", "April");  
            kuduSession.flush();  
            kuduSession.apply(insert);  
            kuduSession.close();  
  
        } catch (KuduException e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                client.close();  
            } catch (KuduException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
  
    public static void deleteTable(KuduClient client) {  
        try {  
            client.deleteTable("student");  
        } catch (KuduException e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                client.close();  
            } catch (KuduException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
  
    public static void createTable(KuduClient client) {  
        // (1)必须指定表连接到的master节点主机名  
        // (2)必须定义schema  
        // (3)必须指定副本数量、分区策略和数量  
  
  
        String tableName = "userTable";  
  
        ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();  
        ColumnSchema id = new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).key(true).build();  
        ColumnSchema name = new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).key(false).build();  
        ColumnSchema age = new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).key(false).build();  
        columnSchemas.add(id);  
        columnSchemas.add(name);  
        columnSchemas.add(age);  
        Schema schema = new Schema(columnSchemas);  
  
        CreateTableOptions options = new CreateTableOptions();  
        options.setNumReplicas(1);  
        ArrayList<String> colrule = new ArrayList<String>();  
        colrule.add("id");  
        options.addHashPartitions(colrule, 3);  
  
        try {  
            client.createTable(tableName, schema, options);  
        } catch (KuduException e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                client.close();  
            } catch (KuduException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}  
  

Flink下沉到KuDu