Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move reconnect logic to main thread #51

Merged
merged 5 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 50 additions & 47 deletions src/main/java/io/seqera/tower/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.micronaut.rxjava2.http.client.websockets.RxWebSocketClient;
import io.micronaut.scheduling.TaskScheduler;
import io.micronaut.websocket.exceptions.WebSocketClientException;
import io.seqera.tower.agent.exceptions.RecoverableException;
import io.seqera.tower.agent.exceptions.UnrecoverableException;
import io.seqera.tower.agent.exchange.CommandRequest;
import io.seqera.tower.agent.exchange.CommandResponse;
import io.seqera.tower.agent.exchange.HeartbeatMessage;
Expand All @@ -38,6 +40,7 @@
import java.lang.module.ModuleDescriptor;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
Expand Down Expand Up @@ -98,19 +101,30 @@ public static void main(String[] args) throws Exception {
public void run() {
try {
validateParameters();
checkTower();
connectTower();
sendPeriodicHeartbeat();
} catch (Exception e) {
infiniteLoop();
} catch (UnrecoverableException e) {
logger.error(e.getMessage());
System.exit(1);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
System.exit(1);
}
}

private void connectTowerDelay() {
TaskScheduler scheduler = ctx.getBean(TaskScheduler.class);
Duration delay = Duration.ofSeconds(2);
scheduler.schedule(delay, this::connectTower);
private void infiniteLoop() throws InterruptedException, IOException {
while (true) {
try {
if (agentClient == null || !agentClient.isOpen()) {
checkTower();
connectTower();
}
} catch (RecoverableException e) {
logger.error(e.getMessage());
}

Thread.sleep(2000);
}
}

/**
Expand All @@ -121,32 +135,28 @@ private void connectTower() {
try {
final URI uri = new URI(url + "/agent/" + agentKey + "/connect");
if (!uri.getScheme().equals("https")) {
logger.error("You are trying to connect to an insecure server: {}", url);
System.exit(1);
throw new UnrecoverableException(String.format("You are trying to connect to an insecure server: %s", url));
}

final MutableHttpRequest<?> req = HttpRequest.GET(uri).bearerAuth(token);
final RxWebSocketClient webSocketClient = ctx.getBean(RxWebSocketClient.class);
agentClient = webSocketClient.connect(AgentClientSocket.class, req)
.timeout(5, TimeUnit.SECONDS)
.blockingFirst();
agentClient.setConnectCallback(this::connectTowerDelay);
agentClient.setCommandRequestCallback(this::execCommand);
sendInfoMessage();
} catch (URISyntaxException e) {
logger.error("Invalid URI: {}/agent/{}/connect - {}", url, agentKey, e.getMessage());
System.exit(1);
throw new UnrecoverableException(String.format("Invalid URI: %s/agent/%s/connect - %s", url, agentKey, e.getMessage()));
} catch (WebSocketClientException e) {
logger.error("Connection error - {}", e.getMessage());
System.exit(1);
throw new RecoverableException(String.format("Connection error - %s", e.getMessage()));
} catch (UnknownHostException e) {
throw new RecoverableException("Unknown host exception - Check that it's a valid DNS domain.");
} catch (Exception e) {
if (e.getCause() instanceof TimeoutException) {
logger.error("Connection timeout [trying to reconnect in {} seconds]", heartbeatDelay);
} else {
logger.error("Unknown problem");
e.printStackTrace();
throw new RecoverableException(String.format("Connection timeout -- %s", e.getCause().getMessage()));
}
System.exit(1);

throw new RecoverableException(String.format("Unknown problem - %s", e.getMessage()), e);
}
}

Expand All @@ -159,6 +169,7 @@ private void execCommand(CommandRequest message) {
CommandResponse response;

try {
logger.trace("REQUEST: {}", message.getCommand());
Process process = new ProcessBuilder()
.command("sh", "-c", message.getCommand())
.redirectErrorStream(true)
Expand Down Expand Up @@ -199,12 +210,10 @@ private void execCommand(CommandRequest message) {
private void sendPeriodicHeartbeat() {
TaskScheduler scheduler = ctx.getBean(TaskScheduler.class);
scheduler.scheduleWithFixedDelay(heartbeatDelay, heartbeatDelay, () -> {
if (agentClient.isOpen()) {
if (agentClient != null && agentClient.isOpen()) {
logger.info("Sending heartbeat");
logger.trace("websocket session '{}'", agentClient.getId());
agentClient.send(new HeartbeatMessage());
} else {
logger.info("Trying to reconnect");
connectTower();
}
});
}
Expand All @@ -226,8 +235,7 @@ private void validateParameters() throws IOException {
// Fetch username
validatedUserName = System.getenv().getOrDefault("USER", System.getProperty("user.name"));
if (validatedUserName == null || validatedUserName.isEmpty() || validatedUserName.isBlank() || validatedUserName.equals("?")) {
logger.error("Impossible to detect current Unix username. Try setting USER environment variable.");
System.exit(1);
throw new UnrecoverableException("Impossible to detect current Unix username. Try setting USER environment variable.");
}

// Set default workDir
Expand All @@ -237,15 +245,13 @@ private void validateParameters() throws IOException {
try {
workDir = Paths.get(defaultPath);
} catch (InvalidPathException e) {
logger.error("Impossible to define a default work directory. Please provide one using '--work-dir'.");
System.exit(1);
throw new UnrecoverableException("Impossible to define a default work directory. Please provide one using '--work-dir'.");
}
}

// Validate workDir exists
if (!Files.exists(workDir)) {
logger.error("The work directory '{}' do not exists. Create it or provide a different one using '--work-dir'.", workDir);
System.exit(1);
throw new UnrecoverableException(String.format("The work directory '%s' do not exists. Create it or provide a different one using '--work-dir'.", workDir));
}
validatedWorkDir = workDir.toAbsolutePath().normalize().toString();

Expand All @@ -261,38 +267,35 @@ private void validateParameters() throws IOException {
* Do some health checks to the Tower API endpoint to verify that it is available and
* compatible with this Agent.
*/
private void checkTower() {
private void checkTower() throws IOException {
final RxHttpClient httpClient = ctx.getBean(RxHttpClient.class);
ServiceInfoResponse infoResponse = null;
try {
final URI uri = new URI(url + "/service-info");
final MutableHttpRequest<?> req = HttpRequest.GET(uri).bearerAuth(token);

ServiceInfoResponse infoResponse = httpClient.retrieve(req, ServiceInfoResponse.class).blockingFirst();
if (infoResponse.getServiceInfo() != null && infoResponse.getServiceInfo().getApiVersion() != null) {
final ModuleDescriptor.Version systemApiVersion = ModuleDescriptor.Version.parse(infoResponse.getServiceInfo().getApiVersion());
final ModuleDescriptor.Version requiredApiVersion = ModuleDescriptor.Version.parse(getVersionApi());

if (systemApiVersion.compareTo(requiredApiVersion) < 0) {
logger.error("Tower at '{}' is running API version {} and the agent needs a minimum of {}", url, systemApiVersion, requiredApiVersion);
System.exit(1);
}
}
infoResponse = httpClient.retrieve(req, ServiceInfoResponse.class).blockingFirst();
} catch (Exception e) {
if (url.contains("/api")) {
logger.error("Tower API endpoint '{}' it is not available", url);
} else {
logger.error("Tower API endpoint '{}' it is not available (did you mean '{}/api'?)", url, url);
throw new RecoverableException(String.format("Tower API endpoint '%s' it is not available", url));
}
throw new RecoverableException(String.format("Tower API endpoint '%s' it is not available (did you mean '%s/api'?)", url, url));
}

if (infoResponse != null && infoResponse.getServiceInfo() != null && infoResponse.getServiceInfo().getApiVersion() != null) {
final ModuleDescriptor.Version systemApiVersion = ModuleDescriptor.Version.parse(infoResponse.getServiceInfo().getApiVersion());
final ModuleDescriptor.Version requiredApiVersion = ModuleDescriptor.Version.parse(getVersionApi());

if (systemApiVersion.compareTo(requiredApiVersion) < 0) {
throw new UnrecoverableException(String.format("Tower at '%s' is running API version %s and the agent needs a minimum of %s", url, systemApiVersion, requiredApiVersion));
}
System.exit(1);
}

try {
final URI uri = new URI(url + "/user");
final MutableHttpRequest<?> req = HttpRequest.GET(uri).bearerAuth(token);
httpClient.retrieve(req).blockingFirst();
} catch (Exception e) {
logger.error("Invalid TOWER_ACCESS_TOKEN, check that the given token has access at '{}'.", url);
System.exit(1);
throw new UnrecoverableException(String.format("Invalid TOWER_ACCESS_TOKEN, check that the given token has access at '%s'.", url));
}
}

Expand Down
17 changes: 4 additions & 13 deletions src/main/java/io/seqera/tower/agent/AgentClientSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ abstract class AgentClientSocket implements AutoCloseable {
private WebSocketSession session;
private Instant openingTime;

// Callback to reconnect the agent
private Runnable connectCallback;

// Callback to manage a command request
private Consumer<CommandRequest> commandRequestCallback;

Expand Down Expand Up @@ -79,7 +76,6 @@ void onClose(CloseReason reason) {

if (reason.getCode() == 4001) {
logger.info("Closing to reauthenticate the session");
return;
} else {
logger.info("Closed for unknown reason after");
if (openingTime != null) {
Expand All @@ -88,11 +84,6 @@ void onClose(CloseReason reason) {
logger.info("Session duration {}", duration);
}
}

if (connectCallback != null) {
logger.info("Reconnecting in 2 seconds");
connectCallback.run();
}
}

abstract void send(AgentMessage message);
Expand All @@ -103,13 +94,13 @@ public boolean isOpen() {
return session.isOpen();
}

public void setConnectCallback(Runnable connectCallback) {
this.connectCallback = connectCallback;
}

public void setCommandRequestCallback(Consumer<CommandRequest> callback) {
this.commandRequestCallback = callback;
}

public String getId() {
return session.getId();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2021, Seqera Labs.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* This Source Code Form is "Incompatible With Secondary Licenses", as
* defined by the Mozilla Public License, v. 2.0.
*/

package io.seqera.tower.agent.exceptions;

/**
* A recoverable exception is an exception that Tower Agent will log as
* an error, but it will keep running and retrying to connect.
*/
public class RecoverableException extends RuntimeException {

public RecoverableException() {
}

public RecoverableException(String message) {
super(message);
}

public RecoverableException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2021, Seqera Labs.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* This Source Code Form is "Incompatible With Secondary Licenses", as
* defined by the Mozilla Public License, v. 2.0.
*/

package io.seqera.tower.agent.exceptions;

/**
* An unrecoverable exception is an exception that Tower Agent will log as
* an error and cause it to exit with an exit code error.
*/
public class UnrecoverableException extends RuntimeException {

public UnrecoverableException() {
}

public UnrecoverableException(String message) {
super(message);
}

public UnrecoverableException(String message, Throwable cause) {
super(message, cause);
}
}