Skip to content

Commit

Permalink
Invalidate hung tasks on restart
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Feb 14, 2018
1 parent f13f116 commit ccf920c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
1 change: 1 addition & 0 deletions datapackage_pipelines/celery_tasks/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ps = status.get(spec.pipeline_id)
ex = ps.get_last_execution()
if ex is not None and not ex.finish_time:
ex.invalidate()
ex.finish_execution(False, {}, ['Cancelled'])

kw = dict(CELERYBEAT_SCHEDULE=CELERY_SCHEDULE)
Expand Down
14 changes: 10 additions & 4 deletions datapackage_pipelines/celery_tasks/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,13 @@ def update_pipelines(action, completed_pipeline_id, completed_trigger):
spec.validation_errors,
spec.cache_hash)

if action == 'update':
if action == 'init':
psle = ps.get_last_execution()
if psle is not None and not psle.finish_time:
psle.invalidate()
psle.finish_execution(False, {}, ['Cancelled'])

elif action == 'update':
if spec.pipeline_id not in status_all_pipeline_ids:
dm.update(spec)
logging.info("NEW Pipeline: %s", spec)
Expand All @@ -108,19 +114,19 @@ def update_pipelines(action, completed_pipeline_id, completed_trigger):
spec.pipeline_id, len(spec.validation_errors),
os.path.basename(completed_pipeline_id),
completed_trigger)
ps.save()
else:
continue

elif action == 'scheduled':
if completed_pipeline_id != spec.pipeline_id:
continue

psle = ps.get_last_execution()
last_successful = psle.success is True if psle is not None else False
last_successful = ps.state() == 'SUCCEEDED'
if ps.runnable() and \
(ps.dirty() or
(completed_trigger == 'scheduled') or
(action == 'init' and not last_successful)):
not last_successful):
queued = queue_pipeline(ps, spec,
'dirty-task-%s' % action
if completed_trigger is None
Expand Down

0 comments on commit ccf920c

Please sign in to comment.