@@ -38,19 +38,33 @@ class Process(dj.Part):
3838 """
3939
4040 @classmethod
41- def get_workers_progress (cls ):
41+ def get_workers_progress (cls , worker_name = None , process_name = None ):
4242 """
4343 Return the operation progress for all registered workers (jobs status for each AutoPopulate process)
44- :return: pandas DataFrame of workers jobs status
44+
45+ Args:
46+ worker_name (str): name of the worker (optionally restrict by worker_name)
47+ process_name (str): name of the process (optionally restrict by process_name)
48+
49+ Returns:
50+ pandas DataFrame of workers jobs status
4551 """
52+ restriction = {}
53+ if worker_name :
54+ restriction ["worker_name" ] = worker_name
55+ if process_name :
56+ restriction ["process" ] = process_name
57+
4658 workflow_status = (
47- (cls .Process & "key_source_sql is not NULL" )
48- .proj (
49- "process" ,
50- "key_source_sql" ,
51- table_name = "full_table_name" ,
52- total = "NULL" ,
53- incomplete = "NULL" ,
59+ (
60+ (cls .Process & "key_source_sql is not NULL" ).proj (
61+ "process" ,
62+ "key_source_sql" ,
63+ table_name = "full_table_name" ,
64+ total = "NULL" ,
65+ incomplete = "NULL" ,
66+ )
67+ & restriction
5468 )
5569 .fetch (format = "frame" )
5670 .reset_index ()
@@ -96,7 +110,7 @@ def get_workers_progress(cls):
96110 (
97111 workflow_status .loc [r_idx , "total" ],
98112 workflow_status .loc [r_idx , "incomplete" ],
99- ) = cls ._get_key_source_count (r .key_source_sql , r .table_name )
113+ ) = cls .get_key_source_count (r .key_source_sql , r .table_name )
100114
101115 # merge key_source and jobs status
102116 workflow_status .set_index ("table_name" , inplace = True )
@@ -124,7 +138,10 @@ def get_workers_progress(cls):
124138 return workflow_status
125139
126140 @classmethod
127- def _get_key_source_count (cls , key_source_sql , target_full_table_name ):
141+ def get_incomplete_key_source_sql (cls , key_source_sql , target_full_table_name ):
142+ """
143+ From `key_source_sql`, build a SQL statement to find incomplete key_source entries in the target table
144+ """
128145 def _rename_attributes (table , props ):
129146 return (
130147 table .proj (
@@ -169,6 +186,14 @@ def _remove_enclosed_parentheses(input_string):
169186 key_source_sql
170187 + f"{ AND_or_WHERE } (({ ks_attrs_sql } ) not in (SELECT { ks_attrs_sql } FROM { target .full_table_name } ))"
171188 )
189+ return incomplete_sql
190+
191+ @classmethod
192+ def get_key_source_count (cls , key_source_sql , target_full_table_name ):
193+ """
194+ From `key_source_sql`, count the total and incomplete key_source entries in the target table
195+ """
196+ incomplete_sql = cls .get_incomplete_key_source_sql (key_source_sql , target_full_table_name )
172197 try :
173198 total = len (dj .conn ().query (key_source_sql ).fetchall ())
174199 incomplete = len (dj .conn ().query (incomplete_sql ).fetchall ())
0 commit comments