Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement PartitonHub #3287

Merged
merged 7 commits into from
Jan 23, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion docs/articles/streams/stream-dynamic.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,55 @@ We now wrap the `Sink` and `Source` in a Flow using `Flow.FromSinkAndSource`. Th

The resulting `Flow` now has a type of `Flow<string, string, UniqueKillSwitch>` representing a publish-subscribe channel which can be used any number of times to attach new producers or consumers. In addition, it materializes to a `UniqueKillSwitch` (see [UniqueKillSwitch](xref:streams-dynamic-handling#uniquekillswitch)) that can be used to deregister a single user externally:

[!code-csharp[HubsDocTests.cs](../../examples/DocsExamples/Streams/HubsDocTests.cs?name=pub-sub-4)]
[!code-csharp[HubsDocTests.cs](../../examples/DocsExamples/Streams/HubsDocTests.cs?name=pub-sub-4)]


### Using the PartitionHub

**This is a [may change](../utilities/may-change.md) feature**

A `PartitionHub` can be used to route elements from a common producer to a dynamic set of consumers.
The selection of consumer is done with a function. Each element can be routed to only one consumer.

The rate of the producer will be automatically adapted to the slowest consumer. In this case, the hub is a `Sink`
to which the single producer must be attached first. Consumers can only be attached once the `Sink` has
been materialized (i.e. the producer has been started). One example of using the `PartitionHub`:

[!code-csharp[HubsDocTests.cs](../../examples/DocsExamples/Streams/HubsDocTests.cs?name=partition-hub)]

The `partitioner` function takes two parameters; the first is the number of active consumers and the second
is the stream element. The function should return the index of the selected consumer for the given element,
i.e. `int` greater than or equal to 0 and less than number of consumers.

The resulting `Source` can be materialized any number of times, each materialization effectively attaching
a new consumer. If there are no consumers attached to this hub then it will not drop any elements but instead
backpressure the upstream producer until consumers arrive. This behavior can be tweaked by using the combinators
`.Buffer` for example with a drop strategy, or just attaching a consumer that drops all messages. If there
are no other consumers, this will ensure that the producer is kept drained (dropping all elements) and once a new
consumer arrives and messages are routed to the new consumer it will adaptively slow down, ensuring no more messages
are dropped.

It is possible to define how many initial consumers that are required before it starts emitting any messages
to the attached consumers. While not enough consumers have been attached messages are buffered and when the
buffer is full the upstream producer is backpressured. No messages are dropped.

The above example illustrate a stateless partition function. For more advanced stateful routing the `StatefulSink` can be used. Here is an example of a stateful round-robin function:

[!code-csharp[HubsDocTests.cs](../../examples/DocsExamples/Streams/HubsDocTests.cs?name=partition-hub-stateful)]

Note that it is a factory of a function to to be able to hold stateful variables that are
unique for each materialization.


The function takes two parameters; the first is information about active consumers, including an array of
consumer identifiers and the second is the stream element. The function should return the selected consumer
identifier for the given element. The function will never be called when there are no active consumers, i.e.
there is always at least one element in the array of identifiers.

Another interesting type of routing is to prefer routing to the fastest consumers. The `IConsumerInfo`
has an accessor `QueueSize` that is approximate number of buffered elements for a consumer.
Larger value than other consumers could be an indication of that the consumer is slow.
Note that this is a moving target since the elements are consumed concurrently. Here is an example of
a hub that routes to the consumer with least buffered elements:

[!code-csharp[HubsDocTests.cs](../../examples/DocsExamples/Streams/HubsDocTests.cs?name=partition-hub-fastest)]
28 changes: 28 additions & 0 deletions docs/articles/utilities/may-change.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Modules marked "May Change"

To be able to introduce new modules and APIs without freezing them the moment they
are released we have introduced
the term **may change**.

Concretely **may change** means that an API or module is in early access mode and that it:

* is not guaranteed to be binary compatible in minor releases
* may have its API change in breaking ways in minor releases
* may be entirely dropped from Akka in a minor release

Complete modules can be marked as **may change**, this will can be found in their module description and in the docs.

Individual public APIs can be annotated with `Akka.Annotations.ApiMayChange` to signal that it has less
guarantees than the rest of the module it lives in.
Please use such methods and classes with care, however if you see such APIs that is the best point in time to try them
out and provide feedback (e.g. using the akka-user mailing list, GitHub issues or Gitter) before they are frozen as
fully stable API.

Best effort migration guides may be provided, but this is decided on a case-by-case basis for **may change** modules.

The purpose of this is to be able to release features early and
make them easily available and improve based on feedback, or even discover
that the module or API wasn't useful.

These are the current complete modules marked as **may change**:

101 changes: 101 additions & 0 deletions docs/examples/DocsExamples/Streams/HubsDocTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka;
using Akka.Streams;
Expand Down Expand Up @@ -113,5 +114,105 @@ public void Hubs_must_demonstrate_combination()
killSwitch.Shutdown();
#endregion
}

[Fact]
public void Hubs_must_demonstrate_creating_a_dynamic_partition_hub()
{
#region partition-hub

// A simple producer that publishes a new "message-" every second
Source<string, NotUsed> producer = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "message")
.MapMaterializedValue(_ => NotUsed.Instance)
.ZipWith(Source.From(Enumerable.Range(1, 100)), (msg, i) => $"{msg}-{i}");

// Attach a PartitionHub Sink to the producer. This will materialize to a
// corresponding Source.
// (We need to use toMat and Keep.right since by default the materialized
// value to the left is used)
IRunnableGraph<Source<string, NotUsed>> runnableGraph =
producer.ToMaterialized(PartitionHub.Sink<string>(
(size, element) => Math.Abs(element.GetHashCode()) % size,
startAfterNrOfConsumers: 2, bufferSize: 256), Keep.Right);

// By running/materializing the producer, we get back a Source, which
// gives us access to the elements published by the producer.
Source<string, NotUsed> fromProducer = runnableGraph.Run(Materializer);

// Print out messages from the producer in two independent consumers
fromProducer.RunForeach(msg => Console.WriteLine("Consumer1: " + msg), Materializer);
fromProducer.RunForeach(msg => Console.WriteLine("Consumer2: " + msg), Materializer);

#endregion
}

[Fact]
public void Hubs_must_demonstrate_creating_a_dynamic_steful_partition_hub()
{
#region partition-hub-stateful

// A simple producer that publishes a new "message-" every second
Source<string, NotUsed> producer = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "message")
.MapMaterializedValue(_ => NotUsed.Instance)
.ZipWith(Source.From(Enumerable.Range(1, 100)), (msg, i) => $"{msg}-{i}");

// New instance of the partitioner function and its state is created
// for each materialization of the PartitionHub.
Func<PartitionHub.IConsumerInfo, string, long> RoundRobbin()
{
var i = -1L;
return (info, element) =>
{
i++;
return info.ConsumerByIndex((int) (i % info.Size));
};
}

// Attach a PartitionHub Sink to the producer. This will materialize to a
// corresponding Source.
// (We need to use toMat and Keep.right since by default the materialized
// value to the left is used)
IRunnableGraph<Source<string, NotUsed>> runnableGraph =
producer.ToMaterialized(PartitionHub.StatefulSink(RoundRobbin,
startAfterNrOfConsumers: 2, bufferSize: 256), Keep.Right);

// By running/materializing the producer, we get back a Source, which
// gives us access to the elements published by the producer.
Source<string, NotUsed> fromProducer = runnableGraph.Run(Materializer);

// Print out messages from the producer in two independent consumers
fromProducer.RunForeach(msg => Console.WriteLine("Consumer1: " + msg), Materializer);
fromProducer.RunForeach(msg => Console.WriteLine("Consumer2: " + msg), Materializer);

#endregion
}

[Fact]
public void Hubs_must_demonstrate_creating_a_dynamic_partition_hub_routing_to_fastest_consumer()
{
#region partition-hub-fastest

// A simple producer that publishes a new "message-" every second
Source<int, NotUsed> producer = Source.From(Enumerable.Range(0, 100));

// Attach a PartitionHub Sink to the producer. This will materialize to a
// corresponding Source.
// (We need to use toMat and Keep.right since by default the materialized
// value to the left is used)
IRunnableGraph<Source<int, NotUsed>> runnableGraph =
producer.ToMaterialized(PartitionHub.StatefulSink<int>(
() => ((info, element) => info.ConsumerIds.Min(info.QueueSize)),
startAfterNrOfConsumers: 2, bufferSize: 256), Keep.Right);

// By running/materializing the producer, we get back a Source, which
// gives us access to the elements published by the producer.
Source<int, NotUsed> fromProducer = runnableGraph.Run(Materializer);

// Print out messages from the producer in two independent consumers
fromProducer.RunForeach(msg => Console.WriteLine("Consumer1: " + msg), Materializer);
fromProducer.Throttle(10, TimeSpan.FromMilliseconds(100), 10, ThrottleMode.Shaping)
.RunForeach(msg => Console.WriteLine("Consumer2: " + msg), Materializer);

#endregion
}
}
}
15 changes: 15 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,21 @@ namespace Akka.Streams.Dsl
public Akka.Streams.Outlet<T> Out(int id) { }
public override string ToString() { }
}
public class static PartitionHub
{
[Akka.Annotations.ApiMayChangeAttribute()]
public static Akka.Streams.Dsl.Sink<T, Akka.Streams.Dsl.Source<T, Akka.NotUsed>> Sink<T>(System.Func<int, T, int> partitioner, int startAfterNrOfConsumers, int bufferSize = 256) { }
[Akka.Annotations.ApiMayChangeAttribute()]
public static Akka.Streams.Dsl.Sink<T, Akka.Streams.Dsl.Source<T, Akka.NotUsed>> StatefulSink<T>(System.Func<System.Func<Akka.Streams.Dsl.PartitionHub.IConsumerInfo, T, long>> partitioner, int startAfterNrOfConsumers, int bufferSize = 256) { }
[Akka.Annotations.ApiMayChangeAttribute()]
public interface IConsumerInfo
{
System.Collections.Immutable.ImmutableArray<long> ConsumerIds { get; }
int Size { get; }
long ConsumerByIndex(int index);
int QueueSize(long consumerId);
}
}
public sealed class PartitionOutOfBoundsException : System.Exception
{
public PartitionOutOfBoundsException(string message) { }
Expand Down
Loading