diff --git a/src/Akka.sln b/src/Akka.sln
index 8e8d1546a30..240dd07079f 100644
--- a/src/Akka.sln
+++ b/src/Akka.sln
@@ -2,6 +2,8 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 14
VisualStudioVersion = 14.0.22823.1
+# Visual Studio 2013
+VisualStudioVersion = 12.0.31101.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{69279534-1DBA-4115-BF8B-03F77FC8125E}"
EndProject
@@ -211,6 +213,10 @@ EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.MultiNodeTests", "core\Akka.MultiNodeTests\Akka.MultiNodeTests.csproj", "{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PersistenceBenchmark", "benchmark\PersistenceBenchmark\PersistenceBenchmark.csproj", "{39E6F51F-FA1E-4C62-B8F8-19065DE6D55D}"
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{A59BAE84-70E2-46A0-9E26-7413C103E2D7}"
+ ProjectSection(SolutionItems) = preProject
+ WebEssentials-Settings.json = WebEssentials-Settings.json
+ EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
diff --git a/src/core/Akka.TestKit/Akka.TestKit.csproj b/src/core/Akka.TestKit/Akka.TestKit.csproj
index edc98f4d41f..21e25311a7b 100644
--- a/src/core/Akka.TestKit/Akka.TestKit.csproj
+++ b/src/core/Akka.TestKit/Akka.TestKit.csproj
@@ -111,6 +111,7 @@
+
diff --git a/src/core/Akka.TestKit/TestBreaker.cs b/src/core/Akka.TestKit/TestBreaker.cs
new file mode 100644
index 00000000000..ac188f853e4
--- /dev/null
+++ b/src/core/Akka.TestKit/TestBreaker.cs
@@ -0,0 +1,30 @@
+using System;
+using System.Linq;
+using System.Runtime.Serialization;
+using System.Threading;
+using System.Threading.Tasks;
+using Akka.Pattern;
+
+namespace Akka.TestKit
+{
+ public class TestBreaker
+ {
+ public CountdownEvent HalfOpenLatch { get; private set; }
+ public CountdownEvent OpenLatch { get; private set; }
+ public CountdownEvent ClosedLatch { get; private set; }
+ public CircuitBreaker Instance { get; private set; }
+
+ public TestBreaker( CircuitBreaker instance )
+ {
+ HalfOpenLatch = new CountdownEvent( 1 );
+ OpenLatch = new CountdownEvent( 1 );
+ ClosedLatch = new CountdownEvent( 1 );
+ Instance = instance;
+ Instance.OnClose( ( ) => { if ( !ClosedLatch.IsSet ) ClosedLatch.Signal( ); } )
+ .OnHalfOpen( ( ) => { if ( !HalfOpenLatch.IsSet ) HalfOpenLatch.Signal( ); } )
+ .OnOpen( ( ) => { if ( !OpenLatch.IsSet ) OpenLatch.Signal( ); } );
+ }
+
+
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Tests/Akka.Tests.csproj b/src/core/Akka.Tests/Akka.Tests.csproj
index 16e21f4965a..b9519bf9740 100644
--- a/src/core/Akka.Tests/Akka.Tests.csproj
+++ b/src/core/Akka.Tests/Akka.Tests.csproj
@@ -150,6 +150,7 @@
+
diff --git a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs
new file mode 100644
index 00000000000..c4d5a56b692
--- /dev/null
+++ b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs
@@ -0,0 +1,351 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2015 Typesafe Inc.
+// Copyright (C) 2013-2015 Akka.NET project
+//
+//-----------------------------------------------------------------------
+
+using System;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq;
+using System.Runtime.Serialization;
+using System.Threading;
+using System.Threading.Tasks;
+using Akka.Pattern;
+using Akka.TestKit;
+using Xunit;
+
+namespace Akka.Tests.Pattern
+{
+ public class ASynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase
+ {
+ [Fact(DisplayName = "A synchronous circuit breaker that is closed should allow call through")]
+ public void Should_Allow_Call_Through( )
+ {
+ var breaker = LongCallTimeoutCb( );
+ var result = breaker.Instance.WithSyncCircuitBreaker( ( ) => "Test" );
+
+ Assert.Equal( "Test", result );
+ }
+
+ [Fact( DisplayName = "A synchronous circuit breaker that is closed should increment failure count when call fails" )]
+ public void Should_Increment_FailureCount_When_Call_Fails( )
+ {
+ var breaker = LongCallTimeoutCb( );
+
+ Assert.Equal( breaker.Instance.CurrentFailureCount, 0 );
+ Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) );
+ Assert.True( CheckLatch( breaker.OpenLatch ) );
+ Assert.Equal( 1, breaker.Instance.CurrentFailureCount );
+ }
+
+ [Fact( DisplayName = "A synchronous circuit breaker that is closed should reset failure count when call succeeds" )]
+ public void Should_Reset_FailureCount_When_Call_Succeeds( )
+ {
+ var breaker = MultiFailureCb( );
+
+ Assert.Equal( breaker.Instance.CurrentFailureCount, 0 );
+ Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) );
+ Assert.Equal( breaker.Instance.CurrentFailureCount, 1 );
+
+ breaker.Instance.WithSyncCircuitBreaker( ( ) => "Test" );
+
+ Assert.Equal( 0, breaker.Instance.CurrentFailureCount );
+ }
+
+ [Fact(DisplayName = "A synchronous circuit breaker that is closed should increment failure count when call times out")]
+ public void Should_Increment_FailureCount_When_Call_Times_Out( )
+ {
+ var breaker = ShortCallTimeoutCb( );
+
+ breaker.Instance.WithSyncCircuitBreaker( ( ) => Thread.Sleep( 500 ) );
+
+ Assert.True( CheckLatch( breaker.OpenLatch ) );
+ Assert.Equal( 1, breaker.Instance.CurrentFailureCount );
+ }
+ }
+
+ public class ASynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase
+ {
+ [Fact(DisplayName = "A synchronous circuit breaker that is half open should pass call and transition to close on success")]
+ public void Should_Pass_Call_And_Transition_To_Close_On_Success( )
+ {
+ var breaker = ShortResetTimeoutCb( );
+ InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) );
+ Assert.True( CheckLatch( breaker.HalfOpenLatch ) );
+
+ var result = breaker.Instance.WithSyncCircuitBreaker( ( ) => SayTest( ) );
+
+ Assert.True( CheckLatch( breaker.ClosedLatch ) );
+ Assert.Equal( SayTest( ), result );
+ }
+
+ [Fact(DisplayName = "A synchronous circuit breaker that is half open should pass call and transition to open on exception")]
+ public void Should_Pass_Call_And_Transition_To_Open_On_Exception( )
+ {
+ var breaker = ShortResetTimeoutCb( );
+
+
+ Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) );
+ Assert.True( CheckLatch( breaker.HalfOpenLatch ) );
+
+ Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) );
+ Assert.True( CheckLatch( breaker.OpenLatch ) );
+ }
+ }
+
+ public class ASynchronousCircuitBreakerThatIsOpen : CircuitBreakerSpecBase
+ {
+ [Fact(DisplayName = "A synchronous circuit breaker that is open should throw exceptions before reset timeout")]
+ public void Should_Throw_Exceptions_Before_Reset_Timeout( )
+ {
+ var breaker = LongResetTimeoutCb( );
+
+ Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) );
+ Assert.True( CheckLatch( breaker.OpenLatch ) );
+ Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) );
+ }
+
+ [Fact(DisplayName = "A synchronous circuit breaker that is open should transition to half open when reset times out")]
+ public void Should_Transition_To_Half_Open_When_Reset_Times_Out( )
+ {
+ var breaker = ShortResetTimeoutCb( );
+
+ Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) );
+ Assert.True( CheckLatch( breaker.HalfOpenLatch ) );
+ }
+ }
+
+ public class AnAsynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase
+ {
+ [Fact(DisplayName = "An asynchronous circuit breaker that is closed should allow call through")]
+ public void Should_Allow_Call_Through( )
+ {
+ var breaker = LongCallTimeoutCb( );
+ var result = breaker.Instance.WithCircuitBreaker( Task.Run( ( ) => SayTest( ) ) );
+
+ Assert.Equal( SayTest( ), result.Result );
+ }
+
+ [Fact(DisplayName = "An asynchronous circuit breaker that is closed should increment failure count when call fails")]
+ public void Should_Increment_Failure_Count_When_Call_Fails( )
+ {
+ var breaker = LongCallTimeoutCb( );
+
+ Assert.Equal( breaker.Instance.CurrentFailureCount, 0 );
+ Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Run( ( ) => ThrowException( ) ) ).Wait( AwaitTimeout ) ) );
+ Assert.True( CheckLatch( breaker.OpenLatch ) );
+ Assert.Equal( 1, breaker.Instance.CurrentFailureCount );
+ }
+
+ [Fact(DisplayName = "An asynchronous circuit breaker that is closed should reset failure count when call succeeds after failure")]
+ public void Should_Reset_Failure_Count_When_Call_Succeeds_After_Failure( )
+ {
+ var breaker = MultiFailureCb( );
+
+ Assert.Equal( breaker.Instance.CurrentFailureCount, 0 );
+
+ var whenall = Task.WhenAll(
+ breaker.Instance.WithCircuitBreaker(Task.Factory.StartNew(ThrowException))
+ , breaker.Instance.WithCircuitBreaker(Task.Factory.StartNew(ThrowException))
+ , breaker.Instance.WithCircuitBreaker(Task.Factory.StartNew(ThrowException))
+ , breaker.Instance.WithCircuitBreaker(Task.Factory.StartNew(ThrowException)));
+
+ Assert.True( InterceptExceptionType( ( ) => whenall.Wait( AwaitTimeout ) ) );
+
+ Assert.Equal( breaker.Instance.CurrentFailureCount, 4 );
+
+ var result = breaker.Instance.WithCircuitBreaker( Task.Run( ( ) => SayTest( ) ) ).Result;
+
+ Assert.Equal( SayTest( ), result );
+ Assert.Equal( 0, breaker.Instance.CurrentFailureCount );
+ }
+
+ [Fact(DisplayName = "An asynchronous circuit breaker that is closed should increment failure count when call times out")]
+ public void Should_Increment_Failure_Count_When_Call_Times_Out( )
+ {
+ var breaker = ShortCallTimeoutCb( );
+
+ breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ( ) =>
+ {
+ Thread.Sleep( 500 );
+ return SayTest( );
+ } ) );
+
+ Assert.True( CheckLatch( breaker.OpenLatch ) );
+ Assert.Equal( 1, breaker.Instance.CurrentFailureCount );
+ }
+ }
+
+ public class AnAsynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase
+ {
+ [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to close on success")]
+ public void Should_Pass_Call_And_Transition_To_Close_On_Success( )
+ {
+ var breaker = ShortResetTimeoutCb( );
+ InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ) );
+ Assert.True( CheckLatch( breaker.HalfOpenLatch ) );
+
+ var result = breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ( ) => SayTest( ) ) );
+
+ Assert.True( CheckLatch( breaker.ClosedLatch ) );
+ Assert.Equal( SayTest( ), result.Result );
+ }
+
+ [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to open on exception")]
+ public void Should_Pass_Call_And_Transition_To_Open_On_Exception( )
+ {
+ var breaker = ShortResetTimeoutCb( );
+
+
+ Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ).Wait( ) ) );
+ Assert.True( CheckLatch( breaker.HalfOpenLatch ) );
+
+ Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ).Wait( ) ) );
+ Assert.True( CheckLatch( breaker.OpenLatch ) );
+ }
+
+ [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to open on async failure")]
+ public void Should_Pass_Call_And_Transition_To_Open_On_Async_Failure( )
+ {
+ var breaker = ShortResetTimeoutCb( );
+
+ breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) );
+ Assert.True( CheckLatch( breaker.HalfOpenLatch ) );
+
+ breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) );
+ Assert.True( CheckLatch( breaker.OpenLatch ) );
+ }
+ }
+
+ public class AnAsynchronousCircuitBreakerThatIsOpen : CircuitBreakerSpecBase
+ {
+ [Fact(DisplayName = "An asynchronous circuit breaker that is open should throw exceptions when called before reset timeout")]
+ public void Should_Throw_Exceptions_When_Called_Before_Reset_Timeout( )
+ {
+ var breaker = LongResetTimeoutCb( );
+
+ Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ).Wait( ) ) );
+ Assert.True( CheckLatch( breaker.OpenLatch ) );
+ Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ).Wait( ) ) );
+ }
+
+ [Fact(DisplayName = "An asynchronous circuit breaker that is open should transition to half open when reset timeout")]
+ public void Should_Transition_To_Half_Open_When_Reset_Timeout( )
+ {
+ var breaker = ShortResetTimeoutCb( );
+
+ Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ).Wait( ) ) );
+ Assert.True( CheckLatch( breaker.HalfOpenLatch ) );
+ }
+ }
+
+ public class CircuitBreakerSpecBase : AkkaSpec
+ {
+ private readonly TimeSpan _awaitTimeout = TimeSpan.FromSeconds(2);
+ public TimeSpan AwaitTimeout { get { return _awaitTimeout; } }
+
+ public bool CheckLatch( CountdownEvent latch )
+ {
+ return latch.Wait( AwaitTimeout );
+ }
+
+ public Task Delay( TimeSpan toDelay, CancellationToken? token )
+ {
+ return token.HasValue ? Task.Delay( toDelay, token.Value ) : Task.Delay( toDelay );
+ }
+
+ public void ThrowException( )
+ {
+ throw new TestException( "Test Exception" );
+ }
+
+ public string SayTest( )
+ {
+ return "Test";
+ }
+
+ [SuppressMessage( "Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter" )]
+ public bool InterceptExceptionType( Action action ) where T : Exception
+ {
+ try
+ {
+ action.Invoke( );
+ return false;
+ }
+ catch ( Exception ex )
+ {
+ var aggregate = ex as AggregateException;
+ if ( aggregate != null )
+ {
+
+ // ReSharper disable once UnusedVariable
+ foreach ( var temp in aggregate.InnerExceptions.Select( innerException => innerException as T ).Where( temp => temp == null ) )
+ {
+ throw;
+ }
+ }
+ else
+ {
+ var temp = ex as T;
+
+ if ( temp == null )
+ {
+ throw;
+ }
+ }
+
+ }
+ return true;
+ }
+
+ public TestBreaker ShortCallTimeoutCb( )
+ {
+ return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 50 ), TimeSpan.FromMilliseconds( 500 ) ) );
+ }
+
+ public TestBreaker ShortResetTimeoutCb( )
+ {
+ return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 1000 ), TimeSpan.FromMilliseconds( 50 ) ) );
+ }
+
+ public TestBreaker LongCallTimeoutCb( )
+ {
+ return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 5000 ), TimeSpan.FromMilliseconds( 500 ) ) );
+ }
+
+ public TestBreaker LongResetTimeoutCb( )
+ {
+ return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 100 ), TimeSpan.FromMilliseconds( 5000 ) ) );
+ }
+
+ public TestBreaker MultiFailureCb( )
+ {
+ return new TestBreaker( new CircuitBreaker( 5, TimeSpan.FromMilliseconds( 200 ), TimeSpan.FromMilliseconds( 500 ) ) );
+ }
+ }
+
+
+ internal class TestException : ApplicationException
+ {
+ public TestException( )
+ {
+ }
+
+ public TestException( string message )
+ : base( message )
+ {
+ }
+
+ public TestException( string message, Exception innerException )
+ : base( message, innerException )
+ {
+ }
+
+ protected TestException( SerializationInfo info, StreamingContext context )
+ : base( info, context )
+ {
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/core/Akka/Akka.csproj b/src/core/Akka/Akka.csproj
index 908f24e3b17..30969346257 100644
--- a/src/core/Akka/Akka.csproj
+++ b/src/core/Akka/Akka.csproj
@@ -248,6 +248,9 @@
+
+
+
@@ -266,6 +269,7 @@
+
diff --git a/src/core/Akka/Pattern/CircuitBreaker.cs b/src/core/Akka/Pattern/CircuitBreaker.cs
new file mode 100644
index 00000000000..15b88af307b
--- /dev/null
+++ b/src/core/Akka/Pattern/CircuitBreaker.cs
@@ -0,0 +1,261 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2015 Typesafe Inc.
+// Copyright (C) 2013-2015 Akka.NET project
+//
+//-----------------------------------------------------------------------
+
+using System;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+using Akka.Util.Internal;
+
+namespace Akka.Pattern
+{
+ ///
+ /// Provides circuit breaker functionality to provide stability when working with
+ /// "dangerous" operations, e.g. calls to remote systems
+ ///
+ ///
+ ///
+ /// Transitions through three states:
+ ///
+ ///-
+ /// In *Closed* state,
+ /// calls pass through until the maxFailures count is reached.
+ /// This causes the circuit breaker to open. Both exceptions and calls exceeding
+ /// callTimeout are considered failures.
+ ///
+ ///-
+ /// In *Open* state,
+ /// calls fail-fast with an exception. After resetTimeout,
+ /// circuit breaker transitions to half-open state.
+ ///
+ ///-
+ /// In *Half-Open* state,
+ /// the first call will be allowed through, if it succeeds
+ /// the circuit breaker will reset to closed state. If it fails, the circuit
+ /// breaker will re-open to open state. All calls beyond the first that execute
+ /// while the first is running will fail-fast with an exception.
+ ///
+ ///
+ ///
+ public class CircuitBreaker
+ {
+ ///
+ /// The current state of the breaker -- Closed, Half-Open, or Closed -- *access only via helper methods*
+ ///
+ private AtomicState _currentState;
+
+ ///
+ /// Helper method for access to the underlying state via Interlocked
+ ///
+ /// Previous state on transition
+ /// Next state on transition
+ /// Whether the previous state matched correctly
+ private bool SwapState( AtomicState oldState, AtomicState newState )
+ {
+ return Interlocked.CompareExchange( ref _currentState, newState, oldState ) == oldState;
+ }
+
+ ///
+ /// Helper method for access to the underlying state via Interlocked
+ ///
+ private AtomicState CurrentState
+ {
+ get
+ {
+ Interlocked.MemoryBarrier( );
+ return _currentState;
+ }
+ }
+
+ public int MaxFailures { get; private set; }
+
+ public TimeSpan CallTimeout { get; private set; }
+ public TimeSpan ResetTimeout { get; private set; }
+
+ //akka.io implementation is to use nested static classes and access parent member variables
+ //.Net static nested classes do not have access to parent member variables -- so we configure the states here and
+ //swap them above
+ private AtomicState Closed { get; set; }
+ private AtomicState Open { get; set; }
+ private AtomicState HalfOpen { get; set; }
+
+ ///
+ /// Create a new CircuitBreaker
+ ///
+ /// Maximum number of failures before opening the circuit
+ /// of time after which to consider a call a failure
+ /// of time after which to attempt to close the circuit
+ ///
+ public static CircuitBreaker Create( int maxFailures, TimeSpan callTimeout, TimeSpan resetTimeout )
+ {
+ return new CircuitBreaker( maxFailures, callTimeout, resetTimeout );
+ }
+
+ ///
+ /// Create a new CircuitBreaker
+ ///
+ /// Maximum number of failures before opening the circuit
+ /// of time after which to consider a call a failure
+ /// of time after which to attempt to close the circuit
+ ///
+ public CircuitBreaker( int maxFailures, TimeSpan callTimeout, TimeSpan resetTimeout )
+ {
+ MaxFailures = maxFailures;
+ CallTimeout = callTimeout;
+ ResetTimeout = resetTimeout;
+ Closed = new Closed( this );
+ Open = new Open( this );
+ HalfOpen = new HalfOpen( this );
+ _currentState = Closed;
+ //_failures = new AtomicInteger();
+ }
+
+ ///
+ /// Retrieves current failure count.
+ ///
+ public long CurrentFailureCount
+ {
+ get { return Closed.Current; }
+ }
+
+ ///
+ /// Wraps invocation of asynchronous calls that need to be protected
+ ///
+ ///
+ /// Call needing protected
+ /// containing the call result
+ public async Task WithCircuitBreaker( Task body )
+ {
+ return await CurrentState.Invoke( body );
+ }
+
+ ///
+ /// Wraps invocation of asynchronous calls that need to be protected
+ ///
+ /// Call needing protected
+ ///
+ public async Task WithCircuitBreaker( Task body )
+ {
+ await CurrentState.Invoke( body );
+ }
+
+ ///
+ /// The failure will be recorded farther down.
+ ///
+ ///
+ public void WithSyncCircuitBreaker( Action body )
+ {
+ var cbTask = WithCircuitBreaker( Task.Factory.StartNew( body ) );
+ if ( !cbTask.Wait( CallTimeout ) )
+ {
+ //throw new TimeoutException( string.Format( "Execution did not complete within the time alotted {0} ms", CallTimeout.TotalMilliseconds ) );
+ }
+ if ( cbTask.Exception != null )
+ {
+ throw cbTask.Exception;
+ }
+ }
+
+ ///
+ /// Wraps invocations of asynchronous calls that need to be protected
+ /// If this does not complete within the time allotted, it should return default()
+ ///
+ ///
+ /// Await.result(
+ /// withCircuitBreaker(try Future.successful(body) catch { case NonFatal(t) ⇒ Future.failed(t) }),
+ /// callTimeout)
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// or default()
+ public T WithSyncCircuitBreaker( Func body )
+ {
+ var cbTask = WithCircuitBreaker( Task.Factory.StartNew( body ) );
+ return cbTask.Wait( CallTimeout ) ? cbTask.Result : default(T);
+ }
+
+ ///
+ /// Adds a callback to execute when circuit breaker opens
+ ///
+ /// Handler to be invoked on state change
+ /// CircuitBreaker for fluent usage
+ public CircuitBreaker OnOpen( Action callback )
+ {
+ Open.AddListener( callback );
+ return this;
+ }
+
+ ///
+ /// Adds a callback to execute when circuit breaker transitions to half-open
+ ///
+ /// Handler to be invoked on state change
+ /// CircuitBreaker for fluent usage
+ public CircuitBreaker OnHalfOpen( Action callback )
+ {
+ HalfOpen.AddListener( callback );
+ return this;
+ }
+
+ ///
+ /// Adds a callback to execute when circuit breaker state closes
+ ///
+ /// Handler to be invoked on state change
+ /// CircuitBreaker for fluent usage
+ public CircuitBreaker OnClose( Action callback )
+ {
+ Closed.AddListener( callback );
+ return this;
+ }
+
+ ///
+ /// Implements consistent transition between states. Throws IllegalStateException if an invalid transition is attempted.
+ ///
+ /// State being transitioning from
+ /// State being transitioned to
+ private void Transition( AtomicState fromState, AtomicState toState )
+ {
+ if ( SwapState( fromState, toState ) )
+ {
+ Debug.WriteLine( "Successful transition from {0} to {1}", fromState, toState );
+ toState.Enter( );
+ }
+ else
+ {
+ throw new IllegalStateException( string.Format( "Illegal transition attempted from {0} to {1}", fromState, toState ) );
+ }
+ }
+
+ ///
+ /// Trips breaker to an open state. This is valid from Closed or Half-Open states
+ ///
+ /// State we're coming from (Closed or Half-Open)
+ internal void TripBreaker( AtomicState fromState )
+ {
+ Transition( fromState, Open );
+ }
+
+ ///
+ /// Resets breaker to a closed state. This is valid from an Half-Open state only.
+ ///
+ internal void ResetBreaker( )
+ {
+ Transition( HalfOpen, Closed );
+ }
+
+ ///
+ /// Attempts to reset breaker by transitioning to a half-open state. This is valid from an Open state only.
+ ///
+ internal void AttemptReset( )
+ {
+ Transition( Open, HalfOpen );
+ }
+
+ //private readonly Task timeoutTask = Task.FromResult(new TimeoutException("Circuit Breaker Timed out."));
+ }
+}
diff --git a/src/core/Akka/Pattern/CircuitBreakerState.cs b/src/core/Akka/Pattern/CircuitBreakerState.cs
new file mode 100644
index 00000000000..833d72b7b21
--- /dev/null
+++ b/src/core/Akka/Pattern/CircuitBreakerState.cs
@@ -0,0 +1,226 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2015 Typesafe Inc.
+// Copyright (C) 2013-2015 Akka.NET project
+//
+//-----------------------------------------------------------------------
+
+using System.Globalization;
+using System.Threading.Tasks;
+using Akka.Util;
+using Akka.Util.Internal;
+
+namespace Akka.Pattern
+{
+ ///
+ /// Concrete implementation of Open state
+ ///
+ internal class Open : AtomicState
+ {
+ private readonly CircuitBreaker _breaker;
+
+ public Open( CircuitBreaker breaker )
+ : base( breaker.CallTimeout, 0 )
+ {
+ _breaker = breaker;
+ }
+
+ ///
+ /// Fail-fast on any invocation
+ ///
+ ///
+ /// Implementation of the call that needs protected
+ /// containing result of protected call
+ public override Task Invoke( Task body )
+ {
+ throw new OpenCircuitException( );
+ }
+
+ ///
+ /// Implementation of invoke, which simply attempts the call
+ ///
+ /// Implementation of the call that needs protected
+ /// containing result of protected call
+ public override Task Invoke( Task body )
+ {
+ throw new OpenCircuitException( );
+ }
+
+ ///
+ /// No-op for open, calls are never executed so cannot succeed or fail
+ ///
+ protected override void CallFails( )
+ {
+ //throw new NotImplementedException();
+ }
+
+ ///
+ /// No-op for open, calls are never executed so cannot succeed or fail
+ ///
+ protected override void CallSucceeds( )
+ {
+ //throw new NotImplementedException();
+ }
+
+ ///
+ /// On entering this state, schedule an attempted reset and store the entry time to
+ /// calculate remaining time before attempted reset.
+ ///
+ protected override void EnterInternal( )
+ {
+ Task.Delay( _breaker.ResetTimeout ).ContinueWith( task => _breaker.AttemptReset( ) );
+ }
+ }
+
+ ///
+ /// Concrete implementation of half-open state
+ ///
+ internal class HalfOpen : AtomicState
+ {
+ private readonly CircuitBreaker _breaker;
+ private readonly AtomicBoolean _lock;
+
+ public HalfOpen( CircuitBreaker breaker )
+ : base( breaker.CallTimeout, 0 )
+ {
+ _breaker = breaker;
+ _lock = new AtomicBoolean();
+ }
+
+ ///
+ /// Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens.
+ /// If the call succeeds, the breaker closes.
+ ///
+ ///
+ /// Implementation of the call that needs protected
+ /// containing result of protected call
+ public override async Task Invoke( Task body )
+ {
+ if ( !_lock.CompareAndSet( true, false) )
+ {
+ throw new OpenCircuitException( );
+ }
+ return await CallThrough( body );
+ }
+
+ ///
+ /// Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens.
+ /// If the call succeeds, the breaker closes.
+ ///
+ /// Implementation of the call that needs protected
+ /// containing result of protected call
+ public override async Task Invoke( Task body )
+ {
+ if ( !_lock.CompareAndSet( true, false ) )
+ {
+ throw new OpenCircuitException( );
+ }
+ await CallThrough( body );
+ }
+
+ ///
+ /// Reopen breaker on failed call.
+ ///
+ protected override void CallFails( )
+ {
+ _breaker.TripBreaker( this );
+ }
+
+ ///
+ /// Reset breaker on successful call.
+ ///
+ protected override void CallSucceeds( )
+ {
+ _breaker.ResetBreaker( );
+ }
+
+ ///
+ /// On entry, guard should be reset for that first call to get in
+ ///
+ protected override void EnterInternal( )
+ {
+ _lock.Value = true ;
+ }
+
+ ///
+ /// Override for more descriptive toString
+ ///
+ ///
+ public override string ToString( )
+ {
+ return string.Format( CultureInfo.InvariantCulture, "Half-Open currently testing call for success = {0}", ( _lock == true ) );
+ }
+ }
+
+ ///
+ /// Concrete implementation of Closed state
+ ///
+ internal class Closed : AtomicState
+ {
+ private readonly CircuitBreaker _breaker;
+
+ public Closed( CircuitBreaker breaker )
+ : base( breaker.CallTimeout, 0 )
+ {
+ _breaker = breaker;
+ }
+
+ ///
+ /// Implementation of invoke, which simply attempts the call
+ ///
+ ///
+ /// Implementation of the call that needs protected
+ /// containing result of protected call
+ public override async Task Invoke( Task body )
+ {
+ return await CallThrough( body );
+ }
+
+ ///
+ /// Implementation of invoke, which simply attempts the call
+ ///
+ /// Implementation of the call that needs protected
+ /// containing result of protected call
+ public override async Task Invoke( Task body )
+ {
+ await CallThrough( body );
+ }
+
+ ///
+ /// On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and
+ /// the breaker is tripped if we have reached maxFailures.
+ ///
+ protected override void CallFails( )
+ {
+ if ( IncrementAndGet( ) == _breaker.MaxFailures )
+ {
+ _breaker.TripBreaker( this );
+ }
+ }
+
+ ///
+ /// On successful call, the failure count is reset to 0
+ ///
+ protected override void CallSucceeds( )
+ {
+ Reset();
+ }
+
+ ///
+ /// On entry of this state, failure count is reset.
+ ///
+ protected override void EnterInternal( )
+ {
+ Reset();
+ }
+
+ ///
+ /// Override for more descriptive toString
+ ///
+ ///
+ public override string ToString( )
+ {
+ return string.Format( "Closed with failure count = {0}", Current );
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka/Pattern/IllegalStateException.cs b/src/core/Akka/Pattern/IllegalStateException.cs
index b27a95958c1..f1a4c684170 100644
--- a/src/core/Akka/Pattern/IllegalStateException.cs
+++ b/src/core/Akka/Pattern/IllegalStateException.cs
@@ -10,6 +10,10 @@
namespace Akka.Pattern
{
+ ///
+ /// Signals that a method has been invoked at an illegal or
+ /// inappropriate time.
+ ///
public class IllegalStateException : AkkaException
{
diff --git a/src/core/Akka/Pattern/OpenCircuitException.cs b/src/core/Akka/Pattern/OpenCircuitException.cs
new file mode 100644
index 00000000000..8cc7d68fd0a
--- /dev/null
+++ b/src/core/Akka/Pattern/OpenCircuitException.cs
@@ -0,0 +1,36 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2015 Typesafe Inc.
+// Copyright (C) 2013-2015 Akka.NET project
+//
+//-----------------------------------------------------------------------
+
+using System;
+using System.Runtime.Serialization;
+using Akka.Actor;
+
+namespace Akka.Pattern
+{
+ ///
+ /// Exception throws when CircuitBreaker is open
+ ///
+ public class OpenCircuitException : AkkaException
+ {
+ public OpenCircuitException( ) : base( "Circuit Breaker is open; calls are failing fast" ) { }
+
+ public OpenCircuitException( string message )
+ : base( message )
+ {
+ }
+
+ public OpenCircuitException( string message, Exception innerException )
+ : base( message, innerException )
+ {
+ }
+
+ protected OpenCircuitException( SerializationInfo info, StreamingContext context )
+ : base( info, context )
+ {
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka/Util/Internal/AtomicState.cs b/src/core/Akka/Util/Internal/AtomicState.cs
new file mode 100644
index 00000000000..3b9f01d9ddc
--- /dev/null
+++ b/src/core/Akka/Util/Internal/AtomicState.cs
@@ -0,0 +1,198 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2015 Typesafe Inc.
+// Copyright (C) 2013-2015 Akka.NET project
+//
+//-----------------------------------------------------------------------
+
+using System;
+using System.Collections.Concurrent;
+using System.Runtime.ExceptionServices;
+using System.Threading.Tasks;
+
+namespace Akka.Util.Internal
+{
+ ///
+ /// Internal state abstraction
+ ///
+ internal abstract class AtomicState : AtomicCounterLong, IAtomicState
+ {
+ private readonly ConcurrentQueue _listeners;
+ private readonly TimeSpan _callTimeout;
+
+ protected AtomicState( TimeSpan callTimeout, long startingCount )
+ : base( startingCount )
+ {
+ _listeners = new ConcurrentQueue( );
+ _callTimeout = callTimeout;
+ }
+
+ ///
+ /// Add a listener function which is invoked on state entry
+ ///
+ /// listener implementation
+ public void AddListener( Action listener )
+ {
+ _listeners.Enqueue( listener );
+ }
+
+ ///
+ /// Test for whether listeners exist
+ ///
+ public bool HasListeners
+ {
+ get { return !_listeners.IsEmpty; }
+ }
+
+ ///
+ /// Notifies the listeners of the transition event via a
+ ///
+ protected async Task NotifyTransitionListeners( )
+ {
+ if ( !HasListeners ) return;
+ await Task
+ .Factory
+ .StartNew
+ (
+ ( ) =>
+ {
+ foreach ( var listener in _listeners )
+ {
+ listener.Invoke( );
+ }
+ }
+ );
+ }
+
+ ///
+ /// Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed
+ /// call timeout is counted as a failed call, otherwise a successful call
+ ///
+ /// NOTE: In .Net there is no way to cancel an uncancellable task. We are merely cancelling the wait and marking this
+ /// as a failure.
+ ///
+ /// see http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235834.aspx
+ ///
+ ///
+ /// Implementation of the call
+ /// result of the call
+ public async Task CallThrough( Task task )
+ {
+ var deadline = DateTime.UtcNow.Add( _callTimeout );
+ ExceptionDispatchInfo capturedException = null;
+ T result = default(T);
+ try
+ {
+ result = await task;
+ }
+ catch ( Exception ex )
+ {
+ capturedException = ExceptionDispatchInfo.Capture( ex );
+ }
+
+ bool throwException = capturedException != null;
+ if ( throwException || DateTime.UtcNow.CompareTo( deadline ) >= 0 )
+ {
+ CallFails( );
+ if ( throwException )
+ capturedException.Throw( );
+ }
+ else
+ {
+ CallSucceeds( );
+ }
+ return result;
+ }
+
+ ///
+ /// Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed
+ /// call timeout is counted as a failed call, otherwise a successful call
+ ///
+ /// NOTE: In .Net there is no way to cancel an uncancellable task. We are merely cancelling the wait and marking this
+ /// as a failure.
+ ///
+ /// see http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235834.aspx
+ ///
+ /// Implementation of the call
+ ///
+ public async Task CallThrough( Task task )
+ {
+ var deadline = DateTime.UtcNow.Add( _callTimeout );
+ ExceptionDispatchInfo capturedException = null;
+
+ try
+ {
+ await task;
+ }
+ catch ( Exception ex )
+ {
+ capturedException = ExceptionDispatchInfo.Capture( ex );
+ }
+
+ bool throwException = capturedException != null;
+ if (throwException || DateTime.UtcNow.CompareTo(deadline) >= 0)
+ {
+ CallFails();
+ if (throwException) capturedException.Throw();
+ }
+ else
+ {
+ CallSucceeds();
+ }
+
+
+ }
+
+ ///
+ /// Abstract entry point for all states
+ ///
+ ///
+ /// Implementation of the call that needs protected
+ /// containing result of protected call
+ public abstract Task Invoke( Task body );
+
+ ///
+ /// Abstract entry point for all states
+ ///
+ /// Implementation of the call that needs protected
+ /// containing result of protected call
+ public abstract Task Invoke( Task body );
+
+ ///
+ /// Invoked when call fails
+ ///
+ protected abstract void CallFails( );
+
+ ///
+ /// Invoked when call succeeds
+ ///
+ protected abstract void CallSucceeds( );
+
+ ///
+ /// Invoked on the transitioned-to state during transition. Notifies listeners after invoking subclass template method _enter
+ ///
+ protected abstract void EnterInternal( );
+
+ ///
+ /// Enter the state. NotifyTransitionListeners is not awaited -- its "fire and forget".
+ /// It is up to the user to handle any errors that occur in this state.
+ ///
+ public void Enter( )
+ {
+ EnterInternal( );
+ NotifyTransitionListeners( );
+ }
+
+ }
+
+ ///
+ /// This interface represents the parts of the internal circuit breaker state; the behavior stack, watched by, watching and termination queue
+ ///
+ public interface IAtomicState
+ {
+ void AddListener( Action listener );
+ bool HasListeners { get; }
+ Task Invoke( Task body );
+ void Enter( );
+ }
+}