Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port Pulse, DelayFlow and Valve streams-contrib stages #3421

Merged
49 changes: 47 additions & 2 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1068,11 +1068,22 @@ namespace Akka.Streams.Dsl
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
public Akka.Streams.Inlet<TIn> In(int id) { }
}
public class DelayFlow<T> : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T>
{
public DelayFlow(System.Func<Akka.Streams.Dsl.IDelayStrategy<T>> strategySupplier) { }
public DelayFlow(System.TimeSpan fixedDelay) { }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
public class static FileIO
{
public static Akka.Streams.Dsl.Source<Akka.IO.ByteString, System.Threading.Tasks.Task<Akka.Streams.IO.IOResult>> FromFile(System.IO.FileInfo f, int chunkSize = 8192, long startPosition = 0) { }
public static Akka.Streams.Dsl.Sink<Akka.IO.ByteString, System.Threading.Tasks.Task<Akka.Streams.IO.IOResult>> ToFile(System.IO.FileInfo f, System.Nullable<System.IO.FileMode> fileMode = null, long startPosition = 0) { }
}
public class FixedDelay<T> : Akka.Streams.Dsl.IDelayStrategy<T>
{
public FixedDelay(System.TimeSpan delay) { }
public System.TimeSpan NextDelay(T element) { }
}
public class static Flow
{
public static Akka.Streams.Dsl.Flow<T, T, Akka.NotUsed> Create<T>() { }
Expand Down Expand Up @@ -1293,6 +1304,10 @@ namespace Akka.Streams.Dsl
public Akka.Streams.Inlet<TIn> In { get; }
}
}
public interface IDelayStrategy<T>
{
System.TimeSpan NextDelay(T element);
}
public interface IFlow<TOut, out TMat>
{
Akka.Streams.Dsl.IFlow<TOut, TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> mapFunc);
Expand Down Expand Up @@ -1324,6 +1339,11 @@ namespace Akka.Streams.Dsl
{
T Create(System.Func<TIn, TOut> unzipper);
}
public interface IValveSwitch
{
System.Threading.Tasks.Task<bool> Flip(Akka.Streams.Dsl.SwitchMode mode);
System.Threading.Tasks.Task<Akka.Streams.Dsl.SwitchMode> GetMode();
}
public class static JsonFraming
{
public static Akka.Streams.Dsl.Flow<Akka.IO.ByteString, Akka.IO.ByteString, Akka.NotUsed> ObjectScanner(int maximumObjectLength) { }
Expand All @@ -1345,6 +1365,12 @@ namespace Akka.Streams.Dsl
public override Akka.Streams.FlowShape<T, T> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<Akka.Streams.Util.Option<T>>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
}
public class LinearIncreasingDelay<T> : Akka.Streams.Dsl.IDelayStrategy<T>
{
[Akka.Annotations.ApiMayChangeAttribute()]
public LinearIncreasingDelay(System.TimeSpan increaseStep, System.Func<T, bool> needsIncrease, System.TimeSpan initialDelay, System.TimeSpan maxDelay) { }
public System.TimeSpan NextDelay(T element) { }
}
public sealed class Merge<T> : Akka.Streams.Dsl.Merge<T, T>
{
public Merge(int inputPorts, bool eagerComplete = False) { }
Expand Down Expand Up @@ -1469,6 +1495,11 @@ namespace Akka.Streams.Dsl
public override Akka.Streams.FanOutShape<TIn, TOut0, TOut1> Shape { get; }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
public class Pulse<T> : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T>
{
public Pulse(System.TimeSpan interval, bool initiallyOpen = False) { }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
public class static RestartFlow
{
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> WithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
Expand Down Expand Up @@ -1529,10 +1560,10 @@ namespace Akka.Streams.Dsl
}
public class Sample<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<T, T>>
{
public Akka.Streams.Inlet<T> In;
public Akka.Streams.Outlet<T> Out;
public Sample(int nth) { }
public Sample(System.Func<int> next) { }
public Akka.Streams.Inlet<T> In { get; }
public Akka.Streams.Outlet<T> Out { get; }
public override Akka.Streams.FlowShape<T, T> Shape { get; }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
public static Akka.Streams.Dsl.Sample<T> Random(int maxStep = 1000) { }
Expand Down Expand Up @@ -1821,6 +1852,11 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.SubFlow<T3, TMat, TClosed> ZipWith<T1, T2, T3, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<T1, TMat, TClosed> flow, Akka.Streams.IGraph<Akka.Streams.SourceShape<T2>, TMat> other, System.Func<T1, T2, T3> combine) { }
public static Akka.Streams.Dsl.SubFlow<System.Tuple<TOut1, long>, TMat, TClosed> ZipWithIndex<TOut1, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut1, TMat, TClosed> flow) { }
}
public enum SwitchMode
{
Open = 0,
Close = 1,
}
public class Tcp : Akka.Actor.ExtensionIdProvider<Akka.Streams.Dsl.TcpExt>
{
public Tcp() { }
Expand Down Expand Up @@ -1989,6 +2025,15 @@ namespace Akka.Streams.Dsl
protected UnzipWithCreator() { }
public virtual Akka.Streams.Dsl.UnzipWith<TIn, TOut0, TOut1, TOut2, TOut3, TOut4, TOut5, TOut6> Create(System.Func<TIn, System.Tuple<TOut0, TOut1, TOut2, TOut3, TOut4, TOut5, TOut6>> unzipper) { }
}
public class Valve<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.FlowShape<T, T>, System.Threading.Tasks.Task<Akka.Streams.Dsl.IValveSwitch>>
{
public Valve() { }
public Valve(Akka.Streams.Dsl.SwitchMode mode) { }
public Akka.Streams.Inlet<T> In { get; }
public Akka.Streams.Outlet<T> Out { get; }
public override Akka.Streams.FlowShape<T, T> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<Akka.Streams.Dsl.IValveSwitch>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
}
public sealed class Zip<T1, T2> : Akka.Streams.Dsl.ZipWith<T1, T2, System.Tuple<T1, T2>>
{
public Zip() { }
Expand Down
111 changes: 111 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/DelayFlowSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
//-----------------------------------------------------------------------
// <copyright file="DelayFlowSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2018 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using FluentAssertions;
using Xunit;

namespace Akka.Streams.Tests.Dsl
{
public class DelayFlowSpec : Akka.TestKit.Xunit2.TestKit
{
[Fact]
public void DelayFlow_should_work_with_empty_source()
{
Source.Empty<int>()
.Via(new DelayFlow<int>(TimeSpan.Zero))
.RunWith(this.SinkProbe<int>(), Sys.Materializer())
.Request(1)
.ExpectComplete();
}

[Fact]
public void DelayFlow_should_work_with_fixed_delay()
{
var fixedDelay = TimeSpan.FromSeconds(1);
var elems = Enumerable.Range(1, 10);

var probe = Source.From(elems)
.Select(_ => DateTime.Now.Ticks)
.Via(new DelayFlow<long>(fixedDelay))
.Select(start => DateTime.Now.Ticks - start)
.RunWith(this.SinkProbe<long>(), Sys.Materializer());

foreach (var e in elems)
{
var next = probe
.Request(1)
.ExpectNext(fixedDelay + Dilated(fixedDelay));

next.Should().BeGreaterOrEqualTo(fixedDelay.Ticks);
}

probe.ExpectComplete();
}

[Fact]
public void DelayFlow_should_work_without_delay()
{
var elems = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };

Source.From(elems)
.Via(new DelayFlow<int>(TimeSpan.Zero))
.RunWith(this.SinkProbe<int>(), Sys.Materializer())
.Request(elems.Length)
.ExpectNextN(elems)
.ExpectComplete();
}

[Fact]
public void DelayFlow_should_work_with_linear_increasing_delay()
{
var elems = Enumerable.Range(1, 10);
var step = TimeSpan.FromSeconds(1);
var initial = TimeSpan.FromSeconds(1);
var max = TimeSpan.FromSeconds(5);

bool incWhile(Tuple<int, long> i)
{
return i.Item1 < 7;
}

var probe = Source.From(elems)
.Select(e => Tuple.Create(e, DateTime.Now.Ticks))
.Via(new DelayFlow<Tuple<int, long>>(
() => new LinearIncreasingDelay<Tuple<int, long>>(step, incWhile, initial, max))
)
.Select(pair => DateTime.Now.Ticks - pair.Item2)
.RunWith(this.SinkProbe<long>(), Sys.Materializer());

foreach (var e in elems)
{
if (incWhile(Tuple.Create(e, 1L)))
{
var afterIncrease = initial + TimeSpan.FromTicks(step.Ticks * e);
var delay = afterIncrease < max ? afterIncrease : max;
var next = probe
.Request(1)
.ExpectNext(delay + Dilated(delay));

next.Should().BeGreaterOrEqualTo(delay.Ticks);
}
else
{
var next = probe
.Request(1)
.ExpectNext(initial + Dilated(initial));

next.Should().BeGreaterOrEqualTo(initial.Ticks);
}
}
probe.ExpectComplete();
}
}
}
90 changes: 90 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/PulseSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//-----------------------------------------------------------------------
// <copyright file="PulseSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2018 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
using FluentAssertions;
using Xunit;

namespace Akka.Streams.Tests.Dsl
{
public class PulseSpec : Akka.TestKit.Xunit2.TestKit
{
private readonly TimeSpan _pulseInterval = TimeSpan.FromMilliseconds(20);

[Fact]
public void Pulse_should_signal_demand_once_every_interval()
{
var t = this.SourceProbe<int>()
.Via(new Pulse<int>(Dilated(_pulseInterval)))
.ToMaterialized(Sink.Seq<int>(), Keep.Both)
.Run(Sys.Materializer());

var probe = t.Item1;
var task = t.Item2;

probe.SendNext(1);
probe.ExpectNoMsg(_pulseInterval);
probe.SendNext(2);
probe.ExpectNoMsg(_pulseInterval);
probe.SendComplete();

task.AwaitResult().ShouldBeEquivalentTo(new[] { 1, 2 }, o => o.WithStrictOrdering());
}

[Fact]
public void Pulse_should_keep_backpressure_if_there_is_no_demand_from_downstream()
{
var elements = Enumerable.Range(1, 10);
var probe = Source.From(elements)
.Via(new Pulse<int>(Dilated(_pulseInterval)))
.RunWith(this.SinkProbe<int>(), Sys.Materializer());

probe.EnsureSubscription();
// lets waste some time without a demand and let pulse run its timer
probe.ExpectNoMsg(TimeSpan.FromTicks(_pulseInterval.Ticks * 10));

probe.Request(elements.Count());
foreach (var e in elements)
probe.ExpectNext(e);
}

[Fact]
public void Initially_opened_Pulse_should_emit_the_first_available_element()
{
var task = Source.Repeat(1)
.Via(new Pulse<int>(Dilated(_pulseInterval), initiallyOpen: true))
.InitialTimeout(Dilated(TimeSpan.FromMilliseconds(2)))
.RunWith(Sink.First<int>(), Sys.Materializer());

task.AwaitResult().Should().Be(1);
}

[Fact]
public void Initially_opened_Pulse_should_signal_demand_once_every_interval()
{
var t = this.SourceProbe<int>()
.Via(new Pulse<int>(Dilated(_pulseInterval), initiallyOpen: true))
.ToMaterialized(Sink.Seq<int>(), Keep.Both)
.Run(Sys.Materializer());

var probe = t.Item1;
var task = t.Item2;

probe.SendNext(1);
probe.ExpectNoMsg(_pulseInterval);
probe.SendNext(2);
probe.ExpectNoMsg(_pulseInterval);
probe.SendComplete();

task.AwaitResult().ShouldBeEquivalentTo(new[] { 1, 2 }, o => o.WithStrictOrdering());
}
}
}
Loading