Skip to content

Commit 1c32832

Browse files
authored
feat: DH-18888 Support Connector Plugin (#396)
1 parent 5311a69 commit 1c32832

File tree

9 files changed

+159
-53
lines changed

9 files changed

+159
-53
lines changed

pom.xml

+15-15
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@
7878
<artifactId>maven-compiler-plugin</artifactId>
7979
<version>3.13.0</version>
8080
<configuration>
81-
<source>21</source>
82-
<target>21</target>
81+
<source>17</source>
82+
<target>17</target>
8383
</configuration>
8484
</plugin>
8585
<plugin>
@@ -146,7 +146,7 @@
146146
<plugin>
147147
<groupId>org.apache.maven.plugins</groupId>
148148
<artifactId>maven-javadoc-plugin</artifactId>
149-
<version>3.10.1</version>
149+
<version>3.11.2</version>
150150
<configuration>
151151
<excludePackageNames>*.connect,*.controller,*.generator,*.jfr,*.metric,*.run,*.util</excludePackageNames>
152152
<show>public</show>
@@ -167,7 +167,7 @@
167167
'core.autocrlf' to true -->
168168
<groupId>com.diffplug.spotless</groupId>
169169
<artifactId>spotless-maven-plugin</artifactId>
170-
<version>2.43.0</version>
170+
<version>2.44.2</version>
171171
<configuration>
172172
<java>
173173
<includes>
@@ -193,7 +193,7 @@
193193
<plugin>
194194
<groupId>org.apache.maven.plugins</groupId>
195195
<artifactId>maven-surefire-plugin</artifactId>
196-
<version>3.5.1</version>
196+
<version>3.5.2</version>
197197
<configuration>
198198
<excludes>
199199
<exclude>/io/deephaven/benchmark/tests/**/*Test</exclude>
@@ -203,14 +203,14 @@
203203
<dependency>
204204
<groupId>org.junit.jupiter</groupId>
205205
<artifactId>junit-jupiter-engine</artifactId>
206-
<version>5.9.2</version>
206+
<version>5.9.3</version>
207207
</dependency>
208208
</dependencies>
209209
</plugin>
210210
<plugin>
211211
<groupId>org.apache.maven.plugins</groupId>
212212
<artifactId>maven-failsafe-plugin</artifactId>
213-
<version>3.5.1</version>
213+
<version>3.5.2</version>
214214
<configuration>
215215
<forkCount>0</forkCount>
216216
<includes>
@@ -221,7 +221,7 @@
221221
<dependency>
222222
<groupId>org.junit.jupiter</groupId>
223223
<artifactId>junit-jupiter-engine</artifactId>
224-
<version>5.9.2</version>
224+
<version>5.9.3</version>
225225
</dependency>
226226
</dependencies>
227227
<executions>
@@ -247,39 +247,39 @@
247247
<dependency>
248248
<groupId>io.netty</groupId>
249249
<artifactId>netty-all</artifactId>
250-
<version>4.1.100.Final</version>
250+
<version>4.1.114.Final</version>
251251
</dependency>
252252
<!-- Added because of conflicts -->
253253
<dependency>
254254
<groupId>io.grpc</groupId>
255255
<artifactId>grpc-all</artifactId>
256-
<version>1.58.0</version>
256+
<version>1.65.1</version>
257257
</dependency>
258258
<!-- Added because of conflicts -->
259259
<dependency>
260260
<groupId>com.google.protobuf</groupId>
261261
<artifactId>protobuf-java</artifactId>
262-
<version>3.25.3</version>
262+
<version>3.25.4</version>
263263
</dependency>
264264
<dependency>
265265
<groupId>io.confluent</groupId>
266266
<artifactId>kafka-avro-serializer</artifactId>
267-
<version>7.7.1</version>
267+
<version>7.9.0</version>
268268
</dependency>
269269
<dependency>
270270
<groupId>io.confluent</groupId>
271271
<artifactId>kafka-protobuf-serializer</artifactId>
272-
<version>7.7.1</version>
272+
<version>7.9.0</version>
273273
</dependency>
274274
<dependency>
275275
<groupId>io.deephaven</groupId>
276276
<artifactId>deephaven-java-client-barrage-dagger</artifactId>
277-
<version>0.36.1</version>
277+
<version>0.37.6</version>
278278
</dependency>
279279
<dependency>
280280
<groupId>io.deephaven</groupId>
281281
<artifactId>deephaven-log-to-slf4j</artifactId>
282-
<version>0.36.1</version>
282+
<version>0.37.6</version>
283283
</dependency>
284284
<dependency>
285285
<groupId>org.junit.platform</groupId>

src/main/java/io/deephaven/benchmark/api/BenchQuery.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1-
/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */
1+
/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */
22
package io.deephaven.benchmark.api;
33

44
import java.io.Closeable;
55
import java.util.LinkedHashMap;
66
import java.util.Map;
7+
import java.util.Properties;
78
import java.util.concurrent.Future;
89
import java.util.function.Consumer;
910
import java.util.function.Function;
10-
import io.deephaven.benchmark.connect.BarrageConnector;
11+
import io.deephaven.benchmark.connect.Connector;
12+
import io.deephaven.benchmark.connect.ConnectorFactory;
1113
import io.deephaven.benchmark.connect.ResultTable;
1214
import io.deephaven.benchmark.metric.Metrics;
1315
import io.deephaven.benchmark.util.Timer;
@@ -23,7 +25,8 @@ final public class BenchQuery implements Closeable {
2325
final QueryLog queryLog;
2426
final Map<String, Consumer<ResultTable>> snapshotFetchers = new LinkedHashMap<>();
2527
final Map<String, Function<ResultTable, Boolean>> tickingFetchers = new LinkedHashMap<>();
26-
private BarrageConnector session = null;
28+
final Properties props = new Properties();
29+
private Connector session = null;
2730

2831
BenchQuery(Bench bench, String logic, QueryLog queryLog) {
2932
this.bench = bench;
@@ -57,6 +60,16 @@ public BenchQuery fetchDuring(String table, Function<ResultTable, Boolean> table
5760
return this;
5861
}
5962

63+
/**
64+
* Add properties to be passed to the <code>Connector</code> used in the query
65+
*/
66+
public BenchQuery withProperty(String name, String value) {
67+
if (value != null && !value.isBlank()) {
68+
props.setProperty(name, value);
69+
}
70+
return this;
71+
}
72+
6073
/**
6174
* Execute the query logic through a session
6275
*/
@@ -96,8 +109,10 @@ public void close() {
96109
// Add function defs in separate query so if there are errors in the "logic" part, the line numbers match up
97110
private void executeBarrageQuery(String logic) {
98111
if (session == null) {
99-
String deephavenServer = bench.property("deephaven.addr", "localhost:10000");
100-
session = new BarrageConnector(deephavenServer);
112+
var connectorClass = bench.property("connector.class", "io.deephaven.benchmark.connect.BarrageConnector");
113+
var localProps = Bench.profile.getProperties();
114+
localProps.putAll(props);
115+
session = ConnectorFactory.create(connectorClass, localProps);
101116
}
102117
String snippetsLogic = Bench.profile.replaceProperties(Snippets.getFunctions(logic));
103118
if (!snippetsLogic.isBlank()) {

src/main/java/io/deephaven/benchmark/api/Profile.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */
1+
/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */
22
package io.deephaven.benchmark.api;
33

44
import java.io.InputStream;
@@ -13,19 +13,20 @@
1313
import io.deephaven.benchmark.util.Log;
1414

1515
/**
16-
* Represents properties for a the Benchmark API .profile file in addition to allowing retrieval of System and
17-
* environment properties.
16+
* Represents properties for the Benchmark API profile file in addition to allowing retrieval of System and environment
17+
* properties. The default property file (e.g. default.properties) for the profile can be overridden with
18+
* <code>System.setProperty("benchmark.profile.default", "my-properties-path")</code>.
1819
*/
1920
class Profile {
2021
final private URL url;
2122
final private Properties props;
2223

2324
/**
24-
* Initialize the profile from the supplied benchmark.profile property. That that property is absent, use the
25-
* default properties file.
25+
* Initialize the profile from the supplied "benchmark.profile" property. If that property is absent, use the
26+
* "benchmark.profile.default" property. If that property is absent, use the "default.properties" resource file.
2627
*/
2728
Profile() {
28-
this(System.getProperty("benchmark.profile", "default.properties"));
29+
this(System.getProperty("benchmark.profile", getProfileDefaultFile()));
2930
}
3031

3132
/**
@@ -218,4 +219,8 @@ private URL findProfileAsResource(Class<?> parentClass, String uri) {
218219
}
219220
}
220221

222+
static String getProfileDefaultFile() {
223+
return System.getProperty("benchmark.profile.default", "default.properties");
224+
}
225+
221226
}

src/main/java/io/deephaven/benchmark/connect/BarrageConnector.java

+12-10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */
1+
/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */
22
package io.deephaven.benchmark.connect;
33

44
import java.util.*;
@@ -30,7 +30,7 @@
3030
* The typical workflow will be initialize connection, execute query, fetch results, close. Note: This class is meant to
3131
* be used through the Bench api rather than directly.
3232
*/
33-
public class BarrageConnector implements AutoCloseable {
33+
class BarrageConnector implements Connector {
3434
static {
3535
System.setProperty("thread.initialization", ""); // Remove server side initializers (e.g. DebuggingInitializer)
3636
}
@@ -51,15 +51,17 @@ public class BarrageConnector implements AutoCloseable {
5151
*
5252
* @param hostPort a host and port string for connecting to a Deephaven worker (ex. localhost:10000)
5353
*/
54-
public BarrageConnector(String hostPort) {
55-
String[] split;
54+
BarrageConnector(Properties props) {
55+
var hostPort = props.getProperty("deephaven.addr", "localhost:10000");
56+
var userPass = props.getProperty("deephaven.auth", "");
57+
var host = hostPort.replaceAll(":.*", "");
58+
var port = hostPort.replaceAll(".*:", "");
59+
if (host.isEmpty() || port.isEmpty())
60+
throw new RuntimeException("Missing Connector host or port");
61+
if (!userPass.isBlank())
62+
System.out.println("Ignoring supplied User and Pass");
5663
try {
57-
split = hostPort.split(":");
58-
} catch (Exception ex) {
59-
throw new RuntimeException("Failed to parse connection hostPort: " + hostPort, ex);
60-
}
61-
try {
62-
this.channel = getManagedChannel(split[0], Integer.parseInt(split[1]));
64+
this.channel = getManagedChannel(host, Integer.parseInt(port));
6365
this.session = getSession(channel);
6466
this.console = session.session().console("python").get();
6567
} catch (Exception ex) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */
2+
package io.deephaven.benchmark.connect;
3+
4+
import java.util.Set;
5+
import java.util.concurrent.Future;
6+
import java.util.function.Consumer;
7+
import java.util.function.Function;
8+
import io.deephaven.benchmark.metric.Metrics;
9+
10+
/**
11+
* An Object that provides a connection to a service under test, execution of statements, and retrieval of results
12+
* through snapshots or live data. What this all means is up to the implementer, but it's execution is supported by
13+
* Benchmark API in {@link io.deephaven.benchmark.api.BenchQuery}.
14+
*/
15+
public interface Connector extends AutoCloseable {
16+
17+
/**
18+
* Execute a query or statement against the connected service
19+
*
20+
* @param query the statement to be executed
21+
*/
22+
public void executeQuery(String query);
23+
24+
/**
25+
* Get a list a variable or table names used in the query. This is optional and may return an empty set.
26+
*
27+
* @return a set of variable names or empty set
28+
*/
29+
public Set<String> getUsedVariableNames();
30+
31+
/**
32+
* Fetch data as a snapshot of the given table and return a result table according to the given consumer. This fetch
33+
* is made after the execution of the query.
34+
*
35+
* @param table the table to fetch data from
36+
* @param tableHandler a consumer to supplied to receive the fetched data
37+
* @return a future that waits until the fetch is complete
38+
*/
39+
public Future<Metrics> fetchSnapshotData(String table, Consumer<ResultTable> tableHandler);
40+
41+
/**
42+
* Fetch data for the given table as it is updated while the query is running. The given function is called at a
43+
* refresh rate determined by the <code>Connector</code> implementation.
44+
*
45+
* @param table the table to fetch data from
46+
* @param tableHandler a function to call on a cycle
47+
* @return a future that waits until the fetch initiates
48+
*/
49+
public Future<Metrics> fetchTickingData(String table, Function<ResultTable, Boolean> tableHandler);
50+
51+
/**
52+
* Close the connector and clean up resources
53+
*/
54+
public void close();
55+
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */
2+
package io.deephaven.benchmark.connect;
3+
4+
import java.util.Properties;
5+
6+
/**
7+
* Instantiate the <code>Connector</code> for the given full-qualified class name with the given properties.
8+
*/
9+
public class ConnectorFactory {
10+
static public Connector create(String connectorClassName, Properties props) {
11+
try {
12+
var myClass = Class.forName(connectorClassName);
13+
var constructor = myClass.getDeclaredConstructor(Properties.class);
14+
return (Connector) constructor.newInstance(props);
15+
} catch (Exception ex) {
16+
throw new RuntimeException("Failed to instantiate Connector: " + connectorClassName, ex);
17+
}
18+
}
19+
20+
}

src/main/java/io/deephaven/benchmark/util/Numbers.java

+19-16
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,21 @@ static public String formatBytesToGigs(Object val) {
8888
* @return the negated number or null if <code>val</code> was null
8989
*/
9090
static public Number negate(Object val) {
91-
if (val instanceof Number && ((Number) val).doubleValue() == 0.0)
91+
if (val == null || (val instanceof Number && ((Number) val).doubleValue() == 0.0))
9292
return (Number) val;
93-
return switch (val) {
94-
case Integer v -> -v.intValue();
95-
case Float v -> -v.floatValue();
96-
case Long v -> -v.longValue();
97-
case Double v -> -v.doubleValue();
98-
case Short v -> (short) (-v.shortValue());
99-
case Byte v -> (byte) (-v.byteValue());
100-
case null -> null;
101-
default -> throw new RuntimeException("Unsupported Type: " + val.getClass().getSimpleName());
102-
};
93+
if (val instanceof Integer)
94+
return -((Integer) val).intValue();
95+
if (val instanceof Float)
96+
return -((Float) val).floatValue();
97+
if (val instanceof Long)
98+
return -((Long) val).longValue();
99+
if (val instanceof Double)
100+
return -((Double) val).doubleValue();
101+
if (val instanceof Short)
102+
return (short) -((Short) val).shortValue();
103+
if (val instanceof Byte)
104+
return (byte) -((Byte) val).byteValue();
105+
throw new RuntimeException("Unsupported Type: " + val.getClass().getSimpleName());
103106
}
104107

105108
/**
@@ -111,11 +114,11 @@ static public Number negate(Object val) {
111114
* @return true if an even number, otherwise false
112115
*/
113116
static public boolean isEven(Object val) {
114-
return switch (val) {
115-
case Number v -> v.longValue() % 2 == 0;
116-
case null -> false;
117-
default -> throw new RuntimeException("Unsupported Type: " + val.getClass().getSimpleName());
118-
};
117+
if (val == null)
118+
return false;
119+
if (val instanceof Number)
120+
return ((Number) val).longValue() % 2 == 0;
121+
throw new RuntimeException("Unsupported Type: " + val.getClass().getSimpleName());
119122
}
120123

121124
/**

src/main/resources/io/deephaven/benchmark/run/profile/default.properties

+6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
11

2+
# The fully-qualified class name of the connector used in the tests
3+
connector.class=
4+
5+
# Description of the authentication to use (e.g. user:pass)
6+
deephaven.auth=
7+
28
# Deephaven engine address (same one the UI uses)
39
deephaven.addr=localhost:10000
410

src/test/java/io/deephaven/benchmark/util/NumbersTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
package io.deephaven.benchmark.util;
33

44
import static org.junit.jupiter.api.Assertions.*;
5-
import java.math.BigInteger;
65
import org.junit.jupiter.api.*;
76

87
public class NumbersTest {

0 commit comments

Comments
 (0)