-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Indexer-Grpc-V2] Add FileStoreUploader. #15724
Conversation
⏱️ 11m total CI duration on this PR
|
pub(crate) async fn start(&mut self, data_manager: Arc<DataManager>) -> Result<()> { | ||
let (version, batch_metadata) = self.recover().await?; | ||
|
||
let mut file_store_operator = FileStoreOperatorV2::new( | ||
MAX_SIZE_PER_FILE, | ||
NUM_TXNS_PER_FOLDER, | ||
version, | ||
batch_metadata, | ||
) | ||
.await; | ||
tokio_scoped::scope(|s| { | ||
let (tx, mut rx) = channel(5); | ||
s.spawn(async move { | ||
while let Some((transactions, batch_metadata, end_batch)) = rx.recv().await { | ||
self.do_upload(transactions, batch_metadata, end_batch) | ||
.await | ||
.unwrap(); | ||
} | ||
}); | ||
s.spawn(async move { | ||
loop { | ||
let transactions = data_manager | ||
.get_transactions_from_cache( | ||
file_store_operator.version(), | ||
MAX_SIZE_PER_FILE, | ||
/*update_file_store_version=*/ true, | ||
) | ||
.await; | ||
let len = transactions.len(); | ||
for transaction in transactions { | ||
file_store_operator | ||
.buffer_and_maybe_dump_transactions_to_file(transaction, tx.clone()) | ||
.await | ||
.unwrap(); | ||
} | ||
if len == 0 { | ||
tokio::time::sleep(Duration::from_millis(100)).await; | ||
} | ||
} | ||
}); | ||
}); | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The start()
method spawns infinite tasks within tokio_scoped::scope
but returns immediately, which causes the tasks to be terminated prematurely. Since these tasks need to run continuously, they should either:
- Use
tokio::spawn
instead of scoped tasks, or - Have the method wait for a shutdown signal before returning
This will ensure the background processing continues running as intended rather than being cleaned up when the scope exits.
Spotted by Graphite Reviewer
Is this helpful? React 👍 or 👎 to let us know.
1da097d
to
edf179e
Compare
302d973
to
d9eb878
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
}) | ||
} | ||
|
||
async fn recover(&self) -> Result<(u64, BatchMetadata)> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add some comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.await; | ||
} | ||
|
||
if update_batch_metadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: write in the other way and flatten the nested logic.
if !update_batch_metadata {
// no update can be performed.
return Ok(()):
}
// the rest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
batch_metadata, | ||
) | ||
.await; | ||
tokio_scoped::scope(|s| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like tokio_scoped is no longer maintained; maybe async_scoped?
d9eb878
to
beb5cd8
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
✅ Forge suite
|
✅ Forge suite
|
Description
How Has This Been Tested?
Key Areas to Review
Type of Change
Which Components or Systems Does This Change Impact?
Checklist