|
| 1 | +DO $$ |
| 2 | +BEGIN |
| 3 | + IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'lantern' AND table_name = 'tasks') THEN |
| 4 | + ALTER TABLE lantern.tasks ADD COLUMN pg_cron_jobid bigint DEFAULT NULL; |
| 5 | + END IF; |
| 6 | +END $$; |
| 7 | +CREATE OR REPLACE FUNCTION _lantern_internal.async_task_finalizer_trigger() RETURNS TRIGGER AS $$ |
| 8 | +DECLARE |
| 9 | + res RECORD; |
| 10 | +BEGIN |
| 11 | + -- if NEW.status is one of "starting", "running", "sending, "connecting", return |
| 12 | + IF NEW.status IN ('starting', 'running', 'sending', 'connecting') THEN |
| 13 | + RETURN NEW; |
| 14 | + END IF; |
| 15 | + |
| 16 | + IF NEW.status NOT IN ('succeeded', 'failed') THEN |
| 17 | + RAISE WARNING 'Lantern Async tasks: Unexpected status %', NEW.status; |
| 18 | + END IF; |
| 19 | + |
| 20 | + -- Update pg_cron_jobid on lantern.tasks table before the job is unscheduled |
| 21 | + -- This is necessary because under some circumstances jobs continue changing status even after they no longer |
| 22 | + -- appear in cron.job. The easiest way to trigger this case is to schedule a multi-statement job |
| 23 | + -- where the second statement causes a failure, e.g. async_task('select 1; select haha;') |
| 24 | + UPDATE lantern.tasks t SET |
| 25 | + pg_cron_jobid = c.jobid |
| 26 | + FROM cron.job c |
| 27 | + LEFT JOIN cron.job_run_details run |
| 28 | + ON c.jobid = run.jobid |
| 29 | + WHERE |
| 30 | + t.pg_cron_job_name = c.jobname AND |
| 31 | + c.jobid = NEW.jobid |
| 32 | + -- using returning as a trick to run the unschedule function as a side effect |
| 33 | + -- Note: have to unschedule by jobid because of pg_cron#320 https://github.com/citusdata/pg_cron/issues/320 |
| 34 | + -- Note2: unscheduling happens here since the update below may run multiple times for the same async task |
| 35 | + -- and unscheduling same job multiple times is not allowed |
| 36 | + -- At least experimentally so far, this update runs once per async task |
| 37 | + RETURNING cron.unschedule(NEW.jobid) INTO res; |
| 38 | + |
| 39 | + -- Get the job name from the jobid |
| 40 | + -- Call the job finalizer if corresponding job exists BOTH in lantern async tasks AND |
| 41 | + -- active cron jobs |
| 42 | + UPDATE lantern.tasks t SET |
| 43 | + (duration, status, error_message) = (run.end_time - t.started_at, NEW.status, |
| 44 | + CASE WHEN NEW.status = 'failed' THEN return_message ELSE NULL END) |
| 45 | + FROM cron.job_run_details run |
| 46 | + WHERE |
| 47 | + t.pg_cron_jobid = NEW.jobid |
| 48 | + AND t.pg_cron_jobid = run.jobid; |
| 49 | + |
| 50 | + RETURN NEW; |
| 51 | + |
| 52 | +EXCEPTION |
| 53 | + WHEN OTHERS THEN |
| 54 | + RAISE WARNING 'Lantern Async tasks: Unknown job failure in % % %', NEW, SQLERRM, SQLSTATE; |
| 55 | + PERFORM cron.unschedule(NEW.jobid); |
| 56 | + RETURN NEW; |
| 57 | +END |
| 58 | +$$ LANGUAGE plpgsql; |
| 59 | + |
0 commit comments