python - Reversed upstream/downstream relationships when generating multiple tasks in Airflow -


the original code related question can found here.

i'm confused both bitshift operators , set_upstream/set_downstream methods working within task loop i've defined in dag. when main execution loop of dag configured follows:

for uid in dash_workers.get_id_creds():     clear_tables.set_downstream(id_worker(uid)) 

or

for uid in dash_workers.get_id_creds():     clear_tables >> id_worker(uid) 

the graph looks (the alpha-numeric sequence user ids, define task ids):

enter image description here

when configure main execution loop of dag this:

for uid in dash_workers.get_id_creds():     clear_tables.set_upstream(id_worker(uid)) 

or

for uid in dash_workers.get_id_creds():     id_worker(uid) >> clear_tables 

the graph looks this:

enter image description here

the second graph want / have expected first 2 snippets of code have produced based on reading of docs. if want clear_tables execute first before triggering batch of data parsing tasks different user ids should indicate clear_tables >> id_worker(uid)

edit -- here's complete code, has been updated since posted last few questions, reference:

from datetime import datetime import os import sys  airflow.models import dag airflow.operators.python_operator import pythonoperator  import ds_dependencies  script_path = os.getenv('dash_preproc_path') if script_path:     sys.path.insert(0, script_path)     import dash_workers else:     print('define dash_preproc_path value in environmental variables')     sys.exit(1)  env = os.environ  default_args = {   'start_date': datetime.now(), }  dag = dag(   dag_id='dash_preproc',   default_args=default_args )  clear_tables = pythonoperator(   task_id='clear_tables',   python_callable=dash_workers.clear_db,   dag=dag)  def id_worker(uid):     return pythonoperator(         task_id=id,         python_callable=dash_workers.main_preprocess,         op_args=[uid],         dag=dag)  uid in dash_workers.get_id_creds():     preproc_task = id_worker(uid)     clear_tables << preproc_task 

after implementing @ladislavindra's suggestion continue have same reversed implementation of bitshift operator in order correct dependency graph.

update @ashberlin-taylor's answer what's going on here. assumed graph view , tree view doing same thing, they're not. here's id_worker(uid) >> clear_tables looks in graph view:

enter image description here

i don't want final step in data pre-prep routine to delete data tables!

the tree view in airflow "backwards" how (and i!) first thought it. in first screenshot showing "clear_tables" must run before "aaag5608078m2" run task. , dag status depends upon each of id worker tasks. instead of task order, it's tree of status chain. if makes sense @ all.

(this might seem strange @ first, it's because dag can branch out , branch in.)

you might have better luck looking @ graph view dag. 1 has arrows , shows execution order in more intuitive way. (though find tree view useful. it's less clear start with)


Comments

Popular posts from this blog

php - Vagrant up error - Uncaught Reflection Exception: Class DOMDocument does not exist -

vue.js - Create hooks for automated testing -

Add new key value to json node in java -