Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed Reentrancy behavior #917

Merged
merged 1 commit into from
Apr 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 7 additions & 31 deletions src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Event;
using Akka.TestKit;
using Xunit;

Expand All @@ -29,8 +27,6 @@ public AsyncActor()
}
});
}

ILoggingAdapter Log = Context.GetLogger();
}

public class SuspendActor : ReceiveActor
Expand All @@ -42,7 +38,7 @@ public SuspendActor()
{
state = 1;
});
Receive<string>(AsyncBehavior.Suspend, async _ =>
Receive<string>(async _ =>
{
Self.Tell("change");
await Task.Delay(TimeSpan.FromSeconds(1));
Expand All @@ -51,21 +47,7 @@ public SuspendActor()
});
}
}
public class ReentrantActor : ReceiveActor
{
public ReentrantActor()
{
var state = 0;
Receive<string>(s => s == "change", _ => state = 1);
Receive<string>(AsyncBehavior.Reentrant, async _ =>
{
Self.Tell("change");
await Task.Delay(TimeSpan.FromSeconds(1));
//we expect that state should have changed due to an incoming message
Sender.Tell(state);
});
}
}

public class AsyncAwaitActor : ReceiveActor
{
public AsyncAwaitActor()
Expand All @@ -89,7 +71,7 @@ protected override void OnReceive(object message)
{
if (message is string)
{
RunTask(AsyncBehavior.Suspend, async () =>
RunTask(async () =>
{
var sender = Sender;
var self = Self;
Expand Down Expand Up @@ -132,7 +114,7 @@ protected override void OnReceive(object message)
{
if (message is string)
{
RunTask(AsyncBehavior.Suspend, async () =>
RunTask(async () =>
{
var sender = Sender;
var self = Self;
Expand Down Expand Up @@ -210,7 +192,7 @@ public AsyncTplActor()
Receive<string>(m =>
{
//this is also safe, all tasks complete in the actor context
RunTask(AsyncBehavior.Suspend, () =>
RunTask(() =>
{
Task.Delay(TimeSpan.FromSeconds(1))
.ContinueWith(t => { Sender.Tell("done"); });
Expand All @@ -228,7 +210,7 @@ public AsyncTplExceptionActor(IActorRef callback)
_callback = callback;
Receive<string>(m =>
{
RunTask(AsyncBehavior.Suspend, () =>
RunTask(() =>
{
Task.Delay(TimeSpan.FromSeconds(1))
.ContinueWith(t => { throw new Exception("foo"); });
Expand Down Expand Up @@ -320,13 +302,7 @@ public void Actors_should_be_able_to_supervise_exception_ContinueWith()
asker.Tell("start");
ExpectMsg("done", TimeSpan.FromSeconds(5));
}
[Fact]
public async Task Actors_should_be_able_to_reenter()
{
var asker = Sys.ActorOf(Props.Create(() => new ReentrantActor()));
var res = await asker.Ask<int>("start",TimeSpan.FromSeconds(5));
res.ShouldBe(1);
}


[Fact]
public async Task Actors_should_be_able_to_suspend_reentrancy()
Expand Down
22 changes: 0 additions & 22 deletions src/core/Akka/Actor/ActorCell.DefaultMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ public void SystemInvoke(Envelope envelope)
else if (m is Recreate) FaultRecreate((m as Recreate).Cause);
else if (m is Suspend) FaultSuspend();
else if (m is Resume) FaultResume((m as Resume).CausedByFailure);
else if (m is SuspendReentrancy) HandleSuspendReentrancy();
else if (m is ResumeReentrancy) HandleResumeReentrancy();
else if (m is Terminate) Terminate();
else if (m is Supervise)
{
Expand All @@ -193,16 +191,6 @@ public void SystemInvoke(Envelope envelope)
}
}

private void HandleSuspendReentrancy()
{
Mailbox.Suspend(MailboxSuspendStatus.AwaitingTask);
}

private void HandleResumeReentrancy()
{
Mailbox.Resume(MailboxSuspendStatus.AwaitingTask);
}

private void HandleCompleteTask(CompleteTask task)
{
CurrentMessage = task.State.Message;
Expand Down Expand Up @@ -357,16 +345,6 @@ public void Suspend()
SendSystemMessage(Dispatch.SysMsg.Suspend.Instance);
}

public void SuspendReentrancy()
{
SendSystemMessage(Dispatch.SysMsg.SuspendReentrancy.Instance);
}

public void ResumeReentrancy()
{
SendSystemMessage(Dispatch.SysMsg.ResumeReentrancy.Instance);
}

private void SendSystemMessage(ISystemMessage systemMessage)
{
try
Expand Down
12 changes: 1 addition & 11 deletions src/core/Akka/Actor/ReceiveActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,7 @@ protected void Receive<T>(Func<T,Task> handler)
_matchHandlerBuilders.Peek().Match<T>( m =>
{
Func<Task> wrap = () => handler(m);
ActorTaskScheduler.RunTask(AsyncBehavior.Suspend, wrap);
});
}

protected void Receive<T>(AsyncBehavior behavior, Func<T, Task> handler)
{
EnsureMayConfigureMessageHandlers();
_matchHandlerBuilders.Peek().Match<T>(m =>
{
Func<Task> wrap = () => handler(m);
ActorTaskScheduler.RunTask(behavior, wrap);
ActorTaskScheduler.RunTask(wrap);
});
}

Expand Down
8 changes: 4 additions & 4 deletions src/core/Akka/Actor/UntypedActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ protected sealed override bool Receive(object message)
return true;
}

protected void RunTask(AsyncBehavior behavior, Action action)
protected void RunTask(Action action)
{
ActorTaskScheduler.RunTask(behavior,action);
ActorTaskScheduler.RunTask(action);
}

protected void RunTask(AsyncBehavior behavior, Func<Task> action)
protected void RunTask(Func<Task> action)
{
ActorTaskScheduler.RunTask(behavior,action);
ActorTaskScheduler.RunTask(action);
}

/// <summary>
Expand Down
31 changes: 10 additions & 21 deletions src/core/Akka/Dispatch/ActorTaskScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@

namespace Akka.Dispatch
{
public enum AsyncBehavior
{
Reentrant,
Suspend
}

public class AmbientState
{
public IActorRef Self { get; set; }
Expand Down Expand Up @@ -85,8 +79,8 @@ protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQu
//Is the current cell and the current state the same?
if (cell != null &&
s != null &&
cell.Self == s.Self &&
cell.Sender == s.Sender &&
Equals(cell.Self, s.Self) &&
Equals(cell.Sender, s.Sender) &&
cell.CurrentMessage == s.Message)
{
var res = TryExecuteTask(task);
Expand All @@ -96,24 +90,22 @@ protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQu
return false;
}

public static void RunTask(AsyncBehavior behavior, Action action)
public static void RunTask(Action action)
{
RunTask(behavior, () =>
RunTask(() =>
{
action();
return Task.FromResult(0);
});
}

public static void RunTask(AsyncBehavior behavior, Func<Task> action)
public static void RunTask(Func<Task> action)
{
var context = ActorCell.Current;
var mailbox = context.Mailbox;

//if reentrancy is not allowed, suspend user message processing
if (behavior == AsyncBehavior.Suspend)
{
context.SuspendReentrancy();
}
//suspend the mailbox
mailbox.Suspend(MailboxSuspendStatus.AwaitingTask);

SetCurrentState(context.Self, context.Sender, null);

Expand All @@ -134,11 +126,8 @@ await action()
Faulted,
TaskContinuationOptions.None);

//if reentrancy was suspended, make sure we re-enable message processing again
if (behavior == AsyncBehavior.Suspend)
{
context.ResumeReentrancy();
}
//if mailbox was suspended, make sure we re-enable message processing again
mailbox.Resume(MailboxSuspendStatus.AwaitingTask);
},
Outer,
CancellationToken.None,
Expand Down
32 changes: 0 additions & 32 deletions src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,38 +360,6 @@ public override string ToString()
}
}

/// <summary>
/// Class SuspendReentrancy.
/// </summary>
public sealed class SuspendReentrancy : ISystemMessage
{
private SuspendReentrancy() { }
private static readonly SuspendReentrancy _instance = new SuspendReentrancy();
public static SuspendReentrancy Instance
{
get
{
return _instance;
}
}
}

/// <summary>
/// Class ResumeReentrancy.
/// </summary>
public sealed class ResumeReentrancy : ISystemMessage
{
private ResumeReentrancy() { }
private static readonly ResumeReentrancy _instance = new ResumeReentrancy();
public static ResumeReentrancy Instance
{
get
{
return _instance;
}
}
}

/// <summary>
/// Class Stop.
/// </summary>
Expand Down