From 66b1061e29ddbe1f62aa2f2f93adc8af4b692a5b Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Tue, 11 Feb 2025 12:16:27 +0000 Subject: [PATCH 1/2] check if hard shutdown has started before sending to channels in fallback output Signed-off-by: Jem Davies --- internal/impl/pure/output_fallback.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/impl/pure/output_fallback.go b/internal/impl/pure/output_fallback.go index 932745c62..1fd911bf0 100644 --- a/internal/impl/pure/output_fallback.go +++ b/internal/impl/pure/output_fallback.go @@ -227,6 +227,10 @@ func (t *fallbackBroker) loop() { return tran.Ack(ctx, err) } + if t.shutSig.IsHardStopSignalled() { + return errors.New("fallback: hard shutdown signalled, cannot send to closed channels") + } + select { case t.outputTSChans[i] <- message.NewTransactionFunc(nextBatchFromErr(err), ackFn): case <-ctx.Done(): From 315a1de2d293e6adc93b8117e848a1ee38786b61 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Tue, 11 Feb 2025 13:00:32 +0000 Subject: [PATCH 2/2] add more hardstop handling to fallback output Signed-off-by: Jem Davies --- internal/impl/pure/output_fallback.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/impl/pure/output_fallback.go b/internal/impl/pure/output_fallback.go index 1fd911bf0..0ed1be66d 100644 --- a/internal/impl/pure/output_fallback.go +++ b/internal/impl/pure/output_fallback.go @@ -165,7 +165,7 @@ func (t *fallbackBroker) loop() { t.shutSig.TriggerHasStopped() }() - for { + for !t.shutSig.IsHardStopSignalled() { var open bool var tran message.Transaction @@ -228,10 +228,12 @@ func (t *fallbackBroker) loop() { } if t.shutSig.IsHardStopSignalled() { - return errors.New("fallback: hard shutdown signalled, cannot send to closed channels") + return nil } select { + case <-t.shutSig.HardStopChan(): + return nil case t.outputTSChans[i] <- message.NewTransactionFunc(nextBatchFromErr(err), ackFn): case <-ctx.Done(): return ctx.Err()