From 2d1a12e8459e9942eeff1c9ff348f68c63390e35 Mon Sep 17 00:00:00 2001 From: Jordi Deu-Pons Date: Wed, 7 Dec 2022 10:21:17 +0100 Subject: [PATCH 1/5] Move reconnect logic to the main thread --- .../java/io/seqera/tower/agent/Agent.java | 23 ++++++++----------- .../seqera/tower/agent/AgentClientSocket.java | 17 ++++---------- 2 files changed, 14 insertions(+), 26 deletions(-) diff --git a/src/main/java/io/seqera/tower/agent/Agent.java b/src/main/java/io/seqera/tower/agent/Agent.java index b756b76..bd4fd68 100644 --- a/src/main/java/io/seqera/tower/agent/Agent.java +++ b/src/main/java/io/seqera/tower/agent/Agent.java @@ -98,21 +98,21 @@ public static void main(String[] args) throws Exception { public void run() { try { validateParameters(); - checkTower(); - connectTower(); sendPeriodicHeartbeat(); + while (true) { + if (agentClient == null || !agentClient.isOpen()) { + checkTower(); + connectTower(); + } + Thread.sleep(2000); + } + } catch (Exception e) { logger.error(e.getMessage()); System.exit(1); } } - private void connectTowerDelay() { - TaskScheduler scheduler = ctx.getBean(TaskScheduler.class); - Duration delay = Duration.ofSeconds(2); - scheduler.schedule(delay, this::connectTower); - } - /** * Connect the agent to Tower using websockets */ @@ -130,7 +130,6 @@ private void connectTower() { agentClient = webSocketClient.connect(AgentClientSocket.class, req) .timeout(5, TimeUnit.SECONDS) .blockingFirst(); - agentClient.setConnectCallback(this::connectTowerDelay); agentClient.setCommandRequestCallback(this::execCommand); sendInfoMessage(); } catch (URISyntaxException e) { @@ -199,12 +198,10 @@ private void execCommand(CommandRequest message) { private void sendPeriodicHeartbeat() { TaskScheduler scheduler = ctx.getBean(TaskScheduler.class); scheduler.scheduleWithFixedDelay(heartbeatDelay, heartbeatDelay, () -> { - if (agentClient.isOpen()) { + if (agentClient != null && agentClient.isOpen()) { logger.info("Sending heartbeat"); + logger.trace("websocket session '{}'", agentClient.getId()); agentClient.send(new HeartbeatMessage()); - } else { - logger.info("Trying to reconnect"); - connectTower(); } }); } diff --git a/src/main/java/io/seqera/tower/agent/AgentClientSocket.java b/src/main/java/io/seqera/tower/agent/AgentClientSocket.java index 7ecbad0..d3d10f8 100644 --- a/src/main/java/io/seqera/tower/agent/AgentClientSocket.java +++ b/src/main/java/io/seqera/tower/agent/AgentClientSocket.java @@ -39,9 +39,6 @@ abstract class AgentClientSocket implements AutoCloseable { private WebSocketSession session; private Instant openingTime; - // Callback to reconnect the agent - private Runnable connectCallback; - // Callback to manage a command request private Consumer commandRequestCallback; @@ -79,7 +76,6 @@ void onClose(CloseReason reason) { if (reason.getCode() == 4001) { logger.info("Closing to reauthenticate the session"); - return; } else { logger.info("Closed for unknown reason after"); if (openingTime != null) { @@ -88,11 +84,6 @@ void onClose(CloseReason reason) { logger.info("Session duration {}", duration); } } - - if (connectCallback != null) { - logger.info("Reconnecting in 2 seconds"); - connectCallback.run(); - } } abstract void send(AgentMessage message); @@ -103,13 +94,13 @@ public boolean isOpen() { return session.isOpen(); } - public void setConnectCallback(Runnable connectCallback) { - this.connectCallback = connectCallback; - } - public void setCommandRequestCallback(Consumer callback) { this.commandRequestCallback = callback; } + public String getId() { + return session.getId(); + } + } From 053566c84d4599953a0547bcc1246ed5da1a74f8 Mon Sep 17 00:00:00 2001 From: Jordi Deu-Pons Date: Wed, 7 Dec 2022 15:07:38 +0100 Subject: [PATCH 2/5] Retry on most exceptions --- src/main/java/io/seqera/tower/agent/Agent.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/seqera/tower/agent/Agent.java b/src/main/java/io/seqera/tower/agent/Agent.java index bd4fd68..f5f5b55 100644 --- a/src/main/java/io/seqera/tower/agent/Agent.java +++ b/src/main/java/io/seqera/tower/agent/Agent.java @@ -38,6 +38,7 @@ import java.lang.module.ModuleDescriptor; import java.net.URI; import java.net.URISyntaxException; +import java.net.UnknownHostException; import java.nio.file.Files; import java.nio.file.InvalidPathException; import java.nio.file.Path; @@ -106,7 +107,6 @@ public void run() { } Thread.sleep(2000); } - } catch (Exception e) { logger.error(e.getMessage()); System.exit(1); @@ -137,15 +137,14 @@ private void connectTower() { System.exit(1); } catch (WebSocketClientException e) { logger.error("Connection error - {}", e.getMessage()); - System.exit(1); + } catch (UnknownHostException e) { + logger.error("Unknown host exception - Check that it's a valid DNS domain."); } catch (Exception e) { if (e.getCause() instanceof TimeoutException) { - logger.error("Connection timeout [trying to reconnect in {} seconds]", heartbeatDelay); + logger.error("Connection timeout"); } else { - logger.error("Unknown problem"); - e.printStackTrace(); + logger.error("Unknown problem - {}", e.getMessage()); } - System.exit(1); } } @@ -280,7 +279,7 @@ private void checkTower() { } else { logger.error("Tower API endpoint '{}' it is not available (did you mean '{}/api'?)", url, url); } - System.exit(1); + return; } try { @@ -289,7 +288,6 @@ private void checkTower() { httpClient.retrieve(req).blockingFirst(); } catch (Exception e) { logger.error("Invalid TOWER_ACCESS_TOKEN, check that the given token has access at '{}'.", url); - System.exit(1); } } From ca07f2b1923f2900ae4e2f729362e12072639859 Mon Sep 17 00:00:00 2001 From: Jordi Deu-Pons Date: Fri, 9 Dec 2022 11:35:27 +0100 Subject: [PATCH 3/5] Add recoverable and unrecoverable exceptions --- .../java/io/seqera/tower/agent/Agent.java | 82 ++++++++++--------- .../exceptions/RecoverableException.java | 19 +++++ .../exceptions/UnrecoverableException.java | 19 +++++ 3 files changed, 83 insertions(+), 37 deletions(-) create mode 100644 src/main/java/io/seqera/tower/agent/exceptions/RecoverableException.java create mode 100644 src/main/java/io/seqera/tower/agent/exceptions/UnrecoverableException.java diff --git a/src/main/java/io/seqera/tower/agent/Agent.java b/src/main/java/io/seqera/tower/agent/Agent.java index f5f5b55..eb9d19c 100644 --- a/src/main/java/io/seqera/tower/agent/Agent.java +++ b/src/main/java/io/seqera/tower/agent/Agent.java @@ -20,6 +20,8 @@ import io.micronaut.rxjava2.http.client.websockets.RxWebSocketClient; import io.micronaut.scheduling.TaskScheduler; import io.micronaut.websocket.exceptions.WebSocketClientException; +import io.seqera.tower.agent.exceptions.RecoverableException; +import io.seqera.tower.agent.exceptions.UnrecoverableException; import io.seqera.tower.agent.exchange.CommandRequest; import io.seqera.tower.agent.exchange.CommandResponse; import io.seqera.tower.agent.exchange.HeartbeatMessage; @@ -100,16 +102,28 @@ public void run() { try { validateParameters(); sendPeriodicHeartbeat(); - while (true) { + infiniteLoop(); + } catch (Throwable e) { + logger.error(e.getMessage()); + if (logger.isTraceEnabled()) { + e.printStackTrace(); + } + System.exit(1); + } + } + + private void infiniteLoop() throws InterruptedException, IOException { + while (true) { + try { if (agentClient == null || !agentClient.isOpen()) { checkTower(); connectTower(); } - Thread.sleep(2000); + } catch (RecoverableException e) { + logger.error(e.getMessage()); } - } catch (Exception e) { - logger.error(e.getMessage()); - System.exit(1); + + Thread.sleep(2000); } } @@ -121,8 +135,7 @@ private void connectTower() { try { final URI uri = new URI(url + "/agent/" + agentKey + "/connect"); if (!uri.getScheme().equals("https")) { - logger.error("You are trying to connect to an insecure server: {}", url); - System.exit(1); + throw new UnrecoverableException(String.format("You are trying to connect to an insecure server: %s", url)); } final MutableHttpRequest req = HttpRequest.GET(uri).bearerAuth(token); @@ -133,18 +146,17 @@ private void connectTower() { agentClient.setCommandRequestCallback(this::execCommand); sendInfoMessage(); } catch (URISyntaxException e) { - logger.error("Invalid URI: {}/agent/{}/connect - {}", url, agentKey, e.getMessage()); - System.exit(1); + throw new UnrecoverableException(String.format("Invalid URI: %s/agent/%s/connect - %s", url, agentKey, e.getMessage())); } catch (WebSocketClientException e) { - logger.error("Connection error - {}", e.getMessage()); + throw new RecoverableException(String.format("Connection error - %s", e.getMessage())); } catch (UnknownHostException e) { - logger.error("Unknown host exception - Check that it's a valid DNS domain."); + throw new RecoverableException("Unknown host exception - Check that it's a valid DNS domain."); } catch (Exception e) { if (e.getCause() instanceof TimeoutException) { - logger.error("Connection timeout"); - } else { - logger.error("Unknown problem - {}", e.getMessage()); + throw new RecoverableException(String.format("Connection timeout -- %s", e.getCause().getMessage())); } + + throw new RecoverableException(String.format("Unknown problem - %s", e.getMessage()), e); } } @@ -157,6 +169,7 @@ private void execCommand(CommandRequest message) { CommandResponse response; try { + logger.trace("REQUEST: {}", message.getCommand()); Process process = new ProcessBuilder() .command("sh", "-c", message.getCommand()) .redirectErrorStream(true) @@ -222,8 +235,7 @@ private void validateParameters() throws IOException { // Fetch username validatedUserName = System.getenv().getOrDefault("USER", System.getProperty("user.name")); if (validatedUserName == null || validatedUserName.isEmpty() || validatedUserName.isBlank() || validatedUserName.equals("?")) { - logger.error("Impossible to detect current Unix username. Try setting USER environment variable."); - System.exit(1); + throw new UnrecoverableException("Impossible to detect current Unix username. Try setting USER environment variable."); } // Set default workDir @@ -233,15 +245,13 @@ private void validateParameters() throws IOException { try { workDir = Paths.get(defaultPath); } catch (InvalidPathException e) { - logger.error("Impossible to define a default work directory. Please provide one using '--work-dir'."); - System.exit(1); + throw new UnrecoverableException("Impossible to define a default work directory. Please provide one using '--work-dir'."); } } // Validate workDir exists if (!Files.exists(workDir)) { - logger.error("The work directory '{}' do not exists. Create it or provide a different one using '--work-dir'.", workDir); - System.exit(1); + throw new UnrecoverableException(String.format("The work directory '%s' do not exists. Create it or provide a different one using '--work-dir'.", workDir)); } validatedWorkDir = workDir.toAbsolutePath().normalize().toString(); @@ -257,29 +267,27 @@ private void validateParameters() throws IOException { * Do some health checks to the Tower API endpoint to verify that it is available and * compatible with this Agent. */ - private void checkTower() { + private void checkTower() throws IOException { final RxHttpClient httpClient = ctx.getBean(RxHttpClient.class); + ServiceInfoResponse infoResponse = null; try { final URI uri = new URI(url + "/service-info"); final MutableHttpRequest req = HttpRequest.GET(uri).bearerAuth(token); - - ServiceInfoResponse infoResponse = httpClient.retrieve(req, ServiceInfoResponse.class).blockingFirst(); - if (infoResponse.getServiceInfo() != null && infoResponse.getServiceInfo().getApiVersion() != null) { - final ModuleDescriptor.Version systemApiVersion = ModuleDescriptor.Version.parse(infoResponse.getServiceInfo().getApiVersion()); - final ModuleDescriptor.Version requiredApiVersion = ModuleDescriptor.Version.parse(getVersionApi()); - - if (systemApiVersion.compareTo(requiredApiVersion) < 0) { - logger.error("Tower at '{}' is running API version {} and the agent needs a minimum of {}", url, systemApiVersion, requiredApiVersion); - System.exit(1); - } - } + infoResponse = httpClient.retrieve(req, ServiceInfoResponse.class).blockingFirst(); } catch (Exception e) { if (url.contains("/api")) { - logger.error("Tower API endpoint '{}' it is not available", url); - } else { - logger.error("Tower API endpoint '{}' it is not available (did you mean '{}/api'?)", url, url); + throw new RecoverableException(String.format("Tower API endpoint '%s' it is not available", url)); + } + throw new RecoverableException(String.format("Tower API endpoint '%s' it is not available (did you mean '%s/api'?)", url, url)); + } + + if (infoResponse != null && infoResponse.getServiceInfo() != null && infoResponse.getServiceInfo().getApiVersion() != null) { + final ModuleDescriptor.Version systemApiVersion = ModuleDescriptor.Version.parse(infoResponse.getServiceInfo().getApiVersion()); + final ModuleDescriptor.Version requiredApiVersion = ModuleDescriptor.Version.parse(getVersionApi()); + + if (systemApiVersion.compareTo(requiredApiVersion) < 0) { + throw new UnrecoverableException(String.format("Tower at '%s' is running API version %s and the agent needs a minimum of %s", url, systemApiVersion, requiredApiVersion)); } - return; } try { @@ -287,7 +295,7 @@ private void checkTower() { final MutableHttpRequest req = HttpRequest.GET(uri).bearerAuth(token); httpClient.retrieve(req).blockingFirst(); } catch (Exception e) { - logger.error("Invalid TOWER_ACCESS_TOKEN, check that the given token has access at '{}'.", url); + throw new UnrecoverableException(String.format("Invalid TOWER_ACCESS_TOKEN, check that the given token has access at '%s'.", url)); } } diff --git a/src/main/java/io/seqera/tower/agent/exceptions/RecoverableException.java b/src/main/java/io/seqera/tower/agent/exceptions/RecoverableException.java new file mode 100644 index 0000000..9f5e29f --- /dev/null +++ b/src/main/java/io/seqera/tower/agent/exceptions/RecoverableException.java @@ -0,0 +1,19 @@ +package io.seqera.tower.agent.exceptions; + +/** + * A recoverable exception is an exception that Tower Agent will log as + * an error, but it will keep running and retrying to connect. + */ +public class RecoverableException extends RuntimeException { + + public RecoverableException() { + } + + public RecoverableException(String message) { + super(message); + } + + public RecoverableException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/io/seqera/tower/agent/exceptions/UnrecoverableException.java b/src/main/java/io/seqera/tower/agent/exceptions/UnrecoverableException.java new file mode 100644 index 0000000..4883faa --- /dev/null +++ b/src/main/java/io/seqera/tower/agent/exceptions/UnrecoverableException.java @@ -0,0 +1,19 @@ +package io.seqera.tower.agent.exceptions; + +/** + * An unrecoverable exception is an exception that Tower Agent will log as + * an error and cause it to exit with an exit code error. + */ +public class UnrecoverableException extends RuntimeException { + + public UnrecoverableException() { + } + + public UnrecoverableException(String message) { + super(message); + } + + public UnrecoverableException(String message, Throwable cause) { + super(message, cause); + } +} From e2e57f6682ce000d401242d019d3aa2ad34e6b0c Mon Sep 17 00:00:00 2001 From: Jordi Deu-Pons Date: Fri, 9 Dec 2022 11:38:37 +0100 Subject: [PATCH 4/5] Add missing license headers --- .../tower/agent/exceptions/RecoverableException.java | 11 +++++++++++ .../agent/exceptions/UnrecoverableException.java | 11 +++++++++++ 2 files changed, 22 insertions(+) diff --git a/src/main/java/io/seqera/tower/agent/exceptions/RecoverableException.java b/src/main/java/io/seqera/tower/agent/exceptions/RecoverableException.java index 9f5e29f..791d7aa 100644 --- a/src/main/java/io/seqera/tower/agent/exceptions/RecoverableException.java +++ b/src/main/java/io/seqera/tower/agent/exceptions/RecoverableException.java @@ -1,3 +1,14 @@ +/* + * Copyright (c) 2021, Seqera Labs. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This Source Code Form is "Incompatible With Secondary Licenses", as + * defined by the Mozilla Public License, v. 2.0. + */ + package io.seqera.tower.agent.exceptions; /** diff --git a/src/main/java/io/seqera/tower/agent/exceptions/UnrecoverableException.java b/src/main/java/io/seqera/tower/agent/exceptions/UnrecoverableException.java index 4883faa..20f7dfd 100644 --- a/src/main/java/io/seqera/tower/agent/exceptions/UnrecoverableException.java +++ b/src/main/java/io/seqera/tower/agent/exceptions/UnrecoverableException.java @@ -1,3 +1,14 @@ +/* + * Copyright (c) 2021, Seqera Labs. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This Source Code Form is "Incompatible With Secondary Licenses", as + * defined by the Mozilla Public License, v. 2.0. + */ + package io.seqera.tower.agent.exceptions; /** From c51dcc4023234095a1c2d9060c4b362e69a4ae4f Mon Sep 17 00:00:00 2001 From: Jordi Deu-Pons Date: Fri, 9 Dec 2022 16:31:17 +0100 Subject: [PATCH 5/5] Improve logs on unknown exceptions --- src/main/java/io/seqera/tower/agent/Agent.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/seqera/tower/agent/Agent.java b/src/main/java/io/seqera/tower/agent/Agent.java index eb9d19c..52ef596 100644 --- a/src/main/java/io/seqera/tower/agent/Agent.java +++ b/src/main/java/io/seqera/tower/agent/Agent.java @@ -103,11 +103,11 @@ public void run() { validateParameters(); sendPeriodicHeartbeat(); infiniteLoop(); - } catch (Throwable e) { + } catch (UnrecoverableException e) { logger.error(e.getMessage()); - if (logger.isTraceEnabled()) { - e.printStackTrace(); - } + System.exit(1); + } catch (Throwable e) { + logger.error(e.getMessage(), e); System.exit(1); } }