-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix "zombie" http output websocket connections #125
Fix "zombie" http output websocket connections #125
Conversation
const ( | ||
writeWait = 10 * time.Second | ||
pongWait = 60 * time.Second | ||
pingPeriod = (pongWait * 9) / 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those who would read it may find it strange, so here is why:
By setting pingPeriod
to 90% of pongWait
(i.e., (pongWait * 9) / 10
), we ensure that:
-
The server sends a ping before the
pongWait
deadline:- If
pongWait
is 60 seconds,pingPeriod
becomes 54 seconds. - This means the server sends a ping every 54 seconds.
- If
-
There's a buffer period for the client to respond:
- The client has 6 seconds (the remaining 10% of
pongWait
) to respond with a pong before the read deadline (pongWait
) expires.
- The client has 6 seconds (the remaining 10% of
-
Avoiding premature timeouts:
- If the
pingPeriod
were equal topongWait
, the read deadline might expire before the client has a chance to respond to the ping. - By setting
pingPeriod
slightly less thanpongWait
, we avoid this race condition.
- If the
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This all makes sense! A small nit would be we should consider adding these as configuration variables with the advanced()
flag. Think these are sensible defaults in any-case.
pingPeriod = (pongWait * 9) / 10 | ||
) | ||
|
||
ws.SetReadLimit(512) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the question, on why we need both ping/pong and Read goroutine:
- Necessity of Ping/Pong with a Read Loop:
- Idle Clients: The read loop alone cannot detect if an idle client has disconnected unexpectedly.
- Timely Detection: Ping/pong can detect unresponsive clients within a predictable timeframe (
pongWait
), regardless of message traffic. - Protocol Compliance: Utilizing ping/pong aligns with WebSocket protocol standards and best practices.
^ fixed linter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great! Thanks so much for the contribution and fix 🍱😄 Just a couple of questions/suggestions.
Mostly concerned that the ReadMessage
in that go-routine could cause a race-condition when sending a Ping
to the same socket we write the message bytes to.
Otherwise, run make fmt && make lint
on your local to sort out these linting issues 🙂
if err := ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { | ||
h.log.Warn("Failed to set write deadline for ping: %v", err) | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This setWriteDeadline
method on the connection doesn't ever return an error (despite that confusing function signature)
if err := ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { | |
h.log.Warn("Failed to set write deadline for ping: %v", err) | |
return | |
} | |
ws.SetWriteDeadline(time.Now().Add(writeWait)) |
if err := ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { | ||
writeErr = err | ||
break | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as below comment:
if err := ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { | |
writeErr = err | |
break | |
} | |
ws.SetWriteDeadline(time.Now().Add(writeWait)); |
if err := ws.SetReadDeadline(time.Now().Add(pongWait)); err != nil { | ||
h.log.Warn("Failed to set read deadline: %v", err) | ||
return | ||
} | ||
ws.SetPongHandler(func(string) error { | ||
return ws.SetReadDeadline(time.Now().Add(pongWait)) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to set the read deadline before creating and inside the PongHandler
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, both calls are necessary:
- Initial
SetReadDeadline
sets the first timeout for the connection to prevent indefinite blocking PongHandler
'sSetReadDeadline
extends the timeout each time a pong is received, keeping the connection alive only while the client is responsive
Think of it like resetting a timer - you need both the initial timer setup and the mechanism to reset it when activity is detected.
go func() { | ||
defer close(done) | ||
for { | ||
_, _, err := ws.ReadMessage() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% clued up on websockets so excuse the ignorance but will doing a ws.ReadMessage()
clear all bytes in the buffer after reading? Else, does a WriteMessage
fan-out to all consumers on the connection?
I'm concerned about a race condition where the payload we send using the below WriteMessage
ends up being consumed in this goroutine instead of to the client where intending to send this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no risk of a race condition like the one you're describing.
In the context of WebSockets, the connection between the client and the server is full-duplex, meaning both ends can send and receive messages independently of each other. This feature allows for simultaneous communication without interference.
-
Separate Channels: The
ws.ReadMessage()
method reads messages that the client has sent to the server. Thews.WriteMessage()
method sends messages from the server to the client. They operate on separate channels within the WebSocket protocol and don't interfere with each other. -
No Buffer Overlap: When you call
ws.WriteMessage()
, it doesn't add messages to a buffer thatws.ReadMessage()
would read from. Similarly,ws.ReadMessage()
doesn't consume messages that the server has sent—only those sent by the client. -
Required Read Loop: Implementing a read loop as done, is recommended in WebSocket servers. It ensures that control frames (like pings and pongs) are properly handled, which is essential for maintaining the connection. Not reading from the WebSocket can lead to the connection being closed due to unhandled control frames or buffer overflows.
const ( | ||
writeWait = 10 * time.Second | ||
pongWait = 60 * time.Second | ||
pingPeriod = (pongWait * 9) / 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This all makes sense! A small nit would be we should consider adding these as configuration variables with the advanced()
flag. Think these are sensible defaults in any-case.
@jem-davies @gregfurman sorry for delay! Tried to address all the comments! |
@gregfurman @jem-davies just a gentle bump in case if you missed the last message :) |
cca1170
to
dc98f1c
Compare
Co-authored-by: Jem Davies <[email protected]>
Co-authored-by: Jem Davies <[email protected]>
Applied suggestions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
am happy to merge if @gregfurman is -- thanks a lot !
-- need to change the version set in a few places now, will raise a ticket to do that instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks @buger for addressing the comments and @jem-davies for the extra 👀
* Fix http output websocket disconnects * Update output_http_server.go Co-authored-by: Jem Davies <[email protected]>
#6697) ### **User description** PR for https://tyktech.atlassian.net/browse/TT-13508 Changes: * Bento updated to this commit: warpstreamlabs/bento@755c497 * `removeStream` uses the correct key to remove an inactive stream, not combining it with the API ID. Related PR: warpstreamlabs/bento#125 ___ ### **PR Type** Bug fix, Enhancement ___ ### **Description** - Fixed a bug in the `removeStream` function by using the correct key for removing inactive streams, ensuring that the API ID is not combined with the stream ID. - Updated the `google.golang.org/grpc` dependency to version `v1.67.0`. - Updated the `github.com/warpstreamlabs/bento` dependency to a specific commit for improved performance. - Updated several other dependencies to their latest versions to ensure compatibility and leverage new features. ___ ### **Changes walkthrough** 📝 <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Bug fix</strong></td><td><table> <tr> <td> <details> <summary><strong>manager.go</strong><dd><code>Correct stream removal logic in `removeStream` function</code> </dd></summary> <hr> ee/middleware/streams/manager.go <li>Fixed the <code>removeStream</code> function to use the correct key for stream <br>removal.<br> <li> Removed the combination of API ID with stream ID for stream <br>identification.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-3e372b3346d8d296e6953152c89202a634d7654f10549676af9aea8628e13dfb">+2/-4</a> </td> </tr> </table></td></tr><tr><td><strong>Dependencies</strong></td><td><table> <tr> <td> <details> <summary><strong>go.mod</strong><dd><code>Update dependencies to latest versions</code> </dd></summary> <hr> go.mod <li>Updated <code>google.golang.org/grpc</code> to version <code>v1.67.0</code>.<br> <li> Updated <code>github.com/warpstreamlabs/bento</code> to a specific commit.<br> <li> Updated various dependencies to newer versions.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-33ef32bf6c23acb95f5902d7097b7a1d5128ca061167ec0716715b0b9eeaa5f6">+65/-45</a> </td> </tr> </table></td></tr><tr><td><strong>Additional files (token-limit)</strong></td><td><table> <tr> <td> <details> <summary><strong>go.sum</strong><dd><code>...</code> </dd></summary> <hr> go.sum ... </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-3295df7234525439d778f1b282d146a4f1ff6b415248aaac074e8042d9f42d63">+1054/-103</a></td> </tr> </table></td></tr></tr></tbody></table> ___ > 💡 **PR-Agent usage**: Comment `/help "your question"` on any pull request to receive relevant information
* Fix http output websocket disconnects * Update output_http_server.go Co-authored-by: Jem Davies <[email protected]>
#6697) ### **User description** PR for https://tyktech.atlassian.net/browse/TT-13508 Changes: * Bento updated to this commit: warpstreamlabs/bento@755c497 * `removeStream` uses the correct key to remove an inactive stream, not combining it with the API ID. Related PR: warpstreamlabs/bento#125 ___ ### **PR Type** Bug fix, Enhancement ___ ### **Description** - Fixed a bug in the `removeStream` function by using the correct key for removing inactive streams, ensuring that the API ID is not combined with the stream ID. - Updated the `google.golang.org/grpc` dependency to version `v1.67.0`. - Updated the `github.com/warpstreamlabs/bento` dependency to a specific commit for improved performance. - Updated several other dependencies to their latest versions to ensure compatibility and leverage new features. ___ ### **Changes walkthrough** 📝 <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Bug fix</strong></td><td><table> <tr> <td> <details> <summary><strong>manager.go</strong><dd><code>Correct stream removal logic in `removeStream` function</code> </dd></summary> <hr> ee/middleware/streams/manager.go <li>Fixed the <code>removeStream</code> function to use the correct key for stream <br>removal.<br> <li> Removed the combination of API ID with stream ID for stream <br>identification.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-3e372b3346d8d296e6953152c89202a634d7654f10549676af9aea8628e13dfb">+2/-4</a> </td> </tr> </table></td></tr><tr><td><strong>Dependencies</strong></td><td><table> <tr> <td> <details> <summary><strong>go.mod</strong><dd><code>Update dependencies to latest versions</code> </dd></summary> <hr> go.mod <li>Updated <code>google.golang.org/grpc</code> to version <code>v1.67.0</code>.<br> <li> Updated <code>github.com/warpstreamlabs/bento</code> to a specific commit.<br> <li> Updated various dependencies to newer versions.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-33ef32bf6c23acb95f5902d7097b7a1d5128ca061167ec0716715b0b9eeaa5f6">+65/-45</a> </td> </tr> </table></td></tr><tr><td><strong>Additional files (token-limit)</strong></td><td><table> <tr> <td> <details> <summary><strong>go.sum</strong><dd><code>...</code> </dd></summary> <hr> go.sum ... </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-3295df7234525439d778f1b282d146a4f1ff6b415248aaac074e8042d9f42d63">+1054/-103</a></td> </tr> </table></td></tr></tr></tbody></table> ___ > 💡 **PR-Agent usage**: Comment `/help "your question"` on any pull request to receive relevant information
#6697) ### **User description** PR for https://tyktech.atlassian.net/browse/TT-13508 Changes: * Bento updated to this commit: warpstreamlabs/bento@755c497 * `removeStream` uses the correct key to remove an inactive stream, not combining it with the API ID. Related PR: warpstreamlabs/bento#125 ___ ### **PR Type** Bug fix, Enhancement ___ ### **Description** - Fixed a bug in the `removeStream` function by using the correct key for removing inactive streams, ensuring that the API ID is not combined with the stream ID. - Updated the `google.golang.org/grpc` dependency to version `v1.67.0`. - Updated the `github.com/warpstreamlabs/bento` dependency to a specific commit for improved performance. - Updated several other dependencies to their latest versions to ensure compatibility and leverage new features. ___ ### **Changes walkthrough** 📝 <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Bug fix</strong></td><td><table> <tr> <td> <details> <summary><strong>manager.go</strong><dd><code>Correct stream removal logic in `removeStream` function</code> </dd></summary> <hr> ee/middleware/streams/manager.go <li>Fixed the <code>removeStream</code> function to use the correct key for stream <br>removal.<br> <li> Removed the combination of API ID with stream ID for stream <br>identification.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-3e372b3346d8d296e6953152c89202a634d7654f10549676af9aea8628e13dfb">+2/-4</a> </td> </tr> </table></td></tr><tr><td><strong>Dependencies</strong></td><td><table> <tr> <td> <details> <summary><strong>go.mod</strong><dd><code>Update dependencies to latest versions</code> </dd></summary> <hr> go.mod <li>Updated <code>google.golang.org/grpc</code> to version <code>v1.67.0</code>.<br> <li> Updated <code>github.com/warpstreamlabs/bento</code> to a specific commit.<br> <li> Updated various dependencies to newer versions.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-33ef32bf6c23acb95f5902d7097b7a1d5128ca061167ec0716715b0b9eeaa5f6">+65/-45</a> </td> </tr> </table></td></tr><tr><td><strong>Additional files (token-limit)</strong></td><td><table> <tr> <td> <details> <summary><strong>go.sum</strong><dd><code>...</code> </dd></summary> <hr> go.sum ... </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-3295df7234525439d778f1b282d146a4f1ff6b415248aaac074e8042d9f42d63">+1054/-103</a></td> </tr> </table></td></tr></tr></tbody></table> ___ > 💡 **PR-Agent usage**: Comment `/help "your question"` on any pull request to receive relevant information
TykTechnologies#6697) ### **User description** PR for https://tyktech.atlassian.net/browse/TT-13508 Changes: * Bento updated to this commit: warpstreamlabs/bento@755c497 * `removeStream` uses the correct key to remove an inactive stream, not combining it with the API ID. Related PR: warpstreamlabs/bento#125 ___ ### **PR Type** Bug fix, Enhancement ___ ### **Description** - Fixed a bug in the `removeStream` function by using the correct key for removing inactive streams, ensuring that the API ID is not combined with the stream ID. - Updated the `google.golang.org/grpc` dependency to version `v1.67.0`. - Updated the `github.com/warpstreamlabs/bento` dependency to a specific commit for improved performance. - Updated several other dependencies to their latest versions to ensure compatibility and leverage new features. ___ ### **Changes walkthrough** 📝 <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Bug fix</strong></td><td><table> <tr> <td> <details> <summary><strong>manager.go</strong><dd><code>Correct stream removal logic in `removeStream` function</code> </dd></summary> <hr> ee/middleware/streams/manager.go <li>Fixed the <code>removeStream</code> function to use the correct key for stream <br>removal.<br> <li> Removed the combination of API ID with stream ID for stream <br>identification.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-3e372b3346d8d296e6953152c89202a634d7654f10549676af9aea8628e13dfb">+2/-4</a> </td> </tr> </table></td></tr><tr><td><strong>Dependencies</strong></td><td><table> <tr> <td> <details> <summary><strong>go.mod</strong><dd><code>Update dependencies to latest versions</code> </dd></summary> <hr> go.mod <li>Updated <code>google.golang.org/grpc</code> to version <code>v1.67.0</code>.<br> <li> Updated <code>github.com/warpstreamlabs/bento</code> to a specific commit.<br> <li> Updated various dependencies to newer versions.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-33ef32bf6c23acb95f5902d7097b7a1d5128ca061167ec0716715b0b9eeaa5f6">+65/-45</a> </td> </tr> </table></td></tr><tr><td><strong>Additional files (token-limit)</strong></td><td><table> <tr> <td> <details> <summary><strong>go.sum</strong><dd><code>...</code> </dd></summary> <hr> go.sum ... </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-3295df7234525439d778f1b282d146a4f1ff6b415248aaac074e8042d9f42d63">+1054/-103</a></td> </tr> </table></td></tr></tr></tbody></table> ___ > 💡 **PR-Agent usage**: Comment `/help "your question"` on any pull request to receive relevant information
TykTechnologies#6697) ### **User description** PR for https://tyktech.atlassian.net/browse/TT-13508 Changes: * Bento updated to this commit: warpstreamlabs/bento@755c497 * `removeStream` uses the correct key to remove an inactive stream, not combining it with the API ID. Related PR: warpstreamlabs/bento#125 ___ ### **PR Type** Bug fix, Enhancement ___ ### **Description** - Fixed a bug in the `removeStream` function by using the correct key for removing inactive streams, ensuring that the API ID is not combined with the stream ID. - Updated the `google.golang.org/grpc` dependency to version `v1.67.0`. - Updated the `github.com/warpstreamlabs/bento` dependency to a specific commit for improved performance. - Updated several other dependencies to their latest versions to ensure compatibility and leverage new features. ___ ### **Changes walkthrough** 📝 <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Bug fix</strong></td><td><table> <tr> <td> <details> <summary><strong>manager.go</strong><dd><code>Correct stream removal logic in `removeStream` function</code> </dd></summary> <hr> ee/middleware/streams/manager.go <li>Fixed the <code>removeStream</code> function to use the correct key for stream <br>removal.<br> <li> Removed the combination of API ID with stream ID for stream <br>identification.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-3e372b3346d8d296e6953152c89202a634d7654f10549676af9aea8628e13dfb">+2/-4</a> </td> </tr> </table></td></tr><tr><td><strong>Dependencies</strong></td><td><table> <tr> <td> <details> <summary><strong>go.mod</strong><dd><code>Update dependencies to latest versions</code> </dd></summary> <hr> go.mod <li>Updated <code>google.golang.org/grpc</code> to version <code>v1.67.0</code>.<br> <li> Updated <code>github.com/warpstreamlabs/bento</code> to a specific commit.<br> <li> Updated various dependencies to newer versions.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-33ef32bf6c23acb95f5902d7097b7a1d5128ca061167ec0716715b0b9eeaa5f6">+65/-45</a> </td> </tr> </table></td></tr><tr><td><strong>Additional files (token-limit)</strong></td><td><table> <tr> <td> <details> <summary><strong>go.sum</strong><dd><code>...</code> </dd></summary> <hr> go.sum ... </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6697/files#diff-3295df7234525439d778f1b282d146a4f1ff6b415248aaac074e8042d9f42d63">+1054/-103</a></td> </tr> </table></td></tr></tr></tbody></table> ___ > 💡 **PR-Agent usage**: Comment `/help "your question"` on any pull request to receive relevant information
At the moment http_server websocket implementation just writes to webscocket, and do not have a way to detect client disconnect proactively. More over, if client disconnects, and it fails write to it, it does not stop the handler, and continues to be inside the endless for loop, untill stream is stopped
for !h.shutSig.IsSoftStopSignalled() {
.It cause issues when you have multiple clients connected/disconnected, because essentially it fanout using round robin to them, but if you have closed "zombie" connections, they will also consume events.
It fixes the issue by using websocket native mechanisms like "ping/pong" methods. Also it adds ephemeral "Read", just to immidiately detect the client shutdown.