-
Notifications
You must be signed in to change notification settings - Fork 492
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Partkey unblocked #3034
Partkey unblocked #3034
Conversation
Codecov Report
@@ Coverage Diff @@
## feature/partkey #3034 +/- ##
===================================================
+ Coverage 43.95% 43.98% +0.03%
===================================================
Files 394 394
Lines 87242 87309 +67
===================================================
+ Hits 38350 38406 +56
- Misses 42804 42812 +8
- Partials 6088 6091 +3
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks much simpler than the pattern we had before. Could you apply it to Record
as well and delete dirty
/Flush
?
One complication is that we're also planning to call this from the REST API. Especially when installing a key I think we need a way to verify that the write completed. Is there a good way to do that with the write thread?
I think things are updated now. See the changes in tests because a few things there demanded that things be applied immediately that now aren't guaranteed until .Flush(). |
@@ -691,42 +767,14 @@ func (db *participationDB) Flush() error { | |||
db.mutex.Lock() | |||
defer db.mutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the mutex still required? Seems like this would cause a deadlock when the writeThread attempts to Lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the flushDone condition variable temporarily releases its lock (&db.mutex
) inside .Wait()
; that's the receiving end of db.flushDone.Broadcast()
in writeThread()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, that's pretty cool. I found a couple of cases that I thought were race conditions but they seem to all be covered by the pattern. (multiple calls to flush, the window between sending the flush event/calling wait)
One potential issue I see...
If an error is generated with multiple Flush() calls on the queue, we guarantee that only the first Flush returns the error. This might happen if we use Flush in the REST API and are inserting keys back-to-back.
I think we can solve that by counting the number of pending flushes and only clearing asyncErr
when changing the pending account from 0 -> 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still looking good
@@ -691,42 +767,14 @@ func (db *participationDB) Flush() error { | |||
db.mutex.Lock() | |||
defer db.mutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, that's pretty cool. I found a couple of cases that I thought were race conditions but they seem to all be covered by the pattern. (multiple calls to flush, the window between sending the flush event/calling wait)
One potential issue I see...
If an error is generated with multiple Flush() calls on the queue, we guarantee that only the first Flush returns the error. This might happen if we use Flush in the REST API and are inserting keys back-to-back.
I think we can solve that by counting the number of pending flushes and only clearing asyncErr
when changing the pending account from 0 -> 1.
for { | ||
var wr partDBWriteRecord | ||
var chanOk bool | ||
if needsFlush { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this deserves a comment. It looks like the code can be simplified but there's a lot going on and all of it is important.
// The flush command locks the mutex, then sends a flush event to unblock the queue read.
// When everything is flushed, the default is blocked until flush calls flushDone.Wait.
// Finally flushDone.Broadcast notifies all calls to Flush that the queue is now empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few comments around the Flush writeThread interaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Summary
for #2878
Test Plan