-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Port Pulse, DelayFlow and Valve streams-contrib stages #3421
Port Pulse, DelayFlow and Valve streams-contrib stages #3421
Conversation
@marcpiechura any thoughts on these changes? |
@alex-bogomaz sorry totally forgot to review your PR, I’m on vacation until next weekend, maybe @Horusiath will find some time to review it. |
@alex-bogomaz haven’t forgot your PR but I’m still quite busy, will review it in the next days |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Locks good overall, only a few remarks regarding naming, code style and internal optimization you can't know.
flip.AwaitResult().Should().BeTrue(); | ||
|
||
probe.ExpectNext().Should().Be(1); | ||
probe.ExpectNext().Should().Be(2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just as note, you can simplify those checks by using probe.ExpectNext(1, 2)
/// </summary> | ||
/// <param name="elem">element</param> | ||
/// <returns></returns> | ||
TimeSpan NextDelay(T elem); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have agreed on using full names instead of shortcuts from the jvm, i.e. element
instead of elem
, it's not 100% rule especially vor local variables etc. but we should do it in the API, so please change.
_delay = delay; | ||
} | ||
|
||
public TimeSpan NextDelay(T elem) => _delay; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use /// <inheritdoc/>
_delay = _initialDelay; | ||
} | ||
|
||
public TimeSpan NextDelay(T elem) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use /// <inheritdoc/>
throw new ArgumentException("Increase step must be positive", nameof(increaseStep)); | ||
|
||
if (maxDelay <= initialDelay) | ||
throw new ArgumentException("Max delay must be bigger than initial delay"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add nameof
as above
public Inlet<T> In = new Inlet<T>("Sample-in"); | ||
public Outlet<T> Out = new Outlet<T>("Sample-out"); | ||
public Inlet<T> In { get; } | ||
public Outlet<T> Out { get; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch 👍
src/core/Akka.Streams/Dsl/Valve.cs
Outdated
{ | ||
var promise = new TaskCompletionSource<bool>(); | ||
_flipCallback(Tuple.Create(flipToMode, promise)); | ||
return promise.Task; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either use promise
or completion
as name in both cases, I would prefer completion
;-)
_mode = mode; | ||
|
||
In = new Inlet<T>("valve.in"); | ||
Out = new Outlet<T>("valve.out"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be inlined with the property definition if you want to save two lines ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed "Retry.cs" code-style here - inlets and outlets are initialized in constructor, not in property declaration
src/core/Akka.Streams/Dsl/Valve.cs
Outdated
if (IsOpen) | ||
Pull(_valve.In); | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment about IOutHandler
and IInHandler
public FixedDelay(TimeSpan delay) | ||
{ | ||
_delay = delay; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want you can use the ney C# syntax for single line constructors / methods
public FixedDelay(TimeSpan delay) => _delay = delay;
Sorry for the delay @alex-bogomaz , left you some remarks but good work overall 👍 |
@marcpiechura - I'm finished with comments - could you please take a look when you have time |
throw new ArgumentException("Increase step must be positive", nameof(increaseStep)); | ||
|
||
if (maxDelay <= initialDelay) | ||
throw new Exception($"{nameof(maxDelay)} must be bigger than {nameof(initialDelay)}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That‘s not exactly what I meant ;-)
throw new ArgumentException("Max delay must be bigger than initial delay", nameof(maxDelay));
Thanks for your contribution @alex-bogomaz 👌 |
* Port Pulse, DelayFlow, Valve * use Tuple instead of ValueTuple * minor code cleanup * replace LambdaHandler with IInHandler and IOutHandler * cache FixedDelay strategy * IDelayStrategy - approve API changes * Modified ArgumentException message in DelayFlow Take from akkadotnet@56c284b
* Port Pulse, DelayFlow, Valve * use Tuple instead of ValueTuple * minor code cleanup * replace LambdaHandler with IInHandler and IOutHandler * cache FixedDelay strategy * IDelayStrategy - approve API changes * Modified ArgumentException message in DelayFlow Take from akkadotnet@56c284b
Hello - this PR contains port of 3 following "Streams-Contrib" stages:
please review. This is for issue #3234