在Apache Airflow DAG中使用AWS SES发送失败电子邮件

扎克

每当DAG中的任务无法运行或重试运行时,我都试图让Airflow使用AWS SES向我发送电子邮件。我也在使用我的AWS SES凭证,而不是我的一般AWS凭证。

我当前的airflow.cfg

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = [email protected]

我的DAG中当前旨在故意失败并重试的任务:

testfaildag_library_install_jar_jdbc = PythonOperator(
    task_id='library_install_jar',
    retries=3,
    retry_delay=timedelta(seconds=15),
    python_callable=add_library_to_cluster,
    params={'_task_id': 'cluster_create', '_cluster_name': CLUSTER_NAME, '_library_path':s3000://fakepath.jar},
    dag=dag,
    email_on_failure=True,
    email_on_retry=True,
    email=’[email protected]’,
    provide_context=True
)

当任务重试设定的次数并最终失败时,一切都会按设计进行,除非没有发送电子邮件。我也检查了上述任务中的日志,并且从未提及过smtp。

我在这里看过类似的问题,但是那里唯一的解决方案对我不起作用。此外,Airflow的文档(例如此处的示例)似乎也不适合我。

SES是否可以与Airflow的email_on_failure和email_on_retry函数一起使用?

什么我目前想这样做的使用是on_failure_callback要调用的函数由AWS提供的python脚本在这里对失败发送一封电子邮件,但不是在这一点上最好的路线。

谢谢您,谢谢您的帮助。

扎克

-使用有效的SES更新6/8

这是我写的有关如何使其全部工作的文章。该答案的底部有一个小摘要。

几个要点:

  1. 我们决定不使用Amazon SES,而是使用sendmail。现在,SES已启动并正在运行。
  2. 是气流工作者为email_on_failureemail_on_retry提供服务您可以journalctl –u airflow-worker –f在Dag运行期间进行监视。在生产服务器上,airflow.cfg使用新的smtp设置进行更改后,您无需重新启动airflow-worker,它应该会自动提取。无需担心搞乱当前正在运行的Dags。

这是有关如何使用sendmail的技术文章:

由于我们在本地主机上从ses更改为sendmail,因此必须在中更改smtp设置airflow.cfg

新的配置是:

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
#smtp_user = not used
#smtp_password = not used
smtp_port = 25
smtp_mail_from =  [email protected]

这适用于生产和局部气流实例。

如果它们的配置不像上面的我的配置可能会收到一些常见的错误:

  • socket.error: [Errno 111] Connection refused-您必须将smtp_host更改airflow.cfglocalhost
  • smtplib.SMTPException: STARTTLS extension not supported by server.-您必须改变smtp_starttlsairflow.cfgFalse

在我的本地测试中,我试图简单地强制气流显示尝试发送电子邮件时发生的情况的日志–我创建了一个伪造的dag,如下所示:

# Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

# General imports
from datetime import datetime,timedelta

def throwerror():
    raise ValueError("Failure")

SPARK_V_2_2_1 = '3.5.x-scala2.11'

args = {
    'owner': ‘me’,
    'email': ['me@myjob'],
    'depends_on_past': False,
    'start_date': datetime(2018, 5,24),
    'end_date':datetime(2018,6,28)
}

dag = DAG(
    dag_id='testemaildag',
    default_args=args,
    catchup=False,
    schedule_interval="* 18 * * *"
    )

t1 = DummyOperator(
    task_id='extract_data',
    dag=dag
)

t2 = PythonOperator(
    task_id='fail_task',
    dag=dag,
    python_callable=throwerror
)

t2.set_upstream(t1)

如果您执行journalctl -u airflow-worker -f,您会看到工作人员说它已向DAG中的电子邮件发送了有关失败的警报电子邮件,但我们仍未收到该电子邮件。然后,我们决定通过执行以下操作来查看sendmail的邮件日志cat /var/log/maillog我们看到了这样的日志:

Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: connect from localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: ID: client=localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/cleanup[port]: ID: message-id=<randomMessageID@production-server-ip-range-ec2-instance>
Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: disconnect from localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/qmgr[port]: MESSAGEID: from=<[email protected]>, size=1297, nrcpt=1 (queue active)
Jun  5 14:10:55 production-server-ip-range postfix/smtp[port]: connect to aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Jun  5 14:11:25 production-server-ip-range postfix/smtp[port]: connect to alt1.aspmx.l.google.com[smtp-ip-range]:25: Connection timed out

因此,这可能是最大的“噢”时刻。在这里,我们可以看到smtp服务中实际发生的情况。我们使用telnet确认无法连接到gmail的目标IP范围。

我们确定电子邮件正在尝试发送,但是sendmail服务无法成功连接到ip范围。

我们决定允许AWS上端口25上的所有出站流量(因为我们的气流生产环境是ec2实例),现在它可以成功运行。现在,我们可以接收有关失败和重试的电子邮件(提示:email_on_failure并且email_on_retry默认情况下,如TrueDAG API参考中所述-如果您不想这样做,则无需将其放入args中,但是明确声明状态仍然是一种好习惯正确或错误)。

SES现在可以使用。这是气流配置:

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 587
smtp_mail_from = [email protected] (Verified SES email)

谢谢!

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

1m后脚本损坏,状态失败,使用AWS SES发送电子邮件时PHP-Apache

在DAG中使用boto3时,Apache airflow无法找到AWS凭证

如何在 Airflow 中使用 EmailOperator 发送多封电子邮件

Airflow DAG:任何任务失败时的自定义电子邮件

节点AWS-SDK SES电子邮件发送验证失败

使用Apache Commons电子邮件库以Java发送电子邮件

JavaScript-使用AWS SES发送电子邮件

气流-使用AWS SES发送电子邮件

Apache Airflow仅向列表中的第一人发送sla miss电子邮件

AWS SES SDK发送带有附件的电子邮件

AWS SES发送电子邮件不起作用

AWS MWAA(托管 Apache Airflow);以编程方式启用 DAG

Apache camel aws-ses-无法动态设置电子邮件地址

无法使用Jenkins和SES发送电子邮件

Apache Airflow忽略失败的任务

为什么Apache airflow使用以下命令失败:“ airflow initdb”?

Apache Airflow DAG无法导入本地模块

如何为Apache Airflow DAG定义超时?

具有单个任务的 Apache Airflow DAG

Airflow:如何仅在不是连续失败时发送电子邮件警报?

我可以使用域名下的任何电子邮件通过AWS SES发送电子邮件吗?

使用Gmail在Opencart中电子邮件发送失败

如何使用org.apache.velocity.VelocityContext发送图像的电子邮件

Apache Airflow-即使关键任务失败,DAG也会注册为成功

无法在AWS SES上发送邮件,未验证电子邮件

即使经过验证,AWS SES 电子邮件也始终发送至垃圾邮件

使用 aws ses sdk(Java) 从亚太地区(孟买)发送电子邮件

无法使用Amazon AWS,SES和PostFix发送电子邮件

如何使用Courier将文件附加到随AWS SES发送的电子邮件中?