Airflow 版本:2.0.2 尝试通过从 AWS Secrets Manager 重试数据来创建 Emr 集群。
我正在尝试编写气流 dag,我的任务是从此 get_secret 函数中获取数据并在 Spark_steps 中使用它
def get_secret():
secret_name = Variable.get("secret_name")
region_name = Variable.get(region_name)
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(service_name='secretsmanager', region_name=region_name)
account_id = boto3.client('sts').get_caller_identity().get('Account')
try:
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
if 'SecretString' in get_secret_value_response:
secret_str = get_secret_value_response['SecretString']
secret=json.loads(secret_str)
airflow_path=secret["airflow_path"]
return airflow_path
...我需要在 spark_steps Spark_steps 下面使用“airflow_path”返回值:
SPARK_STEPS = [
{
'Name': 'Spark-Submit Command',
"ActionOnFailure": "CONTINUE",
'HadoopJarStep': {
"Jar": "command-runner.jar",
"Args": [
'spark-submit',
'--py-files',
's3://'+airflow_path+'-pyspark/pitchbook/config.zip,s3://'+airflow_path+'-pyspark/pitchbook/jobs.zip,s3://'+airflow_path+'-pyspark/pitchbook/DDL.zip',
's3://'+airflow_path+'-pyspark/pitchbook/main.py'
],
},
},
我在网上看到我需要使用Xcom,是这样吗?,我需要先在python运算符中运行这个函数然后获取值。请提供一个例子,因为我是新手。
谢谢你的帮助。习
是的,如果你想传递动态的东西,利用 xcom 推/拉可能会更容易。
利用PythonOperator将数据推送到 xcom。
参见参考实现:
https ://github.com/apache/airflow/blob/7fed7f31c3a895c0df08228541f955efb16fbf79/airflow/providers/amazon/aws/example_dags/example_emr.py
https://github.com/apache/airflow/blob/7fed7f31c3a895c0df08228541f955efbf796/ /providers/amazon/aws/example_dags/example_emr.py#L108
https://www.startdataengineering.com/post/how-to-submit-spark-jobs-to-emr-cluster-from-airflow/
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句