Airflow2.2.3 + Celery + MySQL 8构建一个健壮的分布式调度集群

 2022-10-27    450  

前面聊了Airflow基础架构??,以及又讲了如何在容器化内部署Airflow??,今天我们就再来看看如何通过Airflow和celery构建一个健壮的分布式调度集群。

1集群环境

同样是在Ubuntu 20.04.3 LTS机器上安装Airflow集群,这次我们准备三台同等配置服务器,进行测试,前篇文章??[1]中,我们已经在Bigdata1服务器上安装了airflow的所有组件,没看过的可以点击链接先看下之前的文章,现在只需要在其他两个节点安装worker组件即可。

Bigdata1(A) Bigdata2(B) Bigdata3(C)
Webserver
Scheduler
Worker

在上篇文章中的docker-compose.yml中没有对部署文件以及数据目录进行的分离,这样在后期管理的时候不太方便,因此我们可以把服务停止后,将数据库以及数据目录与部署文件分开

  • 部署文件:docker-compose.yaml/.env 存放在/apps/airflow目录下
  • MySQL以及配置文件: 放在/data/mysql
  • airflow数据目录: 放在/data/airflow

这样拆分开就方便后期的统一管理了。

2部署worker服务

前期准备

mkdir/data/airflow/{dags,plugins}-pv
mkdir-pv/apps/airflow
mkdir-pv/logs/airflow

worker的部署文件:

---
version:'3'
x-airflow-common:
&airflow-common
#Inordertoaddcustomdependenciesorupgradeproviderpackagesyoucanuseyourextendedimage.
#Commenttheimageline,placeyourDockerfileinthedirectorywhereyouplacedthedocker-compose.yaml
#anduncommentthe"build"linebelow,Thenrun`docker-composebuild`tobuildtheimages.
image:${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}
#build:.
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR:CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN:mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow#修改MySQL对应的账号和密码
AIRFLOW__CELERY__RESULT_BACKEND:db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow#修改MySQL对应的账号和密码
AIRFLOW__CELERY__BROKER_URL:redis://:xxxx@$${REDIS_HOST}:7480/0#修改Redis的密码
AIRFLOW__CORE__FERNET_KEY:''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION:'true'
AIRFLOW__CORE__LOAD_EXAMPLES:'true'
AIRFLOW__API__AUTH_BACKEND:'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS:${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
-/data/airflow/dags:/opt/airflow/dags
-/logs/airflow:/opt/airflow/logs
-/data/airflow/plugins:/opt/airflow/plugins
-/data/airflow/airflow.cfg:/opt/airflow/airflow.cfg
user:"${AIRFLOW_UID:-50000}:0"

services:
airflow-worker:
<<:*airflow-common
command:celeryworker
healthcheck:
test:
-"CMD-SHELL"
-'celery--appairflow.executors.celery_executor.appinspectping-d"celery@$${HOSTNAME}"'
interval:10s
timeout:10s
retries:5
environment:
<<:*airflow-common-env
#Requiredtohandlewarmshutdownoftheceleryworkersproperly
#Seehttps://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID:"0"
restart:always
hostname:bigdata-20-194#此处设置容器的主机名,便于在flower中查看是哪个worker
depends_on:
airflow-init:
condition:service_completed_successfully

airflow-init:
<<:*airflow-common
entrypoint:/bin/bash
#yamllintdisablerule:line-length
command:
--c
-|
functionver(){
printf"%04d%04d%04d%04d"$${1//./}
}
airflow_version=$$(gosuairflowairflowversion)
airflow_version_comparable=$$(ver$${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver$${min_airflow_version})
if((airflow_version_comparable<min_airflow_version_comparable));then
echo
echo-e"\033[1;31mERROR!!!:ToooldAirflowversion$${airflow_version}!\e[0m"
echo"TheminimumAirflowversionsupported:$${min_airflow_version}.Onlyusethisorhigher!"
echo
exit1
fi
if[[-z"${AIRFLOW_UID}"]];then
echo
echo-e"\033[1;33mWARNING!!!:AIRFLOW_UIDnotset!\e[0m"
echo"IfyouareonLinux,youSHOULDfollowtheinstructionsbelowtoset"
echo"AIRFLOW_UIDenvironmentvariable,otherwisefileswillbeownedbyroot."
echo"Forotheroperatingsystemsyoucangetridofthewarningwithmanuallycreated.envfile:"
echo"See:https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf_PHYS_PAGES)*$$(getconfPAGE_SIZE)/one_meg))
cpus_available=$$(grep-cE'cpu[0-9]+'/proc/stat)
disk_available=$$(df/|tail-1|awk'{print$$4}')
warning_resources="false"
if((mem_available<4000));then
echo
echo-e"\033[1;33mWARNING!!!:NotenoughmemoryavailableforDocker.\e[0m"
echo"Atleast4GBofmemoryrequired.Youhave$$(numfmt--toiec$$((mem_available*one_meg)))"
echo
warning_resources="true"
fi
if((cpus_available<2));then
echo
echo-e"\033[1;33mWARNING!!!:NotenoughCPUSavailableforDocker.\e[0m"
echo"Atleast2CPUsrecommended.Youhave$${cpus_available}"
echo
warning_resources="true"
fi
if((disk_available<one_meg*10));then
echo
echo-e"\033[1;33mWARNING!!!:NotenoughDiskspaceavailableforDocker.\e[0m"
echo"Atleast10GBsrecommended.Youhave$$(numfmt--toiec$$((disk_available*1024)))"
echo
warning_resources="true"
fi
if[[$${warning_resources}=="true"]];then
echo
echo-e"\033[1;33mWARNING!!!:YouhavenotenoughresourcestorunAirflow(seeabove)!\e[0m"
echo"Pleasefollowtheinstructionstoincreaseamountofresourcesavailable:"
echo"https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
echo
fi
mkdir-p/sources/logs/sources/dags/sources/plugins
chown-R"${AIRFLOW_UID}:0"/sources/{logs,dags,plugins}
exec/entrypointairflowversion
#yamllintenablerule:line-length
environment:
<<:*airflow-common-env
_AIRFLOW_DB_UPGRADE:'true'
_AIRFLOW_WWW_USER_CREATE:'true'
_AIRFLOW_WWW_USER_USERNAME:${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD:${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
user:"0:0"
volumes:
-.:/sources

airflow-cli:
<<:*airflow-common
profiles:
-debug
environment:
<<:*airflow-common-env
CONNECTION_CHECK_MAX_COUNT:"0"
#Workaroundforentrypointissue.See:https://github.com/apache/airflow/issues/16252
command:
-bash
--c
-airflow

初始化检测,检查环境是否满足:

cd/apps/ariflow/
echo-e"AIRFLOW_UID=$(id-u)">.env#注意,此处一定要保证AIRFLOW_UID是普通用户的UID,且保证此用户有创建这些持久化目录的权限
docker-composeupairflow-init

如果数据库已经存在,初始化检测不影响已有的数据库,接下来就运行airflow-worker服务

docker-composeup-d

接下来,按照同样的方式在bigdata3节点上安装airflow-worker服务就可以了。部署完成之后,就可以通过flower查看broker的状态:

3持久化配置文件

大多情况下,使用airflow多worker节点的集群,我们就需要持久化airflow的配置文件,并且将airflow同步到所有的节点上,因此这里需要修改一下docker-compose.yaml中x-airflow-common的volumes,将airflow.cfg通过挂载卷的形式挂载到容器中,配置文件可以在容器中拷贝一份出来,然后在修改;

前期使用的时候,我们需要将docker-compose文件中的一些环境变量的值写入到airflow.cfg文件中,例如以下信息:

[core]
dags_folder=/opt/airflow/dags
hostname_callable=socket.getfqdn
default_timezone=Asia/Shanghai#修改时区
executor=CeleryExecutor
sql_alchemy_conn=mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow
sql_engine_encoding=utf-8
sql_alchemy_pool_enabled=True
sql_alchemy_pool_size=5
sql_alchemy_max_overflow=10
sql_alchemy_pool_recycle=1800
sql_alchemy_pool_pre_ping=True
sql_alchemy_schema=
parallelism=32
max_active_tasks_per_dag=16
dags_are_paused_at_creation=True
max_active_runs_per_dag=16
load_examples=True
load_default_connections=True
plugins_folder=/opt/airflow/plugins
execute_tasks_new_python_interpreter=False
fernet_key=
donot_pickle=True
dagbag_import_timeout=30.0
dagbag_import_error_tracebacks=True
dagbag_import_error_traceback_depth=2
dag_file_processor_timeout=50
task_runner=StandardTaskRunner
default_impersonation=
security=
unit_test_mode=False
enable_xcom_pickling=False
killed_task_cleanup_time=60
dag_run_conf_overrides_params=True
dag_discovery_safe_mode=True
default_task_retries=0
default_task_weight_rule=downstream
min_serialized_dag_update_interval=30
min_serialized_dag_fetch_interval=10
max_num_rendered_ti_fields_per_task=30
check_slas=True
xcom_backend=airflow.models.xcom.BaseXCom
lazy_load_plugins=True
lazy_discover_providers=True
max_db_retries=3
hide_sensitive_var_conn_fields=True
sensitive_var_conn_names=
default_pool_task_slot_count=128
[logging]
base_log_folder=/opt/airflow/logs
remote_logging=False
remote_log_conn_id=
google_key_path=
remote_base_log_folder=
encrypt_s3_logs=False
logging_level=INFO
fab_logging_level=WARNING
logging_config_class=
colored_console_log=True
colored_log_format=[%%(blue)s%%(asctime)s%%(reset)s]{%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d}%%(log_color)s%%(levelname)s%%(reset)s-%%(log_color)s%%(message)s%%(reset)s
colored_formatter_class=airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format=[%%(asctime)s]{%%(filename)s:%%(lineno)d}%%(levelname)s-%%(message)s
simple_log_format=%%(asctime)s%%(levelname)s-%%(message)s
task_log_prefix_template=
log_filename_template={{ti.dag_id}}/{{ti.task_id}}/{{ts}}/{{try_number}}.log
log_processor_filename_template={{filename}}.log
dag_processor_manager_log_location=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
task_log_reader=task
extra_logger_names=
worker_log_server_port=8793
[metrics]
statsd_on=False
statsd_host=localhost
statsd_port=8125
statsd_prefix=airflow
statsd_allow_list=
stat_name_handler=
statsd_datadog_enabled=False
statsd_datadog_tags=
[secrets]
backend=
backend_kwargs=
[cli]
api_client=airflow.api.client.local_client
endpoint_url=http://localhost:8080
[debug]
fail_fast=False
[api]
enable_experimental_api=False
auth_backend=airflow.api.auth.backend.deny_all
maximum_page_limit=100
fallback_page_limit=100
google_oauth2_audience=
google_key_path=
access_control_allow_headers=
access_control_allow_methods=
access_control_allow_origins=
[lineage]
backend=
[atlas]
sasl_enabled=False
host=
port=21000
username=
password=
[operators]
default_owner=airflow
default_cpus=1
default_ram=512
default_disk=512
default_gpus=0
default_queue=default
allow_illegal_arguments=False
[hive]
default_hive_mapred_queue=
[webserver]
base_url=https://devopsman.cn/airflow#自定义airflow域名
default_ui_timezone=Asia/Shanghai#设置默认的时区
web_server_host=0.0.0.0
web_server_port=8080
web_server_ssl_cert=
web_server_ssl_key=
web_server_master_timeout=120
web_server_worker_timeout=120
worker_refresh_batch_size=1
worker_refresh_interval=6000
reload_on_plugin_change=False
secret_key=emEfndkf3QWZ5zVLE1kVMg==
workers=4
worker_class=sync
access_logfile=-
error_logfile=-
access_logformat=
expose_config=False
expose_hostname=True
expose_stacktrace=True
dag_default_view=tree
dag_orientation=LR
log_fetch_timeout_sec=5
log_fetch_delay_sec=2
log_auto_tailing_offset=30
log_animation_speed=1000
hide_paused_dags_by_default=False
page_size=100
navbar_color=#fff
default_dag_run_display_number=25
enable_proxy_fix=False
proxy_fix_x_for=1
proxy_fix_x_proto=1
proxy_fix_x_host=1
proxy_fix_x_port=1
proxy_fix_x_prefix=1
cookie_secure=False
cookie_samesite=Lax
default_wrap=False
x_frame_enabled=True
show_recent_stats_for_completed_runs=True
update_fab_perms=True
session_lifetime_minutes=43200
auto_refresh_interval=3
[email]
email_backend=airflow.utils.email.send_email_smtp
email_conn_id=smtp_default
default_email_on_retry=True
default_email_on_failure=True
[smtp]#邮箱配置
smtp_host=localhost
smtp_starttls=True
smtp_ssl=False
smtp_port=25
smtp_mail_from=airflow@example.com
smtp_timeout=30
smtp_retry_limit=5
[sentry]
sentry_on=false
sentry_dsn=
[celery_kubernetes_executor]
kubernetes_queue=kubernetes
[celery]
celery_app_name=airflow.executors.celery_executor
worker_concurrency=16
worker_umask=0o077
broker_url=redis://:xxxx@$${REDIS_HOST}:7480/0
result_backend=db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow
flower_host=0.0.0.0
flower_url_prefix=
flower_port=5555
flower_basic_auth=
sync_parallelism=0
celery_config_options=airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
ssl_active=False
ssl_key=
ssl_cert=
ssl_cacert=
pool=prefork
operation_timeout=1.0
task_track_started=True
task_adoption_timeout=600
task_publish_max_retries=3
worker_precheck=False
[celery_broker_transport_options]
[dask]
cluster_address=127.0.0.1:8786
tls_ca=
tls_cert=
tls_key=
[scheduler]
job_heartbeat_sec=5
scheduler_heartbeat_sec=5
num_runs=-1
scheduler_idle_sleep_time=1
min_file_process_interval=30
dag_dir_list_interval=300
print_stats_interval=30
pool_metrics_interval=5.0
scheduler_health_check_threshold=30
orphaned_tasks_check_interval=300.0
child_process_log_directory=/opt/airflow/logs/scheduler
scheduler_zombie_task_threshold=300
catchup_by_default=True
max_tis_per_query=512
use_row_level_locking=True
max_dagruns_to_create_per_loop=10
max_dagruns_per_loop_to_schedule=20
schedule_after_task_execution=True
parsing_processes=2
file_parsing_sort_mode=modified_time
use_job_schedule=True
allow_trigger_in_future=False
dependency_detector=airflow.serialization.serialized_objects.DependencyDetector
trigger_timeout_check_interval=15
[triggerer]
default_capacity=1000
[kerberos]
ccache=/tmp/airflow_krb5_ccache
principal=airflow
reinit_frequency=3600
kinit_path=kinit
keytab=airflow.keytab
forwardable=True
include_ip=True
[github_enterprise]
api_rev=v3
[elasticsearch]
host=
log_id_template={dag_id}-{task_id}-{execution_date}-{try_number}
end_of_log_mark=end_of_log
frontend=
write_stdout=False
json_format=False
json_fields=asctime,filename,lineno,levelname,message
host_field=host
offset_field=offset
[elasticsearch_configs]
use_ssl=False
verify_certs=True
[kubernetes]
pod_template_file=
worker_container_repository=
worker_container_tag=
namespace=default
delete_worker_pods=True
delete_worker_pods_on_failure=False
worker_pods_creation_batch_size=1
multi_namespace_mode=False
in_cluster=True
kube_client_request_args=
delete_option_kwargs=
enable_tcp_keepalive=True
tcp_keep_idle=120
tcp_keep_intvl=30
tcp_keep_cnt=6
verify_ssl=True
worker_pods_pending_timeout=300
worker_pods_pending_timeout_check_interval=120
worker_pods_queued_check_interval=60
worker_pods_pending_timeout_batch_size=100
[smart_sensor]
use_smart_sensor=False
shard_code_upper_limit=10000
shards=5
sensors_enabled=NamedHivePartitionSensor

修改完成之后,重启一下服务。

docker-composerestart

4数据同步

因为airflow使用了三个worker节点,每个节点修改配置,其他节点都要同步,同时DAGS目录以及plugins目录也需要实时进行同步,在scheduler将信息调度到某个节点后,如果找不到对应的DAGS文件,就会报错,因此我们使用lsyncd进行数据实时同步:

apt-getinstalllsyncd-y

配置节点之间通过公钥连接

ssh-keygen-trsa-C"airflow-sync"-b4096#生成一对名为airflow-sync的密钥
foripin100200;dossh-copy-id-i~/.ssh/airflow-sync.pub${USERNAME}@192.168.0.$ip-P12022;done

然后我们就可以通过私钥访问了其它节点了。

编辑同步的配置文件,lsyncd配置的更多参数学习,可以直达官方文档[2]

settings{
logfile="/var/log/lsyncd.log",#日志文件
statusFile="/var/log/lsyncd.status",#同步状态信息
pidfile="/var/run/lsyncd.pid",
statusInterval=1,
nodaemon=false,#守护进程
inotifyMode="CloseWrite",
maxProcesses=1,
maxDelays=1,
}
sync{
default.rsync,
source="/data/airflow",
target="192.168.0.100:/data/airflow",

rsync={
binary="/usr/bin/rsync",
compress=false,
archive=true,
owner=true,
perms=true,
--delete=true,
whole_file=false,
rsh="/usr/bin/ssh-p12022-lsuoper-oStrictHostKeyChecking=no-i/home/username/.ssh/airflow-rsync"
},
}
sync{
default.rsync,
source="/data/airflow",
target="192.168.0.200:/data/airflow",

rsync={
binary="/usr/bin/rsync",
compress=false,
archive=true,
owner=true,
perms=true,
--delete=true,
whole_file=false,
rsh="/usr/bin/ssh-p12022-lsuoper-oStrictHostKeyChecking=no-i/home/username/.ssh/airflow-rsync"
},
}

以上的参数是什么意思,可以访问官网查看,此处是通过rsync的rsh定义ssh命令,能够解决使用了私钥,自定义端口等安全措施的场景,当然你也可以使用配置无密访问,然后使用default.rsync或者default.rsyncssh等进行配置。

配置lsyncd的服务托管

cat<<EOF>/etc/systemd/system/lsyncd.service
[Unit]
Description=lsyncd
ConditionFileIsExecutable=/usr/bin/lsyncd

After=network-online.target
Wants=network-online.target

[Service]
StartLimitBurst=10
ExecStart=/usr/bin/lsyncd/etc/lsyncd.conf
Restart=on-failure
RestartSec=120
EnvironmentFile=-/etc/sysconfig/aliyun
KillMode=process
[Install]
WantedBy=multi-user.target
EOF

systemctldaemon-reload
systemctlenable--nowlsyncd.service#启动服务并配置开启自启

这样就完成了数据(dags,plugins,airflow.cfg)的同步问题,后期使用CICD场景的时候,便可以直接将dag文件上传到Bigdata1节点上即可,其他两个节点就会自动同步了。如果出现问题,可以通过查看日志进行debug

lsyncd-logall/etc/lsyncd.conf
tail-f/var/log/lsyncd.log

5反向代理[3]

如果你需要将airflow放在反向代理之后,如https://lab.mycompany.com/myorg/airflow/你可以通过一下配置完成:

在airflow.cfg中配置base_url

base_url=http://my_host/myorg/airflow
enable_proxy_fix=True

nginx的配置

server{
listen80;
server_namelab.mycompany.com;

location/myorg/airflow/{
proxy_passhttp://localhost:8080;
proxy_set_headerHost$http_host;
proxy_redirectoff;
proxy_http_version1.1;
proxy_set_headerUpgrade$http_upgrade;
proxy_set_headerConnection"upgrade";
}
}

到这里就基本上完成的airflow分布式调度集群的安装了.看下具体效果如下。

看到这里说明你也正在使用或对Airflow感兴趣,顺便送你一个学习Airflow资料;

https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-12/1

参考资料

[1]Airflow 2.2.3 + MySQL8.0.27: https://mp.weixin.qq.com/s/VncpyXcTtlvnDkFrsAZ5lQ

[2]lsyncd config file: https://lsyncd.github.io/lsyncd/manual/config/file/

[3]airflow-behind-proxy: https://airflow.apache.org/docs/apache-airflow/stable/howto/run-behind-proxy.html

  •  标签:  
  • MySQL
  •  

原文链接:http://www.77isp.com/post/10602.html

=========================================

http://www.77isp.com/ 为 “云服务器技术网” 唯一官方服务平台,请勿相信其他任何渠道。