Skip to content

Commit ad9149f

Browse files
nwmacrichard-cox
andauthored
Kube Terminal: Appears to hang if it takes a while to start up (#506)
* Fix issue where chart download link was not absolute - Expected url, got chart file name - We now check for this, if url is not absoulte assume filename and append to repo url * Fix issue with Schemas on Helm Hub * Update following changes in #503 * Fix issue with missing schema on upgrade for local chart * Fix failing unit test * Ensure kube terminal does not hang if it takes a while to start up * Fix timeout issue Co-authored-by: Richard Cox <[email protected]>
1 parent 9742e5d commit ad9149f

File tree

2 files changed

+62
-25
lines changed

2 files changed

+62
-25
lines changed

src/jetstream/plugins/kubernetes/terminal/helpers.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import (
2323
)
2424

2525
const (
26-
helmEndpointType = "helm"
27-
helmRepoEndpointType = "repo"
26+
helmEndpointType = "helm"
27+
helmRepoEndpointType = "repo"
28+
startingProgressMessage = "Waiting for Kubernetes Terminal to start up ..."
2829
)
2930

3031
// PodCreationData stores the clients and names used to create pod and secret
@@ -87,6 +88,8 @@ func (k *KubeTerminal) createPod(c echo.Context, kubeConfig, kubeVersion string,
8788
Type: "Opaque",
8889
}
8990

91+
sendProgressMessage(ws, startingProgressMessage)
92+
9093
setResourcMetadata(&secretSpec.ObjectMeta, sessionID)
9194

9295
secretSpec.Data = make(map[string][]byte)
@@ -98,6 +101,7 @@ func (k *KubeTerminal) createPod(c echo.Context, kubeConfig, kubeVersion string,
98101
secretSpec.Data["helm-setup"] = []byte(helmSetup)
99102
}
100103

104+
sendProgressMessage(ws, startingProgressMessage)
101105
_, err = secretClient.Create(secretSpec)
102106
if err != nil {
103107
log.Warnf("Kubernetes Terminal: Unable to create Secret: %+v", err)
@@ -154,6 +158,8 @@ func (k *KubeTerminal) createPod(c echo.Context, kubeConfig, kubeVersion string,
154158
}
155159
podSpec.Spec.Volumes = volumesSpec
156160

161+
sendProgressMessage(ws, startingProgressMessage)
162+
157163
// Create a new pod
158164
pod, err := podClient.Create(podSpec)
159165
if err != nil {
@@ -165,12 +171,12 @@ func (k *KubeTerminal) createPod(c echo.Context, kubeConfig, kubeVersion string,
165171
result.PodClient = podClient
166172
result.PodName = podName
167173

168-
sendProgressMessage(ws, "Waiting for Kubernetes Terminal to start up ...")
169-
170174
// Wait for the pod to be running
171175
timeout := 60
172176
statusOptions := metav1.GetOptions{}
173177
for {
178+
// This ensures we keep the web socket alive while the container is creating
179+
sendProgressMessage(ws, startingProgressMessage)
174180
status, err := podClient.Get(pod.Name, statusOptions)
175181
if err == nil && status.Status.Phase == "Running" {
176182
break
@@ -201,6 +207,11 @@ func setResourcMetadata(metadata *metav1.ObjectMeta, sessionID string) {
201207

202208
// Cleanup the pod and secret
203209
func (k *KubeTerminal) cleanupPodAndSecret(podData *PodCreationData) error {
210+
if podData == nil {
211+
// Already been cleaned up
212+
return nil
213+
}
214+
204215
if len(podData.PodName) > 0 {
205216
//captureBashHistory(podData)
206217
podData.PodClient.Delete(podData.PodName, nil)

src/jetstream/plugins/kubernetes/terminal/start.go

+47-21
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ type terminalSize struct {
4141
const (
4242
// Time allowed to write a message to the peer.
4343
writeWait = 10 * time.Second
44+
45+
// Time allowed to read the next pong message from the peer.
46+
pongWait = 60 * time.Second
47+
48+
// Send pings to peer with this period. Must be less than pongWait.
49+
pingPeriod = (pongWait * 9) / 10
50+
51+
// Time to wait before force close on connection.
52+
closeGracePeriod = 10 * time.Second
4453
)
4554

4655
// Start handles web-socket request to launch a Kubernetes Terminal
@@ -60,7 +69,7 @@ func (k *KubeTerminal) Start(c echo.Context) error {
6069
if !ok {
6170
return errors.New("Could not get token")
6271
}
63-
72+
6473
// This is the kube config for the kubernetes endpoint that we want configured in the Terminal
6574
kubeConfig, err := k.Kube.GetKubeConfigForEndpoint(cnsiRecord.APIEndpoint.String(), tokenRecord, "")
6675
if err != nil {
@@ -79,7 +88,10 @@ func (k *KubeTerminal) Start(c echo.Context) error {
7988
defer ws.Close()
8089
defer pingTicker.Stop()
8190

82-
// We are now in web socket land - we don't want any middleware to change the HTTP response
91+
// At this point we aer using web sockets, so we can not return errors to the client as the connection
92+
// has been upgraded to a web socket
93+
94+
// We are now in web socket land - we don't want any middleware to change the HTTP response
8395
c.Set("Stratos-WebSocket", "true")
8496

8597
// Send a message to say that we are creating the pod
@@ -95,8 +107,8 @@ func (k *KubeTerminal) Start(c echo.Context) error {
95107
k.cleanupPodAndSecret(podData)
96108

97109
// Send error message
98-
sendProgressMessage(ws, "!" + err.Error())
99-
return err
110+
sendProgressMessage(ws, "!"+err.Error())
111+
return nil
100112
}
101113

102114
// API Endpoint to SSH/exec into a container
@@ -131,36 +143,40 @@ func (k *KubeTerminal) Start(c echo.Context) error {
131143

132144
stdoutDone := make(chan bool)
133145
go pumpStdout(ws, wsConn, stdoutDone)
146+
go ping(ws, stdoutDone)
134147

135148
// If the downstream connection is closed, close the other web socket as well
136-
ws.SetCloseHandler(func (code int, text string) error {
149+
ws.SetCloseHandler(func(code int, text string) error {
137150
wsConn.Close()
151+
// Cleanup
152+
k.cleanupPodAndSecret(podData)
153+
podData = nil
138154
return nil
139155
})
140156

157+
// Wait a while when reading - can take some time for the container to launch
158+
ws.SetReadDeadline(time.Now().Add(pongWait))
159+
ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
160+
141161
// Read the input from the web socket and pipe it to the SSH client
142162
for {
143163
_, r, err := ws.ReadMessage()
144164
if err != nil {
145-
// Check to see if this was because the web socket was closed cleanly
146-
closed := false
147-
select {
148-
case msg := <-stdoutDone:
149-
closed = msg
150-
}
151-
if !closed {
152-
log.Errorf("Kubernetes terminal: error reading message from web socket: %+v", err)
153-
}
154-
log.Debug("Kube Terminal cleaning up ....")
165+
// Error reading - so clean up
155166
k.cleanupPodAndSecret(podData)
167+
podData = nil
168+
169+
ws.SetWriteDeadline(time.Now().Add(writeWait))
170+
ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
171+
time.Sleep(closeGracePeriod)
172+
ws.Close()
156173

157174
// No point returning an error - we've already upgraded to web sockets, so we can't use the HTTP response now
158175
return nil
159176
}
160177

161178
res := KeyCode{}
162179
json.Unmarshal(r, &res)
163-
164180
if res.Cols == 0 {
165181
slice := make([]byte, 1)
166182
slice[0] = 0
@@ -177,11 +193,6 @@ func (k *KubeTerminal) Start(c echo.Context) error {
177193
wsConn.WriteMessage(websocket.TextMessage, slice)
178194
}
179195
}
180-
181-
// Cleanup
182-
log.Error("Kubernetes Terminal is cleaning up")
183-
184-
return k.cleanupPodAndSecret(podData)
185196
}
186197

187198
func pumpStdout(ws *websocket.Conn, source *websocket.Conn, done chan bool) {
@@ -202,3 +213,18 @@ func pumpStdout(ws *websocket.Conn, source *websocket.Conn, done chan bool) {
202213
}
203214
}
204215
}
216+
217+
func ping(ws *websocket.Conn, done chan bool) {
218+
ticker := time.NewTicker(pingPeriod)
219+
defer ticker.Stop()
220+
for {
221+
select {
222+
case <-ticker.C:
223+
if err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)); err != nil {
224+
log.Errorf("Web socket ping error: %+v", err)
225+
}
226+
case <-done:
227+
return
228+
}
229+
}
230+
}

0 commit comments

Comments
 (0)