Flume--数据采集工具

07 Feb 2021

Flume由Cloudera公司开发,是一个分布式、高可靠、高可用的海量日志采集、聚合、传输的系统

Flume支持在日志系统中定制各类数据发送方,用于采集数据;
Flume提供对数据进行简单处理,并写到各种数据接收方的能力。
简单的说,Flume是实时采集日志的数据采集引擎

概述

Flume的特点

适用场景:适用于日志文件实时采集。

Flume体系结构

Flume拓扑结构

Flume内部原理

总体数据流向:Souce => Channel => Sink Channel: 处理器、拦截器、选择器  

具体过程:

  1. Source接收事件,交给其Channel处理器处理事件
  2. 处理器通过拦截器Interceptor,对事件一些处理,比如压缩解码,正则拦截,时间戳拦截,分类等
  3. 经过拦截器处理过的事件再传给Channel选择器,将事件写入相应的Channel。
    Channel Selector有两种:
    • Replicating Channel Selector(默认),会将source过来的Event发往所有 Channel(比较常用的场景是,用多个Channel实现冗余副本,保证可用性)
    • Multiplexing Channel Selector,根据配置分发event。此selector会根据 event中某个header对应的value来将event发往不同的channel
  4. 最后由Sink处理器处理各个Channel的事件

安装部署

1. 下载软件 apache-flume

下载软件 apache-flume-1.9.0-bin.tar.gz,并上传到 centos7-3 上的 /opt/lagou/software 目录下

2. 解压 apache-flume

解压 apache-flume-1.9.0-bin.tar.gz 到 /opt/lagou/servers/ 目录下;并重命名 为 flume-1.9.0

3. 在 /etc/profile 中增加环境变量

vi /etc/profile  
  
export FLUME_HOME=/opt/lagou/servers/flume-1.9.0  
export PATH=$PATH:$FLUME_HOME/bin  
  
  
source /etc/profile  

4. 配置java_home

将 $FLUME_HOME/conf 下的 flume-env.sh.template 改名为 flume-env.sh,并添加 JAVA_HOME的配置

cd $FLUME_HOME/conf  
mv flume-env.sh.template flume-env.sh  
vi flume-env.sh  
export JAVA_HOME=/opt/lagou/servers/jdk1.8.0_231  

基础应用

前置知识

入门案例

flume帮助文档
http://flume.apache.org/FlumeUserGuide.html

业务需求:监听本机 8888 端口,Flume将监听的数据实时显示在控制台

需求分析:

监控日志文件信息到HDFS

业务需求:监控本地日志文件,收集内容实时上传到HDFS
需求分析:

tail -f 等同于--follow=descriptor,根据文件描述符进行追踪,当文件改名或被删除,追踪停止  
  
tail -F  
等同于--follow=name --retry,根据文件名进行追踪,并保持重试,即该文件被删除 或改名后,如果再次创建相同的文件名,会继续追踪  

监控目录采集信息到HDFS

业务需求:监控指定目录,收集信息实时上传到HDFS
需求分析:

监控日志文件采集数据到HDFS、本地文件系统

业务需求:监控日志文件,收集信息上传到HDFS 和 本地文件系统
需求分析:

taildir Source。Flume 1.7.0加入的新Source,相当于 spooldir source + exec source。可以监控多个目录,并且使用正则表达式匹配该目录中的文件名进行实时 收集。实时监控一批文件,并记录每个文件最新消费位置,agent进程重启后不会有 数据丢失的问题。

目前不适用于Windows系统;其不会对于跟踪的文件有任何处理,不会重命名也不 会删除,不会做任何修改。不支持读取二进制文件,支持一行一行的读取文本文件。

高级特性

拦截器 Interceptor

Flume支持在运行时对event进行修改或丢弃,通过拦截器来实现;
Flume里面的拦截器是实现了org.apache.flume.interceptor.Interceptor 接口的类;
拦截器可以根据配置 修改 甚至 丢弃 event;
Flume也支持链式的拦截器执行方式,在配置文件里面配置多个拦截器就可以了;
拦截器的顺序取决于它们配置的顺序,Event 按照顺序经过每一个拦截器;

选择器 selector

source可以向多个channel同时写数据,所以也就产生了以何种方式向多个channel写的问题:

逻辑处理器Sink Processor

可以把多个sink分成一个组, Sink组逻辑处理器可以对这同一个组里的几个sink进行负载均衡 或者 其中一个sink发生故障后将输出Event的任务转移到其他的sink上。

N个sink将Event输出到对应的N个目的地的,通过 Sink组逻辑处理器 可以把这N个sink配置成负载均衡或者故障转移的工作方式:

事务机制与可靠性

一提到事务,首先就想到的是关系型数据库中的事务,事务一个典型的特征就是将一批操作做成原子性的,要么都成功,要么都失败。

在Flume中一共有两个事务:

从 Source 到 Channel 过程中,数据在 Flume 中会被封装成 Event 对象,也就是一批 Event ,把这批 Event 放到一个事务中,把这个事务也就是这批event一次性的放入Channel 中。同理,Take事务的时候,也是把这一批event组成的事务统一拿出来到sink放到HDFS上。

高可用案例

案例:实现Agent的故障转移

1. 配置环境

在centos7-1、centos7-2上部署Flume、修改环境变量

# 在centos7-3上执行 /opt/lagou/servers  
scp -r flume-1.9.0/ centos7-1:$PWD   
  
scp -r flume-1.9.0/ centos7-2:$PWD  
  
cd /etc  
scp profile centos7-1:$PWD  
scp profile centos7-2:$PWD  
  
# 在centos7-1、centos7-2上分别执行 source /etc/profile  

2. centos-3配置文件

flume-taildir-avro.conf

# agent name  
a1.sources = r1  
a1.channels = c1  
a1.sinks = k1 k2  
  
# source  
a1.sources.r1.type = TAILDIR  
a1.sources.r1.positionFile = /root/flume_log/taildir_position.json  
a1.sources.r1.filegroups = f1  
a1.sources.r1.filegroups.f1 = /tmp/root/.*log  
a1.sources.r1.fileHeader = true  
  
  
# interceptor  
a1.sources.r1.interceptors = i1 i2   
a1.sources.r1.interceptors.i1.type = static   
a1.sources.r1.interceptors.i1.key = Type   
a1.sources.r1.interceptors.i1.value = LOGIN  
# 在event header添加了时间戳   
a1.sources.r1.interceptors.i2.type = timestamp  
  
# channel  
a1.channels.c1.type = memory  
a1.channels.c1.capacity = 10000  
a1.channels.c1.transactionCapacity = 500  
  
# sink group  
a1.sinkgroups = g1  
a1.sinkgroups.g1.sinks = k1 k2  
  
  
# set sink1  
a1.sinks.k1.type = avro  
a1.sinks.k1.hostname = centos7-1  
a1.sinks.k1.port = 9999  
  
# set sink2  
a1.sinks.k2.type = avro  
a1.sinks.k2.hostname = centos7-2  
a1.sinks.k2.port = 9999  
  
# set failover  
a1.sinkgroups.g1.processor.type = failover  
a1.sinkgroups.g1.processor.priority.k1 = 100  
a1.sinkgroups.g1.processor.priority.k2 = 60  
a1.sinkgroups.g1.processor.maxpenalty = 10000  
  
a1.sources.r1.channels = c1  
a1.sinks.k1.channel = c1  
a1.sinks.k2.channel = c1  

3. centos-1配置文件

flume-avro-hdfs.conf

# set Agent name  
a2.sources = r1  
a2.channels = c1  
a2.sinks = k1  
  
# Source  
a2.sources.r1.type = avro  
a2.sources.r1.bind = centos7-1  
a2.sources.r1.port = 9999  
  
# interceptor  
a2.sources.r1.interceptors = i1  
a2.sources.r1.interceptors.i1.type = static  
a2.sources.r1.interceptors.i1.key = Collector  
a2.sources.r1.interceptors.i1.value = centos7-1  
  
# set channel  
a2.channels.c1.type = memory  
a2.channels.c1.capacity = 10000  
a2.channels.c1.transactionCapacity = 500  
  
# HDFS Sink  
a2.sinks.k1.type=hdfs  
a2.sinks.k1.hdfs.path=hdfs://centos7-1:9000/flume/failover/  
a2.sinks.k1.hdfs.fileType=DataStream  
a2.sinks.k1.hdfs.writeFormat=TEXT  
a2.sinks.k1.hdfs.rollInterval=60  
a2.sinks.k1.hdfs.filePrefix=%Y-%m-%d  
a2.sinks.k1.hdfs.minBlockReplicas=1  
a2.sinks.k1.hdfs.rollSize=0  
a2.sinks.k1.hdfs.rollCount=0  
a2.sinks.k1.hdfs.idleTimeout=0  
  
a2.sources.r1.channels = c1  
a2.sinks.k1.channel=c1  

4. centos-2配置文件

flume-avro-hdfs.conf

# set Agent name  
a3.sources = r1  
a3.channels = c1  
a3.sinks = k1  
  
# Source  
a3.sources.r1.type = avro  
a3.sources.r1.bind = centos7-2  
a3.sources.r1.port = 9999  
  
# interceptor  
a3.sources.r1.interceptors = i1  
a3.sources.r1.interceptors.i1.type = static  
a3.sources.r1.interceptors.i1.key = Collector  
a3.sources.r1.interceptors.i1.value = centos7-2  
  
# set channel  
a3.channels.c1.type = memory  
a3.channels.c1.capacity = 10000  
a3.channels.c1.transactionCapacity = 500  
  
# HDFS Sink  
a3.sinks.k1.type=hdfs  
a3.sinks.k1.hdfs.path=hdfs://centos7-1:9000/flume/failover/  
a3.sinks.k1.hdfs.fileType=DataStream  
a3.sinks.k1.hdfs.writeFormat=TEXT  
a3.sinks.k1.hdfs.rollInterval=60  
a3.sinks.k1.hdfs.filePrefix=%Y-%m-%d  
a3.sinks.k1.hdfs.minBlockReplicas=1  
a3.sinks.k1.hdfs.rollSize=0  
a3.sinks.k1.hdfs.rollCount=0  
a3.sinks.k1.hdfs.idleTimeout=0  
  
a3.sources.r1.channels = c1  
a3.sinks.k1.channel=c1  
  

5. 按顺序启动flume

分别在centos7-1、centos7-2、centos7-3上启动对应服务(先启动下游的agent)

# centos7-1  
flume-ng agent --name a2 --conf-file ~/conf/flume-avro-hdfs.conf  
  
# centos7-2  
flume-ng agent --name a3 --conf-file ~/conf/flume-avro-hdfs.conf  
  
# centos7-3  
flume-ng agent --name a1 --conf-file ~/conf/flume-taildir-avro2.conf  

6. 先hive.log中写入数据,检查HDFS目录

7. 杀掉一个Agent,看看另外Agent是否能启动