Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify server startup and dagger usage #1601

Merged
merged 2 commits into from
Nov 24, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Simplfiy server startup and dagger usage
This is a first step towards making the http server for grpc
replacable with Jetty.

Partial #1270
niloc132 committed Nov 23, 2021
commit c3ca14a730dd60418446d9c4248392227eac53af
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@
import io.deephaven.db.v2.utils.ProcessMemoryTracker;
import io.deephaven.db.v2.utils.UpdatePerformanceTracker;
import io.deephaven.grpc_api.appmode.ApplicationInjector;
import io.deephaven.grpc_api.appmode.ApplicationServiceGrpcImpl;
import io.deephaven.grpc_api.console.ConsoleServiceGrpcImpl;
import io.deephaven.grpc_api.log.LogInit;
import io.deephaven.grpc_api.session.SessionService;
@@ -15,6 +14,7 @@
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.grpc_api.uri.UriResolversInstance;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.process.ProcessEnvironment;
import io.deephaven.util.process.ShutdownManager;
import io.grpc.Server;
@@ -24,43 +24,15 @@
import java.util.concurrent.TimeUnit;

public class DeephavenApiServer {

private static final Logger log = LoggerFactory.getLogger(DeephavenApiServer.class);

public static void start(DeephavenApiServer server, SessionService sessionService)
throws IOException, ClassNotFoundException, InterruptedException {
// Stop accepting new gRPC requests.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.FIRST,
server.server::shutdown);

// Close outstanding sessions to give any gRPCs closure.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.MIDDLE,
sessionService::onShutdown);

// Finally wait for gRPC to exit now.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.LAST, () -> {
try {
if (!server.server.awaitTermination(10, TimeUnit.SECONDS)) {
log.error().append(
"The gRPC server did not terminate in a reasonable amount of time. Invoking shutdownNow().")
.endl();
server.server.shutdownNow();
}
} catch (final InterruptedException ignored) {
}
});

server.start();
server.blockUntilShutdown();
}

private final Server server;
private final LiveTableMonitor ltm;
private final LogInit logInit;
private final ConsoleServiceGrpcImpl consoleService;
private final ApplicationInjector applicationInjector;
private final ApplicationServiceGrpcImpl applicationService;
private final UriResolvers uriResolvers;
private final SessionService sessionService;

@Inject
public DeephavenApiServer(
@@ -69,22 +41,44 @@ public DeephavenApiServer(
final LogInit logInit,
final ConsoleServiceGrpcImpl consoleService,
final ApplicationInjector applicationInjector,
final ApplicationServiceGrpcImpl applicationService,
final UriResolvers uriResolvers) {
final UriResolvers uriResolvers,
final SessionService sessionService) {
this.server = server;
this.ltm = ltm;
this.logInit = logInit;
this.consoleService = consoleService;
this.applicationInjector = applicationInjector;
this.applicationService = applicationService;
this.uriResolvers = uriResolvers;
this.sessionService = sessionService;
}

@VisibleForTesting
public Server server() {
return server;
}

public void start() throws IOException, ClassNotFoundException {
public void start() throws IOException, ClassNotFoundException, InterruptedException {
// Stop accepting new gRPC requests.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.FIRST,
server::shutdown);

// Close outstanding sessions to give any gRPCs closure.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.MIDDLE,
sessionService::onShutdown);

// Finally wait for gRPC to exit now.
ProcessEnvironment.getGlobalShutdownManager().registerTask(ShutdownManager.OrderingCategory.LAST, () -> {
try {
if (!server.awaitTermination(10, TimeUnit.SECONDS)) {
log.error().append(
"The gRPC server did not terminate in a reasonable amount of time. Invoking shutdownNow().")
.endl();
server.shutdownNow();
}
} catch (final InterruptedException ignored) {
}
});

log.info().append("Configuring logging...").endl();
logInit.run();

@@ -103,26 +97,22 @@ public void start() throws IOException, ClassNotFoundException {
UpdatePerformanceTracker.start();
ProcessMemoryTracker.start();

for (UriResolver resolver : uriResolvers.resolvers()) {
log.debug().append("Found table resolver ").append(resolver.getClass().toString()).endl();
}
UriResolversInstance.init(uriResolvers);

// inject applications before we start the gRPC server
applicationInjector.run();

{
for (UriResolver resolver : uriResolvers.resolvers()) {
log.info().append("Found table resolver ").append(resolver.getClass().toString()).endl();
}
UriResolversInstance.init(uriResolvers);
}

log.info().append("Starting server...").endl();
server.start();
server.awaitTermination();
}

void startForUnitTests() throws IOException {
log.info().append("Starting server...").endl();
server.start();
}

private void blockUntilShutdown() throws InterruptedException {
server.awaitTermination();
}
}
Original file line number Diff line number Diff line change
@@ -2,19 +2,11 @@

import dagger.BindsInstance;
import dagger.Component;
import io.deephaven.grpc_api.appmode.AppMode;
import io.deephaven.grpc_api.healthcheck.HealthCheckModule;
import io.deephaven.grpc_api.session.SessionService;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.process.ProcessEnvironment;
import io.deephaven.util.process.ShutdownManager;

import javax.inject.Named;
import javax.inject.Singleton;
import java.io.IOException;
import java.io.PrintStream;
import java.util.concurrent.TimeUnit;

@Singleton
@Component(modules = {
@@ -24,16 +16,12 @@
})
public interface DeephavenApiServerComponent {

@Singleton
DeephavenApiServer getServer();

@Singleton
SessionService getSessionService();

@Component.Builder
interface Builder {
@BindsInstance
Builder withPort(@Named("grpc.port") int port);
Builder withPort(@Named("http.port") int port);

@BindsInstance
Builder withSchedulerPoolSize(@Named("scheduler.poolSize") int numThreads);
@@ -47,25 +35,6 @@ interface Builder {
@BindsInstance
Builder withErr(@Named("err") PrintStream err);

@BindsInstance
Builder withAppMode(AppMode appMode);

DeephavenApiServerComponent build();
}

static void startMain(PrintStream out, PrintStream err)
throws IOException, InterruptedException, ClassNotFoundException {
final DeephavenApiServerComponent injector = DaggerDeephavenApiServerComponent
.builder()
.withPort(8080)
.withSchedulerPoolSize(4)
.withSessionTokenExpireTmMs(300000) // defaults to 5 min
.withOut(out)
.withErr(err)
.withAppMode(AppMode.currentMode())
.build();
final DeephavenApiServer server = injector.getServer();
final SessionService sessionService = injector.getSessionService();
DeephavenApiServer.start(server, sessionService);
}
}
Original file line number Diff line number Diff line change
@@ -2,19 +2,11 @@

import dagger.BindsInstance;
import dagger.Component;
import io.deephaven.grpc_api.appmode.AppMode;
import io.deephaven.grpc_api.session.SessionService;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.process.ProcessEnvironment;
import io.deephaven.util.process.ShutdownManager;
import io.grpc.ManagedChannelBuilder;

import javax.inject.Named;
import javax.inject.Singleton;
import java.io.IOException;
import java.io.PrintStream;
import java.util.concurrent.TimeUnit;

@Singleton
@Component(modules = {
@@ -23,12 +15,8 @@
})
public interface DeephavenApiServerInProcessComponent {

@Singleton
DeephavenApiServer getServer();

@Singleton
SessionService getSessionService();

ManagedChannelBuilder<?> channelBuilder();

@Component.Builder
@@ -46,9 +34,6 @@ interface Builder {
@BindsInstance
Builder withErr(@Named("err") PrintStream err);

@BindsInstance
Builder withAppMode(AppMode appMode);

DeephavenApiServerInProcessComponent build();
}
}
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
import dagger.multibindings.ElementsIntoSet;
import io.deephaven.db.tables.live.LiveTableMonitor;
import io.deephaven.db.v2.sources.chunk.util.pools.MultiChunkPool;
import io.deephaven.grpc_api.appmode.AppMode;
import io.deephaven.grpc_api.appmode.AppModeModule;
import io.deephaven.grpc_api.arrow.ArrowModule;
import io.deephaven.grpc_api.auth.AuthContextModule;
@@ -83,6 +84,12 @@ static Set<ServerInterceptor> primeInterceptors() {
return Collections.emptySet();
}

@Provides
@Singleton
public static AppMode provideAppMode() {
return AppMode.currentMode();
}

@Provides
@Singleton
public static Scheduler provideScheduler(final @Named("scheduler.poolSize") int poolSize) {
16 changes: 15 additions & 1 deletion grpc-api/src/main/java/io/deephaven/grpc_api/runner/Main.java
Original file line number Diff line number Diff line change
@@ -44,6 +44,20 @@ public static void main(String[] args) throws IOException, InterruptedException,
ProcessEnvironment.basicInteractiveProcessInitialization(config, Main.class.getName(), log);
Thread.setDefaultUncaughtExceptionHandler(processEnvironment.getFatalErrorReporter());

DeephavenApiServerComponent.startMain(PrintStreamGlobals.getOut(), PrintStreamGlobals.getErr());
// defaults to 5 minutes
int httpSessionExpireMs = config.getIntegerWithDefault("http.session.durationMs", 300000);
int httpPort = config.getIntegerWithDefault("http.port", 8080);
int schedulerPoolSize = config.getIntegerWithDefault("scheduler.poolSize", 4);

DaggerDeephavenApiServerComponent
.builder()
.withPort(httpPort)
.withSchedulerPoolSize(schedulerPoolSize)
.withSessionTokenExpireTmMs(httpSessionExpireMs)
.withOut(PrintStreamGlobals.getOut())
.withErr(PrintStreamGlobals.getErr())
.build()
.getServer()
.start();
}
}
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@
public class ServerBuilderModule {

@Provides
static ServerBuilder<?> serverBuilder(final @Named("grpc.port") int port) {
static ServerBuilder<?> serverBuilder(final @Named("http.port") int port) {
return ServerBuilder.forPort(port);
}
}
Original file line number Diff line number Diff line change
@@ -44,7 +44,6 @@ public void setUp() throws Exception {
.withSessionTokenExpireTmMs(sessionTokenExpireTmMs())
.withOut(System.out)
.withErr(System.err)
.withAppMode(AppMode.API_ONLY)
.build();

server = serverComponent.getServer();