diff --git a/internal/impl/pure/output_fallback.go b/internal/impl/pure/output_fallback.go index 932745c62..0ed1be66d 100644 --- a/internal/impl/pure/output_fallback.go +++ b/internal/impl/pure/output_fallback.go @@ -165,7 +165,7 @@ func (t *fallbackBroker) loop() { t.shutSig.TriggerHasStopped() }() - for { + for !t.shutSig.IsHardStopSignalled() { var open bool var tran message.Transaction @@ -227,7 +227,13 @@ func (t *fallbackBroker) loop() { return tran.Ack(ctx, err) } + if t.shutSig.IsHardStopSignalled() { + return nil + } + select { + case <-t.shutSig.HardStopChan(): + return nil case t.outputTSChans[i] <- message.NewTransactionFunc(nextBatchFromErr(err), ackFn): case <-ctx.Done(): return ctx.Err()