Airflow任务调度系统

24 Mar 2021

Airflow 是 Airbnb 开源的一个用 Python 编写的调度工具。 Airflow将一个工作流制定为一组任务的有向无环图(DAG),并指派到一组计算节 点上,根据相互之间的依赖关系,有序执行。Airflow 有以下优势:

简介

架构体系

重要概念

Airflow安装部署

安装条件

Python环境准备

# 卸载 mariadb  
rpm -qa | grep mariadb   
mariadb-libs-5.5.65-1.el7.x86_64   
mariadb-5.5.65-1.el7.x86_64   
mariadb-devel-5.5.65-1.el7.x86_64  
  
yum remove mariadb  
yum remove mariadb-devel  
yum remove mariadb-libs  
  
# 安装依赖  
rpm -ivh mysql57-community-release-el7-11.noarch.rpm  
yum -y install mysql-community-server  
  
yum install readline readline-devel -y  
yum install gcc -y  
yum install zlib* -y  
yum install openssl openssl-devel -y  
yum install sqlite-devel -y  
yum install python-devel mysql-devel -y  
  
# 提前到python官网下载好包  
cd /opt/software  
tar -zxvf Python-3.6.6.tgz  
  
# 安装 python3 运行环境  
cd Python-3.6.6/  
# configure文件是一个可执行的脚本文件。如果配置了--prefix,安装后的所有资源文件都会放在目录中  
./configure --prefix=/usr/local/python3.6  
make && make install  
/usr/local/python3.6/bin/pip3 install virtualenv  
  
# 启动 python3 环境  
cd /usr/local/python3.6/bin/  
./virtualenv env  
. env/bin/activate  
  
# 检查 python 版本  
python -V  

安装Airflow

# 设置目录(配置文件)  
# 添加到配置文件/etc/profile。未设置是缺省值为 ~/airflow  
export AIRFLOW_HOME=/opt/servers/airflow  
  
# 使用国内源下载  
pip3 install apache-airflow==1.10.11 -i https://pypi.douban.com/simple  

创建MySQL用户并授权

-- 创建数据库  
create database airflowcentos72;  
  
-- 创建用户airflow,设置所有ip均可以访问  
create user 'airflow'@'%' identified by '12345678';  
create user 'airflow'@'localhost' identified by '12345678';  
  
-- 用户授权,为新建的airflow用户授予Airflow库的所有权限  
grant all on airflowcentos72.* to 'airflow'@'%';  
grant all on *.* to 'airflow'@'localhost';  
SET GLOBAL explicit_defaults_for_timestamp = 1;   
flush privileges;  

修改Airflow DB配置

# python3 环境中执行  
pip install mysqlclient==1.4.6   
pip install SQLAlchemy==1.3.15  
airflow initdb  

修改 $AIRFLOW_HOME/airflow.cfg:

 # 约 75 行  
sql_alchemy_conn = mysql://airflow:12345678@centos7-2:3306/airflowcentos72  
  
# 重新执行  
airflow initdb  

安装密码模块

pip install apache-airflow[password]  

修改 airflow.cfg 配置文件(第一行修改,第二行增加):

# 约 281 行  
[webserver]  
  
# 约 353行  
authenticate = True  
auth_backend = airflow.contrib.auth.backends.password_auth  

添加密码文件, 只需python脚本

import airflow  
from airflow import models, settings  
from airflow.contrib.auth.backends.password_auth import PasswordUser  
  
user = PasswordUser(models.User())  
user.username = 'airflow'  
user.email = 'airflow@lagou.com'  
user.password = 'airflow123'  
  
session = settings.Session()  
session.add(user)  
session.commit()  
session.close()  
exit()  

启动服务

# 备注:要先进入python3的运行环境   
  
cd /usr/local/python3.6/bin/   
./virtualenv env  
. env/bin/activate  
  
# 退出虚拟环境命令   
deactivate  
  
# 启动scheduler调度器:   
airflow scheduler -D  
# 服务页面启动:  
airflow webserver -D  

安装完成,可以使用浏览器登录 http://centos7-2:8080;输入用户名、口令:airflow / airflow123

停止airflow webserver

# 关闭 airflow webserver 对应的服务  
ps -ef | grep 'airflow-webserver' | grep -v 'grep' | awk '{print $2}' | xargs -i kill -9 {}  
  
# 关闭 airflow scheduler 对应的服务  
ps -ef | grep 'airflow' | grep 'scheduler' | awk '{print $2}' | xargs -i kill -9 {}  
  
# 删除对应的pid文件  
cd $AIRFLOW_HOME  
rm -rf *.pid  

禁用自带的DAG任务

  1. 停止服务
  2. 修改文件 $AIRFLOW_HOME/airflow.cfg:
# 修改文件第 136 行  
load_examples = False  
  
  
# 重新设置db   
airflow resetdb -y  
  1. 重新设置账户、口令:
import airflow  
from airflow import models, settings  
from airflow.contrib.auth.backends.password_auth import PasswordUser  
  
user = PasswordUser(models.User())  
user.username = 'airflow'  
user.email = 'airflow@lagou.com'  
user.password = 'airflow123'  
  
session = settings.Session()  
session.add(user)  
session.commit()  
session.close()  
exit()  
  1. 重启服务

crontab简介

Linux 系统则是由 cron (crond) 这个系统服务来控制的。Linux 系统上面原本就有非常多的计划性工作,因此这个系统服务是默认启动的。
Linux 系统也提供了Linux用户控制计划任务的命令:crontab 命令。

任务集成部署

Airflow核心概念

入门案例

放置在 $AIRFLOW_HOME/dags 目录下

from datetime import datetime, timedelta  
from airflow import DAG  
from airflow.utils import dates  
from airflow.utils.helpers import chain  
from airflow.operators.bash_operator import BashOperator  
from airflow.operators.python_operator import PythonOperator  
  
  
def default_options():  
    default_args = {  
        'owner': 'airflow',  
        'start_date': dates.days_ago(1),  
        'retries': 1,  
        'retry_delay': timedelta(seconds=5)  
    }  
    return default_args  
  
# 定义DAG  
def task1(dag):  
    t = "pwd"  
    # operator支持多种类型,这里使用 BashOperator  
    task = BashOperator(  
        task_id='MyTask1',  
        bash_command=t,  
        dag=dag  
    )  
    return task  
  
  
def hello_world():  
    current_time = str(datetime.today())  
    print('hello world at {}'.format(current_time))  
  
  
def task2(dag):  
    # Python Operator  
    task = PythonOperator(  
        task_id='MyTask2',  
        python_callable=hello_world,  
        dag=dag  
    )  
    return task  
  
  
def task3(dag):  
    t = "date"  
    task = BashOperator(  
        task_id='MyTask3',  
        bash_command=t,  
        dag=dag  
    )  
    return task  
  
  
with DAG(  
        'HelloWorldDag',  
        default_args=default_options(),  
        schedule_interval="*/2 * * * *"  
) as d:  
    task1 = task1(d)  
    task2 = task2(d)  
    task3 = task3(d)  
    chain(task1, task2, task3)