Estou tentando executar uma amostra do DAG do Airflow. O status do DAG é bem-sucedido, mas as tarefas não estão em execução. Alguém pode me ajudar a entender por quê?
Eu tentei o seguinte:
Aqui está o meu código:
import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
print('test1')
def greet():
print('Writing in file')
print('testing the dag')
with open('/Users/abc/Documents/airflow_workspace/greet.txt', 'a+', encoding='utf8') as f:
now = dt.datetime.now()
t = now.strftime("%Y-%m-%d %H:%M")
f.write(str(t) + '\n')
return 'Greeted'
def respond():
return 'Greet Responded Again'
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2019, 3, 12, 10, 00, 00),
'concurrency': 1,
'retries': 0
}
with DAG('my_simple_dag',
default_args=default_args,
schedule_interval='*/10 * * * *',
) as dag:
opr_hello = BashOperator(task_id='say_Hi',
bash_command='echo "Hi!!"')
opr_greet = PythonOperator(task_id='greet',
python_callable=greet)
opr_sleep = BashOperator(task_id='sleep_me',
bash_command='sleep 5')
opr_respond = PythonOperator(task_id='respond',
python_callable=respond)
print('test2')
opr_hello >> opr_greet >> opr_sleep >> opr_respond
Altere o nome dag_id
de my_different_dag
para diferente; na verdade, ele criará um novo DAG.
Então, supondo que você configurou o fluxo de ar corretamente, ele deve funcionar agora.
Este artigo é coletado da Internet.
Se houver alguma infração, entre em [email protected] Delete.
deixe-me dizer algumas palavras