-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PriorityMailbox #364
PriorityMailbox #364
Conversation
Fixes Perf issues in main message pipeline Added DefaultMailbox
@rogeralsing took a look at the PriorityQueue implementation, the mailbox changes, and the specs for the PriorityMailbox. My thoughts:
Looks solid overall! |
Also Improved the spec to send more messages of different priorities. |
@rogeralsing ah, forgot you can just call suspend / resume on the mailbox directly. |
|
||
//drain envelopes from ConcurrentQueue into priority queue | ||
Envelope tmp; | ||
while (_queue.TryDequeue(out tmp)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dequeue all messages from concurrent queue into prio queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a possibility that under a heavy load this section of code could live-lock? i.e. never leave the while loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it could.
However, one would have to take into consideration that a normal unbounded mailbox will yield out of memmory exception and kill the entire system in the same scenario given enough time.
So unbounded queues are not safe for extreme load.
(thus the introduction of akka streams)
So I guess we have three ways to deal with this:
- We leave it as is and just accept that unbounded queues can be harmful.
- We make the loop exit after x times, this will make the priority of messages less exact,
but it can never be more than best effort anyway.
e.g. send a low prio message and wait one hour and send a high prio message, the low prio message will ofcourse arrive first no matter what approach we take. - We make it blocking. (which still only gives best effort, se comment above)
I'm open to any of the approaches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make the priority queue implementation thread-safe and have it block and write directly to that - we'll be taking a bit of a perf hit since it won't have the native optimizations that ConcurrentQueue has but live-locking this code might not be that hard to do (a for...loop might be enough.) I'd err on the side of predictability in this case.
Introduced a blocking wrapper base class for message queues that use non thead safe data structures
/// <summary> | ||
/// Base class message queue that uses a priority generator for messages | ||
/// </summary> | ||
public class UnboundedPriorityMessageQueue : BlockingMessageQueue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The priority queue is now used in a blocking message queue base class.
👍 cool, looks good - let me get #351 out today and then we'll merge this in and include it as part of a v0.7 release. |
It's safe to pull this into the code base now. All of the Akka v0.6.4 stuff is out and has an open PR going into master from my personal master branch, so any changes you make to dev at this point are fine. |
Mailbox<TSys,TUser>
which can consume different kinds of message queuesUnboundedPriorityMailbox
base class that can be used to create custom priority mailboxesMessageQueue
and related semanticsRelated to #299 and #301