Flink--流(批)式处理框架

06 Jun 2021

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

Flink 是一个开源的流处理框架,它具有以下特点

Flink主要应用于流式数据分析场景

Flink 专注于无限流处理,有限流处理是无限流处理的一种特殊情况

流处理引擎的技术选型

市面上的流处理引擎不止Flink一种,其他的比如Storm、SparkStreaming、Trident等,实际应用 时如何进行选型,给大家一些建议参考

Flink初体验

相关依赖

<properties>  
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
    <maven.compiler.source>1.8</maven.compiler.source>  
    <maven.compiler.target>1.8</maven.compiler.target>  
</properties>  
  
<dependencies>  
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-java</artifactId>  
        <version>1.11.1</version>  
    </dependency>  
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-streaming-java_2.12</artifactId>  
        <version>1.11.1</version>  
    </dependency>  
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-clients_2.12</artifactId>  
        <version>1.11.1</version>  
    </dependency>  
  
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-scala_2.12</artifactId>  
        <version>1.11.1</version>  
    </dependency>  
  
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-streaming-scala_2.12</artifactId>  
        <version>1.11.1</version>  
        <!--<scope>provided</scope>-->  
    </dependency>  
  
  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-hadoop-compatibility_2.11</artifactId>  
        <version>1.11.1</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.hadoop</groupId>  
        <artifactId>hadoop-common</artifactId>  
        <version>2.8.5</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.hadoop</groupId>  
        <artifactId>hadoop-hdfs</artifactId>  
        <version>2.8.5</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.hadoop</groupId>  
        <artifactId>hadoop-client</artifactId>  
        <version>2.8.5</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-connector-kafka_2.11</artifactId>  
        <version>1.11.1</version>  
    </dependency>  
    <!--<dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-connector-kafka-0.11_2.12</artifactId>  
        <version>1.11.1</version>  
    </dependency>-->  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-connector-redis_2.11</artifactId>  
        <version>1.1.5</version>  
    </dependency>  
  
    <dependency>  
        <groupId>mysql</groupId>  
        <artifactId>mysql-connector-java</artifactId>  
        <version>8.0.21</version>  
    </dependency>  
  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-table</artifactId>  
        <version>1.11.1</version>  
        <type>pom</type>  
        <scope>provided</scope>  
    </dependency>  
  
    <!-- Either... -->  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-table-api-java-bridge_2.12</artifactId>  
        <version>1.11.1</version>  
        <scope>provided</scope>  
    </dependency>  
    <!-- or... -->  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-table-api-scala-bridge_2.12</artifactId>  
        <version>1.11.1</version>  
        <scope>provided</scope>  
    </dependency>  
  
  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-table-planner-blink_2.12</artifactId>  
        <version>1.11.1</version>  
        <scope>provided</scope>  
    </dependency>  
  
  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-cep_2.12</artifactId>  
        <version>1.11.1</version>  
    </dependency>  
  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-json</artifactId>  
        <version>1.11.1</version>  
    </dependency>  
  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-csv</artifactId>  
        <version>1.11.1</version>  
    </dependency>  
  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-orc_2.12</artifactId>  
        <version>1.11.1</version>  
    </dependency>  
  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-hbase_2.12</artifactId>  
        <version>1.10.2</version>  
    </dependency>  
  
  
    <!-- <dependency>  
         <groupId>org.apache.flink</groupId>  
         <artifactId>flink-jdbc_2.12</artifactId>  
         <version>1.10.2</version>  
     </dependency>-->  
  
    <dependency>  
        <groupId>org.postgresql</groupId>  
        <artifactId>postgresql</artifactId>  
        <version>42.2.16</version>  
    </dependency>  
  
    <dependency>  
        <groupId>com.github.housepower</groupId>  
        <artifactId>clickhouse-native-jdbc</artifactId>  
        <version>1.6-stable</version>  
    </dependency>  
  
    <dependency>  
        <groupId>org.apache.kudu</groupId>  
        <artifactId>kudu-client</artifactId>  
        <version>1.5.0</version>  
    </dependency>  
  
  
</dependencies>  
<build>  
    <plugins>  
        <!-- 打jar插件 -->  
        <plugin>  
            <groupId>org.apache.maven.plugins</groupId>  
            <artifactId>maven-shade-plugin</artifactId>  
            <version>2.4.3</version>  
            <executions>  
                <execution>  
                    <phase>package</phase>  
                    <goals>  
                        <goal>shade</goal>  
                    </goals>  
                    <configuration>  
                        <filters>  
                            <filter>  
                                <artifact>*:*</artifact>  
                                <excludes>  
                                    <exclude>META-INF/*.SF</exclude>  
                                    <exclude>META-INF/*.DSA</exclude>  
                                    <exclude>META-INF/*.RSA</exclude>  
                                </excludes>  
                            </filter>  
                        </filters>  
                    </configuration>  
                </execution>  
            </executions>  
        </plugin>  
  
    </plugins>  
</build>  

wordCount批处理

wordCount流处理

Flink体系结构

Flink的重要角色

Flink是非常经典的Master/Slave结构实现,JobManager是Master,TaskManager是Slave。

Flink运行架构

Flink安装和部署

Flink支持多种安装模式

环境准备工作

StandAlone模式部署

Step1、Flink安装包上传到centos7-2 对应目录并解压
Step2、修改 flink/conf/flink-conf.yaml 文件

jobmanager.rpc.address: centos7-2  
# 一般cpu有多少核就写多少  
taskmanager.numberOfTaskSlots: 2  

Step3、修改conf/master

vim masters   
  
centos7-2:8081  

Step4、修改conf/workers

vim workers   
  
centos7-1  
centos7-2  
centos7-3  

Step5、把修改好配置的flink包分发到centos7-1, centos7-3上

Step6、standalone模式启动

bin目录下执行./start-cluster.sh  

Step7、jps进程查看核实

3857 TaskManagerRunner  
3411 StandaloneSessionClusterEntrypoint  

Step8、查看Flink的web页面 ip:8081/#/overview

Step9、集群模式下运行example测试

./flink run -c WordCount ../examples/streaming/WordCount.jar  
  
# -c 指定程序入口  

也可以在web端直接提交jar包

Yarn模式部署

Flink常用API

DataStream

DataSet

DataSet批处理的算子和流处理基本一致

Table API和SQL_API

不论是批处理的DataFrame, 还是流处理的DataSet, 都可以转化成Table, 采用统计的处理方式. Table API and SQL目前尚未完全完善,还在积极的开发中,所以并不是所有的算子操作都可以通 过其实现。

Flink认为Batch是Streaming的一个特例,因此Flink底层引擎是一个流式引擎,在上面实现了流处 理和批处理。而Window就是从Streaming到Batch的桥梁。
通俗讲,Window是用来对一个无限的流设置一个有限的集合,从而在有界的数据集上进行操作的 一种机制。流上的集合由Window来划定范围,比如“计算过去10分钟”或者“最后50个元素的和”。
Window可以由时间(Time Window)(比如每30s)或者数据(Count Window)(如每100个 元素)驱动。DataStream API提供了Time和Count的Window。

Flink要操作窗口,先得将StreamSource 转成WindowedStream

翻滚窗口 (Tumbling Window, 无重叠)

将数据依据固定的窗口长度对数据进行切分
特点:时间对齐,窗口长度固定,没有重叠

滑动窗口 (Sliding Window, 有重叠)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成
特点:窗口长度固定,可以有重叠

会话窗口 (Session Window, 活动间隙)

由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段 时间没有接收到新数据就会生成新的窗口。
session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不 会有重叠和固定的开始时间和结束时间的情况
session窗口在一个固定的时间周期内不再收到元素,即非活动间隔产生,那么这个窗口就会关闭。

一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非
活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

特点
会话窗口不重叠,没有固定的开始和结束时间
与翻滚窗口和滑动窗口相反, 当会话窗口在一段时间内没有接收到元素时会关闭会话窗口。
后续的元素将会被分配给新的会话窗口

场景
计算每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开

全局窗口GlobalWindow

window实现

所有windowStream都是通过KeyedStream 如下方法创建的

public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {  
	return new WindowedStream<>(this, assigner);  
}  

WindowAssigner 是一个抽象类, 用来如何根据时间和key把数据放到不同的窗口

Time

在Flink的流式处理中,会涉及到时间的不同概念

在Flink的流式处理中,绝大部分的业务都会使用EventTime,一般只在EventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置使用事件时间  

Watermark

水印(watermark)就是一个时间戳,Flink可以给数据流添加水印, 可以理解为:收到一条消息后,额外给这个消息添加了一个等待时间,这就是添加水印。

Flink的State

用来保存计算结果或缓存数据。

状态类型

Flink根据是否需要保存中间结果,把计算分为有状态计算和无状态计算

状态描述

State 既然是暴露给用户的,那么就需要有一些属性需要指定:state 名称、val serializer、state type info。在对应的statebackend中,会去调用对应的create方法获取到stateDescriptor中的值。Flink通 过 StateDescriptor 来定义一个状态。这是一个抽象类,内部定义了状态名称、类型、序列化器等基 础信息。与上面的状态对应,从 StateDescriptor 派生了 ValueStateDescriptor ,
ListStateDescriptor 等descriptor

广播状态

正常收到的数据只会在一个subtask,在一条线程上处理, 而广播状态会发送到所有subtask,所有线程上, 可以用来做模式匹配

状态存储

并行度的设置

一个Flink程序由多个Operator组成(source、transformation和 sink)。 一个Operator由多个并行的Task(线程)来执行, 一个Operator的并行Task(线程)数目就被称为该Operator(任务)的并行度(Parallel)

并行度可以有如下几种指定方式

Operator Level(算子级别)

一个算子、数据源和sink的并行度可以通过调用 setParallelism()方法来指定

actions.filter(new FilterFunction<UserAction>() {  
            @Override  
            public boolean filter(UserAction value) throws Exception {  
                return false;  
            }  
        }).setParallelism(4);  

Execution Environment Level(Env级别)

执行环境(任务)的默认并行度可以通过调用setParallelism()方法指定。为了以并行度3来执行所有的算 子、数据源和data sink, 可以通过如下的方式设置执行环境的并行度:
执行环境的并行度可以通过显式设置算子的并行度而被重写

StreamExecutionEnvironment env =  
StreamExecutionEnvironment.getExecutionEnvironment();  
env.setParallelism(3);  

Client Level(客户端级别,推荐使用)

并行度可以在客户端将job提交到Flink时设定。 对于CLI客户端,可以通过-p参数指定并行度

./bin/flink run -p 10 WordCount-java.jar  

System Level(系统默认级别,尽量不使用)

在系统级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定所有执行环境的默认并行度

注意事项

1.并行度的优先级:算子级别 > env级别 > Client级别 > 系统默认级别 (越靠前具体的代码并行度的优先 级越高)
2.如果source不可以被并行执行,即使指定了并行度为多个,也不会生效
3.尽可能的规避算子的并行度的设置,因为并行度的改变会造成task的重新划分,带来shuffle问题
4.推荐使用任务提交的时候动态的指定并行度
5.slot是静态的概念,是指taskmanager具有的并发执行能力; parallelism是动态的概念,是指程序运行 时实际使用的并发能力

CEP 即Complex Event Processing - 复杂事件处理,Flink CEP 是在 Flink 中实现的复杂时间处理(CEP) 库。处理事件的规则,被叫做“模式”(Pattern),Flink CEP 提供了 Pattern API,用于对输入流数据进行 复杂事件规则定义,用来提取符合规则的事件序列。

Pattern API 大致分为三种:个体模式,组合模式,模式组。

CEP 在互联网各个行业都有应用,例如金融、物流、电商、智能交通、物联网行业等行业

相关概念

Pattern API

处理事件的规则,被叫作模式(Pattern)。
Flink CEP提供了Pattern API用于对输入流数据进行复杂事件规则定义,用来提取符合规则的事件序 列。
模式大致分为三类

非确定有限自动机

FlinkCEP在运行时会将用户的逻辑转化成这样的一个NFA Graph (nfa对象)
所以有限状态机的工作过程,就是从开始状态,根据不同的输入,自动进行状态转换的过程。

上图中的状态机的功能,是检测二进制数是否含有偶数个 0。从图上可以看出,输入只有 1 和 0 两种。 从 S1 状态开始,只有输入 0 才会转换到 S2 状态,同样 S2 状态下只有输入 0 才会转换到 S1。所以, 二进制数输入完毕,如果满足最终状态,也就是最后停在 S1 状态,那么输入的二进制数就含有偶数个 0。

案例

FlinkSQL

Flink 本身是批流统一的处理框架,所以 Table API 和 SQL,就是批流统一的上层处理 API。
Table API 是一套内嵌在 Java 和 Scala 语言中的查询 API,它允许我们以非常直观的方式, 组合来自一些关系运算符的查询(比如 select、filter 和 join)。而对于 Flink SQL,就是直接可 以在代码中写 SQL,来实现一些查询(Query)操作。Flink 的 SQL 支持,基于实现了 SQL 标准的 Apache Calcite(Apache 开源 SQL 解析工具)。
无论输入是批输入还是流式输入,在这两套 API 中,指定的查询都具有相同的语义,得到相同的结果

用到的依赖

<!--        flinktable的基础依赖 -->  
        <dependency>  
            <groupId>org.apache.flink</groupId>  
            <artifactId>flink-table</artifactId>  
            <version>1.11.1</version>  
            <type>pom</type>  
            <scope>provided</scope>  
        </dependency>  
  
<!--        桥接器,主要负责 table API 和 DataStream/DataSet API 的连接支持,  
            按照语言分 java 和 scala。-->  
        <!-- Either... -->  
        <dependency>  
            <groupId>org.apache.flink</groupId>  
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>  
            <version>1.11.1</version>  
            <scope>provided</scope>  
        </dependency>  
        <!-- or... -->  
        <dependency>  
            <groupId>org.apache.flink</groupId>  
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>  
            <version>1.11.1</version>  
            <scope>provided</scope>  
        </dependency>  
  
<!--        计划器,是 table API 最主要的部分,提供了运行时环境和生 成程序执行计划的 planner;  
            如果是生产环境,lib 目录下默认已 经有了 planner,就只需要有 bridge 就可以了-->  
        <dependency>  
            <groupId>org.apache.flink</groupId>  
            <artifactId>flink-table-planner-blink_2.12</artifactId>  
            <version>1.11.1</version>  
            <scope>provided</scope>  
        </dependency>  

一般流程

import org.apache.flink.api.java.tuple.Tuple2;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.datastream.DataStreamSource;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.api.functions.source.SourceFunction;  
import org.apache.flink.table.api.EnvironmentSettings;  
import org.apache.flink.table.api.Table;  
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;  
import org.apache.flink.types.Row;  
  
import static org.apache.flink.table.api.Expressions.$;  
  
public class TableSQLDemo1 {  
    public static void main(String[] args) throws Exception {  
  
//        1、Flink执行环境env  
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
        // BlinkPlanner: 计划器,是 table API 最主要的部分,提供了运行时环境和生 成程序执行计划的 planner;  
        // 基于 blink 版本的流处理环境(Blink-Streaming-Query)或者,基于 blink 版本的批处理环境(Blink- Batch-Query):  
        EnvironmentSettings settings = EnvironmentSettings.newInstance()  
                .useBlinkPlanner()  
                .inStreamingMode()  
//                .inBatchMode()  
                .build();  
  
        // 2、用env,做出Table环境tEnv  
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);  
  
        // 3、获取流式数据源  
        DataStreamSource<Tuple2<String, Integer>> data = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {  
            @Override  
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {  
                int num = 0;  
                while (true) {  
                    ctx.collect(new Tuple2<>("April" + num, num++));  
                    Thread.sleep(1000);  
                }  
            }  
  
            @Override  
            public void cancel() {  
  
            }  
        });  
  
        // 4、将流式数据源做成Table  
        // table方式  
        Table myTable = tEnv.fromDataStream(data, $("name"), $("num"));  
        // sql 方式  
        tEnv.createTemporaryView("nameTable", data, $("name"), $("num"));  
  
        // 5、对Table中的数据做查询  
//        使用table api  
//        Table selectResult = myTable.select($("name"), $("num"));  
        // 过滤数据  
//        Table selectResult = myTable.select($("name"), $("num")).filter($("num").mod(2).isEqual(0));  
  
        // 使用sql语句  
//        Table selectResult = tEnv.sqlQuery("select * from nameTable");  
        Table selectResult = tEnv.sqlQuery("select * from nameTable where mod(num,2)=0");  
  
        // 6、将Table转成数据流:  
        DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tEnv.toRetractStream(selectResult, Row.class);  
  
        tuple2DataStream.print();  
  
        env.execute();  
    }  
}  

外部链接

查询数据

官网:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html

作业提交

Flink的jar文件并不是Flink集群的可执行文件,需要经过转换之后提交给集群

转换过程:
1、在Flink Client中,通过反射启动jar中的main函数,生成Flink StreamGraph和JobGraph。将 JobGraph提交给Flink集群。

2、Flink集群收到JobGraph后,将JobGraph翻译成ExecutionGraph,然后开始调度执行,启动成功之 后开始消费数据

Flink的核心执行流程就是,把用户的一系列API调用,转化为StreamGraph —> JobGraph —> ExecutionGraph —> 物理执行拓扑(Task DAG)

PipelineExecutor

是Flink Client生成JobGraph之后,将作业提交给集群运行的重要环节

Session模式:AbstractSessionClusterExecutor Per-Job模式:AbstractJobClusterExecutor IDE调试:LocalExecutor

Session模式

作业提交通过: yarn-session.sh脚本 在启动脚本的时候检查是否已经存在已经启动好的Flink-Session模式的集群, 然后在PipelineExecutor中,通过Dispatcher提供的Rest接口提交Flink JobGraph Dispatcher为每一个作业提供一个JobMaser,进入到作业执行阶段

Per-Job模式

一个作业一个集群,作业之间相互隔离。

在PipelineExecutor执行作业提交的时候,可以创建集群并将JobGraph以及所有需要的文件一起提交给 Yarn集群,在Yarn集群的容器中启动Flink Master(JobManager进程),进行初始化后,从文件系统 中获取JobGraph,交给Dispatcher,之后和Session流程相同。

流程图

详细流程图