Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete ClusterDaemon and port AutoDown and AutoDownSpec #2

Merged
merged 1 commit into from
Sep 9, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/core/Akka.Cluster.Tests/Akka.Cluster.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
<Otherwise />
</Choose>
<ItemGroup>
<Compile Include="AutoDownSpec.cs" />
<Compile Include="ClusterConfigSpec.cs" />
<Compile Include="ClusterDomainEventPublisherSpec.cs" />
<Compile Include="ClusterDomainEventSpec.cs" />
Expand Down
165 changes: 165 additions & 0 deletions src/core/Akka.Cluster.Tests/AutoDownSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
using System;
using Akka.Actor;
using Akka.TestKit;
using Xunit;

namespace Akka.Cluster.Tests
{
public class AutoDownSpec : AkkaSpec
{
sealed class DownCalled
{
readonly Address _address;

public DownCalled(Address address)
{
_address = address;
}

public override bool Equals(object obj)
{
var other = obj as DownCalled;
if (other == null) return false;
return _address.Equals(other._address);
}

public override int GetHashCode()
{
return _address.GetHashCode();
}
}

readonly static Member MemberA = TestMember.Create(new Address("akka.tcp", "sys", "a", 2552), MemberStatus.Up);
readonly static Member MemberB = TestMember.Create(new Address("akka.tcp", "sys", "b", 2552), MemberStatus.Up);
readonly static Member MemberC = TestMember.Create(new Address("akka.tcp", "sys", "c", 2552), MemberStatus.Up);

class AutoDownTestActor : AutoDownBase
{
readonly ActorRef _probe;

public AutoDownTestActor(TimeSpan autoDownUnreachableAfter, ActorRef probe): base(autoDownUnreachableAfter)
{
_probe = probe;
}

public override Address SelfAddress
{
get { return MemberA.Address; }
}

public override Scheduler Scheduler
{
get { return Context.System.Scheduler; }
}

public override void Down(Address node)
{
if (_leader)
{
_probe.Tell(new DownCalled(node));
}
else
{
_probe.Tell("down must only be done by leader");
}
}
}

private ActorRef AutoDownActor(TimeSpan autoDownUnreachableAfter)
{
return
Sys.ActorOf(new Props(typeof(AutoDownTestActor),
new object[] { autoDownUnreachableAfter, this.TestActor }));
}

[Fact]
public void AutoDownMustDownUnreachableWhenLeader()
{
var a = AutoDownActor(TimeSpan.Zero);
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB));
ExpectMsg(new DownCalled(MemberB.Address));
}

[Fact]
public void AutoDownMustNotDownUnreachableWhenNotLeader()
{
var a = AutoDownActor(TimeSpan.Zero);
a.Tell(new ClusterEvent.LeaderChanged(MemberB.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberC));
ExpectNoMsg(TimeSpan.FromSeconds(1));
}

[Fact]
public void AutoDownMustDownUnreachableWhenBecomingLeader()
{
var a = AutoDownActor(TimeSpan.Zero);
a.Tell(new ClusterEvent.LeaderChanged(MemberB.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberC));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
ExpectMsg(new DownCalled(MemberC.Address));
}

[Fact]
public void AutoDownMustDownUnreachableAfterSpecifiedDuration()
{
var a = AutoDownActor(TimeSpan.FromSeconds(2));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB));
ExpectNoMsg(TimeSpan.FromSeconds(1));
ExpectMsg(new DownCalled(MemberB.Address));
}

[Fact]
public void AutoDownMustDownUnreachableWhenBecomingLeaderInbetweenDetectionAndSpecifiedDuration()
{
var a = AutoDownActor(TimeSpan.FromSeconds(2));
a.Tell(new ClusterEvent.LeaderChanged(MemberB.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberC));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
ExpectNoMsg(TimeSpan.FromSeconds(1));
ExpectMsg(new DownCalled(MemberC.Address));
}

[Fact]
public void AutoDownMustNotDownUnreachableWhenLoosingLeadershipInbetweenDetectionAndSpecfiedDuration()
{
var a = AutoDownActor(TimeSpan.FromSeconds(2));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberC));
a.Tell(new ClusterEvent.LeaderChanged(MemberB.Address));
ExpectNoMsg(TimeSpan.FromSeconds(3));
}

[Fact]
public void AutoDownMustNotDownWhenUnreachableBecomeReachableInbetweenDetectionAndSpecifiedDuration()
{
var a = AutoDownActor(TimeSpan.FromSeconds(2));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB));
a.Tell(new ClusterEvent.ReachableMember(MemberB));
ExpectNoMsg(TimeSpan.FromSeconds(3));
}


[Fact]
public void AutoDownMustNotDownUnreachableIsRemovedInbetweenDetectionAndSpecifiedDuration()
{
var a = AutoDownActor(TimeSpan.FromSeconds(2));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB));
a.Tell(new ClusterEvent.MemberRemoved(MemberB.Copy(MemberStatus.Removed), MemberStatus.Exiting));
ExpectNoMsg(TimeSpan.FromSeconds(3));
}

[Fact]
public void AutoDownMustNotDownWhenUnreachableIsAlreadyDown()
{
var a = AutoDownActor(TimeSpan.Zero);
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB.Copy(MemberStatus.Down)));
ExpectNoMsg(TimeSpan.FromSeconds(1));
}

}
}
193 changes: 192 additions & 1 deletion src/core/Akka.Cluster/AutoDown.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,204 @@
using System;
using System.Collections.Immutable;
using System.Threading;
using Akka.Actor;
using Akka.Event;
using Microsoft.VisualBasic.Logging;

namespace Akka.Cluster
{
public class AutoDown
/// <summary>
/// INTERNAL API
///
/// An unreachable member will be downed by this actor if it remains unreachable
/// for the specified duration and this actor is running on the leader node in the
/// cluster.
///
/// The implementation is split into two classes AutoDown and AutoDownBase to be
/// able to unit test the logic without running cluster.
/// </summary>
public class AutoDown : AutoDownBase
{
public static Props Props(TimeSpan autoDownUnreachableAfter)
{
return new Props(typeof(AutoDown), new object[]{autoDownUnreachableAfter});
}

public sealed class UnreachableTimeout
{
readonly UniqueAddress _node;
public UniqueAddress Node { get { return _node; } }

public UnreachableTimeout(UniqueAddress node)
{
_node = node;
}

//TODO: Equals etc
}

readonly Cluster _cluster;

public AutoDown(TimeSpan autoDownUnreachableAfter) : base(autoDownUnreachableAfter)
{
_cluster = Cluster.Get(Context.System);
}

public override Address SelfAddress
{
get { return _cluster.SelfAddress; }
}

public override Scheduler Scheduler
{
get { return _cluster.Scheduler; }
}

protected override void PreStart()
{
_cluster.Subscribe(Self,new []{ typeof(ClusterEvent.IClusterDomainEvent)});
base.PreStart();
}

protected override void PostStop()
{
_cluster.Unsubscribe(Self);
base.PostStop();
}

public override void Down(Address node)
{
if(!_leader) throw new InvalidOperationException("Must be leader to down node");
_cluster.LogInfo("Leader is auto-downing unreachable node [{0}]", node);
_cluster.Down(node);
}

}

public abstract class AutoDownBase : UntypedActor
{
readonly ImmutableHashSet<MemberStatus> _skipMemberStatus =
Gossip.ConvergenceSkipUnreachableWithMemberStatus;

ImmutableDictionary<UniqueAddress, CancellationTokenSource> _scheduledUnreachable =
ImmutableDictionary.Create<UniqueAddress, CancellationTokenSource>();
ImmutableHashSet<UniqueAddress> _pendingUnreachable = ImmutableHashSet.Create<UniqueAddress>();
protected bool _leader = false;

readonly TimeSpan _autoDownUnreachableAfter;

protected AutoDownBase(TimeSpan autoDownUnreachableAfter)
{
_autoDownUnreachableAfter = autoDownUnreachableAfter;
}

protected override void PostStop()
{
foreach (var tokenSource in _scheduledUnreachable.Values) tokenSource.Cancel();
}

public abstract Address SelfAddress { get; }

public abstract Scheduler Scheduler { get; }

public abstract void Down(Address node);

protected override void OnReceive(object message)
{
var state = message as ClusterEvent.CurrentClusterState;
if (state != null)
{
_leader = state.Leader != null && state.Leader.Equals(SelfAddress);
foreach (var m in state.Unreachable) UnreachableMember(m);
return;
}

var unreachableMember = message as ClusterEvent.UnreachableMember;
if (unreachableMember != null)
{
UnreachableMember(unreachableMember.Member);
return;
}

var reachableMember = message as ClusterEvent.ReachableMember;
if (reachableMember != null)
{
Remove(reachableMember.Member.UniqueAddress);
return;
}
var memberRemoved = message as ClusterEvent.MemberRemoved;
if (memberRemoved != null)
{
Remove(memberRemoved.Member.UniqueAddress);
return;
}

var leaderChanged = message as ClusterEvent.LeaderChanged;
if (leaderChanged != null)
{
_leader = leaderChanged.Leader != null && leaderChanged.Leader.Equals(SelfAddress);
if (_leader)
{
foreach(var node in _pendingUnreachable) Down(node.Address);
_pendingUnreachable = ImmutableHashSet.Create<UniqueAddress>();
}
return;
}

var unreachableTimeout = message as AutoDown.UnreachableTimeout;
if (unreachableTimeout != null)
{
if (_scheduledUnreachable.ContainsKey(unreachableTimeout.Node))
{
_scheduledUnreachable = _scheduledUnreachable.Remove(unreachableTimeout.Node);
DownOrAddPending(unreachableTimeout.Node);
}
return;
}
}

private void UnreachableMember(Member m)
{
if(!_skipMemberStatus.Contains(m.Status) && !_scheduledUnreachable.ContainsKey(m.UniqueAddress))
ScheduleUnreachable(m.UniqueAddress);
}

private void ScheduleUnreachable(UniqueAddress node)
{
if (_autoDownUnreachableAfter == TimeSpan.Zero)
{
DownOrAddPending(node);
}
else
{
var cancellationSource = new CancellationTokenSource();
Scheduler.ScheduleOnce(_autoDownUnreachableAfter, Self, new AutoDown.UnreachableTimeout(node), cancellationSource.Token);
_scheduledUnreachable = _scheduledUnreachable.Add(node, cancellationSource);
}
}

private void DownOrAddPending(UniqueAddress node)
{
if (_leader)
{
Down(node.Address);
}
else
{
// it's supposed to be downed by another node, current leader, but if that crash
// a new leader must pick up these
_pendingUnreachable = _pendingUnreachable.Add(node);
}
}

private void Remove(UniqueAddress node)
{
CancellationTokenSource source;
if(_scheduledUnreachable.TryGetValue(node, out source)) source.Cancel();
_scheduledUnreachable = _scheduledUnreachable.Remove(node);
_pendingUnreachable = _pendingUnreachable.Remove(node);
}

public LoggingAdapter Log { get; private set; }
}
}
Loading