diff --git a/docs/articles/streams/stream-dynamic.md b/docs/articles/streams/stream-dynamic.md index 93c0ebd0061..47f92c33d2c 100644 --- a/docs/articles/streams/stream-dynamic.md +++ b/docs/articles/streams/stream-dynamic.md @@ -103,4 +103,55 @@ We now wrap the `Sink` and `Source` in a Flow using `Flow.FromSinkAndSource`. Th The resulting `Flow` now has a type of `Flow` representing a publish-subscribe channel which can be used any number of times to attach new producers or consumers. In addition, it materializes to a `UniqueKillSwitch` (see [UniqueKillSwitch](xref:streams-dynamic-handling#uniquekillswitch)) that can be used to deregister a single user externally: -[!code-csharp[HubsDocTests.cs](../../examples/DocsExamples/Streams/HubsDocTests.cs?name=pub-sub-4)] \ No newline at end of file +[!code-csharp[HubsDocTests.cs](../../examples/DocsExamples/Streams/HubsDocTests.cs?name=pub-sub-4)] + + +### Using the PartitionHub + +**This is a [may change](../utilities/may-change.md) feature** + +A `PartitionHub` can be used to route elements from a common producer to a dynamic set of consumers. +The selection of consumer is done with a function. Each element can be routed to only one consumer. + +The rate of the producer will be automatically adapted to the slowest consumer. In this case, the hub is a `Sink` +to which the single producer must be attached first. Consumers can only be attached once the `Sink` has +been materialized (i.e. the producer has been started). One example of using the `PartitionHub`: + +[!code-csharp[HubsDocTests.cs](../../examples/DocsExamples/Streams/HubsDocTests.cs?name=partition-hub)] + +The `partitioner` function takes two parameters; the first is the number of active consumers and the second +is the stream element. The function should return the index of the selected consumer for the given element, +i.e. `int` greater than or equal to 0 and less than number of consumers. + +The resulting `Source` can be materialized any number of times, each materialization effectively attaching +a new consumer. If there are no consumers attached to this hub then it will not drop any elements but instead +backpressure the upstream producer until consumers arrive. This behavior can be tweaked by using the combinators +`.Buffer` for example with a drop strategy, or just attaching a consumer that drops all messages. If there +are no other consumers, this will ensure that the producer is kept drained (dropping all elements) and once a new +consumer arrives and messages are routed to the new consumer it will adaptively slow down, ensuring no more messages +are dropped. + +It is possible to define how many initial consumers that are required before it starts emitting any messages +to the attached consumers. While not enough consumers have been attached messages are buffered and when the +buffer is full the upstream producer is backpressured. No messages are dropped. + +The above example illustrate a stateless partition function. For more advanced stateful routing the `StatefulSink` can be used. Here is an example of a stateful round-robin function: + +[!code-csharp[HubsDocTests.cs](../../examples/DocsExamples/Streams/HubsDocTests.cs?name=partition-hub-stateful)] + +Note that it is a factory of a function to to be able to hold stateful variables that are +unique for each materialization. + + +The function takes two parameters; the first is information about active consumers, including an array of +consumer identifiers and the second is the stream element. The function should return the selected consumer +identifier for the given element. The function will never be called when there are no active consumers, i.e. +there is always at least one element in the array of identifiers. + +Another interesting type of routing is to prefer routing to the fastest consumers. The `IConsumerInfo` +has an accessor `QueueSize` that is approximate number of buffered elements for a consumer. +Larger value than other consumers could be an indication of that the consumer is slow. +Note that this is a moving target since the elements are consumed concurrently. Here is an example of +a hub that routes to the consumer with least buffered elements: + +[!code-csharp[HubsDocTests.cs](../../examples/DocsExamples/Streams/HubsDocTests.cs?name=partition-hub-fastest)] \ No newline at end of file diff --git a/docs/articles/utilities/may-change.md b/docs/articles/utilities/may-change.md new file mode 100644 index 00000000000..447aacfd298 --- /dev/null +++ b/docs/articles/utilities/may-change.md @@ -0,0 +1,28 @@ +# Modules marked "May Change" + +To be able to introduce new modules and APIs without freezing them the moment they +are released we have introduced +the term **may change**. + +Concretely **may change** means that an API or module is in early access mode and that it: + + * is not guaranteed to be binary compatible in minor releases + * may have its API change in breaking ways in minor releases + * may be entirely dropped from Akka in a minor release + +Complete modules can be marked as **may change**, this will can be found in their module description and in the docs. + +Individual public APIs can be annotated with `Akka.Annotations.ApiMayChange` to signal that it has less +guarantees than the rest of the module it lives in. +Please use such methods and classes with care, however if you see such APIs that is the best point in time to try them +out and provide feedback (e.g. using the akka-user mailing list, GitHub issues or Gitter) before they are frozen as +fully stable API. + +Best effort migration guides may be provided, but this is decided on a case-by-case basis for **may change** modules. + +The purpose of this is to be able to release features early and +make them easily available and improve based on feedback, or even discover +that the module or API wasn't useful. + +These are the current complete modules marked as **may change**: + diff --git a/docs/examples/DocsExamples/Streams/HubsDocTests.cs b/docs/examples/DocsExamples/Streams/HubsDocTests.cs index d2cec66c265..58795c5ee0c 100644 --- a/docs/examples/DocsExamples/Streams/HubsDocTests.cs +++ b/docs/examples/DocsExamples/Streams/HubsDocTests.cs @@ -1,5 +1,6 @@ using System; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Akka; using Akka.Streams; @@ -113,5 +114,105 @@ public void Hubs_must_demonstrate_combination() killSwitch.Shutdown(); #endregion } + + [Fact] + public void Hubs_must_demonstrate_creating_a_dynamic_partition_hub() + { + #region partition-hub + + // A simple producer that publishes a new "message-" every second + Source producer = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "message") + .MapMaterializedValue(_ => NotUsed.Instance) + .ZipWith(Source.From(Enumerable.Range(1, 100)), (msg, i) => $"{msg}-{i}"); + + // Attach a PartitionHub Sink to the producer. This will materialize to a + // corresponding Source. + // (We need to use toMat and Keep.right since by default the materialized + // value to the left is used) + IRunnableGraph> runnableGraph = + producer.ToMaterialized(PartitionHub.Sink( + (size, element) => Math.Abs(element.GetHashCode()) % size, + startAfterNrOfConsumers: 2, bufferSize: 256), Keep.Right); + + // By running/materializing the producer, we get back a Source, which + // gives us access to the elements published by the producer. + Source fromProducer = runnableGraph.Run(Materializer); + + // Print out messages from the producer in two independent consumers + fromProducer.RunForeach(msg => Console.WriteLine("Consumer1: " + msg), Materializer); + fromProducer.RunForeach(msg => Console.WriteLine("Consumer2: " + msg), Materializer); + + #endregion + } + + [Fact] + public void Hubs_must_demonstrate_creating_a_dynamic_steful_partition_hub() + { + #region partition-hub-stateful + + // A simple producer that publishes a new "message-" every second + Source producer = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "message") + .MapMaterializedValue(_ => NotUsed.Instance) + .ZipWith(Source.From(Enumerable.Range(1, 100)), (msg, i) => $"{msg}-{i}"); + + // New instance of the partitioner function and its state is created + // for each materialization of the PartitionHub. + Func RoundRobbin() + { + var i = -1L; + return (info, element) => + { + i++; + return info.ConsumerByIndex((int) (i % info.Size)); + }; + } + + // Attach a PartitionHub Sink to the producer. This will materialize to a + // corresponding Source. + // (We need to use toMat and Keep.right since by default the materialized + // value to the left is used) + IRunnableGraph> runnableGraph = + producer.ToMaterialized(PartitionHub.StatefulSink(RoundRobbin, + startAfterNrOfConsumers: 2, bufferSize: 256), Keep.Right); + + // By running/materializing the producer, we get back a Source, which + // gives us access to the elements published by the producer. + Source fromProducer = runnableGraph.Run(Materializer); + + // Print out messages from the producer in two independent consumers + fromProducer.RunForeach(msg => Console.WriteLine("Consumer1: " + msg), Materializer); + fromProducer.RunForeach(msg => Console.WriteLine("Consumer2: " + msg), Materializer); + + #endregion + } + + [Fact] + public void Hubs_must_demonstrate_creating_a_dynamic_partition_hub_routing_to_fastest_consumer() + { + #region partition-hub-fastest + + // A simple producer that publishes a new "message-" every second + Source producer = Source.From(Enumerable.Range(0, 100)); + + // Attach a PartitionHub Sink to the producer. This will materialize to a + // corresponding Source. + // (We need to use toMat and Keep.right since by default the materialized + // value to the left is used) + IRunnableGraph> runnableGraph = + producer.ToMaterialized(PartitionHub.StatefulSink( + () => ((info, element) => info.ConsumerIds.Min(info.QueueSize)), + startAfterNrOfConsumers: 2, bufferSize: 256), Keep.Right); + + // By running/materializing the producer, we get back a Source, which + // gives us access to the elements published by the producer. + Source fromProducer = runnableGraph.Run(Materializer); + + // Print out messages from the producer in two independent consumers + fromProducer.RunForeach(msg => Console.WriteLine("Consumer1: " + msg), Materializer); + fromProducer.Throttle(10, TimeSpan.FromMilliseconds(100), 10, ThrottleMode.Shaping) + .RunForeach(msg => Console.WriteLine("Consumer2: " + msg), Materializer); + + #endregion + } } } diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index d46e956fc9d..7eaa8b30743 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -1428,6 +1428,21 @@ namespace Akka.Streams.Dsl public Akka.Streams.Outlet Out(int id) { } public override string ToString() { } } + public class static PartitionHub + { + [Akka.Annotations.ApiMayChangeAttribute()] + public static Akka.Streams.Dsl.Sink> Sink(System.Func partitioner, int startAfterNrOfConsumers, int bufferSize = 256) { } + [Akka.Annotations.ApiMayChangeAttribute()] + public static Akka.Streams.Dsl.Sink> StatefulSink(System.Func> partitioner, int startAfterNrOfConsumers, int bufferSize = 256) { } + [Akka.Annotations.ApiMayChangeAttribute()] + public interface IConsumerInfo + { + System.Collections.Immutable.ImmutableArray ConsumerIds { get; } + int Size { get; } + long ConsumerByIndex(int index); + int QueueSize(long consumerId); + } + } public sealed class PartitionOutOfBoundsException : System.Exception { public PartitionOutOfBoundsException(string message) { } diff --git a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs index 119971b21b6..6c2e9204b68 100644 --- a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs @@ -166,7 +166,8 @@ public void MergeHub_must_work_with_long_streams() { this.AssertAllStagesStopped(() => { - var t = MergeHub.Source(16).Take(20000).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); + var t = MergeHub.Source(16).Take(20000).ToMaterialized(Sink.Seq(), Keep.Both) + .Run(Materializer); var sink = t.Item1; var result = t.Item2; @@ -182,7 +183,8 @@ public void MergeHub_must_work_with_long_streams_when_buffer_size_is_1() { this.AssertAllStagesStopped(() => { - var t = MergeHub.Source(1).Take(20000).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); + var t = MergeHub.Source(1).Take(20000).ToMaterialized(Sink.Seq(), Keep.Both) + .Run(Materializer); var sink = t.Item1; var result = t.Item2; @@ -219,7 +221,8 @@ public void MergeHub_must_work_with_long_streams_if_one_of_the_producers_is_slow { this.AssertAllStagesStopped(() => { - var t = MergeHub.Source(16).Take(2000).ToMaterialized(Sink.Seq(), Keep.Both).Run(Materializer); + var t = MergeHub.Source(16).Take(2000).ToMaterialized(Sink.Seq(), Keep.Both) + .Run(Materializer); var sink = t.Item1; var result = t.Item2; @@ -569,5 +572,341 @@ public void BroadcastHub_must_properly_signal_error_to_consumers_arriving_after_ task.Invoking(t => t.Wait(TimeSpan.FromSeconds(3))).ShouldThrow(); }, Materializer); } + + [Fact] + public void PartitionHub_must_work_in_the_happy_case_with_one_stream() + { + this.AssertAllStagesStopped(() => + { + var items = Enumerable.Range(1, 10).ToList(); + var source = Source.From(items) + .RunWith(PartitionHub.Sink((size, e) => 0, 0, 8), Materializer); + var result = source.RunWith(Sink.Seq(), Materializer).AwaitResult(); + result.ShouldAllBeEquivalentTo(items); + }, Materializer); + } + + [Fact] + public void PartitionHub_must_work_in_the_happy_case_with_two_streams() + { + this.AssertAllStagesStopped(() => + { + var source = Source.From(Enumerable.Range(0, 10)) + .RunWith(PartitionHub.Sink((size, e) => e % size, 2, 8), Materializer); + var result1 = source.RunWith(Sink.Seq(), Materializer); + // it should not start publishing until startAfterNrOfConsumers = 2 + Thread.Sleep(50); + var result2 = source.RunWith(Sink.Seq(), Materializer); + result1.AwaitResult().ShouldAllBeEquivalentTo(new[] { 0, 2, 4, 6, 8 }); + result2.AwaitResult().ShouldAllBeEquivalentTo(new[] { 1, 3, 5, 7, 9 }); + }, Materializer); + } + + [Fact] + public void PartitionHub_must_be_able_to_use_as_rount_robin_router() + { + this.AssertAllStagesStopped(() => + { + var source = Source.From(Enumerable.Range(0, 10)) + .RunWith(PartitionHub.StatefulSink(() => + { + var n = 0L; + return ((info, e) => + { + n++; + return info.ConsumerByIndex((int)n % info.Size); + }); + }, 2, 8), Materializer); + var result1 = source.RunWith(Sink.Seq(), Materializer); + var result2 = source.RunWith(Sink.Seq(), Materializer); + result1.AwaitResult().ShouldAllBeEquivalentTo(new[] { 1, 3, 5, 7, 9 }); + result2.AwaitResult().ShouldAllBeEquivalentTo(new[] { 0, 2, 4, 6, 8 }); + }, Materializer); + } + + [Fact] + public void PartitionHub_must_be_able_to_use_as__sticky_session_rount_robin_router() + { + this.AssertAllStagesStopped(() => + { + var source = Source.From(new[] { "usr-1", "usr-2", "usr-1", "usr-3" }) + .RunWith(PartitionHub.StatefulSink(() => + { + var session = new Dictionary(); + var n = 0L; + return ((info, e) => + { + if (session.TryGetValue(e, out var i) && info.ConsumerIds.Contains(i)) + return i; + n++; + var id = info.ConsumerByIndex((int)n % info.Size); + session[e] = id; + return id; + }); + }, 2, 8), Materializer); + var result1 = source.RunWith(Sink.Seq(), Materializer); + var result2 = source.RunWith(Sink.Seq(), Materializer); + result1.AwaitResult().ShouldAllBeEquivalentTo(new[] { "usr-2" }); + result2.AwaitResult().ShouldAllBeEquivalentTo(new[] { "usr-1", "usr-1", "usr-3" }); + }, Materializer); + } + + [Fact] + public void PartitionHub_must_be_able_to_use_as_fastest_consumer_router() + { + this.AssertAllStagesStopped(() => + { + var items = Enumerable.Range(0, 999).ToList(); + var source = Source.From(items) + .RunWith( + PartitionHub.StatefulSink(() => ((info, i) => info.ConsumerIds.Min(info.QueueSize)), 2, 4), + Materializer); + var result1 = source.RunWith(Sink.Seq(), Materializer); + var result2 = source.Throttle(10, TimeSpan.FromMilliseconds(100), 10, ThrottleMode.Shaping) + .RunWith(Sink.Seq(), Materializer); + + result1.AwaitResult().Count.ShouldBeGreaterThan(result2.AwaitResult().Count); + }, Materializer); + } + + [Fact] + public void PartitionHub_must_route_evenly() + { + this.AssertAllStagesStopped(() => + { + var t = this.SourceProbe() + .ToMaterialized(PartitionHub.Sink((size, e) => e % size, 2, 8), Keep.Both) + .Run(Materializer); + + var testSource = t.Item1; + var hub = t.Item2; + var probe0 = hub.RunWith(this.SinkProbe(), Materializer); + var probe1 = hub.RunWith(this.SinkProbe(), Materializer); + + probe0.Request(3); + probe1.Request(10); + testSource.SendNext(0); + probe0.ExpectNext(0); + testSource.SendNext(1); + probe1.ExpectNext(1); + + testSource.SendNext(2); + testSource.SendNext(3); + testSource.SendNext(4); + probe0.ExpectNext(2); + probe1.ExpectNext(3); + probe0.ExpectNext(4); + + // probe1 has not requested more + testSource.SendNext(5); + testSource.SendNext(6); + testSource.SendNext(7); + probe1.ExpectNext(5); + probe1.ExpectNext(7); + probe0.ExpectNoMsg(TimeSpan.FromMilliseconds(50)); + probe0.Request(10); + probe0.ExpectNext(6); + + testSource.SendComplete(); + probe0.ExpectComplete(); + probe1.ExpectComplete(); + + }, Materializer); + } + + [Fact] + public void PartitionHub_must_route_unevenly() + { + this.AssertAllStagesStopped(() => + { + var t = this.SourceProbe() + .ToMaterialized(PartitionHub.Sink((size, e) => (e % 3) % 2, 2, 8), Keep.Both) + .Run(Materializer); + + var testSource = t.Item1; + var hub = t.Item2; + var probe0 = hub.RunWith(this.SinkProbe(), Materializer); + var probe1 = hub.RunWith(this.SinkProbe(), Materializer); + + // (_ % 3) % 2 + // 0 => 0 + // 1 => 1 + // 2 => 0 + // 3 => 0 + // 4 => 1 + + probe0.Request(10); + probe1.Request(10); + testSource.SendNext(0); + probe0.ExpectNext(0); + testSource.SendNext(1); + probe1.ExpectNext(1); + testSource.SendNext(2); + probe0.ExpectNext(2); + testSource.SendNext(3); + probe0.ExpectNext(3); + testSource.SendNext(4); + probe1.ExpectNext(4); + + testSource.SendComplete(); + probe0.ExpectComplete(); + probe1.ExpectComplete(); + + }, Materializer); + } + + [Fact] + public void PartitionHub_must_backpressure() + { + this.AssertAllStagesStopped(() => + { + var t = this.SourceProbe() + .ToMaterialized(PartitionHub.Sink((size, e) => 0, 2, 4), Keep.Both) + .Run(Materializer); + + var testSource = t.Item1; + var hub = t.Item2; + var probe0 = hub.RunWith(this.SinkProbe(), Materializer); + var probe1 = hub.RunWith(this.SinkProbe(), Materializer); + + probe0.Request(10); + probe1.Request(10); + testSource.SendNext(0); + probe0.ExpectNext(0); + testSource.SendNext(1); + probe0.ExpectNext(1); + testSource.SendNext(2); + probe0.ExpectNext(2); + testSource.SendNext(3); + probe0.ExpectNext(3); + testSource.SendNext(4); + probe0.ExpectNext(4); + + testSource.SendComplete(); + probe0.ExpectComplete(); + probe1.ExpectComplete(); + + }, Materializer); + } + + [Fact] + public void PartitionHub_must_ensure_that_from_two_different_speed_consumers_the_slower_controls_the_rate() + { + this.AssertAllStagesStopped(() => + { + var t = Source.Maybe().ConcatMaterialized(Source.From(Enumerable.Range(1, 19)), Keep.Left) + .ToMaterialized(PartitionHub.Sink((size, e) => e % size, 2, 1), Keep.Both) + .Run(Materializer); + var firstElement = t.Item1; + var source = t.Item2; + + var f1 = source.Throttle(1, TimeSpan.FromMilliseconds(10), 1, ThrottleMode.Shaping) + .RunWith(Sink.Seq(), Materializer); + + // Second cannot be overwhelmed since the first one throttles the overall rate, and second allows a higher rate + var f2 = source.Throttle(10, TimeSpan.FromMilliseconds(10), 8, ThrottleMode.Enforcing) + .RunWith(Sink.Seq(), Materializer); + + // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. + Thread.Sleep(100); + // on the jvm Some 0 is used, unfortunately haven't we used Option for the Maybe source + // and therefore firstElement.SetResult(0) will complete the source without pushing an element + // since 0 is the default value for int and if you set the result to default(T) it will ignore + // the element and complete the source. We should probably fix this in the feature. + firstElement.SetResult(50); + + var expectationF1 = Enumerable.Range(1, 18).Where(v => v % 2 == 0).ToList(); + expectationF1.Insert(0, 50); + + f1.AwaitResult().ShouldAllBeEquivalentTo(expectationF1); + f2.AwaitResult().ShouldAllBeEquivalentTo(Enumerable.Range(1, 19).Where(v => v % 2 != 0)); + }, Materializer); + } + + [Fact] + public void PartitionHub_must_properly_signal_error_to_consumer() + { + this.AssertAllStagesStopped(() => + { + var upstream = this.CreatePublisherProbe(); + var source = Source.FromPublisher(upstream) + .RunWith(PartitionHub.Sink((s, e) => e % s, 2, 8), Materializer); + + var downstream1 = this.CreateSubscriberProbe(); + source.RunWith(Sink.FromSubscriber(downstream1), Materializer); + var downstream2 = this.CreateSubscriberProbe(); + source.RunWith(Sink.FromSubscriber(downstream2), Materializer); + + downstream1.Request(4); + downstream2.Request(8); + + Enumerable.Range(0, 16).ForEach(i => upstream.SendNext(i)); + + downstream1.ExpectNext(0, 2, 4, 6); + downstream2.ExpectNext(1, 3, 5, 7, 9, 11, 13, 15); + + downstream1.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + downstream2.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + + var failure = new TestException("Failed"); + upstream.SendError(failure); + + downstream1.ExpectError().Should().Be(failure); + downstream2.ExpectError().Should().Be(failure); + }, Materializer); + } + + [Fact] + public void PartitionHub_must_properly_signal_completion_to_consumers_arriving_after_producer_finished() + { + this.AssertAllStagesStopped(() => + { + var source = Source.Empty().RunWith(PartitionHub.Sink((s, e) => e % s, 0), Materializer); + // Wait enough so the Hub gets the completion. This is racy, but this is fine because both + // cases should work in the end + Thread.Sleep(50); + + source.RunWith(Sink.Seq(), Materializer).AwaitResult().Should().BeEmpty(); + }, Materializer); + } + + [Fact] + public void PartitionHub_must_remeber_completion_for_materialisations_after_completion() + { + var t = this.SourceProbe().ToMaterialized(PartitionHub.Sink((s, e) => 0, 0), Keep.Both) + .Run(Materializer); + var sourceProbe = t.Item1; + var source = t.Item2; + var sinkProbe = source.RunWith(this.SinkProbe(), Materializer); + + sourceProbe.SendComplete(); + + sinkProbe.Request(1); + sinkProbe.ExpectComplete(); + + // Materialize a second time. There was a race here, where we managed to enqueue our Source registration just + // immediately before the Hub shut down. + var sink2Probe = source.RunWith(this.SinkProbe(), Materializer); + + sink2Probe.Request(1); + sink2Probe.ExpectComplete(); + } + + [Fact] + public void PartitionHub_must_properly_signal_error_to_consumer_arriving_after_producer_finished() + { + this.AssertAllStagesStopped(() => + { + var failure = new TestException("Fail!"); + var source = Source.Failed(failure).RunWith(PartitionHub.Sink((s, e) => 0, 0), Materializer); + // Wait enough so the Hub gets the completion. This is racy, but this is fine because both + // cases should work in the end + Thread.Sleep(50); + + Action a = () => source.RunWith(Sink.Seq(), Materializer).AwaitResult(); + a.ShouldThrow().WithMessage("Fail!"); + }, Materializer); + } } + } diff --git a/src/core/Akka.Streams/Dsl/Hub.cs b/src/core/Akka.Streams/Dsl/Hub.cs index faf4e9c3726..8b525d86434 100644 --- a/src/core/Akka.Streams/Dsl/Hub.cs +++ b/src/core/Akka.Streams/Dsl/Hub.cs @@ -11,6 +11,7 @@ using System.Collections.Immutable; using System.Linq; using System.Threading.Tasks; +using Akka.Annotations; using Akka.Streams.Stage; using Akka.Util; using Akka.Util.Internal; @@ -77,7 +78,7 @@ public sealed class ProducerFailed : Exception /// The exception that is the cause of the current exception. public ProducerFailed(string message, Exception cause) : base(message, cause) { - + } } } @@ -117,7 +118,7 @@ public Register(long id, Action demandCallback) } public long Id { get; } - + public Action DemandCallback { get; } } @@ -229,7 +230,7 @@ private bool OnEvent(IEvent e) } public override void OnPull() => TryProcessNext(true); - + private void TryProcessNext(bool firstAttempt) { while (true) @@ -359,14 +360,14 @@ public override void PreStart() public override void PostStop() { // Unlike in the case of preStart, we don't care about the Hub no longer looking at the queue. - if(!_logic.IsShuttingDown) + if (!_logic.IsShuttingDown) _logic.Enqueue(new Deregister(_id)); } public override void OnPush() { _logic.Enqueue(new Element(_id, Grab(_stage.In))); - if(_demand > 0) + if (_demand > 0) PullWithDemand(); } @@ -390,7 +391,7 @@ private void OnDemand(long moreDemand) else { _demand += moreDemand; - if(!HasBeenPulled(_stage.In)) + if (!HasBeenPulled(_stage.In)) PullWithDemand(); } } @@ -434,7 +435,7 @@ public MergeHub(int perProducerBufferSize) throw new ArgumentException("Buffer size must be positive", nameof(perProducerBufferSize)); _perProducerBufferSize = perProducerBufferSize; - DemandThreshold = perProducerBufferSize/2 + perProducerBufferSize%2; + DemandThreshold = perProducerBufferSize / 2 + perProducerBufferSize % 2; Shape = new SourceShape(Out); } @@ -480,7 +481,7 @@ public class BroadcastHub /// Creates a that receives elements from its upstream producer and broadcasts them to a dynamic set /// of consumers. After the returned by this method is materialized, it returns a as materialized /// value. This can be materialized arbitrary many times and each materialization will receive the - /// broadcast elements form the original . + /// broadcast elements from the original . /// /// Every new materialization of the results in a new, independent hub, which materializes to its own /// for consuming the of that materialization. @@ -525,7 +526,6 @@ public static Sink> Sink(int bufferSize) /// /// INTERNAL API /// - /// TBD internal class BroadcastHub : GraphStageWithMaterializedValue, Source> { #region internal classes @@ -538,7 +538,7 @@ private sealed class RegistrationPending : IHubEvent private RegistrationPending() { - + } } @@ -601,7 +601,7 @@ public Consumer(long id, Action callback) } - private sealed class Completed + private sealed class Completed { public static Completed Instance { get; } = new Completed(); @@ -680,7 +680,7 @@ private sealed class HubLogic : InGraphStageLogic private readonly TaskCompletionSource> _callbackCompletion = new TaskCompletionSource>(); - + private readonly Open _noRegistrationState; internal readonly AtomicReference State; @@ -714,7 +714,7 @@ public HubLogic(BroadcastHub stage) : base(stage.Shape) _noRegistrationState = new Open(_callbackCompletion.Task, ImmutableList.Empty); State = new AtomicReference(_noRegistrationState); _queue = new object[stage._bufferSize]; - _consumerWheel = Enumerable.Repeat(0, stage._bufferSize*2) + _consumerWheel = Enumerable.Repeat(0, stage._bufferSize * 2) .Select(_ => ImmutableList.Empty) .ToArray(); @@ -738,7 +738,7 @@ public override void OnUpstreamFinish() public override void OnPush() { Publish(Grab(_stage.In)); - if(!IsFull) + if (!IsFull) Pull(_stage.In); } @@ -746,14 +746,15 @@ private void OnEvent(IHubEvent hubEvent) { if (hubEvent == RegistrationPending.Instance) { - var open = (Open) State.GetAndSet(_noRegistrationState); - open.Registrations.ForEach(c => + var open = (Open)State.GetAndSet(_noRegistrationState); + foreach (var c in open.Registrations) { var startFrom = _head; _activeConsumer++; AddConsumer(c, startFrom); c.Callback(new Initialize(startFrom)); - }); + } + return; } @@ -763,7 +764,7 @@ private void OnEvent(IHubEvent hubEvent) FindAndRemoveConsumer(unregister.Id, unregister.PreviousOffset); if (_activeConsumer == 0) { - if(IsClosed(_stage.In)) + if (IsClosed(_stage.In)) CompleteStage(); else if (_head != unregister.FinalOffset) { @@ -788,7 +789,7 @@ private void OnEvent(IHubEvent hubEvent) if (hubEvent is Advanced advance) { - var newOffset = advance.PreviousOffset + _stage.DemandThreshold; + var newOffset = advance.PreviousOffset + _stage._demandThreshold; // Move the consumer from its last known offset to its new one. Check if we are unblocked. var c = FindAndRemoveConsumer(advance.Id, advance.PreviousOffset); AddConsumer(c, newOffset); @@ -797,7 +798,7 @@ private void OnEvent(IHubEvent hubEvent) } // only NeedWakeup left - var wakeup = (NeedWakeup) hubEvent; + var wakeup = (NeedWakeup)hubEvent; // Move the consumer from its last known offset to its new one. Check if we are unblocked. var consumer = FindAndRemoveConsumer(wakeup.Id, wakeup.PreviousOffset); AddConsumer(consumer, wakeup.CurrentOffset); @@ -818,8 +819,8 @@ public override void OnUpstreamFailure(Exception e) var failMessage = new HubCompleted(e); // Notify pending consumers and set tombstone - var open = (Open) State.GetAndSet(new Closed(e)); - open.Registrations.ForEach(c=>c.Callback(failMessage)); + var open = (Open)State.GetAndSet(new Closed(e)); + open.Registrations.ForEach(c => c.Callback(failMessage)); // Notify registered consumers _consumerWheel.SelectMany(x => x).ForEach(c => c.Callback(failMessage)); @@ -862,7 +863,7 @@ private void CheckUnblock(int offsetOfConsumerRemoved) { if (UnblockIfPossible(offsetOfConsumerRemoved)) { - if(IsClosed(_stage.In)) + if (IsClosed(_stage.In)) Complete(); else if (!HasBeenPulled(_stage.In)) Pull(_stage.In); @@ -899,7 +900,7 @@ private void AddConsumer(Consumer consumer, int offset) /// which is offset modulo (bufferSize + 1). /// /// TBD - private void WakeupIndex(int index) + private void WakeupIndex(int index) => _consumerWheel[index].ForEach(c => c.Callback(Wakeup.Instance)); private void Complete() @@ -978,7 +979,7 @@ public Logic(HubSourceLogic stage, long id) : base(stage.Shape) { _stage = stage; _id = id; - _untilNextAdvanceSignal = stage._hub.DemandThreshold; + _untilNextAdvanceSignal = stage._hub._demandThreshold; SetHandler(stage.Out, this); } @@ -1012,7 +1013,7 @@ void OnHubReady(Result> result) var state = _stage._hubLogic.State.Value; if (state is Closed closed) { - if(closed.Failure != null) + if (closed.Failure != null) FailStage(closed.Failure); else CompleteStage(); @@ -1047,7 +1048,7 @@ public override void OnPull() { _hubCallback(new NeedWakeup(_id, _previousPublishedOffset, _offset)); _previousPublishedOffset = _offset; - _untilNextAdvanceSignal = _stage._hub.DemandThreshold; + _untilNextAdvanceSignal = _stage._hub._demandThreshold; } else if (element == Completed.Instance) CompleteStage(); @@ -1058,9 +1059,9 @@ public override void OnPull() _untilNextAdvanceSignal--; if (_untilNextAdvanceSignal == 0) { - _untilNextAdvanceSignal = _stage._hub.DemandThreshold; + _untilNextAdvanceSignal = _stage._hub._demandThreshold; var previousOffset = _previousPublishedOffset; - _previousPublishedOffset += _stage._hub.DemandThreshold; + _previousPublishedOffset += _stage._hub._demandThreshold; _hubCallback(new Advanced(_id, previousOffset)); } } @@ -1069,12 +1070,12 @@ public override void OnPull() public override void PostStop() => _hubCallback?.Invoke(new UnRegister(_id, _previousPublishedOffset, _offset)); - + private void OnCommand(IConsumerEvent e) { if (e is HubCompleted completed) { - if(completed.Failure != null) + if (completed.Failure != null) FailStage(completed.Failure); else CompleteStage(); @@ -1117,6 +1118,7 @@ protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) private readonly int _bufferSize; private readonly int _mask; private readonly int _wheelMask; + private readonly int _demandThreshold; /// /// TBD @@ -1137,17 +1139,14 @@ public BroadcastHub(int bufferSize) _bufferSize = bufferSize; _mask = _bufferSize - 1; - _wheelMask = bufferSize*2 - 1; - DemandThreshold = bufferSize / 2 + bufferSize % 2; + _wheelMask = bufferSize * 2 - 1; + + // Half of buffer size, rounded up + _demandThreshold = bufferSize / 2 + bufferSize % 2; Shape = new SinkShape(In); } - /// - /// Half of buffer size, rounded up - /// - private int DemandThreshold { get; } - private Inlet In { get; } = new Inlet("BroadcastHub.in"); /// @@ -1169,4 +1168,707 @@ public override ILogicAndMaterializedValue> CreateLogicAndMat return new LogicAndMaterializedValue>(logic, Source.FromGraph(source)); } } + + /// + /// A is a special streaming hub that is able to route streamed elements to a dynamic set of consumers. + /// It consists of two parts, a and a . The e elements from a producer to the + /// actually live consumers it has.The selection of consumer is done with a function. Each element can be routed to + /// only one consumer.Once the producer has been materialized, the it feeds into returns a + /// materialized value which is the corresponding . This can be materialized an arbitrary number + /// of times, where each of the new materializations will receive their elements from the original . + /// + public static class PartitionHub + { + private const int DefaultBufferSize = 256; + + /// + /// Creates a that receives elements from its upstream producer and routes them to a dynamic set + /// of consumers.After the returned by this method is materialized, it returns a + /// as materialized value. + /// This can be materialized an arbitrary number of times and each materialization will receive the + /// elements from the original . + /// + /// Every new materialization of the results in a new, independent hub, which materializes to its own + /// for consuming the of that materialization. + /// + /// If the original is failed, then the failure is immediately propagated to all of its materialized + /// s (possibly jumping over already buffered elements). If the original is completed, then + /// all corresponding s are completed.Both failure and normal completion is "remembered" and later + /// materializations of the will see the same (failure or completion) state. s that are + /// cancelled are simply removed from the dynamic set of consumers. + /// + /// This should be used when there is a need to keep mutable state in the partition function, + /// e.g. for implemening round-robin or sticky session kind of routing. If state is not needed the can + /// be more convenient to use. + /// + /// + /// Function that decides where to route an element.It is a factory of a function to + /// to be able to hold stateful variables that are unique for each materialization.The function + /// takes two parameters; the first is information about active consumers, including an array of consumer + /// identifiers and the second is the stream element.The function should return the selected consumer + /// identifier for the given element.The function will never be called when there are no active consumers, + /// i.e.there is always at least one element in the array of identifiers. + /// + /// + /// Elements are buffered until this number of consumers have been connected. + /// This is only used initially when the stage is starting up, i.e.it is not honored when consumers have been removed (canceled). + /// + /// Total number of elements that can be buffered. If this buffer is full, the producer is backpressured. + [ApiMayChange] + public static Sink> StatefulSink(Func> partitioner, + int startAfterNrOfConsumers, int bufferSize = DefaultBufferSize) + { + return Dsl.Sink.FromGraph(new PartitionHub(partitioner, startAfterNrOfConsumers, bufferSize)); + } + + /// + /// Creates a that receives elements from its upstream producer and routes them to a dynamic set + /// of consumers.After the returned by this method is materialized, it returns a + /// as materialized value. + /// This can be materialized an arbitrary number of times and each materialization will receive the + /// elements from the original . + /// + /// Every new materialization of the results in a new, independent hub, which materializes to its own + /// for consuming the of that materialization. + /// + /// If the original is failed, then the failure is immediately propagated to all of its materialized + /// s (possibly jumping over already buffered elements). If the original is completed, then + /// all corresponding s are completed.Both failure and normal completion is "remembered" and later + /// materializations of the will see the same (failure or completion) state. s that are + /// cancelled are simply removed from the dynamic set of consumers. + /// + /// This should be used when the routing function is stateless, e.g. based on a hashed value of the + /// elements. Otherwise the can be used to implement more advanced routing logic. + /// + /// + /// Function that decides where to route an element. The function takes two parameters; + /// the first is the number of active consumers and the second is the stream element. The function should + /// return the index of the selected consumer for the given element, i.e. int greater than or equal to 0 + /// and less than number of consumers. E.g. `(size, elem) => math.abs(elem.hashCode) % size`. + /// + /// + /// Elements are buffered until this number of consumers have been connected. + /// This is only used initially when the stage is starting up, i.e.it is not honored when consumers have been removed (canceled). + /// + /// Total number of elements that can be buffered. If this buffer is full, the producer is backpressured. + [ApiMayChange] + public static Sink> Sink(Func partitioner, + int startAfterNrOfConsumers, int bufferSize = DefaultBufferSize) + { + return StatefulSink(() => ((info, element) => info.ConsumerByIndex(partitioner(info.Size, element))), + startAfterNrOfConsumers, bufferSize); + } + + /// + /// DO NOT INHERIT + /// + [ApiMayChange] + public interface IConsumerInfo + { + /// + /// Sequence of all identifiers of current consumers. + /// + /// Use this method only if you need to enumerate consumer existing ids. + /// When selecting a specific consumerId by its index, prefer using the dedicated method instead, + /// which is optimised for this use case. + /// + ImmutableArray ConsumerIds { get; } + + /// + /// Obtain consumer identifier by index + /// + long ConsumerByIndex(int index); + + /// + /// Approximate number of buffered elements for a consumer. + /// Larger value than other consumers could be an indication of that the consumer is slow. + /// + /// Note that this is a moving target since the elements are consumed concurrently. + /// + int QueueSize(long consumerId); + + /// + /// Number of attached consumers. + /// + int Size { get; } + } + } + + /// + /// INTERNAL API + /// + internal class PartitionHub : GraphStageWithMaterializedValue, Source> + { + #region queue implementation + + private interface IPartitionQueue + { + void Init(long id); + int TotalSize { get; } + int Size(long id); + bool IsEmpty(long id); + bool NonEmpty(long id); + void Offer(long id, object element); + object Poll(long id); + void Remove(long id); + } + + private sealed class ConsumerQueue + { + public static ConsumerQueue Empty { get; } = new ConsumerQueue(ImmutableQueue.Empty, 0); + + private readonly ImmutableQueue _queue; + + public ConsumerQueue(ImmutableQueue queue, int size) + { + _queue = queue; + Size = size; + } + + public ConsumerQueue Enqueue(object element) => new ConsumerQueue(_queue.Enqueue(element), Size + 1); + + public bool IsEmpty => Size == 0; + + public object Head => _queue.First(); + + public ConsumerQueue Tail => new ConsumerQueue(_queue.Dequeue(), Size - 1); + + public int Size { get; } + } + + private sealed class PartitionQueue : IPartitionQueue + { + private readonly AtomicCounter _totalSize = new AtomicCounter(); + private readonly ConcurrentDictionary _queues = new ConcurrentDictionary(); + + public void Init(long id) => _queues.TryAdd(id, ConsumerQueue.Empty); + + public int TotalSize => _totalSize.Current; + + public int Size(long id) + { + if (_queues.TryGetValue(id, out var queue)) + return queue.Size; + + throw new ArgumentException($"Invalid stream identifier: {id}", nameof(id)); + } + + public bool IsEmpty(long id) + { + if (_queues.TryGetValue(id, out var queue)) + return queue.IsEmpty; + + throw new ArgumentException($"Invalid stream identifier: {id}", nameof(id)); + } + + public bool NonEmpty(long id) => !IsEmpty(id); + + public void Offer(long id, object element) + { + if (_queues.TryGetValue(id, out var queue)) + { + if (_queues.TryUpdate(id, queue.Enqueue(element), queue)) + _totalSize.IncrementAndGet(); + else + Offer(id, element); + } + else + throw new ArgumentException($"Invalid stream identifier: {id}", nameof(id)); + } + + public object Poll(long id) + { + var success = _queues.TryGetValue(id, out var queue); + if (!success || queue.IsEmpty) + return null; + + if (_queues.TryUpdate(id, queue.Tail, queue)) + { + _totalSize.Decrement(); + return queue.Head; + } + + return Poll(id); + } + + public void Remove(long id) + { + if (_queues.TryRemove(id, out var queue)) + _totalSize.AddAndGet(-queue.Size); + } + } + + #endregion + + #region internal classes + + private interface IConsumerEvent { } + + private sealed class Wakeup : IConsumerEvent + { + public static Wakeup Instance { get; } = new Wakeup(); + + private Wakeup() { } + } + + private sealed class Initialize : IConsumerEvent + { + public static Initialize Instance { get; } = new Initialize(); + + private Initialize() { } + } + + private sealed class HubCompleted : IConsumerEvent + { + public Exception Failure { get; } + + public HubCompleted(Exception failure) + { + Failure = failure; + } + } + + + private interface IHubEvent { } + + private sealed class RegistrationPending : IHubEvent + { + public static RegistrationPending Instance { get; } = new RegistrationPending(); + + private RegistrationPending() { } + } + + private sealed class UnRegister : IHubEvent + { + public long Id { get; } + + public UnRegister(long id) + { + Id = id; + } + } + + private sealed class NeedWakeup : IHubEvent + { + public Consumer Consumer { get; } + + public NeedWakeup(Consumer consumer) + { + Consumer = consumer; + } + + } + + private sealed class Consumer : IHubEvent + { + public long Id { get; } + public Action Callback { get; } + + public Consumer(long id, Action callback) + { + Id = id; + Callback = callback; + } + } + + private sealed class TryPull : IHubEvent + { + public static TryPull Instance { get; } = new TryPull(); + + private TryPull() { } + } + + private sealed class Completed + { + public static Completed Instance { get; } = new Completed(); + + private Completed() { } + } + + + private interface IHubState { } + + private sealed class Open : IHubState + { + public Task> CallbackTask { get; } + public ImmutableList Registrations { get; } + + public Open(Task> callbackTask, ImmutableList registrations) + { + CallbackTask = callbackTask; + Registrations = registrations; + } + } + + private sealed class Closed : IHubState + { + public Exception Failure { get; } + + public Closed(Exception failure) + { + Failure = failure; + } + } + + #endregion + + private sealed class PartitionSinkLogic : InGraphStageLogic + { + private sealed class ConsumerInfo : PartitionHub.IConsumerInfo + { + private readonly PartitionSinkLogic _partitionSinkLogic; + + public ConsumerInfo(PartitionSinkLogic partitionSinkLogic, ImmutableList consumers) + { + _partitionSinkLogic = partitionSinkLogic; + Consumers = consumers; + ConsumerIds = Consumers.Select(c => c.Id).ToImmutableArray(); + Size = consumers.Count; + } + + public ImmutableArray ConsumerIds { get; } + + public long ConsumerByIndex(int index) => Consumers[index].Id; + + public int QueueSize(long consumerId) => _partitionSinkLogic._queue.Size(consumerId); + + public int Size { get; } + + public ImmutableList Consumers { get; } + } + + private readonly PartitionHub _hub; + private readonly int _demandThreshold; + private readonly Func _materializedPartitioner; + private readonly TaskCompletionSource> _callbackCompletion = new TaskCompletionSource>(); + private readonly IHubState _noRegistrationsState; + private bool _initialized; + private readonly IPartitionQueue _queue = new PartitionQueue(); + private readonly List _pending = new List(); + private ConsumerInfo _consumerInfo; + private readonly Dictionary _needWakeup = new Dictionary(); + private long _callbackCount; + + public PartitionSinkLogic(PartitionHub hub) : base(hub.Shape) + { + _hub = hub; + // Half of buffer size, rounded up + _demandThreshold = hub._bufferSize / 2 + hub._bufferSize % 2; + _materializedPartitioner = hub._partitioner(); + _noRegistrationsState = new Open(_callbackCompletion.Task, ImmutableList.Empty); + _consumerInfo = new ConsumerInfo(this, ImmutableList.Empty); + + State = new AtomicReference(_noRegistrationsState); + + SetHandler(hub.In, this); + } + + public override void PreStart() + { + SetKeepGoing(true); + _callbackCompletion.SetResult(GetAsyncCallback(OnEvent)); + + if (_hub._startAfterNrOfConsumers == 0) + Pull(_hub.In); + } + + public override void OnPush() + { + Publish(Grab(_hub.In)); + if (!IsFull) Pull(_hub.In); + } + + private bool IsFull => _queue.TotalSize + _pending.Count >= _hub._bufferSize; + + public AtomicReference State { get; } + + private void Publish(T element) + { + if (!_initialized || _consumerInfo.Consumers.Count == 0) + { + // will be published when first consumers are registered + _pending.Add(element); + } + else + { + var id = _materializedPartitioner(_consumerInfo, element); + _queue.Offer(id, element); + Wakeup(id); + } + } + + private void Wakeup(long id) + { + if (_needWakeup.TryGetValue(id, out var consumer)) + { + _needWakeup.Remove(consumer.Id); + consumer.Callback(PartitionHub.Wakeup.Instance); + } + } + + public override void OnUpstreamFinish() + { + if (_consumerInfo.Consumers.Count == 0) + CompleteStage(); + else + { + foreach (var consumer in _consumerInfo.Consumers) + Complete(consumer.Id); + } + } + + private void Complete(long id) + { + _queue.Offer(id, Completed.Instance); + Wakeup(id); + } + + private void TryPull() + { + if (_initialized && !IsClosed(_hub.In) && !HasBeenPulled(_hub.In) && !IsFull) + Pull(_hub.In); + } + + private void OnEvent(IHubEvent e) + { + _callbackCount++; + + if (e is NeedWakeup n) + { + // Also check if the consumer is now unblocked since we published an element since it went asleep. + if (_queue.NonEmpty(n.Consumer.Id)) + n.Consumer.Callback(PartitionHub.Wakeup.Instance); + else + { + _needWakeup[n.Consumer.Id] = n.Consumer; + TryPull(); + } + } + else if (e is TryPull) + TryPull(); + else if (e is RegistrationPending) + { + var o = (Open)State.GetAndSet(_noRegistrationsState); + foreach (var consumer in o.Registrations) + { + var newConsumers = _consumerInfo.Consumers.Add(consumer).Sort((c1, c2) => c1.Id.CompareTo(c2.Id)); + _consumerInfo = new ConsumerInfo(this, newConsumers); + _queue.Init(consumer.Id); + if (newConsumers.Count >= _hub._startAfterNrOfConsumers) + _initialized = true; + + consumer.Callback(Initialize.Instance); + + if (_initialized && _pending.Count != 0) + { + foreach (var p in _pending) + Publish(p); + + _pending.Clear(); + } + + TryPull(); + } + } + else if (e is UnRegister u) + { + var newConsumers = _consumerInfo.Consumers.RemoveAll(c => c.Id == u.Id); + _consumerInfo = new ConsumerInfo(this, newConsumers); + _queue.Remove(u.Id); + if (newConsumers.IsEmpty) + { + if (IsClosed(_hub.In)) + CompleteStage(); + } + else + TryPull(); + } + } + + public override void OnUpstreamFailure(Exception e) + { + var failMessage = new HubCompleted(e); + + // Notify pending consumers and set tombstone + var o = (Open)State.GetAndSet(new Closed(e)); + foreach (var consumer in o.Registrations) + consumer.Callback(failMessage); + + // Notify registered consumers + foreach (var consumer in _consumerInfo.Consumers) + consumer.Callback(failMessage); + + FailStage(e); + } + + public override void PostStop() + { + // Notify pending consumers and set tombstone + + var s = State.Value; + if (s is Open o) + { + if (State.CompareAndSet(o, new Closed(null))) + { + var completeMessage = new HubCompleted(null); + foreach (var consumer in o.Registrations) + consumer.Callback(completeMessage); + } + else + PostStop(); + } + // Already closed, ignore + } + + // Consumer API + public object Poll(long id, Action hubCallback) + { + // try pull via async callback when half full + // this is racy with other threads doing poll but doesn't matter + if (_queue.TotalSize == _demandThreshold) + hubCallback(PartitionHub.TryPull.Instance); + + return _queue.Poll(id); + } + } + + private sealed class PartitionSource : GraphStage> + { + private sealed class Logic : OutGraphStageLogic + { + private readonly PartitionSource _source; + private readonly long _id; + private readonly Consumer _consumer; + private long _callbackCount; + private Action _hubCallback; + + public Logic(PartitionSource source) : base(source.Shape) + { + _source = source; + _id = source._counter.IncrementAndGet(); + var callback = GetAsyncCallback(OnCommand); + _consumer = new Consumer(_id, callback); + + SetHandler(source._out, this); + } + + public override void PreStart() + { + void OnHubReady(Task> t) + { + if (t.IsCanceled || t.IsFaulted) + FailStage(t.Exception); + else + { + _hubCallback = t.Result; + _hubCallback(RegistrationPending.Instance); + if (IsAvailable(_source._out)) + OnPull(); + } + } + + void Register() + { + var s = _source._logic.State.Value; + if (s is Closed c) + { + if (c.Failure != null) + FailStage(c.Failure); + else + CompleteStage(); + return; + } + + var o = (Open)s; + var newRegistrations = o.Registrations.Add(_consumer); + if (_source._logic.State.CompareAndSet(o, new Open(o.CallbackTask, newRegistrations))) + { + var callback = GetAsyncCallback>>(OnHubReady); + o.CallbackTask.ContinueWith(callback); + } + else Register(); + } + + Register(); + } + + public override void OnPull() + { + if (_hubCallback == null) return; + + var element = _source._logic.Poll(_id, _hubCallback); + if (element == null) + _hubCallback(new NeedWakeup(_consumer)); + else if (element is Completed) + CompleteStage(); + else + Push(_source._out, (T)element); + } + + public override void PostStop() => _hubCallback?.Invoke(new UnRegister(_id)); + + private void OnCommand(IConsumerEvent command) + { + _callbackCount++; + switch (command) + { + case HubCompleted c when c.Failure != null: + FailStage(c.Failure); + break; + case HubCompleted _: + CompleteStage(); + break; + case Wakeup _: + if (IsAvailable(_source._out)) + OnPull(); + break; + case Initialize _: + if (IsAvailable(_source._out) && _hubCallback != null) + OnPull(); + break; + } + } + } + + private readonly AtomicCounterLong _counter; + private readonly PartitionSinkLogic _logic; + private readonly Outlet _out = new Outlet("PartitionHub.out"); + + public PartitionSource(AtomicCounterLong counter, PartitionSinkLogic logic) + { + _counter = counter; + _logic = logic; + Shape = new SourceShape(_out); + } + + public override SourceShape Shape { get; } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); + } + + private readonly Func> _partitioner; + private readonly int _startAfterNrOfConsumers; + private readonly int _bufferSize; + + public PartitionHub(Func> partitioner, int startAfterNrOfConsumers, int bufferSize) + { + _partitioner = partitioner; + _startAfterNrOfConsumers = startAfterNrOfConsumers; + _bufferSize = bufferSize; + Shape = new SinkShape(In); + } + + public Inlet In { get; } = new Inlet("PartitionHub.in"); + + public override SinkShape Shape { get; } + + public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + { + var idCounter = new AtomicCounterLong(); + var logic = new PartitionSinkLogic(this); + var source = new PartitionSource(idCounter, logic); + + return new LogicAndMaterializedValue>(logic, Source.FromGraph(source)); + } + } }