@@ -1654,8 +1654,14 @@ def _set_outputs(self, point, taskdef, outputs, flow_nums, flow_wait):
1654
1654
1655
1655
# Try to spawn children of the outputs.
1656
1656
for msg in good :
1657
+ if msg == TASK_OUTPUT_EXPIRED :
1658
+ # not caused by task messages
1659
+ self ._expire_task (itask )
1660
+ self .spawn_on_output (itask , expired ) ## TODO CONTINUE FROM pr 5412
1661
+ else :
1662
+ self .task_events_mgr .process_message (itask , logging .INFO , msg )
1663
+ # TODO remove this - just log the actual spawning events
1657
1664
LOG .info (f"[{ itask } ] Forced spawning on { msg } " )
1658
- self .task_events_mgr .process_message (itask , logging .INFO , msg )
1659
1665
1660
1666
def _set_prereqs (self , point , taskdef , prereqs , flow_nums , flow_wait ):
1661
1667
"""Set given prerequisites of a target task.
@@ -1897,18 +1903,19 @@ def _set_expired_task(self, itask):
1897
1903
itask .get_point_as_seconds () +
1898
1904
itask .get_offset_as_seconds (itask .tdef .expiration_offset ))
1899
1905
if time () > itask .expire_time :
1900
- msg = 'Task expired (skipping job).'
1901
- LOG .warning (f"[{ itask } ] { msg } " )
1902
- self .task_events_mgr .setup_event_handlers (itask , "expired" , msg )
1903
- # TODO succeeded and expired states are useless due to immediate
1904
- # removal under all circumstances (unhandled failed is still used).
1905
- if itask .state_reset (TASK_STATUS_EXPIRED , is_held = False ):
1906
- self .data_store_mgr .delta_task_state (itask )
1907
- self .data_store_mgr .delta_task_held (itask )
1908
- self .remove (itask , 'expired' )
1906
+ self ._expire_task (itask )
1909
1907
return True
1910
1908
return False
1911
1909
1910
+ def _expire_task (self , itask ):
1911
+ msg = 'Task expired (skipping job).'
1912
+ LOG .warning (f"[{ itask } ] { msg } " )
1913
+ self .task_events_mgr .setup_event_handlers (itask , "expired" , msg )
1914
+ if itask .state_reset (TASK_STATUS_EXPIRED , is_held = False ):
1915
+ self .data_store_mgr .delta_task_state (itask )
1916
+ self .data_store_mgr .delta_task_held (itask )
1917
+ self .remove (itask , 'expired' )
1918
+
1912
1919
def task_succeeded (self , id_ ):
1913
1920
"""Return True if task with id_ is in the succeeded state."""
1914
1921
return any (
0 commit comments