Skip to content

Commit

Permalink
Complete ClusterDaemon and port AutoDown and AutoDownSpec
Browse files Browse the repository at this point in the history
  • Loading branch information
smalldave committed Sep 9, 2014
1 parent 0a9a5d2 commit 01f9ba3
Show file tree
Hide file tree
Showing 6 changed files with 552 additions and 33 deletions.
1 change: 1 addition & 0 deletions src/core/Akka.Cluster.Tests/Akka.Cluster.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
<Otherwise />
</Choose>
<ItemGroup>
<Compile Include="AutoDownSpec.cs" />
<Compile Include="ClusterConfigSpec.cs" />
<Compile Include="ClusterDomainEventPublisherSpec.cs" />
<Compile Include="ClusterDomainEventSpec.cs" />
Expand Down
165 changes: 165 additions & 0 deletions src/core/Akka.Cluster.Tests/AutoDownSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
using System;
using Akka.Actor;
using Akka.TestKit;
using Xunit;

namespace Akka.Cluster.Tests
{
public class AutoDownSpec : AkkaSpec
{
sealed class DownCalled
{
readonly Address _address;

public DownCalled(Address address)
{
_address = address;
}

public override bool Equals(object obj)
{
var other = obj as DownCalled;
if (other == null) return false;
return _address.Equals(other._address);
}

public override int GetHashCode()
{
return _address.GetHashCode();
}
}

readonly static Member MemberA = TestMember.Create(new Address("akka.tcp", "sys", "a", 2552), MemberStatus.Up);
readonly static Member MemberB = TestMember.Create(new Address("akka.tcp", "sys", "b", 2552), MemberStatus.Up);
readonly static Member MemberC = TestMember.Create(new Address("akka.tcp", "sys", "c", 2552), MemberStatus.Up);

class AutoDownTestActor : AutoDownBase
{
readonly ActorRef _probe;

public AutoDownTestActor(TimeSpan autoDownUnreachableAfter, ActorRef probe): base(autoDownUnreachableAfter)
{
_probe = probe;
}

public override Address SelfAddress
{
get { return MemberA.Address; }
}

public override Scheduler Scheduler
{
get { return Context.System.Scheduler; }
}

public override void Down(Address node)
{
if (_leader)
{
_probe.Tell(new DownCalled(node));
}
else
{
_probe.Tell("down must only be done by leader");
}
}
}

private ActorRef AutoDownActor(TimeSpan autoDownUnreachableAfter)
{
return
Sys.ActorOf(new Props(typeof(AutoDownTestActor),
new object[] { autoDownUnreachableAfter, this.TestActor }));
}

[Fact]
public void AutoDownMustDownUnreachableWhenLeader()
{
var a = AutoDownActor(TimeSpan.Zero);
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB));
ExpectMsg(new DownCalled(MemberB.Address));
}

[Fact]
public void AutoDownMustNotDownUnreachableWhenNotLeader()
{
var a = AutoDownActor(TimeSpan.Zero);
a.Tell(new ClusterEvent.LeaderChanged(MemberB.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberC));
ExpectNoMsg(TimeSpan.FromSeconds(1));
}

[Fact]
public void AutoDownMustDownUnreachableWhenBecomingLeader()
{
var a = AutoDownActor(TimeSpan.Zero);
a.Tell(new ClusterEvent.LeaderChanged(MemberB.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberC));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
ExpectMsg(new DownCalled(MemberC.Address));
}

[Fact]
public void AutoDownMustDownUnreachableAfterSpecifiedDuration()
{
var a = AutoDownActor(TimeSpan.FromSeconds(2));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB));
ExpectNoMsg(TimeSpan.FromSeconds(1));
ExpectMsg(new DownCalled(MemberB.Address));
}

[Fact]
public void AutoDownMustDownUnreachableWhenBecomingLeaderInbetweenDetectionAndSpecifiedDuration()
{
var a = AutoDownActor(TimeSpan.FromSeconds(2));
a.Tell(new ClusterEvent.LeaderChanged(MemberB.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberC));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
ExpectNoMsg(TimeSpan.FromSeconds(1));
ExpectMsg(new DownCalled(MemberC.Address));
}

[Fact]
public void AutoDownMustNotDownUnreachableWhenLoosingLeadershipInbetweenDetectionAndSpecfiedDuration()
{
var a = AutoDownActor(TimeSpan.FromSeconds(2));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberC));
a.Tell(new ClusterEvent.LeaderChanged(MemberB.Address));
ExpectNoMsg(TimeSpan.FromSeconds(3));
}

[Fact]
public void AutoDownMustNotDownWhenUnreachableBecomeReachableInbetweenDetectionAndSpecifiedDuration()
{
var a = AutoDownActor(TimeSpan.FromSeconds(2));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB));
a.Tell(new ClusterEvent.ReachableMember(MemberB));
ExpectNoMsg(TimeSpan.FromSeconds(3));
}


[Fact]
public void AutoDownMustNotDownUnreachableIsRemovedInbetweenDetectionAndSpecifiedDuration()
{
var a = AutoDownActor(TimeSpan.FromSeconds(2));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB));
a.Tell(new ClusterEvent.MemberRemoved(MemberB.Copy(MemberStatus.Removed), MemberStatus.Exiting));
ExpectNoMsg(TimeSpan.FromSeconds(3));
}

[Fact]
public void AutoDownMustNotDownWhenUnreachableIsAlreadyDown()
{
var a = AutoDownActor(TimeSpan.Zero);
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB.Copy(MemberStatus.Down)));
ExpectNoMsg(TimeSpan.FromSeconds(1));
}

}
}
193 changes: 192 additions & 1 deletion src/core/Akka.Cluster/AutoDown.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,204 @@
using System;
using System.Collections.Immutable;
using System.Threading;
using Akka.Actor;
using Akka.Event;
using Microsoft.VisualBasic.Logging;

namespace Akka.Cluster
{
public class AutoDown
/// <summary>
/// INTERNAL API
///
/// An unreachable member will be downed by this actor if it remains unreachable
/// for the specified duration and this actor is running on the leader node in the
/// cluster.
///
/// The implementation is split into two classes AutoDown and AutoDownBase to be
/// able to unit test the logic without running cluster.
/// </summary>
public class AutoDown : AutoDownBase
{
public static Props Props(TimeSpan autoDownUnreachableAfter)
{
return new Props(typeof(AutoDown), new object[]{autoDownUnreachableAfter});
}

public sealed class UnreachableTimeout
{
readonly UniqueAddress _node;
public UniqueAddress Node { get { return _node; } }

public UnreachableTimeout(UniqueAddress node)
{
_node = node;
}

//TODO: Equals etc
}

readonly Cluster _cluster;

public AutoDown(TimeSpan autoDownUnreachableAfter) : base(autoDownUnreachableAfter)
{
_cluster = Cluster.Get(Context.System);
}

public override Address SelfAddress
{
get { return _cluster.SelfAddress; }
}

public override Scheduler Scheduler
{
get { return _cluster.Scheduler; }
}

protected override void PreStart()
{
_cluster.Subscribe(Self,new []{ typeof(ClusterEvent.IClusterDomainEvent)});
base.PreStart();
}

protected override void PostStop()
{
_cluster.Unsubscribe(Self);
base.PostStop();
}

public override void Down(Address node)
{
if(!_leader) throw new InvalidOperationException("Must be leader to down node");
_cluster.LogInfo("Leader is auto-downing unreachable node [{0}]", node);
_cluster.Down(node);
}

}

public abstract class AutoDownBase : UntypedActor
{
readonly ImmutableHashSet<MemberStatus> _skipMemberStatus =
Gossip.ConvergenceSkipUnreachableWithMemberStatus;

ImmutableDictionary<UniqueAddress, CancellationTokenSource> _scheduledUnreachable =
ImmutableDictionary.Create<UniqueAddress, CancellationTokenSource>();
ImmutableHashSet<UniqueAddress> _pendingUnreachable = ImmutableHashSet.Create<UniqueAddress>();
protected bool _leader = false;

readonly TimeSpan _autoDownUnreachableAfter;

protected AutoDownBase(TimeSpan autoDownUnreachableAfter)
{
_autoDownUnreachableAfter = autoDownUnreachableAfter;
}

protected override void PostStop()
{
foreach (var tokenSource in _scheduledUnreachable.Values) tokenSource.Cancel();
}

public abstract Address SelfAddress { get; }

public abstract Scheduler Scheduler { get; }

public abstract void Down(Address node);

protected override void OnReceive(object message)
{
var state = message as ClusterEvent.CurrentClusterState;
if (state != null)
{
_leader = state.Leader != null && state.Leader.Equals(SelfAddress);
foreach (var m in state.Unreachable) UnreachableMember(m);
return;
}

var unreachableMember = message as ClusterEvent.UnreachableMember;
if (unreachableMember != null)
{
UnreachableMember(unreachableMember.Member);
return;
}

var reachableMember = message as ClusterEvent.ReachableMember;
if (reachableMember != null)
{
Remove(reachableMember.Member.UniqueAddress);
return;
}
var memberRemoved = message as ClusterEvent.MemberRemoved;
if (memberRemoved != null)
{
Remove(memberRemoved.Member.UniqueAddress);
return;
}

var leaderChanged = message as ClusterEvent.LeaderChanged;
if (leaderChanged != null)
{
_leader = leaderChanged.Leader != null && leaderChanged.Leader.Equals(SelfAddress);
if (_leader)
{
foreach(var node in _pendingUnreachable) Down(node.Address);
_pendingUnreachable = ImmutableHashSet.Create<UniqueAddress>();
}
return;
}

var unreachableTimeout = message as AutoDown.UnreachableTimeout;
if (unreachableTimeout != null)
{
if (_scheduledUnreachable.ContainsKey(unreachableTimeout.Node))
{
_scheduledUnreachable = _scheduledUnreachable.Remove(unreachableTimeout.Node);
DownOrAddPending(unreachableTimeout.Node);
}
return;
}
}

private void UnreachableMember(Member m)
{
if(!_skipMemberStatus.Contains(m.Status) && !_scheduledUnreachable.ContainsKey(m.UniqueAddress))
ScheduleUnreachable(m.UniqueAddress);
}

private void ScheduleUnreachable(UniqueAddress node)
{
if (_autoDownUnreachableAfter == TimeSpan.Zero)
{
DownOrAddPending(node);
}
else
{
var cancellationSource = new CancellationTokenSource();
Scheduler.ScheduleOnce(_autoDownUnreachableAfter, Self, new AutoDown.UnreachableTimeout(node), cancellationSource.Token);
_scheduledUnreachable = _scheduledUnreachable.Add(node, cancellationSource);
}
}

private void DownOrAddPending(UniqueAddress node)
{
if (_leader)
{
Down(node.Address);
}
else
{
// it's supposed to be downed by another node, current leader, but if that crash
// a new leader must pick up these
_pendingUnreachable = _pendingUnreachable.Add(node);
}
}

private void Remove(UniqueAddress node)
{
CancellationTokenSource source;
if(_scheduledUnreachable.TryGetValue(node, out source)) source.Cancel();
_scheduledUnreachable = _scheduledUnreachable.Remove(node);
_pendingUnreachable = _pendingUnreachable.Remove(node);
}

public LoggingAdapter Log { get; private set; }
}
}
Loading

0 comments on commit 01f9ba3

Please sign in to comment.