邗江区网站建设套餐,哪个网上购物网站好,怎么制作商城小程序,网站访问加速器#x1f3e1; 个人主页#xff1a;IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 #x1f6a9; 私聊博主#xff1a;加入大数据技术讨论群聊#xff0c;获取更多大数据资料。 #x1f514; 博主个人B栈地址#xff1a;豹哥教你大数据的个人空间-豹… 个人主页IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 私聊博主加入大数据技术讨论群聊获取更多大数据资料。 博主个人B栈地址豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录
1. BashOperator 调度Shell命令案例
2. BashOperator 调度Shell脚本案例 Airflow中最重要的还是各种Operator其允许生成特定类型的任务这个任务在实例化时称为DAG中的任务节点所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。关于BaseOperator的参数可以参照
http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator
BaseOperator中常用参数如下
task_id(str) 唯一task_id标记owner(str):任务的所有者建议使用linux用户名email(str or list[str]):出问题时发送报警Email的地址可以填写多个用逗号隔开。email_on_retry(bool):当任务重试时是否发送电子邮件email_on_failure(bool):当任务执行失败时是否发送电子邮件retries(int):在任务失败之前应该重试的次数retry_delay(datetime.timedelta):重试间隔必须是timedelta对象start_date(datetime.datetime):DAG开始执行时间这个参数必须是datetime对象不可以使用字符串。end_date(datetime.datetime)DAG运行结束时间任务启动后一般都会一直执行下去一般不设置此参数。depends_on_past(bool,默认False):是否依赖于过去如果为True,那么必须之前的DAG调度成功了现在的DAG调度才能执行。dag(airflow.models.DAG):指定的dag。execution_timeout(datetime.timedelta)执行此任务实例允许的最长时间超过最长时间则任务失败。trigger_rule(str):定义依赖的触发规则包括选项如下{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。
BashOperator主要执行bash脚本或命令BashOperator参数如下
bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)
1. BashOperator 调度Shell命令案例
from datetime import datetime, timedeltafrom airflow import DAG
from airflow.operators.bash import BashOperatordefault_args {owner:zhangsan,start_date:datetime(2021, 9, 23),email:kettle_test1163.com, #pwd:kettle123456retries: 1, # 失败重试次数retry_delay: timedelta(minutes5) # 失败重试间隔
}dag DAG(dag_id execute_shell_cmd,default_argsdefault_args,schedule_intervaltimedelta(minutes1)
)t1BashOperator(task_idprint_date,bash_commanddate,dag dag
)t2BashOperator(task_idprint_helloworld,bash_commandecho hello world!,dagdag
)t3BashOperator(task_idtempplated,bash_command{% for i in range(5) %}echo {{ ds }}echo {{ params.name}}echo {{ params.age}}{% endfor %},params{name:wangwu,age:10},dagdag
)t1 t2 t3
注意在t3中使用了Jinja模板“{% %}”内部是for标签用于循环操作但是必须以{% endfor %}结束。“{{}}”内部是变量其中ds是执行日期是airflow的宏变量params.name和params.age是自定义变量。
在default_args中的email是指当DAG执行失败时发送邮件到指定邮箱想要使用airflow发送邮件需要在$AIRFLOW_HOME/airflow.cfg中配置如下内容
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host smtp.163.com
smtp_starttls True
smtp_ssl False
# Example: smtp_user airflow
smtp_user kettle_test2
# Example: smtp_password airflow
smtp_password VIOFSYMFDIKKIUEA
smtp_port 25
smtp_mail_from kettle_test2163.com
smtp_timeout 30
smtp_retry_limit 5
此外配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务设置如下 2. BashOperator 调度Shell脚本案例
准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下BashOperator默认执行脚本时默认从/tmp/airflow**临时目录查找对应脚本由于临时目录名称不定这里建议执行脚本时在“bash_command”中写上绝对路径。如果要写相对路径可以将脚本放在/tmp目录下在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。
first_shell.sh
#!/bin/bashdt$1echo execute first shell echo ---- first : time is ${dt}
second_shell.sh
#!/bin/bashdt$1echo execute second shell echo ---- second : time is ${dt}
编写airflow python 配置
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatordefault_args {owner:zhangsan,start_date:datetime(2021, 9, 23),retries: 1, # 失败重试次数retry_delay: timedelta(minutes5) # 失败重试间隔
}dag DAG(dag_id execute_shell_sh,default_argsdefault_args,schedule_intervaltimedelta(minutes1)
)firstBashOperator(task_idfirst,#脚本路径建议写绝对路径bash_commandsh /root/airflow/dags/first_shell.sh %s%datetime.now().strftime(%Y-%m-%d),dag dag
)secondBashOperator(task_idsecond,#脚本路径建议写绝对路径bash_commandsh /root/airflow/dags/second_shell.sh %s%datetime.now().strftime(%Y-%m-%d),dagdag
)first second
执行结果 特别注意在“bash_command”中写执行脚本时一定要在脚本后跟上空格有没有参数都要跟上空格否则会找不到对应的脚本。如下