Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify server startup and dagger usage #1601

Merged
merged 2 commits into from
Nov 24, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@
import io.deephaven.db.v2.utils.ProcessMemoryTracker;
import io.deephaven.db.v2.utils.UpdatePerformanceTracker;
import io.deephaven.grpc_api.appmode.ApplicationInjector;
import io.deephaven.grpc_api.appmode.ApplicationServiceGrpcImpl;
import io.deephaven.grpc_api.console.ConsoleServiceGrpcImpl;
import io.deephaven.grpc_api.log.LogInit;
import io.deephaven.grpc_api.session.SessionService;
@@ -15,6 +14,7 @@
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.grpc_api.uri.UriResolversInstance;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.process.ProcessEnvironment;
import io.deephaven.util.process.ShutdownManager;
import io.grpc.Server;
@@ -23,44 +23,20 @@
import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* Entrypoint for the Deephaven gRPC server, starting the various engine and script components, running any specified
* application, and enabling the gRPC endpoints to be accessed by consumers.
*/
public class DeephavenApiServer {

private static final Logger log = LoggerFactory.getLogger(DeephavenApiServer.class);

public static void start(DeephavenApiServer server, SessionService sessionService)
throws IOException, ClassNotFoundException, InterruptedException {
// Stop accepting new gRPC requests.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.FIRST,
server.server::shutdown);

// Close outstanding sessions to give any gRPCs closure.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.MIDDLE,
sessionService::onShutdown);

// Finally wait for gRPC to exit now.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.LAST, () -> {
try {
if (!server.server.awaitTermination(10, TimeUnit.SECONDS)) {
log.error().append(
"The gRPC server did not terminate in a reasonable amount of time. Invoking shutdownNow().")
.endl();
server.server.shutdownNow();
}
} catch (final InterruptedException ignored) {
}
});

server.start();
server.blockUntilShutdown();
}

private final Server server;
private final LiveTableMonitor ltm;
private final LogInit logInit;
private final ConsoleServiceGrpcImpl consoleService;
private final ApplicationInjector applicationInjector;
private final ApplicationServiceGrpcImpl applicationService;
private final UriResolvers uriResolvers;
private final SessionService sessionService;

@Inject
public DeephavenApiServer(
@@ -69,22 +45,54 @@ public DeephavenApiServer(
final LogInit logInit,
final ConsoleServiceGrpcImpl consoleService,
final ApplicationInjector applicationInjector,
final ApplicationServiceGrpcImpl applicationService,
final UriResolvers uriResolvers) {
final UriResolvers uriResolvers,
final SessionService sessionService) {
this.server = server;
this.ltm = ltm;
this.logInit = logInit;
this.consoleService = consoleService;
this.applicationInjector = applicationInjector;
this.applicationService = applicationService;
this.uriResolvers = uriResolvers;
this.sessionService = sessionService;
}

@VisibleForTesting
public Server server() {
return server;
}

public void start() throws IOException, ClassNotFoundException {
/**
* Starts the various server components, and blocks until the gRPC server has shut down. That shutdown is mediated
* by the ShutdownManager, and will call the gRPC server to shut it down when the process is itself shutting down.
* Only once that is complete will this method return.
*
* @throws IOException thrown in event of an error with logging, finding and running an application, and starting
* the gRPC service.
* @throws ClassNotFoundException thrown if a class can't be found while finding and running an application.
* @throws InterruptedException thrown if this thread is interrupted while blocking for the server to halt.
*/
public void run() throws IOException, ClassNotFoundException, InterruptedException {
// Stop accepting new gRPC requests.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.FIRST,
server::shutdown);

// Close outstanding sessions to give any gRPCs closure.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.MIDDLE,
sessionService::onShutdown);

// Finally wait for gRPC to exit now.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.LAST, () -> {
try {
if (!server.awaitTermination(10, TimeUnit.SECONDS)) {
log.error().append(
"The gRPC server did not terminate in a reasonable amount of time. Invoking shutdownNow().")
.endl();
server.shutdownNow();
}
} catch (final InterruptedException ignored) {
}
});

log.info().append("Configuring logging...").endl();
logInit.run();

@@ -103,26 +111,22 @@ public void start() throws IOException, ClassNotFoundException {
UpdatePerformanceTracker.start();
ProcessMemoryTracker.start();

for (UriResolver resolver : uriResolvers.resolvers()) {
log.debug().append("Found table resolver ").append(resolver.getClass().toString()).endl();
}
UriResolversInstance.init(uriResolvers);

// inject applications before we start the gRPC server
applicationInjector.run();

{
for (UriResolver resolver : uriResolvers.resolvers()) {
log.info().append("Found table resolver ").append(resolver.getClass().toString()).endl();
}
UriResolversInstance.init(uriResolvers);
}

log.info().append("Starting server...").endl();
server.start();
server.awaitTermination();
}

void startForUnitTests() throws IOException {
log.info().append("Starting server...").endl();
server.start();
}

private void blockUntilShutdown() throws InterruptedException {
server.awaitTermination();
}
}
Original file line number Diff line number Diff line change
@@ -2,19 +2,11 @@

import dagger.BindsInstance;
import dagger.Component;
import io.deephaven.grpc_api.appmode.AppMode;
import io.deephaven.grpc_api.healthcheck.HealthCheckModule;
import io.deephaven.grpc_api.session.SessionService;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.process.ProcessEnvironment;
import io.deephaven.util.process.ShutdownManager;

import javax.inject.Named;
import javax.inject.Singleton;
import java.io.IOException;
import java.io.PrintStream;
import java.util.concurrent.TimeUnit;

@Singleton
@Component(modules = {
@@ -24,16 +16,12 @@
})
public interface DeephavenApiServerComponent {

@Singleton
DeephavenApiServer getServer();

@Singleton
SessionService getSessionService();

@Component.Builder
interface Builder {
@BindsInstance
Builder withPort(@Named("grpc.port") int port);
Builder withPort(@Named("http.port") int port);

@BindsInstance
Builder withSchedulerPoolSize(@Named("scheduler.poolSize") int numThreads);
@@ -47,25 +35,6 @@ interface Builder {
@BindsInstance
Builder withErr(@Named("err") PrintStream err);

@BindsInstance
Builder withAppMode(AppMode appMode);

DeephavenApiServerComponent build();
}

static void startMain(PrintStream out, PrintStream err)
throws IOException, InterruptedException, ClassNotFoundException {
final DeephavenApiServerComponent injector = DaggerDeephavenApiServerComponent
.builder()
.withPort(8080)
.withSchedulerPoolSize(4)
.withSessionTokenExpireTmMs(300000) // defaults to 5 min
.withOut(out)
.withErr(err)
.withAppMode(AppMode.currentMode())
.build();
final DeephavenApiServer server = injector.getServer();
final SessionService sessionService = injector.getSessionService();
DeephavenApiServer.start(server, sessionService);
}
}
Original file line number Diff line number Diff line change
@@ -2,19 +2,11 @@

import dagger.BindsInstance;
import dagger.Component;
import io.deephaven.grpc_api.appmode.AppMode;
import io.deephaven.grpc_api.session.SessionService;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.process.ProcessEnvironment;
import io.deephaven.util.process.ShutdownManager;
import io.grpc.ManagedChannelBuilder;

import javax.inject.Named;
import javax.inject.Singleton;
import java.io.IOException;
import java.io.PrintStream;
import java.util.concurrent.TimeUnit;

@Singleton
@Component(modules = {
@@ -23,12 +15,8 @@
})
public interface DeephavenApiServerInProcessComponent {

@Singleton
DeephavenApiServer getServer();

@Singleton
SessionService getSessionService();

ManagedChannelBuilder<?> channelBuilder();

@Component.Builder
@@ -46,9 +34,6 @@ interface Builder {
@BindsInstance
Builder withErr(@Named("err") PrintStream err);

@BindsInstance
Builder withAppMode(AppMode appMode);

DeephavenApiServerInProcessComponent build();
}
}
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
import dagger.multibindings.ElementsIntoSet;
import io.deephaven.db.tables.live.LiveTableMonitor;
import io.deephaven.db.v2.sources.chunk.util.pools.MultiChunkPool;
import io.deephaven.grpc_api.appmode.AppMode;
import io.deephaven.grpc_api.appmode.AppModeModule;
import io.deephaven.grpc_api.arrow.ArrowModule;
import io.deephaven.grpc_api.auth.AuthContextModule;
@@ -83,6 +84,12 @@ static Set<ServerInterceptor> primeInterceptors() {
return Collections.emptySet();
}

@Provides
@Singleton
public static AppMode provideAppMode() {
return AppMode.currentMode();
}

@Provides
@Singleton
public static Scheduler provideScheduler(final @Named("scheduler.poolSize") int poolSize) {
16 changes: 15 additions & 1 deletion grpc-api/src/main/java/io/deephaven/grpc_api/runner/Main.java
Original file line number Diff line number Diff line change
@@ -44,6 +44,20 @@ public static void main(String[] args) throws IOException, InterruptedException,
ProcessEnvironment.basicInteractiveProcessInitialization(config, Main.class.getName(), log);
Thread.setDefaultUncaughtExceptionHandler(processEnvironment.getFatalErrorReporter());

DeephavenApiServerComponent.startMain(PrintStreamGlobals.getOut(), PrintStreamGlobals.getErr());
// defaults to 5 minutes
int httpSessionExpireMs = config.getIntegerWithDefault("http.session.durationMs", 300000);
int httpPort = config.getIntegerWithDefault("http.port", 8080);
int schedulerPoolSize = config.getIntegerWithDefault("scheduler.poolSize", 4);

DaggerDeephavenApiServerComponent
.builder()
.withPort(httpPort)
.withSchedulerPoolSize(schedulerPoolSize)
.withSessionTokenExpireTmMs(httpSessionExpireMs)
.withOut(PrintStreamGlobals.getOut())
.withErr(PrintStreamGlobals.getErr())
.build()
.getServer()
.run();
}
}
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@
public class ServerBuilderModule {

@Provides
static ServerBuilder<?> serverBuilder(final @Named("grpc.port") int port) {
static ServerBuilder<?> serverBuilder(final @Named("http.port") int port) {
return ServerBuilder.forPort(port);
}
}
Original file line number Diff line number Diff line change
@@ -44,7 +44,6 @@ public void setUp() throws Exception {
.withSessionTokenExpireTmMs(sessionTokenExpireTmMs())
.withOut(System.out)
.withErr(System.err)
.withAppMode(AppMode.API_ONLY)
.build();

server = serverComponent.getServer();