Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the base FlightProducer for getStream API #17446

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

rishabhmaurya
Copy link
Contributor

@rishabhmaurya rishabhmaurya commented Feb 24, 2025

Description

With recently added Arrow SPIs #16691 and flight server support #16962, this is a follow up change which introduces -

  • implementation of getStream() and getFlightInfo() APIs. It also adds support for default StreamManager, FlightProducer, FlightStreamReader which these APIs make use of.
  • adds way for consumers of StreamManagerPlugin to get instance of default StreamManager.
  • adds ProxyStreamProvider which acts as forward proxy for FlightStream. This is useful when stream is not present locally and needs to be fetched from a different node in the cluster.

Related Issues

Resolves #17065

Check List

  • Functionality includes testing.
    - [ ] API changes companion pull request created, if applicable.
    - [ ] Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions github-actions bot added enhancement Enhancement or improvement to existing feature or request Search:Performance labels Feb 24, 2025
@rishabhmaurya rishabhmaurya self-assigned this Feb 24, 2025
Copy link
Contributor

✅ Gradle check result for 7738286: SUCCESS

Copy link

codecov bot commented Feb 25, 2025

Codecov Report

Attention: Patch coverage is 57.04225% with 122 lines in your changes missing coverage. Please review.

Project coverage is 72.42%. Comparing base (e7ac072) to head (7738286).
Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
...ensearch/arrow/flight/impl/BaseFlightProducer.java 47.12% 42 Missing and 4 partials ⚠️
...nsearch/arrow/flight/impl/FlightStreamManager.java 45.45% 29 Missing and 1 partial ⚠️
server/src/main/java/org/opensearch/node/Node.java 5.55% 16 Missing and 1 partial ⚠️
...ensearch/arrow/flight/impl/FlightStreamTicket.java 70.45% 4 Missing and 9 partials ⚠️
...ch/arrow/flight/bootstrap/FlightClientManager.java 75.00% 5 Missing and 3 partials ⚠️
...ch/arrow/flight/impl/BaseBackpressureStrategy.java 70.00% 1 Missing and 2 partials ⚠️
...h/arrow/flight/impl/FlightStreamTicketFactory.java 66.66% 2 Missing ⚠️
...va/org/opensearch/plugins/StreamManagerPlugin.java 0.00% 2 Missing ⚠️
...nsearch/arrow/flight/impl/ProxyStreamProducer.java 95.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #17446      +/-   ##
============================================
+ Coverage     72.41%   72.42%   +0.01%     
- Complexity    65687    65718      +31     
============================================
  Files          5303     5311       +8     
  Lines        304793   305049     +256     
  Branches      44202    44239      +37     
============================================
+ Hits         220702   220926     +224     
- Misses        66025    66034       +9     
- Partials      18066    18089      +23     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

public class ArrowFlightServerIT extends OpenSearchIntegTestCase {

private FlightClientManager flightClientManager;

@BeforeClass
public static void setupFeatureFlags() {
FeatureFlagSetter.set(FeatureFlags.ARROW_STREAMS_SETTING.getKey());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for OpenSearchIntegTestCase you can override featureFlagSettings() to provide the flags as node level settings that won't mess with system properties like FeatureFlagSetter:
example:

    @Override
    protected Settings featureFlagSettings() {
        return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags. ARROW_STREAMS_SETTING, Boolean.TRUE).build();
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, let me try it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this way doesn't work for integ tests, also it seems like FeatureFlagSetter is purpose built for it.

infoBuilder = FlightInfo.builder(
streamProducerHolder.get().getRoot().getSchema(),
descriptor,
Collections.singletonList(endpoint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious what your thoughts on providing additional endpoints via the StreamProducer to consolidate to a single ticket at a coordinator? In a poc I've been working on with DataFusion I am using this to specify partitions (each shard ticket) from FlightInfo and only hand a single ticket to the engine and then it calls getStream for each partition. I could see this also helping with join case where we get a ticket per index or further consolidating to a single ticket to pass back to a client.

Copy link
Contributor Author

@rishabhmaurya rishabhmaurya Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mch2 taking a look

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requirement looks valid. So far, StreamTicket represents a unit of work, thus each ticket is associated with single StreamProducer. However, this unit of work could be partitioned, I would like to solve it by creating abstraction at StreamTicket itself -
we can create something like PartitionedStreamTicket, which will internally contain list of tickets.
Its StreamProducer implementation in java can have a parallel execution of streams when getStream() is called on it (you may not use it though in data fusion).
For getFlightInfo() it can just return List of endpoints, as your POC code.

Let me know what do you think? created - #17493

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable, but you'd need a worker/coordinator to create that single ticket after receiving each partition's (data node) tickets post query phase. At least in the data node case you wouldn't know those tickets beforehand.

);
return;
}
StreamProducer<VectorSchemaRoot, BufferAllocator> proxyProvider = new ProxyStreamProducer(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am curious when this would happen given getStream is typically invoked after a getFlightInfo call that vends the endpoint? Or is the intent to skip that step and allow hitting any node with a ticket and stream a reply? Would that not add latency & load on an additional data node thats vending through the proxy that we could otherwise avoid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mch2 for communication within cluster nodes, i think you're right as we should not make use of ProxyStreamProducer. This will be useful when clients directly want to connect via load balancer and do not have access to the nodes directly, so any node can behave as a coordinator and then it should know where to route.
For cases, where data nodes are directly accessible, this intelligence should be in the client to call getFlightInfo and send the request directly to the node holding stream.

Copy link
Contributor

github-actions bot commented Mar 8, 2025

❌ Gradle check result for c711cf6: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

github-actions bot commented Mar 8, 2025

❌ Gradle check result for 2532e0f: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

github-actions bot commented Mar 8, 2025

❌ Gradle check result for 44423f1: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@@ -88,6 +88,7 @@ internalClusterTest {
systemProperty 'io.netty.noUnsafe', 'false'
systemProperty 'io.netty.tryUnsafe', 'true'
systemProperty 'io.netty.tryReflectionSetAccessible', 'true'
jvmArgs += ["--add-opens", "java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"]
Copy link
Collaborator

@reta reta Mar 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should go to jvm.options as well:

21-:--add-opens java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED

@@ -99,7 +100,19 @@ public FlightClientManager(
* @return An OpenSearchFlightClient instance for the specified node
*/
public Optional<FlightClient> getFlightClient(String nodeId) {
return Optional.ofNullable(flightClients.get(nodeId));
ClientHolder clientHolder = flightClients.getOrDefault(nodeId, null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will always get null when key is not in the map:

Suggested change
ClientHolder clientHolder = flightClients.getOrDefault(nodeId, null);
ClientHolder clientHolder = flightClients.get(nodeId);

@@ -99,7 +100,19 @@ public FlightClientManager(
* @return An OpenSearchFlightClient instance for the specified node
*/
public Optional<FlightClient> getFlightClient(String nodeId) {
return Optional.ofNullable(flightClients.get(nodeId));
ClientHolder clientHolder = flightClients.getOrDefault(nodeId, null);
return clientHolder == null ? Optional.empty() : Optional.ofNullable(clientHolder.flightClient);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the flightClient should never be null, right?

Suggested change
return clientHolder == null ? Optional.empty() : Optional.ofNullable(clientHolder.flightClient);
return clientHolder == null ? Optional.empty() : Optional.of(clientHolder.flightClient);

* @param nodeId The ID of the node for which to retrieve the location
* @return The Location of the Flight client for the specified node
*/
public Location getFlightClientLocation(String nodeId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, use Optional here as well:

Suggested change
public Location getFlightClientLocation(String nodeId) {
public Optional<Location> getFlightClientLocation(String nodeId) {

* @return The Location of the Flight client for the specified node
*/
public Location getFlightClientLocation(String nodeId) {
ClientHolder clientHolder = flightClients.getOrDefault(nodeId, null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ClientHolder clientHolder = flightClients.getOrDefault(nodeId, null);
ClientHolder clientHolder = flightClients.get(nodeId);

future.completeExceptionally(new IllegalStateException("No Flight info received for node: [" + nodeId + "]"));
}
}
client.execute(NodesFlightInfoAction.INSTANCE, request, new ActionListener<>() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we removed try from here?

@@ -184,13 +200,16 @@ private DiscoveryNode getNodeFromClusterState(String nodeId) {
*/
@Override
public void close() throws Exception {
for (FlightClient flightClient : flightClients.values()) {
flightClient.close();
closed = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (closed) {
    return;
}

@@ -229,7 +248,7 @@ private Set<String> getCurrentClusterNodes() {
}

@VisibleForTesting
Map<String, FlightClient> getFlightClients() {
Map<String, ClientHolder> getFlightClients() {
Copy link
Collaborator

@reta reta Mar 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You already have getClients that does the same?

@@ -165,6 +166,7 @@ protected void doClose() {
}

private void initializeStreamManager(FlightClientManager clientManager) {
streamManager = null;
streamManager = new FlightStreamManager(() -> allocator);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change type of the field: StreamManager streamManager -> FlightStreamManager streamManager, no need to typecast all the time since the streamManager is created here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Search:Performance skip-changelog
Projects
Status: In Progress
Status: 🏗 In progress
Development

Successfully merging this pull request may close these issues.

[Feature Request] Base Flight producer for FlightServer
3 participants