如何解析气流模板中的json字符串

攻击

是否可以在气流模板中解析JSON字符串?

我有一个HttpSensor,它通过REST API监视作业,但作业ID在已xcom_push标记为的上游任务的响应中True

我想做类似下面的事情,但是,这段代码给出了错误 jinja2.exceptions.UndefinedError: 'json' is undefined

t1 = SimpleHttpOperator(
    http_conn_id="s1",
    task_id="job",
    endpoint="some_url",
    method='POST',
    data=json.dumps({ "foo": "bar" }),
    xcom_push=True,
    dag=dag,
)

t2 = HttpSensor(
    http_conn_id="s1",
    task_id="finish_job",
    endpoint="job/{{ json.loads(ti.xcom_pull(\"job\")).jobId }}",
    response_check=lambda response: True if response.json().state == "complete" else False,
    poke_interval=5,
    dag=dag
)

t2.set_upstream(t1)
黄Huang

您可以使用参数user_defined_filters自定义的Jinja过滤器添加到DAG,以解析json。

将在您的jinja模板中公开的过滤器字典。例如,传递dict(hello=lambda name: 'Hello %s' % name)给此参数可让您进入{{ 'world' | hello }}与此DAG相关的所有Jinja模板。

dag = DAG(
    ...
    user_defined_filters={'fromjson': lambda s: json.loads(s)},
)

t1 = SimpleHttpOperator(
    task_id='job',
    xcom_push=True,
    ...
)

t2 = HttpSensor(
    endpoint='job/{{ (ti.xcom_pull("job") | fromjson)["jobId"] }}',
    ...
)

但是,返回之前编写自己的自定义JsonHttpOperator 插件(或向添加一个标志SimpleHttpOperator)来解析JSON可能会更干净一些,以便您可以直接在模板中进行引用{{ti.xcom_pull("job")["jobId"]

class JsonHttpOperator(SimpleHttpOperator):

    def execute(self, context):
        text = super(JsonHttpOperator, self).execute(context)
        return json.loads(text)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章