Skip to content

Commit

Permalink
Merge pull request #857 from syncromatics/853-fix-racy-tests
Browse files Browse the repository at this point in the history
853 Fixing racy tests
  • Loading branch information
Aaronontheweb committed Apr 16, 2015
2 parents c283a88 + 6c77299 commit f488215
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 51 deletions.
2 changes: 1 addition & 1 deletion src/core/Akka.Tests.Shared.Internals/AkkaSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected void Intercept(Action actionThatThrows)
{
try
{
actionThatThrows();
actionThatThrows();
}
catch(Exception)
{
Expand Down
9 changes: 5 additions & 4 deletions src/core/Akka.Tests/Actor/InboxSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ namespace Akka.Tests.Actor
{
public class InboxSpec : AkkaSpec
{
private Inbox _inbox;
private readonly Inbox _inbox;

public InboxSpec()
: base("akka.actor.inbox.inbox-size=1000") //Default is 1000 but just to make sure these tests don't fail we set it
{
Expand Down Expand Up @@ -105,7 +106,7 @@ public void Inbox_have_maximum_queue_size()
o.ShouldBe(0);
}

//The inbox should be empty now, so receiving should result in a timeout
//The inbox should be empty now, so receiving should result in a timeout
Intercept<TimeoutException>(() =>
{
var received = _inbox.Receive(TimeSpan.FromSeconds(1));
Expand All @@ -118,15 +119,15 @@ public void Inbox_have_maximum_queue_size()
}
}


[Fact]
public void Inbox_have_a_default_and_custom_timeouts()
{
Within(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(6), () =>
Within(TimeSpan.FromSeconds(4), TimeSpan.FromSeconds(6), () =>
{
Intercept<TimeoutException>(() => _inbox.Receive());
return true;
});

Within(TimeSpan.FromSeconds(1), () =>
{
Intercept<TimeoutException>(() => _inbox.Receive(TimeSpan.FromMilliseconds(100)));
Expand Down
36 changes: 21 additions & 15 deletions src/core/Akka.Tests/Routing/ResizerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,34 +159,37 @@ public void DefaultResizer_must_grow_as_needed_under_pressure()

(RouteeSize(router)).ShouldBe(resizer.LowerBound);

Action<int, TimeSpan> loop = (loops, span) =>
Action<int, TimeSpan, int?> loop = (loops, span, expectedBound) =>
{
for (var i = 0; i < loops; i++)
{
router.Tell(span, TestActor);
if (expectedBound.HasValue && RouteeSize(router) >= expectedBound.Value)
{
return;
}

//sending too quickly will result in skipped resize due to many resizeInProgress conflicts
Thread.Sleep(TimeSpan.FromMilliseconds(20));
}
Within(
TimeSpan.FromMilliseconds((span.TotalMilliseconds * loops) / resizer.LowerBound) + TimeSpan.FromSeconds(2),
() =>
{
for (var i = 0; i < loops; i++) ExpectMsg("done");
return true;
});
};


var max = TimeSpan.FromMilliseconds((span.TotalMilliseconds*loops)/resizer.LowerBound) +
TimeSpan.FromSeconds(2);

Within(max, () =>
{
for (var i = 0; i < loops; i++) ExpectMsg("done");
return true;
});
};

// 2 more should go through without triggering more
loop(2, TimeSpan.FromMilliseconds(200));
loop(2, TimeSpan.FromMilliseconds(200), null);
RouteeSize(router).ShouldBe(resizer.LowerBound);


// a whole bunch should max it out
loop(50, TimeSpan.FromMilliseconds(500));
loop(100, TimeSpan.FromMilliseconds(500), resizer.UpperBound);
RouteeSize(router).ShouldBe(resizer.UpperBound);

}

class BackoffActor : UntypedActor
Expand All @@ -213,9 +216,12 @@ public void DefaultResizer_must_backoff()
var router = Sys.ActorOf(Props.Create<BackoffActor>().WithRouter(new RoundRobinPool(0, resizer)));

// put some pressure on the router
for (var i = 0; i < 25; i++)
for (var i = 0; i < 50; i++)
{
router.Tell(150);
if (RouteeSize(router) > 2)
break;

Thread.Sleep(20);
}

Expand Down
61 changes: 35 additions & 26 deletions src/core/Akka.Tests/Routing/TailChoppingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,15 @@ namespace Akka.Tests.Routing
{
public class TailChoppingSpec : AkkaSpec
{
private TestActor testActor;

private ActorSystem actorSystem;

class TailChopTestActor : UntypedActor
{
private int timesResponded;
private int _timesResponded;

private int sleepTime;
private readonly int _sleepTime;

public TailChopTestActor(int sleepTime)
{
this.sleepTime = sleepTime;
_sleepTime = sleepTime;
}

protected override void OnReceive(object message)
Expand All @@ -43,36 +39,39 @@ protected override void OnReceive(object message)
Context.Stop(Self);
break;
case "times":
Sender.Tell(timesResponded);
Sender.Tell(_timesResponded);
break;
default:
Thread.Sleep(sleepTime);
Thread.Sleep(_sleepTime);
Sender.Tell("ack");
timesResponded++;
_timesResponded++;
break;
}
}
}

public class BroadcastTarget : UntypedActor
{
private AtomicCounter _counter;
private TestLatch _latch;
private readonly AtomicCounter _counter;
private readonly TestLatch _latch;

public BroadcastTarget(TestLatch latch, AtomicCounter counter)
{
_latch = latch;
_counter = counter;
}

protected override void OnReceive(object message)
{
if (message is string)
var messageString = message as string;
if (messageString != null)
{
var s = (string)message;
if (s == "end")
if (messageString == "end")
{
_latch.CountDown();
}
}

if (message is int)
{
var i = (int)message;
Expand All @@ -93,17 +92,17 @@ public Func<Func<IActorRef, int>, bool> OneOfShouldEqual(int what, IEnumerable<I
{
return func =>
{
var results = actors.Select(x => func(x));
return (results.Any(x => x == what));
var results = actors.Select(func);
return results.Any(x => x == what);
};
}

public Func<Func<IActorRef, int>, bool> AllShouldEqual(int what, IEnumerable<IActorRef> actors)
{
return func =>
{
var results = actors.Select(x => func(x));
return (results.All(x => x == what));
var results = actors.Select(func);
return results.All(x => x == what);
};
}

Expand All @@ -118,7 +117,7 @@ public void Tail_chopping_router_must_deliver_a_broadcast_message_using_tell()
var actor2 = Sys.ActorOf(Props.Create(() => new BroadcastTarget(doneLatch, counter2)), "Actor2");

var routedActor = Sys.ActorOf(Props.Create<TestActor>()
.WithRouter(new TailChoppingGroup(new string[] { actor1.Path.ToString(), actor2.Path.ToString() }, TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(100))
.WithRouter(new TailChoppingGroup(new[] { actor1.Path.ToString(), actor2.Path.ToString() }, TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(100))
));

routedActor.Tell(new Broadcast(1));
Expand All @@ -138,7 +137,7 @@ public void Tail_chopping_router_must_return_response_from_second_actor_after_in

var probe = CreateTestProbe();
var routedActor = Sys.ActorOf(Props.Create<TestActor>()
.WithRouter(new TailChoppingGroup(new string[] { actor1.Path.ToString(), actor2.Path.ToString() }, TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(50))
.WithRouter(new TailChoppingGroup(new[] { actor1.Path.ToString(), actor2.Path.ToString() }, TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(50))
));

probe.Send(routedActor, "");
Expand All @@ -153,19 +152,29 @@ public void Tail_chopping_router_must_return_response_from_second_actor_after_in
[Fact]
public void Tail_chopping_router_must_throw_exception_if_no_result_will_arrive_within_the_given_time()
{
var actor1 = Sys.ActorOf(Props.Create(() => new TailChopTestActor(500)), "Actor5");
var actor2 = Sys.ActorOf(Props.Create(() => new TailChopTestActor(500)), "Actor6");
var actor1 = Sys.ActorOf(Props.Create(() => new TailChopTestActor(700)), "Actor5");
var actor2 = Sys.ActorOf(Props.Create(() => new TailChopTestActor(700)), "Actor6");

var probe = CreateTestProbe();

var routedActor = Sys.ActorOf(Props.Create<TestActor>()
.WithRouter(new TailChoppingGroup(new string[] { actor1.Path.ToString(), actor2.Path.ToString() }, TimeSpan.FromMilliseconds(300), TimeSpan.FromMilliseconds(50))
));
.WithRouter(new TailChoppingGroup(
new[]
{
actor1.Path.ToString(),
actor2.Path.ToString()
},
TimeSpan.FromMilliseconds(300),
TimeSpan.FromMilliseconds(50))
));

probe.Send(routedActor, "");
probe.ExpectMsg<Status.Failure>();

Thread.Sleep(700);

var actorList = new List<IActorRef> { actor1, actor2 };
Assert.True(AllShouldEqual(1, actorList)((x => (int)x.Ask("times").Result)));
Assert.True(AllShouldEqual(1, actorList)(x => (int) x.Ask("times").Result));

routedActor.Tell(new Broadcast("stop"));
}
Expand Down
15 changes: 10 additions & 5 deletions src/core/Akka/Actor/Inbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,18 @@ private object AwaitResult(Task<object> task, TimeSpan timeout)
{
if (task.Wait(timeout))
{
var received = task.Result as Status.Failure;
if (received != null && received.Cause is TimeoutException)
{
var reason = string.Format("Inbox {0} received a status failure response message: {1}", Receiver.Path, received.Cause.Message);
throw new TimeoutException(reason, received.Cause);
}

return task.Result;
}
else
{
var fmt = string.Format("Inbox {0} didn't received a response message in specified timeout {1}", Receiver.Path, timeout);
throw new TimeoutException(fmt);
}

var fmt = string.Format("Inbox {0} didn't received a response message in specified timeout {1}", Receiver.Path, timeout);
throw new TimeoutException(fmt);
}
}
}
Expand Down

0 comments on commit f488215

Please sign in to comment.