From f3c4317b5471605846e8490ec5eaf6068e61b131 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Thu, 3 Oct 2024 17:55:22 +0300 Subject: [PATCH 1/6] Fix http output websocket disconnects --- internal/impl/io/output_http_server.go | 145 ++++++++++++++++--------- 1 file changed, 96 insertions(+), 49 deletions(-) diff --git a/internal/impl/io/output_http_server.go b/internal/impl/io/output_http_server.go index 8639bea51..fe0761bdf 100644 --- a/internal/impl/io/output_http_server.go +++ b/internal/impl/io/output_http_server.go @@ -389,57 +389,104 @@ func (h *httpServerOutput) streamHandler(w http.ResponseWriter, r *http.Request) } func (h *httpServerOutput) wsHandler(w http.ResponseWriter, r *http.Request) { - var err error - defer func() { - if err != nil { - http.Error(w, "Bad request", http.StatusBadRequest) - h.log.Warn("Websocket request failed: %v\n", err) - return - } - }() - - upgrader := websocket.Upgrader{} - - var ws *websocket.Conn - if ws, err = upgrader.Upgrade(w, r, nil); err != nil { - return - } - defer ws.Close() - - ctx, done := h.shutSig.SoftStopCtx(r.Context()) - defer done() - - for !h.shutSig.IsSoftStopSignalled() { - var ts message.Transaction - var open bool - - select { - case ts, open = <-h.transactions: - if !open { - go h.TriggerCloseNow() - return - } - case <-r.Context().Done(): - return - case <-h.shutSig.SoftStopChan(): - return - } - - var werr error - for _, msg := range message.GetAllBytes(ts.Payload) { - if werr = ws.WriteMessage(websocket.BinaryMessage, msg); werr != nil { - break - } - h.mWSBatchSent.Incr(1) - h.mWSSent.Incr(int64(batch.MessageCollapsedCount(ts.Payload))) - } - if werr != nil { - h.mWSError.Incr(1) - } - _ = ts.Ack(ctx, werr) - } + var err error + defer func() { + if err != nil { + http.Error(w, "Bad request", http.StatusBadRequest) + h.log.Warn("WebSocket request failed: %v", err) + return + } + }() + + upgrader := websocket.Upgrader{} + + // Upgrade the HTTP connection to a WebSocket connection + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + h.log.Warn("WebSocket upgrade failed: %v", err) + return + } + defer ws.Close() + + // Set up ping/pong handlers and deadlines + const ( + writeWait = 10 * time.Second + pongWait = 60 * time.Second + pingPeriod = (pongWait * 9) / 10 + ) + + ws.SetReadLimit(512) + ws.SetReadDeadline(time.Now().Add(pongWait)) + ws.SetPongHandler(func(string) error { + ws.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + // Start a goroutine to read messages (to process control frames) + done := make(chan struct{}) + go func() { + defer close(done) + for { + _, _, err := ws.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + h.log.Warn("WebSocket read error: %v", err) + } + break + } + } + }() + + // Start ticker to send ping messages to the client periodically + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + + ctx, doneCtx := h.shutSig.SoftStopCtx(r.Context()) + defer doneCtx() + + for !h.shutSig.IsSoftStopSignalled() { + select { + case ts, open := <-h.transactions: + if !open { + // If the transactions channel is closed, trigger server shutdown + go h.TriggerCloseNow() + return + } + // Write messages to the client + var writeErr error + for _, msg := range message.GetAllBytes(ts.Payload) { + ws.SetWriteDeadline(time.Now().Add(writeWait)) + if writeErr = ws.WriteMessage(websocket.BinaryMessage, msg); writeErr != nil { + break + } + h.mWSBatchSent.Incr(1) + h.mWSSent.Incr(int64(batch.MessageCollapsedCount(ts.Payload))) + } + if writeErr != nil { + h.mWSError.Incr(1) + _ = ts.Ack(ctx, writeErr) + return // Exit the loop on write error + } + _ = ts.Ack(ctx, nil) + case <-ticker.C: + // Send a ping message to the client + ws.SetWriteDeadline(time.Now().Add(writeWait)) + if err := ws.WriteMessage(websocket.PingMessage, nil); err != nil { + h.log.Warn("WebSocket ping error: %v", err) + return + } + case <-done: + // The read goroutine has exited, indicating the client has disconnected + h.log.Debug("WebSocket client disconnected") + return + case <-ctx.Done(): + // The context has been canceled (e.g., server is shutting down) + return + } + } } + func (h *httpServerOutput) Consume(ts <-chan message.Transaction) error { if h.transactions != nil { return component.ErrAlreadyStarted From ecf8622c2b8219fa13979e520ee7b09cdd8efdd5 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Thu, 3 Oct 2024 20:47:47 +0300 Subject: [PATCH 2/6] Update output_http_server.go --- internal/impl/io/output_http_server.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/internal/impl/io/output_http_server.go b/internal/impl/io/output_http_server.go index fe0761bdf..467ab3451 100644 --- a/internal/impl/io/output_http_server.go +++ b/internal/impl/io/output_http_server.go @@ -416,10 +416,12 @@ func (h *httpServerOutput) wsHandler(w http.ResponseWriter, r *http.Request) { ) ws.SetReadLimit(512) - ws.SetReadDeadline(time.Now().Add(pongWait)) + if err := ws.SetReadDeadline(time.Now().Add(pongWait)); err != nil { + h.log.Warn("Failed to set read deadline: %v", err) + return + } ws.SetPongHandler(func(string) error { - ws.SetReadDeadline(time.Now().Add(pongWait)) - return nil + return ws.SetReadDeadline(time.Now().Add(pongWait)) }) // Start a goroutine to read messages (to process control frames) @@ -455,7 +457,10 @@ func (h *httpServerOutput) wsHandler(w http.ResponseWriter, r *http.Request) { // Write messages to the client var writeErr error for _, msg := range message.GetAllBytes(ts.Payload) { - ws.SetWriteDeadline(time.Now().Add(writeWait)) + if err := ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { + writeErr = err + break + } if writeErr = ws.WriteMessage(websocket.BinaryMessage, msg); writeErr != nil { break } @@ -470,7 +475,10 @@ func (h *httpServerOutput) wsHandler(w http.ResponseWriter, r *http.Request) { _ = ts.Ack(ctx, nil) case <-ticker.C: // Send a ping message to the client - ws.SetWriteDeadline(time.Now().Add(writeWait)) + if err := ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { + h.log.Warn("Failed to set write deadline for ping: %v", err) + return + } if err := ws.WriteMessage(websocket.PingMessage, nil); err != nil { h.log.Warn("WebSocket ping error: %v", err) return From d784b23e409a5fc02abb3992d60fc3c609c0f6e3 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Mon, 4 Nov 2024 20:48:10 +0300 Subject: [PATCH 3/6] Fix linter --- internal/impl/io/output_http_server.go | 222 +++++++++++++------------ 1 file changed, 118 insertions(+), 104 deletions(-) diff --git a/internal/impl/io/output_http_server.go b/internal/impl/io/output_http_server.go index 467ab3451..16601b803 100644 --- a/internal/impl/io/output_http_server.go +++ b/internal/impl/io/output_http_server.go @@ -42,6 +42,9 @@ const ( hsoFieldCORS = "cors" hsoFieldCORSEnabled = "enabled" hsoFieldCORSAllowedOrigins = "allowed_origins" + hsoFieldWriteWait = "write_wait" + hsoFieldPongWait = "pong_wait" + hsoFieldPingPeriod = "ping_period" ) type hsoConfig struct { @@ -54,6 +57,9 @@ type hsoConfig struct { CertFile string KeyFile string CORS httpserver.CORSConfig + WriteWait time.Duration + PongWait time.Duration + PingPeriod time.Duration } func hsoConfigFromParsed(pConf *service.ParsedConfig) (conf hsoConfig, err error) { @@ -95,6 +101,15 @@ func hsoConfigFromParsed(pConf *service.ParsedConfig) (conf hsoConfig, err error if conf.CORS, err = corsConfigFromParsed(pConf.Namespace(hsoFieldCORS)); err != nil { return } + if conf.WriteWait, err = pConf.FieldDuration(hsoFieldWriteWait); err != nil { + return + } + if conf.PongWait, err = pConf.FieldDuration(hsoFieldPongWait); err != nil { + return + } + if conf.PingPeriod, err = pConf.FieldDuration(hsoFieldPingPeriod); err != nil { + return + } return } @@ -145,6 +160,18 @@ Please note, messages are considered delivered as soon as the data is written to Advanced(). Default(""), service.NewInternalField(corsSpec), + service.NewDurationField(hsoFieldWriteWait). + Description("The time allowed to write a message to the websocket."). + Default("10s"). + Advanced(), + service.NewDurationField(hsoFieldPongWait). + Description("The time allowed to read the next pong message from the client."). + Default("60s"). + Advanced(), + service.NewDurationField(hsoFieldPingPeriod). + Description("Send pings to client with this period. Must be less than pong wait."). + Default("54s"). + Advanced(), ) } @@ -389,111 +416,98 @@ func (h *httpServerOutput) streamHandler(w http.ResponseWriter, r *http.Request) } func (h *httpServerOutput) wsHandler(w http.ResponseWriter, r *http.Request) { - var err error - defer func() { - if err != nil { - http.Error(w, "Bad request", http.StatusBadRequest) - h.log.Warn("WebSocket request failed: %v", err) - return - } - }() - - upgrader := websocket.Upgrader{} - - // Upgrade the HTTP connection to a WebSocket connection - ws, err := upgrader.Upgrade(w, r, nil) - if err != nil { - h.log.Warn("WebSocket upgrade failed: %v", err) - return - } - defer ws.Close() - - // Set up ping/pong handlers and deadlines - const ( - writeWait = 10 * time.Second - pongWait = 60 * time.Second - pingPeriod = (pongWait * 9) / 10 - ) - - ws.SetReadLimit(512) - if err := ws.SetReadDeadline(time.Now().Add(pongWait)); err != nil { - h.log.Warn("Failed to set read deadline: %v", err) - return - } - ws.SetPongHandler(func(string) error { - return ws.SetReadDeadline(time.Now().Add(pongWait)) - }) - - // Start a goroutine to read messages (to process control frames) - done := make(chan struct{}) - go func() { - defer close(done) - for { - _, _, err := ws.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - h.log.Warn("WebSocket read error: %v", err) - } - break - } - } - }() - - // Start ticker to send ping messages to the client periodically - ticker := time.NewTicker(pingPeriod) - defer ticker.Stop() - - ctx, doneCtx := h.shutSig.SoftStopCtx(r.Context()) - defer doneCtx() - - for !h.shutSig.IsSoftStopSignalled() { - select { - case ts, open := <-h.transactions: - if !open { - // If the transactions channel is closed, trigger server shutdown - go h.TriggerCloseNow() - return - } - // Write messages to the client - var writeErr error - for _, msg := range message.GetAllBytes(ts.Payload) { - if err := ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { - writeErr = err - break - } - if writeErr = ws.WriteMessage(websocket.BinaryMessage, msg); writeErr != nil { - break - } - h.mWSBatchSent.Incr(1) - h.mWSSent.Incr(int64(batch.MessageCollapsedCount(ts.Payload))) - } - if writeErr != nil { - h.mWSError.Incr(1) - _ = ts.Ack(ctx, writeErr) - return // Exit the loop on write error - } - _ = ts.Ack(ctx, nil) - case <-ticker.C: - // Send a ping message to the client - if err := ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { - h.log.Warn("Failed to set write deadline for ping: %v", err) - return - } - if err := ws.WriteMessage(websocket.PingMessage, nil); err != nil { - h.log.Warn("WebSocket ping error: %v", err) - return - } - case <-done: - // The read goroutine has exited, indicating the client has disconnected - h.log.Debug("WebSocket client disconnected") - return - case <-ctx.Done(): - // The context has been canceled (e.g., server is shutting down) - return - } - } -} + var err error + defer func() { + if err != nil { + http.Error(w, "Bad request", http.StatusBadRequest) + h.log.Warn("WebSocket request failed: %v", err) + return + } + }() + + upgrader := websocket.Upgrader{} + + // Upgrade the HTTP connection to a WebSocket connection + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + h.log.Warn("WebSocket upgrade failed: %v", err) + return + } + defer ws.Close() + + ws.SetReadLimit(512) + if err := ws.SetReadDeadline(time.Now().Add(h.conf.PongWait)); err != nil { + h.log.Warn("Failed to set read deadline: %v", err) + return + } + + ws.SetPongHandler(func(string) error { + return ws.SetReadDeadline(time.Now().Add(h.conf.PongWait)) + }) + + // Start a goroutine to read messages (to process control frames) + done := make(chan struct{}) + go func() { + defer close(done) + for { + _, _, err := ws.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + h.log.Warn("WebSocket read error: %v", err) + } + break + } + } + }() + + // Start ticker to send ping messages to the client periodically + ticker := time.NewTicker(h.conf.PingPeriod) + defer ticker.Stop() + ctx, doneCtx := h.shutSig.SoftStopCtx(r.Context()) + defer doneCtx() + + for !h.shutSig.IsSoftStopSignalled() { + select { + case ts, open := <-h.transactions: + if !open { + // If the transactions channel is closed, trigger server shutdown + go h.TriggerCloseNow() + return + } + // Write messages to the client + var writeErr error + for _, msg := range message.GetAllBytes(ts.Payload) { + ws.SetWriteDeadline(time.Now().Add(h.conf.WriteWait)) + if writeErr = ws.WriteMessage(websocket.BinaryMessage, msg); writeErr != nil { + break + } + h.mWSBatchSent.Incr(1) + h.mWSSent.Incr(int64(batch.MessageCollapsedCount(ts.Payload))) + } + if writeErr != nil { + h.mWSError.Incr(1) + _ = ts.Ack(ctx, writeErr) + return // Exit the loop on write error + } + _ = ts.Ack(ctx, nil) + case <-ticker.C: + // Send a ping message to the client + ws.SetWriteDeadline(time.Now().Add(h.conf.WriteWait)) + if err := ws.WriteMessage(websocket.PingMessage, nil); err != nil { + h.log.Warn("WebSocket ping error: %v", err) + return + } + case <-done: + // The read goroutine has exited, indicating the client has disconnected + h.log.Debug("WebSocket client disconnected") + return + case <-ctx.Done(): + // The context has been canceled (e.g., server is shutting down) + return + } + } +} func (h *httpServerOutput) Consume(ts <-chan message.Transaction) error { if h.transactions != nil { From 58c869d4795382824c60309f84ab5486ea35dd14 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Mon, 4 Nov 2024 21:02:22 +0300 Subject: [PATCH 4/6] Fix docs and linter --- internal/impl/io/output_http_server.go | 4 ++- .../docs/components/outputs/http_server.md | 27 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/internal/impl/io/output_http_server.go b/internal/impl/io/output_http_server.go index 16601b803..df1295467 100644 --- a/internal/impl/io/output_http_server.go +++ b/internal/impl/io/output_http_server.go @@ -478,7 +478,8 @@ func (h *httpServerOutput) wsHandler(w http.ResponseWriter, r *http.Request) { // Write messages to the client var writeErr error for _, msg := range message.GetAllBytes(ts.Payload) { - ws.SetWriteDeadline(time.Now().Add(h.conf.WriteWait)) + //nolint:errcheck // this function does not actually return an error + _ = ws.SetWriteDeadline(time.Now().Add(h.conf.WriteWait)) if writeErr = ws.WriteMessage(websocket.BinaryMessage, msg); writeErr != nil { break } @@ -493,6 +494,7 @@ func (h *httpServerOutput) wsHandler(w http.ResponseWriter, r *http.Request) { _ = ts.Ack(ctx, nil) case <-ticker.C: // Send a ping message to the client + // nolint:errcheck // this function does not actually return an error ws.SetWriteDeadline(time.Now().Add(h.conf.WriteWait)) if err := ws.WriteMessage(websocket.PingMessage, nil); err != nil { h.log.Warn("WebSocket ping error: %v", err) diff --git a/website/docs/components/outputs/http_server.md b/website/docs/components/outputs/http_server.md index 1150c1704..24840bbcb 100644 --- a/website/docs/components/outputs/http_server.md +++ b/website/docs/components/outputs/http_server.md @@ -58,6 +58,9 @@ output: cors: enabled: false allowed_origins: [] + write_wait: 10s + pong_wait: 60s + ping_period: 54s ``` @@ -172,4 +175,28 @@ An explicit list of origins that are allowed for CORS requests. Type: `array` Default: `[]` +### `write_wait` + +The time allowed to write a message to the websocket. + + +Type: `string` +Default: `"10s"` + +### `pong_wait` + +The time allowed to read the next pong message from the client. + + +Type: `string` +Default: `"60s"` + +### `ping_period` + +Send pings to client with this period. Must be less than pong wait. + + +Type: `string` +Default: `"54s"` + From 8408362615ca5bef11b69234474a0d84c1c3a738 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Thu, 7 Nov 2024 23:12:39 +0300 Subject: [PATCH 5/6] Update internal/impl/io/output_http_server.go Co-authored-by: Jem Davies <131159520+jem-davies@users.noreply.github.com> --- internal/impl/io/output_http_server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/impl/io/output_http_server.go b/internal/impl/io/output_http_server.go index df1295467..5ab0a8772 100644 --- a/internal/impl/io/output_http_server.go +++ b/internal/impl/io/output_http_server.go @@ -478,7 +478,6 @@ func (h *httpServerOutput) wsHandler(w http.ResponseWriter, r *http.Request) { // Write messages to the client var writeErr error for _, msg := range message.GetAllBytes(ts.Payload) { - //nolint:errcheck // this function does not actually return an error _ = ws.SetWriteDeadline(time.Now().Add(h.conf.WriteWait)) if writeErr = ws.WriteMessage(websocket.BinaryMessage, msg); writeErr != nil { break From 822414a6f4bb5016783b0e32d5d5ba4223800d22 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Thu, 7 Nov 2024 23:12:45 +0300 Subject: [PATCH 6/6] Update internal/impl/io/output_http_server.go Co-authored-by: Jem Davies <131159520+jem-davies@users.noreply.github.com> --- internal/impl/io/output_http_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/io/output_http_server.go b/internal/impl/io/output_http_server.go index 5ab0a8772..f4e65b61d 100644 --- a/internal/impl/io/output_http_server.go +++ b/internal/impl/io/output_http_server.go @@ -493,7 +493,7 @@ func (h *httpServerOutput) wsHandler(w http.ResponseWriter, r *http.Request) { _ = ts.Ack(ctx, nil) case <-ticker.C: // Send a ping message to the client - // nolint:errcheck // this function does not actually return an error + //nolint:errcheck // this function does not actually return an error ws.SetWriteDeadline(time.Now().Add(h.conf.WriteWait)) if err := ws.WriteMessage(websocket.PingMessage, nil); err != nil { h.log.Warn("WebSocket ping error: %v", err)