Skip to content

Commit

Permalink
Ask interface should be clean (#3220)
Browse files Browse the repository at this point in the history
* Tests should be precise - in temrs of what to expect

* Ask interface refined #3220

* ClusterRouter unit test fix #3220

* Ask deadlock test added #3220

* Handle deadlock by removing the SynchronizationContext #3220

* Fixing ScatterGather router test #3220

* Ask interface refined #3220

AskSpecs consolidated
Api change approval - internal CastTask removed

* Fixing header #3220
  • Loading branch information
maxcherednik authored and Aaronontheweb committed Feb 1, 2018
1 parent c9ab046 commit fe9efe9
Show file tree
Hide file tree
Showing 21 changed files with 250 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,13 +388,13 @@ public static Task<T> AskEx<T>(this ICanTell self, Func<IActorRef, object> messa
return self.AskEx<T>(messageFactory, null, cancellationToken);
}

public static Task<T> AskEx<T>(this ICanTell self, Func<IActorRef, object> messageFactory, TimeSpan? timeout, CancellationToken cancellationToken)
public static async Task<T> AskEx<T>(this ICanTell self, Func<IActorRef, object> messageFactory, TimeSpan? timeout, CancellationToken cancellationToken)
{
IActorRefProvider provider = ResolveProvider(self);
if (provider == null)
throw new ArgumentException("Unable to resolve the target Provider", nameof(self));

return AskEx(self, messageFactory, provider, timeout, cancellationToken).CastTask<object, T>();
return (T)await AskEx(self, messageFactory, provider, timeout, cancellationToken);
}
internal static IActorRefProvider ResolveProvider(ICanTell self)
{
Expand All @@ -410,49 +410,60 @@ internal static IActorRefProvider ResolveProvider(ICanTell self)
return null;
}

private static Task<object> AskEx(ICanTell self, Func<IActorRef, object> messageFactory, IActorRefProvider provider, TimeSpan? timeout, CancellationToken cancellationToken)
private static async Task<object> AskEx(ICanTell self, Func<IActorRef, object> messageFactory, IActorRefProvider provider, TimeSpan? timeout, CancellationToken cancellationToken)
{
var result = new TaskCompletionSource<object>();

CancellationTokenSource timeoutCancellation = null;
timeout = timeout ?? provider.Settings.AskTimeout;
List<CancellationTokenRegistration> ctrList = new List<CancellationTokenRegistration>(2);
var ctrList = new List<CancellationTokenRegistration>(2);

if (timeout != System.Threading.Timeout.InfiniteTimeSpan && timeout.Value > default(TimeSpan))
if (timeout != Timeout.InfiniteTimeSpan && timeout.Value > default(TimeSpan))
{
timeoutCancellation = new CancellationTokenSource();
ctrList.Add(timeoutCancellation.Token.Register(() => result.TrySetCanceled()));

ctrList.Add(timeoutCancellation.Token.Register(() =>
{
result.TrySetException(new AskTimeoutException($"Timeout after {timeout} seconds"));
}));

timeoutCancellation.CancelAfter(timeout.Value);
}

if (cancellationToken.CanBeCanceled)
{
ctrList.Add(cancellationToken.Register(() => result.TrySetCanceled()));
}

//create a new tempcontainer path
ActorPath path = provider.TempPath();
//callback to unregister from tempcontainer
Action unregister =
() =>
{
// cancelling timeout (if any) in order to prevent memory leaks
// (a reference to 'result' variable in CancellationToken's callback)
if (timeoutCancellation != null)
{
timeoutCancellation.Cancel();
timeoutCancellation.Dispose();
}
for (var i = 0; i < ctrList.Count; i++)
{
ctrList[i].Dispose();
}
provider.UnregisterTempActor(path);
};

var future = new FutureActorRef(result, unregister, path);
var future = new FutureActorRef(result, () => { }, path);
//The future actor needs to be registered in the temp container
provider.RegisterTempActor(future, path);

self.Tell(messageFactory(future), future);
return result.Task;

try
{
return await result.Task;
}
finally
{
//callback to unregister from tempcontainer

provider.UnregisterTempActor(path);

for (var i = 0; i < ctrList.Count; i++)
{
ctrList[i].Dispose();
}

if (timeoutCancellation != null)
{
timeoutCancellation.Dispose();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4892,7 +4892,6 @@ namespace Akka.Util.Internal
[Akka.Annotations.InternalApiAttribute()]
public class static TaskExtensions
{
public static System.Threading.Tasks.Task<TResult> CastTask<TTask, TResult>(this System.Threading.Tasks.Task<TTask> task) { }
public static System.Threading.Tasks.Task WithCancellation(this System.Threading.Tasks.Task task, System.Threading.CancellationToken cancellationToken) { }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,7 @@ public async Task Should_Ask_Clustered_Group_Router_and_with_no_routees_and_time
var router = Sys.ActorOf(Props.Empty.WithRouter(FromConfig.Instance), "router3");
Assert.IsType<RoutedActorRef>(router);

try
{
var result = await router.Ask<string>("foo");
}
catch (Exception ex)
{
Assert.IsType<TaskCanceledException>(ex);
}
await Assert.ThrowsAsync<AskTimeoutException>(async () => await router.Ask<int>("foo"));
}
}
}
108 changes: 85 additions & 23 deletions src/core/Akka.Tests/Actor/AskSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@
using Akka.Actor;
using System;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;

namespace Akka.Tests.Actor
{

public class AskSpec : AkkaSpec
{
public AskSpec()
: base(@"akka.actor.ask-timeout = 3000ms")
{ }

public class SomeActor : UntypedActor
{
protected override void OnReceive(object message)
Expand All @@ -24,6 +29,7 @@ protected override void OnReceive(object message)
{
Thread.Sleep(5000);
}

if (message.Equals("answer"))
{
Sender.Tell("answer");
Expand All @@ -39,9 +45,9 @@ public WaitActor(IActorRef replyActor, IActorRef testActor)
_testActor = testActor;
}

private IActorRef _replyActor;
private readonly IActorRef _replyActor;

private IActorRef _testActor;
private readonly IActorRef _testActor;

protected override void OnReceive(object message)
{
Expand All @@ -66,52 +72,108 @@ protected override void OnReceive(object message)
}

[Fact]
public void Can_Ask_actor()
public async Task Can_Ask_actor()
{
var actor = Sys.ActorOf<SomeActor>();
actor.Ask<string>("answer").Result.ShouldBe("answer");
var res = await actor.Ask<string>("answer");
res.ShouldBe("answer");
}

[Fact]
public void Can_Ask_actor_with_timeout()
public async Task Can_Ask_actor_with_timeout()
{
var actor = Sys.ActorOf<SomeActor>();
actor.Ask<string>("answer",TimeSpan.FromSeconds(10)).Result.ShouldBe("answer");
var res = await actor.Ask<string>("answer", TimeSpan.FromSeconds(10));
res.ShouldBe("answer");
}

[Fact]
public void Can_get_timeout_when_asking_actor()
public async Task Can_get_timeout_when_asking_actor()
{
var actor = Sys.ActorOf<SomeActor>();
Assert.Throws<AggregateException>(() => { actor.Ask<string>("timeout", TimeSpan.FromSeconds(3)).Wait(); });
await Assert.ThrowsAsync<AskTimeoutException>(async () => await actor.Ask<string>("timeout", TimeSpan.FromSeconds(3)));
}

[Fact]
public void Can_cancel_when_asking_actor()
{
public async Task Can_cancel_when_asking_actor()
{
var actor = Sys.ActorOf<SomeActor>();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
Assert.Throws<AggregateException>(() => { actor.Ask<string>("timeout", Timeout.InfiniteTimeSpan, cts.Token).Wait(); });
Assert.True(cts.IsCancellationRequested);
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)))
{
await Assert.ThrowsAsync<TaskCanceledException>(async () => await actor.Ask<string>("timeout", Timeout.InfiniteTimeSpan, cts.Token));
}
}

[Fact]
public void Cancelled_ask_with_null_timeout_should_remove_temp_actor()
public async Task Ask_should_honor_config_specified_timeout()
{
var actor = Sys.ActorOf<SomeActor>();
var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
Assert.Throws<AggregateException>(() => { actor.Ask<string>("cancel", cts.Token).Wait(); });
Assert.True(cts.IsCancellationRequested);
try
{
await actor.Ask<string>("timeout");
Assert.True(false, "the ask should have timed out with default timeout");
}
catch (AskTimeoutException e)
{
Assert.Equal("Timeout after 00:00:03 seconds", e.Message);
}
}

[Fact]
public async Task Cancelled_ask_with_null_timeout_should_remove_temp_actor()
{
var actor = Sys.ActorOf<SomeActor>();

using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)))
{
await Assert.ThrowsAsync<TaskCanceledException>(async () => await actor.Ask<string>("cancel", cts.Token));
}

Are_Temp_Actors_Removed(actor);
}

[Fact]
public void Cancelled_ask_with_timeout_should_remove_temp_actor()
public async Task Cancelled_ask_with_timeout_should_remove_temp_actor()
{
var actor = Sys.ActorOf<SomeActor>();
var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
Assert.Throws<AggregateException>(() => { actor.Ask<string>("cancel", TimeSpan.FromSeconds(30), cts.Token).Wait(); });
Assert.True(cts.IsCancellationRequested);
using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)))
{
await Assert.ThrowsAsync<TaskCanceledException>(async () => await actor.Ask<string>("cancel", TimeSpan.FromSeconds(30), cts.Token));
}

Are_Temp_Actors_Removed(actor);
}

[Fact]
public async Task AskTimeout_with_default_timeout_should_remove_temp_actor()
{
var actor = Sys.ActorOf<SomeActor>();

await Assert.ThrowsAsync<AskTimeoutException>(async () => await actor.Ask<string>("timeout"));

Are_Temp_Actors_Removed(actor);
}

[Fact]
public async Task ShouldFailWhenAskExpectsWrongType()
{
var actor = Sys.ActorOf<SomeActor>();

// expect int, but in fact string
await Assert.ThrowsAsync<InvalidCastException>(async () => await actor.Ask<int>("answer"));
}

[Fact]
public void AskDoesNotDeadlockWhenWaitForResultInGuiApplication()
{
AsyncContext.Run(() =>
{
var actor = Sys.ActorOf<SomeActor>();
var res = actor.Ask<string>("answer").Result; // blocking on purpose
res.ShouldBe("answer");
});
}

private void Are_Temp_Actors_Removed(IActorRef actor)
{
var actorCell = actor as ActorRefWithCell;
Expand All @@ -126,7 +188,7 @@ private void Are_Temp_Actors_Removed(IActorRef actor)
container.ForEachChild(x => childCounter++);
Assert.True(childCounter == 0, "Temp actors not all removed.");
});

}

/// <summary>
Expand Down
82 changes: 0 additions & 82 deletions src/core/Akka.Tests/Actor/AskTimeoutSpec.cs

This file was deleted.

Loading

0 comments on commit fe9efe9

Please sign in to comment.