Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transfer() method in ParallelSink class does not return CompletableFuture in case of Kafka/other streams of data #4330

Closed
anoojkrishnan24 opened this issue Jul 4, 2024 · 3 comments · Fixed by #4333
Assignees
Labels
bug Something isn't working triage all new issues awaiting classification

Comments

@anoojkrishnan24
Copy link

Bug Report

transfer() method in ParallelSink class is blocking in nature when using kafka/other streams extension

Describe the Bug

When initiating kafka/other queued data transfers, after successful negotiation, the data transfer occurs smoothly but the transfer() method in ParallelSink class does not return CompletableFuture object. In this scenario, if any new data transfer is initiated, the data-plane is too busy and the second transfer is never initiated.

There seems to be no issue when 2 non-kafka/non-queued data transfers are initiated simultaneously (second transfer initiated before completion of first transfer).

Expected Behavior

Initiating a 2nd data transfer, after the 1st data transfer being a kafka/other queued data transfer, should allow the second transfer to continue smoothly.

Observed Behavior

Kafka/other queued data transfers are blocking in nature and do not return CompletableFuture. Thus allowing only 1 queued transfer at a time.

Steps to Reproduce

  1. Initiate a transfer of type Kafka
  2. inititate any data transfer. (this doesn't happen)

A unit test has also been written at https://github.com/anoojkrishnan24/Connector/blob/main/core/data-plane/data-plane-util/src/test/java/org/eclipse/edc/connector/dataplane/util/sink/QueuedParallelSinkTest.java

This unit test shows that the code doesn't proceed after the call of transfer.

Context Information

  • Tried on main branch of code and on versions till v0.7.1
  • OS: [Mac, Windows, Docker]

Detailed Description

Test ignored as transfer call is blocking

image

Possible Implementation

The KafkaDataSource creates Multiple parts as shown below:

image

and the transfer() in ParallelSink class looks for a never ending amount of Parts and therefore is stuck awaiting for more Parts.

image

Therefore, either transfer() method in ParallelSink has to be overridden for KafkaDataSource or modified for infinite amount of parts.

@github-actions github-actions bot added the triage all new issues awaiting classification label Jul 4, 2024
Copy link

github-actions bot commented Jul 4, 2024

Thanks for your contribution 🔥 We will take a look asap 🚀

@ndr-brt
Copy link
Member

ndr-brt commented Jul 4, 2024

Hey, thanks for reporting, indeed, that collection at line 61 in the ParallelSink is blocking the process, this is not a problem only for streaming transfers but also for multi-files long-running transfers.

I think the solution it's pretty straightforward, no need to override transfer: as its signature is telling us by returning a future, it should not block the current thread, that could be done by executing asynchronously.

@ndr-brt ndr-brt self-assigned this Jul 4, 2024
@ndr-brt ndr-brt added bug Something isn't working and removed triage all new issues awaiting classification labels Jul 4, 2024
@github-actions github-actions bot added the triage all new issues awaiting classification label Jul 4, 2024
@anoojkrishnan24
Copy link
Author

anoojkrishnan24 commented Jul 4, 2024

Apologies. I accidently closed it while trying to comment. I haven't tried multi-file transfer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage all new issues awaiting classification
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants