如何从 python 函数中检索数据并在 emr 运算符中使用它

西12

Airflow 版本:2.0.2 尝试通过从 AWS Secrets Manager 重试数据来创建 Emr 集群。

我正在尝试编写气流 dag,我的任务是从此 get_secret 函数中获取数据并在 Spark_steps 中使用它

def get_secret():
    secret_name =  Variable.get("secret_name")
    region_name = Variable.get(region_name)
    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager', region_name=region_name)
    account_id = boto3.client('sts').get_caller_identity().get('Account')
    
    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
        if 'SecretString' in get_secret_value_response:
            secret_str = get_secret_value_response['SecretString']
            secret=json.loads(secret_str)            
            airflow_path=secret["airflow_path"]
            return  airflow_path 

...我需要在 spark_steps Spark_steps 下面使用“airflow_path”返回值:

SPARK_STEPS = [

    {
        'Name': 'Spark-Submit Command',
        "ActionOnFailure": "CONTINUE",
        'HadoopJarStep': {
            "Jar": "command-runner.jar",
            "Args": [
                'spark-submit',
                '--py-files',
                's3://'+airflow_path+'-pyspark/pitchbook/config.zip,s3://'+airflow_path+'-pyspark/pitchbook/jobs.zip,s3://'+airflow_path+'-pyspark/pitchbook/DDL.zip',
                's3://'+airflow_path+'-pyspark/pitchbook/main.py'
                    ],
                        },
    },

我在网上看到我需要使用Xcom,是这样吗?,我需要先在python运算符中运行这个函数然后获取值。请提供一个例子,因为我是新手。

谢谢你的帮助。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何在其他运算符中使用 Python 运算符中声明的变量?

如何在python中使用或运算符从csv中的列中选择多个值

如何使用运算符在Python中编写函数?

如何在 Python Selenium 中使用 css 或运算符

Python:如何在all()中使用运算符?

如何在python中使用逻辑运算符

如何在气流中创建自定义运算符并在通过Cloud Composer运行的气流模板中使用它们(在Google Cloud Platform中)

在python中使用相同的运算符添加lambda函数

在python lambda函数中使用OR运算符

int()对象如何在Python2中使用不带__eq __()方法的“ ==”运算符?

在Python 3.7中使用walrus运算符

如何使用运算符/内置函数检查 Python 列表中是否存在多个元素?

如何在子集函数中使用运算符删除R中的记录?

如何在 JavaScript 中使用三元运算符更改函数中的 if else 语句?

如何在 C++ 中的类中使用运算符重载函数?

如何从异步存储中检索数据并在组件中使用它?

如何在数据库查询中的Laravel中使用按位运算符

Python让我在自定义类中重写运算符。但是我可以重新定义运算符如何使用基本数据类型吗?

在Python中使用除法运算符时,如何获取十进制值?

逻辑`和`运算符如何在Python中使用整数?

如何在Python中使用Walrus运算符时检查是否按下Enter键?

与整数相比,PHP如何在Python中使用逻辑运算符?

如何在 Python 正则表达式中使用 OR 运算符?

您如何在python 3.x中使用或运算符?

如何使用Airflow在Python函数内部触发运算符?

从Airflow(使用airflow Livy运算符)向Livy(在EMR中)提交Spark Job

C ++-重载结构取消引用运算符,并在unique_ptr中使用它

使用Oracle SQL Developer时如何在表数据中使用IN运算符?

在Python中,in运算符如何实现工作?它是否使用迭代器的next()方法?