python - Reversed upstream/downstream relationships when generating multiple tasks in Airflow -
the original code related question can found here.
i'm confused both bitshift operators , set_upstream
/set_downstream
methods working within task loop i've defined in dag. when main execution loop of dag configured follows:
for uid in dash_workers.get_id_creds(): clear_tables.set_downstream(id_worker(uid))
or
for uid in dash_workers.get_id_creds(): clear_tables >> id_worker(uid)
the graph looks (the alpha-numeric sequence user ids, define task ids):
when configure main execution loop of dag this:
for uid in dash_workers.get_id_creds(): clear_tables.set_upstream(id_worker(uid))
or
for uid in dash_workers.get_id_creds(): id_worker(uid) >> clear_tables
the graph looks this:
the second graph want / have expected first 2 snippets of code have produced based on reading of docs. if want clear_tables
execute first before triggering batch of data parsing tasks different user ids should indicate clear_tables >> id_worker(uid)
edit -- here's complete code, has been updated since posted last few questions, reference:
from datetime import datetime import os import sys airflow.models import dag airflow.operators.python_operator import pythonoperator import ds_dependencies script_path = os.getenv('dash_preproc_path') if script_path: sys.path.insert(0, script_path) import dash_workers else: print('define dash_preproc_path value in environmental variables') sys.exit(1) env = os.environ default_args = { 'start_date': datetime.now(), } dag = dag( dag_id='dash_preproc', default_args=default_args ) clear_tables = pythonoperator( task_id='clear_tables', python_callable=dash_workers.clear_db, dag=dag) def id_worker(uid): return pythonoperator( task_id=id, python_callable=dash_workers.main_preprocess, op_args=[uid], dag=dag) uid in dash_workers.get_id_creds(): preproc_task = id_worker(uid) clear_tables << preproc_task
after implementing @ladislavindra's suggestion continue have same reversed implementation of bitshift operator in order correct dependency graph.
update @ashberlin-taylor's answer what's going on here. assumed graph view , tree view doing same thing, they're not. here's id_worker(uid) >> clear_tables
looks in graph view:
i don't want final step in data pre-prep routine to delete data tables!
the tree view in airflow "backwards" how (and i!) first thought it. in first screenshot showing "clear_tables" must run before "aaag5608078m2" run task. , dag status depends upon each of id worker tasks. instead of task order, it's tree of status chain. if makes sense @ all.
(this might seem strange @ first, it's because dag can branch out , branch in.)
you might have better luck looking @ graph view dag. 1 has arrows , shows execution order in more intuitive way. (though find tree view useful. it's less clear start with)
Comments
Post a Comment