每当DAG中的任务无法运行或重试运行时,我都试图让Airflow使用AWS SES向我发送电子邮件。我也在使用我的AWS SES凭证,而不是我的一般AWS凭证。
我当前的airflow.cfg
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = [email protected]
我的DAG中当前旨在故意失败并重试的任务:
testfaildag_library_install_jar_jdbc = PythonOperator(
task_id='library_install_jar',
retries=3,
retry_delay=timedelta(seconds=15),
python_callable=add_library_to_cluster,
params={'_task_id': 'cluster_create', '_cluster_name': CLUSTER_NAME, '_library_path':s3000://fakepath.jar},
dag=dag,
email_on_failure=True,
email_on_retry=True,
email=’[email protected]’,
provide_context=True
)
当任务重试设定的次数并最终失败时,一切都会按设计进行,除非没有发送电子邮件。我也检查了上述任务中的日志,并且从未提及过smtp。
我在这里看过类似的问题,但是那里唯一的解决方案对我不起作用。此外,Airflow的文档(例如此处的示例)似乎也不适合我。
SES是否可以与Airflow的email_on_failure和email_on_retry函数一起使用?
什么我目前想这样做的使用是on_failure_callback
要调用的函数由AWS提供的python脚本在这里对失败发送一封电子邮件,但不是在这一点上最好的路线。
谢谢您,谢谢您的帮助。
-使用有效的SES更新6/8
这是我写的有关如何使其全部工作的文章。该答案的底部有一个小摘要。
几个要点:
email_on_failure
和email_on_retry
提供服务。您可以journalctl –u airflow-worker –f
在Dag运行期间进行监视。在生产服务器上,airflow.cfg
使用新的smtp设置进行更改后,您无需重新启动airflow-worker,它应该会自动提取。无需担心搞乱当前正在运行的Dags。这是有关如何使用sendmail的技术文章:
由于我们在本地主机上从ses更改为sendmail,因此必须在中更改smtp设置airflow.cfg
。
新的配置是:
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
#smtp_user = not used
#smtp_password = not used
smtp_port = 25
smtp_mail_from = [email protected]
这适用于生产和局部气流实例。
如果它们的配置不像上面的我的配置可能会收到一些常见的错误:
socket.error: [Errno 111] Connection refused
-您必须将smtp_host
行更改airflow.cfg
为localhost
smtplib.SMTPException: STARTTLS extension not supported by server.
-您必须改变smtp_starttls
在airflow.cfg
以False
在我的本地测试中,我试图简单地强制气流显示尝试发送电子邮件时发生的情况的日志–我创建了一个伪造的dag,如下所示:
# Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
# General imports
from datetime import datetime,timedelta
def throwerror():
raise ValueError("Failure")
SPARK_V_2_2_1 = '3.5.x-scala2.11'
args = {
'owner': ‘me’,
'email': ['me@myjob'],
'depends_on_past': False,
'start_date': datetime(2018, 5,24),
'end_date':datetime(2018,6,28)
}
dag = DAG(
dag_id='testemaildag',
default_args=args,
catchup=False,
schedule_interval="* 18 * * *"
)
t1 = DummyOperator(
task_id='extract_data',
dag=dag
)
t2 = PythonOperator(
task_id='fail_task',
dag=dag,
python_callable=throwerror
)
t2.set_upstream(t1)
如果您执行journalctl -u airflow-worker -f
,您会看到工作人员说它已向DAG中的电子邮件发送了有关失败的警报电子邮件,但我们仍未收到该电子邮件。然后,我们决定通过执行以下操作来查看sendmail的邮件日志cat /var/log/maillog
。我们看到了这样的日志:
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: connect from localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: ID: client=localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/cleanup[port]: ID: message-id=<randomMessageID@production-server-ip-range-ec2-instance>
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: disconnect from localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/qmgr[port]: MESSAGEID: from=<[email protected]>, size=1297, nrcpt=1 (queue active)
Jun 5 14:10:55 production-server-ip-range postfix/smtp[port]: connect to aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Jun 5 14:11:25 production-server-ip-range postfix/smtp[port]: connect to alt1.aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
因此,这可能是最大的“噢”时刻。在这里,我们可以看到smtp服务中实际发生的情况。我们使用telnet确认无法连接到gmail的目标IP范围。
我们确定电子邮件正在尝试发送,但是sendmail服务无法成功连接到ip范围。
我们决定允许AWS上端口25上的所有出站流量(因为我们的气流生产环境是ec2实例),现在它可以成功运行。现在,我们可以接收有关失败和重试的电子邮件(提示:email_on_failure
并且email_on_retry
默认情况下,如True
DAG API参考中所述-如果您不想这样做,则无需将其放入args中,但是明确声明状态仍然是一种好习惯正确或错误)。
SES现在可以使用。这是气流配置:
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 587
smtp_mail_from = [email protected] (Verified SES email)
谢谢!
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句