From 7e16834ef0ff8551cdd3530eacb1016d40cb1cb8 Mon Sep 17 00:00:00 2001 From: moliver Date: Wed, 22 Jul 2015 01:39:08 -0400 Subject: [PATCH] Add CircuitBreaker Initial based on akka 2.0.5 - Add passing tests from akka 2.0.5 - Add CircuitBreakerState.cs with Open, HalfOpen, and Closed states - Add AtomicState which extends AtomicLong (mirrors functionality of State trait) - Add OpenCircuitException - Add CircuitBreaker documentation - Remove Scheduler from current implementation - Remove inappropriate xmldoc on illegalstateexception - Update flow on AtomicState::CallThrough method to be clearer and more closely match original - Add clarifying comments on "Fire and forget" NotifyTransitionListeners --- src/Akka.sln | 6 + src/core/Akka.TestKit/Akka.TestKit.csproj | 1 + src/core/Akka.TestKit/TestBreaker.cs | 30 ++ src/core/Akka.Tests/Akka.Tests.csproj | 1 + .../Akka.Tests/Pattern/CircuitBreakerSpec.cs | 351 ++++++++++++++++++ src/core/Akka/Akka.csproj | 4 + src/core/Akka/Pattern/CircuitBreaker.cs | 261 +++++++++++++ src/core/Akka/Pattern/CircuitBreakerState.cs | 226 +++++++++++ .../Akka/Pattern/IllegalStateException.cs | 4 + src/core/Akka/Pattern/OpenCircuitException.cs | 36 ++ src/core/Akka/Util/Internal/AtomicState.cs | 198 ++++++++++ 11 files changed, 1118 insertions(+) create mode 100644 src/core/Akka.TestKit/TestBreaker.cs create mode 100644 src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs create mode 100644 src/core/Akka/Pattern/CircuitBreaker.cs create mode 100644 src/core/Akka/Pattern/CircuitBreakerState.cs create mode 100644 src/core/Akka/Pattern/OpenCircuitException.cs create mode 100644 src/core/Akka/Util/Internal/AtomicState.cs diff --git a/src/Akka.sln b/src/Akka.sln index 8e8d1546a30..240dd07079f 100644 --- a/src/Akka.sln +++ b/src/Akka.sln @@ -2,6 +2,8 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 14 VisualStudioVersion = 14.0.22823.1 +# Visual Studio 2013 +VisualStudioVersion = 12.0.31101.0 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{69279534-1DBA-4115-BF8B-03F77FC8125E}" EndProject @@ -211,6 +213,10 @@ EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.MultiNodeTests", "core\Akka.MultiNodeTests\Akka.MultiNodeTests.csproj", "{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PersistenceBenchmark", "benchmark\PersistenceBenchmark\PersistenceBenchmark.csproj", "{39E6F51F-FA1E-4C62-B8F8-19065DE6D55D}" +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{A59BAE84-70E2-46A0-9E26-7413C103E2D7}" + ProjectSection(SolutionItems) = preProject + WebEssentials-Settings.json = WebEssentials-Settings.json + EndProjectSection EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/src/core/Akka.TestKit/Akka.TestKit.csproj b/src/core/Akka.TestKit/Akka.TestKit.csproj index edc98f4d41f..21e25311a7b 100644 --- a/src/core/Akka.TestKit/Akka.TestKit.csproj +++ b/src/core/Akka.TestKit/Akka.TestKit.csproj @@ -111,6 +111,7 @@ + diff --git a/src/core/Akka.TestKit/TestBreaker.cs b/src/core/Akka.TestKit/TestBreaker.cs new file mode 100644 index 00000000000..ac188f853e4 --- /dev/null +++ b/src/core/Akka.TestKit/TestBreaker.cs @@ -0,0 +1,30 @@ +using System; +using System.Linq; +using System.Runtime.Serialization; +using System.Threading; +using System.Threading.Tasks; +using Akka.Pattern; + +namespace Akka.TestKit +{ + public class TestBreaker + { + public CountdownEvent HalfOpenLatch { get; private set; } + public CountdownEvent OpenLatch { get; private set; } + public CountdownEvent ClosedLatch { get; private set; } + public CircuitBreaker Instance { get; private set; } + + public TestBreaker( CircuitBreaker instance ) + { + HalfOpenLatch = new CountdownEvent( 1 ); + OpenLatch = new CountdownEvent( 1 ); + ClosedLatch = new CountdownEvent( 1 ); + Instance = instance; + Instance.OnClose( ( ) => { if ( !ClosedLatch.IsSet ) ClosedLatch.Signal( ); } ) + .OnHalfOpen( ( ) => { if ( !HalfOpenLatch.IsSet ) HalfOpenLatch.Signal( ); } ) + .OnOpen( ( ) => { if ( !OpenLatch.IsSet ) OpenLatch.Signal( ); } ); + } + + + } +} \ No newline at end of file diff --git a/src/core/Akka.Tests/Akka.Tests.csproj b/src/core/Akka.Tests/Akka.Tests.csproj index 16e21f4965a..b9519bf9740 100644 --- a/src/core/Akka.Tests/Akka.Tests.csproj +++ b/src/core/Akka.Tests/Akka.Tests.csproj @@ -150,6 +150,7 @@ + diff --git a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs new file mode 100644 index 00000000000..c4d5a56b692 --- /dev/null +++ b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs @@ -0,0 +1,351 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Runtime.Serialization; +using System.Threading; +using System.Threading.Tasks; +using Akka.Pattern; +using Akka.TestKit; +using Xunit; + +namespace Akka.Tests.Pattern +{ + public class ASynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase + { + [Fact(DisplayName = "A synchronous circuit breaker that is closed should allow call through")] + public void Should_Allow_Call_Through( ) + { + var breaker = LongCallTimeoutCb( ); + var result = breaker.Instance.WithSyncCircuitBreaker( ( ) => "Test" ); + + Assert.Equal( "Test", result ); + } + + [Fact( DisplayName = "A synchronous circuit breaker that is closed should increment failure count when call fails" )] + public void Should_Increment_FailureCount_When_Call_Fails( ) + { + var breaker = LongCallTimeoutCb( ); + + Assert.Equal( breaker.Instance.CurrentFailureCount, 0 ); + Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); + Assert.True( CheckLatch( breaker.OpenLatch ) ); + Assert.Equal( 1, breaker.Instance.CurrentFailureCount ); + } + + [Fact( DisplayName = "A synchronous circuit breaker that is closed should reset failure count when call succeeds" )] + public void Should_Reset_FailureCount_When_Call_Succeeds( ) + { + var breaker = MultiFailureCb( ); + + Assert.Equal( breaker.Instance.CurrentFailureCount, 0 ); + Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); + Assert.Equal( breaker.Instance.CurrentFailureCount, 1 ); + + breaker.Instance.WithSyncCircuitBreaker( ( ) => "Test" ); + + Assert.Equal( 0, breaker.Instance.CurrentFailureCount ); + } + + [Fact(DisplayName = "A synchronous circuit breaker that is closed should increment failure count when call times out")] + public void Should_Increment_FailureCount_When_Call_Times_Out( ) + { + var breaker = ShortCallTimeoutCb( ); + + breaker.Instance.WithSyncCircuitBreaker( ( ) => Thread.Sleep( 500 ) ); + + Assert.True( CheckLatch( breaker.OpenLatch ) ); + Assert.Equal( 1, breaker.Instance.CurrentFailureCount ); + } + } + + public class ASynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase + { + [Fact(DisplayName = "A synchronous circuit breaker that is half open should pass call and transition to close on success")] + public void Should_Pass_Call_And_Transition_To_Close_On_Success( ) + { + var breaker = ShortResetTimeoutCb( ); + InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ); + Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); + + var result = breaker.Instance.WithSyncCircuitBreaker( ( ) => SayTest( ) ); + + Assert.True( CheckLatch( breaker.ClosedLatch ) ); + Assert.Equal( SayTest( ), result ); + } + + [Fact(DisplayName = "A synchronous circuit breaker that is half open should pass call and transition to open on exception")] + public void Should_Pass_Call_And_Transition_To_Open_On_Exception( ) + { + var breaker = ShortResetTimeoutCb( ); + + + Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); + Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); + + Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); + Assert.True( CheckLatch( breaker.OpenLatch ) ); + } + } + + public class ASynchronousCircuitBreakerThatIsOpen : CircuitBreakerSpecBase + { + [Fact(DisplayName = "A synchronous circuit breaker that is open should throw exceptions before reset timeout")] + public void Should_Throw_Exceptions_Before_Reset_Timeout( ) + { + var breaker = LongResetTimeoutCb( ); + + Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); + Assert.True( CheckLatch( breaker.OpenLatch ) ); + Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); + } + + [Fact(DisplayName = "A synchronous circuit breaker that is open should transition to half open when reset times out")] + public void Should_Transition_To_Half_Open_When_Reset_Times_Out( ) + { + var breaker = ShortResetTimeoutCb( ); + + Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); + Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); + } + } + + public class AnAsynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase + { + [Fact(DisplayName = "An asynchronous circuit breaker that is closed should allow call through")] + public void Should_Allow_Call_Through( ) + { + var breaker = LongCallTimeoutCb( ); + var result = breaker.Instance.WithCircuitBreaker( Task.Run( ( ) => SayTest( ) ) ); + + Assert.Equal( SayTest( ), result.Result ); + } + + [Fact(DisplayName = "An asynchronous circuit breaker that is closed should increment failure count when call fails")] + public void Should_Increment_Failure_Count_When_Call_Fails( ) + { + var breaker = LongCallTimeoutCb( ); + + Assert.Equal( breaker.Instance.CurrentFailureCount, 0 ); + Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Run( ( ) => ThrowException( ) ) ).Wait( AwaitTimeout ) ) ); + Assert.True( CheckLatch( breaker.OpenLatch ) ); + Assert.Equal( 1, breaker.Instance.CurrentFailureCount ); + } + + [Fact(DisplayName = "An asynchronous circuit breaker that is closed should reset failure count when call succeeds after failure")] + public void Should_Reset_Failure_Count_When_Call_Succeeds_After_Failure( ) + { + var breaker = MultiFailureCb( ); + + Assert.Equal( breaker.Instance.CurrentFailureCount, 0 ); + + var whenall = Task.WhenAll( + breaker.Instance.WithCircuitBreaker(Task.Factory.StartNew(ThrowException)) + , breaker.Instance.WithCircuitBreaker(Task.Factory.StartNew(ThrowException)) + , breaker.Instance.WithCircuitBreaker(Task.Factory.StartNew(ThrowException)) + , breaker.Instance.WithCircuitBreaker(Task.Factory.StartNew(ThrowException))); + + Assert.True( InterceptExceptionType( ( ) => whenall.Wait( AwaitTimeout ) ) ); + + Assert.Equal( breaker.Instance.CurrentFailureCount, 4 ); + + var result = breaker.Instance.WithCircuitBreaker( Task.Run( ( ) => SayTest( ) ) ).Result; + + Assert.Equal( SayTest( ), result ); + Assert.Equal( 0, breaker.Instance.CurrentFailureCount ); + } + + [Fact(DisplayName = "An asynchronous circuit breaker that is closed should increment failure count when call times out")] + public void Should_Increment_Failure_Count_When_Call_Times_Out( ) + { + var breaker = ShortCallTimeoutCb( ); + + breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ( ) => + { + Thread.Sleep( 500 ); + return SayTest( ); + } ) ); + + Assert.True( CheckLatch( breaker.OpenLatch ) ); + Assert.Equal( 1, breaker.Instance.CurrentFailureCount ); + } + } + + public class AnAsynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase + { + [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to close on success")] + public void Should_Pass_Call_And_Transition_To_Close_On_Success( ) + { + var breaker = ShortResetTimeoutCb( ); + InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ) ); + Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); + + var result = breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ( ) => SayTest( ) ) ); + + Assert.True( CheckLatch( breaker.ClosedLatch ) ); + Assert.Equal( SayTest( ), result.Result ); + } + + [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to open on exception")] + public void Should_Pass_Call_And_Transition_To_Open_On_Exception( ) + { + var breaker = ShortResetTimeoutCb( ); + + + Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ).Wait( ) ) ); + Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); + + Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ).Wait( ) ) ); + Assert.True( CheckLatch( breaker.OpenLatch ) ); + } + + [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to open on async failure")] + public void Should_Pass_Call_And_Transition_To_Open_On_Async_Failure( ) + { + var breaker = ShortResetTimeoutCb( ); + + breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ); + Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); + + breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ); + Assert.True( CheckLatch( breaker.OpenLatch ) ); + } + } + + public class AnAsynchronousCircuitBreakerThatIsOpen : CircuitBreakerSpecBase + { + [Fact(DisplayName = "An asynchronous circuit breaker that is open should throw exceptions when called before reset timeout")] + public void Should_Throw_Exceptions_When_Called_Before_Reset_Timeout( ) + { + var breaker = LongResetTimeoutCb( ); + + Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ).Wait( ) ) ); + Assert.True( CheckLatch( breaker.OpenLatch ) ); + Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ).Wait( ) ) ); + } + + [Fact(DisplayName = "An asynchronous circuit breaker that is open should transition to half open when reset timeout")] + public void Should_Transition_To_Half_Open_When_Reset_Timeout( ) + { + var breaker = ShortResetTimeoutCb( ); + + Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithCircuitBreaker( Task.Factory.StartNew( ThrowException ) ).Wait( ) ) ); + Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); + } + } + + public class CircuitBreakerSpecBase : AkkaSpec + { + private readonly TimeSpan _awaitTimeout = TimeSpan.FromSeconds(2); + public TimeSpan AwaitTimeout { get { return _awaitTimeout; } } + + public bool CheckLatch( CountdownEvent latch ) + { + return latch.Wait( AwaitTimeout ); + } + + public Task Delay( TimeSpan toDelay, CancellationToken? token ) + { + return token.HasValue ? Task.Delay( toDelay, token.Value ) : Task.Delay( toDelay ); + } + + public void ThrowException( ) + { + throw new TestException( "Test Exception" ); + } + + public string SayTest( ) + { + return "Test"; + } + + [SuppressMessage( "Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter" )] + public bool InterceptExceptionType( Action action ) where T : Exception + { + try + { + action.Invoke( ); + return false; + } + catch ( Exception ex ) + { + var aggregate = ex as AggregateException; + if ( aggregate != null ) + { + + // ReSharper disable once UnusedVariable + foreach ( var temp in aggregate.InnerExceptions.Select( innerException => innerException as T ).Where( temp => temp == null ) ) + { + throw; + } + } + else + { + var temp = ex as T; + + if ( temp == null ) + { + throw; + } + } + + } + return true; + } + + public TestBreaker ShortCallTimeoutCb( ) + { + return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 50 ), TimeSpan.FromMilliseconds( 500 ) ) ); + } + + public TestBreaker ShortResetTimeoutCb( ) + { + return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 1000 ), TimeSpan.FromMilliseconds( 50 ) ) ); + } + + public TestBreaker LongCallTimeoutCb( ) + { + return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 5000 ), TimeSpan.FromMilliseconds( 500 ) ) ); + } + + public TestBreaker LongResetTimeoutCb( ) + { + return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 100 ), TimeSpan.FromMilliseconds( 5000 ) ) ); + } + + public TestBreaker MultiFailureCb( ) + { + return new TestBreaker( new CircuitBreaker( 5, TimeSpan.FromMilliseconds( 200 ), TimeSpan.FromMilliseconds( 500 ) ) ); + } + } + + + internal class TestException : ApplicationException + { + public TestException( ) + { + } + + public TestException( string message ) + : base( message ) + { + } + + public TestException( string message, Exception innerException ) + : base( message, innerException ) + { + } + + protected TestException( SerializationInfo info, StreamingContext context ) + : base( info, context ) + { + } + } + +} \ No newline at end of file diff --git a/src/core/Akka/Akka.csproj b/src/core/Akka/Akka.csproj index 908f24e3b17..30969346257 100644 --- a/src/core/Akka/Akka.csproj +++ b/src/core/Akka/Akka.csproj @@ -248,6 +248,9 @@ + + + @@ -266,6 +269,7 @@ + diff --git a/src/core/Akka/Pattern/CircuitBreaker.cs b/src/core/Akka/Pattern/CircuitBreaker.cs new file mode 100644 index 00000000000..15b88af307b --- /dev/null +++ b/src/core/Akka/Pattern/CircuitBreaker.cs @@ -0,0 +1,261 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using Akka.Util.Internal; + +namespace Akka.Pattern +{ + /// + /// Provides circuit breaker functionality to provide stability when working with + /// "dangerous" operations, e.g. calls to remote systems + /// + /// + /// + /// Transitions through three states: + /// + /// + /// In *Closed* state, + /// calls pass through until the maxFailures count is reached. + /// This causes the circuit breaker to open. Both exceptions and calls exceeding + /// callTimeout are considered failures. + /// + /// + /// In *Open* state, + /// calls fail-fast with an exception. After resetTimeout, + /// circuit breaker transitions to half-open state. + /// + /// + /// In *Half-Open* state, + /// the first call will be allowed through, if it succeeds + /// the circuit breaker will reset to closed state. If it fails, the circuit + /// breaker will re-open to open state. All calls beyond the first that execute + /// while the first is running will fail-fast with an exception. + /// + /// + /// + public class CircuitBreaker + { + /// + /// The current state of the breaker -- Closed, Half-Open, or Closed -- *access only via helper methods* + /// + private AtomicState _currentState; + + /// + /// Helper method for access to the underlying state via Interlocked + /// + /// Previous state on transition + /// Next state on transition + /// Whether the previous state matched correctly + private bool SwapState( AtomicState oldState, AtomicState newState ) + { + return Interlocked.CompareExchange( ref _currentState, newState, oldState ) == oldState; + } + + /// + /// Helper method for access to the underlying state via Interlocked + /// + private AtomicState CurrentState + { + get + { + Interlocked.MemoryBarrier( ); + return _currentState; + } + } + + public int MaxFailures { get; private set; } + + public TimeSpan CallTimeout { get; private set; } + public TimeSpan ResetTimeout { get; private set; } + + //akka.io implementation is to use nested static classes and access parent member variables + //.Net static nested classes do not have access to parent member variables -- so we configure the states here and + //swap them above + private AtomicState Closed { get; set; } + private AtomicState Open { get; set; } + private AtomicState HalfOpen { get; set; } + + /// + /// Create a new CircuitBreaker + /// + /// Maximum number of failures before opening the circuit + /// of time after which to consider a call a failure + /// of time after which to attempt to close the circuit + /// + public static CircuitBreaker Create( int maxFailures, TimeSpan callTimeout, TimeSpan resetTimeout ) + { + return new CircuitBreaker( maxFailures, callTimeout, resetTimeout ); + } + + /// + /// Create a new CircuitBreaker + /// + /// Maximum number of failures before opening the circuit + /// of time after which to consider a call a failure + /// of time after which to attempt to close the circuit + /// + public CircuitBreaker( int maxFailures, TimeSpan callTimeout, TimeSpan resetTimeout ) + { + MaxFailures = maxFailures; + CallTimeout = callTimeout; + ResetTimeout = resetTimeout; + Closed = new Closed( this ); + Open = new Open( this ); + HalfOpen = new HalfOpen( this ); + _currentState = Closed; + //_failures = new AtomicInteger(); + } + + /// + /// Retrieves current failure count. + /// + public long CurrentFailureCount + { + get { return Closed.Current; } + } + + /// + /// Wraps invocation of asynchronous calls that need to be protected + /// + /// + /// Call needing protected + /// containing the call result + public async Task WithCircuitBreaker( Task body ) + { + return await CurrentState.Invoke( body ); + } + + /// + /// Wraps invocation of asynchronous calls that need to be protected + /// + /// Call needing protected + /// + public async Task WithCircuitBreaker( Task body ) + { + await CurrentState.Invoke( body ); + } + + /// + /// The failure will be recorded farther down. + /// + /// + public void WithSyncCircuitBreaker( Action body ) + { + var cbTask = WithCircuitBreaker( Task.Factory.StartNew( body ) ); + if ( !cbTask.Wait( CallTimeout ) ) + { + //throw new TimeoutException( string.Format( "Execution did not complete within the time alotted {0} ms", CallTimeout.TotalMilliseconds ) ); + } + if ( cbTask.Exception != null ) + { + throw cbTask.Exception; + } + } + + /// + /// Wraps invocations of asynchronous calls that need to be protected + /// If this does not complete within the time allotted, it should return default() + /// + /// + /// Await.result( + /// withCircuitBreaker(try Future.successful(body) catch { case NonFatal(t) ⇒ Future.failed(t) }), + /// callTimeout) + /// + /// + /// + /// + /// + /// or default() + public T WithSyncCircuitBreaker( Func body ) + { + var cbTask = WithCircuitBreaker( Task.Factory.StartNew( body ) ); + return cbTask.Wait( CallTimeout ) ? cbTask.Result : default(T); + } + + /// + /// Adds a callback to execute when circuit breaker opens + /// + /// Handler to be invoked on state change + /// CircuitBreaker for fluent usage + public CircuitBreaker OnOpen( Action callback ) + { + Open.AddListener( callback ); + return this; + } + + /// + /// Adds a callback to execute when circuit breaker transitions to half-open + /// + /// Handler to be invoked on state change + /// CircuitBreaker for fluent usage + public CircuitBreaker OnHalfOpen( Action callback ) + { + HalfOpen.AddListener( callback ); + return this; + } + + /// + /// Adds a callback to execute when circuit breaker state closes + /// + /// Handler to be invoked on state change + /// CircuitBreaker for fluent usage + public CircuitBreaker OnClose( Action callback ) + { + Closed.AddListener( callback ); + return this; + } + + /// + /// Implements consistent transition between states. Throws IllegalStateException if an invalid transition is attempted. + /// + /// State being transitioning from + /// State being transitioned to + private void Transition( AtomicState fromState, AtomicState toState ) + { + if ( SwapState( fromState, toState ) ) + { + Debug.WriteLine( "Successful transition from {0} to {1}", fromState, toState ); + toState.Enter( ); + } + else + { + throw new IllegalStateException( string.Format( "Illegal transition attempted from {0} to {1}", fromState, toState ) ); + } + } + + /// + /// Trips breaker to an open state. This is valid from Closed or Half-Open states + /// + /// State we're coming from (Closed or Half-Open) + internal void TripBreaker( AtomicState fromState ) + { + Transition( fromState, Open ); + } + + /// + /// Resets breaker to a closed state. This is valid from an Half-Open state only. + /// + internal void ResetBreaker( ) + { + Transition( HalfOpen, Closed ); + } + + /// + /// Attempts to reset breaker by transitioning to a half-open state. This is valid from an Open state only. + /// + internal void AttemptReset( ) + { + Transition( Open, HalfOpen ); + } + + //private readonly Task timeoutTask = Task.FromResult(new TimeoutException("Circuit Breaker Timed out.")); + } +} diff --git a/src/core/Akka/Pattern/CircuitBreakerState.cs b/src/core/Akka/Pattern/CircuitBreakerState.cs new file mode 100644 index 00000000000..833d72b7b21 --- /dev/null +++ b/src/core/Akka/Pattern/CircuitBreakerState.cs @@ -0,0 +1,226 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + +using System.Globalization; +using System.Threading.Tasks; +using Akka.Util; +using Akka.Util.Internal; + +namespace Akka.Pattern +{ + /// + /// Concrete implementation of Open state + /// + internal class Open : AtomicState + { + private readonly CircuitBreaker _breaker; + + public Open( CircuitBreaker breaker ) + : base( breaker.CallTimeout, 0 ) + { + _breaker = breaker; + } + + /// + /// Fail-fast on any invocation + /// + /// + /// Implementation of the call that needs protected + /// containing result of protected call + public override Task Invoke( Task body ) + { + throw new OpenCircuitException( ); + } + + /// + /// Implementation of invoke, which simply attempts the call + /// + /// Implementation of the call that needs protected + /// containing result of protected call + public override Task Invoke( Task body ) + { + throw new OpenCircuitException( ); + } + + /// + /// No-op for open, calls are never executed so cannot succeed or fail + /// + protected override void CallFails( ) + { + //throw new NotImplementedException(); + } + + /// + /// No-op for open, calls are never executed so cannot succeed or fail + /// + protected override void CallSucceeds( ) + { + //throw new NotImplementedException(); + } + + /// + /// On entering this state, schedule an attempted reset and store the entry time to + /// calculate remaining time before attempted reset. + /// + protected override void EnterInternal( ) + { + Task.Delay( _breaker.ResetTimeout ).ContinueWith( task => _breaker.AttemptReset( ) ); + } + } + + /// + /// Concrete implementation of half-open state + /// + internal class HalfOpen : AtomicState + { + private readonly CircuitBreaker _breaker; + private readonly AtomicBoolean _lock; + + public HalfOpen( CircuitBreaker breaker ) + : base( breaker.CallTimeout, 0 ) + { + _breaker = breaker; + _lock = new AtomicBoolean(); + } + + /// + /// Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens. + /// If the call succeeds, the breaker closes. + /// + /// + /// Implementation of the call that needs protected + /// containing result of protected call + public override async Task Invoke( Task body ) + { + if ( !_lock.CompareAndSet( true, false) ) + { + throw new OpenCircuitException( ); + } + return await CallThrough( body ); + } + + /// + /// Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens. + /// If the call succeeds, the breaker closes. + /// + /// Implementation of the call that needs protected + /// containing result of protected call + public override async Task Invoke( Task body ) + { + if ( !_lock.CompareAndSet( true, false ) ) + { + throw new OpenCircuitException( ); + } + await CallThrough( body ); + } + + /// + /// Reopen breaker on failed call. + /// + protected override void CallFails( ) + { + _breaker.TripBreaker( this ); + } + + /// + /// Reset breaker on successful call. + /// + protected override void CallSucceeds( ) + { + _breaker.ResetBreaker( ); + } + + /// + /// On entry, guard should be reset for that first call to get in + /// + protected override void EnterInternal( ) + { + _lock.Value = true ; + } + + /// + /// Override for more descriptive toString + /// + /// + public override string ToString( ) + { + return string.Format( CultureInfo.InvariantCulture, "Half-Open currently testing call for success = {0}", ( _lock == true ) ); + } + } + + /// + /// Concrete implementation of Closed state + /// + internal class Closed : AtomicState + { + private readonly CircuitBreaker _breaker; + + public Closed( CircuitBreaker breaker ) + : base( breaker.CallTimeout, 0 ) + { + _breaker = breaker; + } + + /// + /// Implementation of invoke, which simply attempts the call + /// + /// + /// Implementation of the call that needs protected + /// containing result of protected call + public override async Task Invoke( Task body ) + { + return await CallThrough( body ); + } + + /// + /// Implementation of invoke, which simply attempts the call + /// + /// Implementation of the call that needs protected + /// containing result of protected call + public override async Task Invoke( Task body ) + { + await CallThrough( body ); + } + + /// + /// On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and + /// the breaker is tripped if we have reached maxFailures. + /// + protected override void CallFails( ) + { + if ( IncrementAndGet( ) == _breaker.MaxFailures ) + { + _breaker.TripBreaker( this ); + } + } + + /// + /// On successful call, the failure count is reset to 0 + /// + protected override void CallSucceeds( ) + { + Reset(); + } + + /// + /// On entry of this state, failure count is reset. + /// + protected override void EnterInternal( ) + { + Reset(); + } + + /// + /// Override for more descriptive toString + /// + /// + public override string ToString( ) + { + return string.Format( "Closed with failure count = {0}", Current ); + } + } +} \ No newline at end of file diff --git a/src/core/Akka/Pattern/IllegalStateException.cs b/src/core/Akka/Pattern/IllegalStateException.cs index b27a95958c1..f1a4c684170 100644 --- a/src/core/Akka/Pattern/IllegalStateException.cs +++ b/src/core/Akka/Pattern/IllegalStateException.cs @@ -10,6 +10,10 @@ namespace Akka.Pattern { + /// + /// Signals that a method has been invoked at an illegal or + /// inappropriate time. + /// public class IllegalStateException : AkkaException { diff --git a/src/core/Akka/Pattern/OpenCircuitException.cs b/src/core/Akka/Pattern/OpenCircuitException.cs new file mode 100644 index 00000000000..8cc7d68fd0a --- /dev/null +++ b/src/core/Akka/Pattern/OpenCircuitException.cs @@ -0,0 +1,36 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Runtime.Serialization; +using Akka.Actor; + +namespace Akka.Pattern +{ + /// + /// Exception throws when CircuitBreaker is open + /// + public class OpenCircuitException : AkkaException + { + public OpenCircuitException( ) : base( "Circuit Breaker is open; calls are failing fast" ) { } + + public OpenCircuitException( string message ) + : base( message ) + { + } + + public OpenCircuitException( string message, Exception innerException ) + : base( message, innerException ) + { + } + + protected OpenCircuitException( SerializationInfo info, StreamingContext context ) + : base( info, context ) + { + } + } +} \ No newline at end of file diff --git a/src/core/Akka/Util/Internal/AtomicState.cs b/src/core/Akka/Util/Internal/AtomicState.cs new file mode 100644 index 00000000000..3b9f01d9ddc --- /dev/null +++ b/src/core/Akka/Util/Internal/AtomicState.cs @@ -0,0 +1,198 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Concurrent; +using System.Runtime.ExceptionServices; +using System.Threading.Tasks; + +namespace Akka.Util.Internal +{ + /// + /// Internal state abstraction + /// + internal abstract class AtomicState : AtomicCounterLong, IAtomicState + { + private readonly ConcurrentQueue _listeners; + private readonly TimeSpan _callTimeout; + + protected AtomicState( TimeSpan callTimeout, long startingCount ) + : base( startingCount ) + { + _listeners = new ConcurrentQueue( ); + _callTimeout = callTimeout; + } + + /// + /// Add a listener function which is invoked on state entry + /// + /// listener implementation + public void AddListener( Action listener ) + { + _listeners.Enqueue( listener ); + } + + /// + /// Test for whether listeners exist + /// + public bool HasListeners + { + get { return !_listeners.IsEmpty; } + } + + /// + /// Notifies the listeners of the transition event via a + /// + protected async Task NotifyTransitionListeners( ) + { + if ( !HasListeners ) return; + await Task + .Factory + .StartNew + ( + ( ) => + { + foreach ( var listener in _listeners ) + { + listener.Invoke( ); + } + } + ); + } + + /// + /// Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed + /// call timeout is counted as a failed call, otherwise a successful call + /// + /// NOTE: In .Net there is no way to cancel an uncancellable task. We are merely cancelling the wait and marking this + /// as a failure. + /// + /// see http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235834.aspx + /// + /// + /// Implementation of the call + /// result of the call + public async Task CallThrough( Task task ) + { + var deadline = DateTime.UtcNow.Add( _callTimeout ); + ExceptionDispatchInfo capturedException = null; + T result = default(T); + try + { + result = await task; + } + catch ( Exception ex ) + { + capturedException = ExceptionDispatchInfo.Capture( ex ); + } + + bool throwException = capturedException != null; + if ( throwException || DateTime.UtcNow.CompareTo( deadline ) >= 0 ) + { + CallFails( ); + if ( throwException ) + capturedException.Throw( ); + } + else + { + CallSucceeds( ); + } + return result; + } + + /// + /// Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed + /// call timeout is counted as a failed call, otherwise a successful call + /// + /// NOTE: In .Net there is no way to cancel an uncancellable task. We are merely cancelling the wait and marking this + /// as a failure. + /// + /// see http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235834.aspx + /// + /// Implementation of the call + /// + public async Task CallThrough( Task task ) + { + var deadline = DateTime.UtcNow.Add( _callTimeout ); + ExceptionDispatchInfo capturedException = null; + + try + { + await task; + } + catch ( Exception ex ) + { + capturedException = ExceptionDispatchInfo.Capture( ex ); + } + + bool throwException = capturedException != null; + if (throwException || DateTime.UtcNow.CompareTo(deadline) >= 0) + { + CallFails(); + if (throwException) capturedException.Throw(); + } + else + { + CallSucceeds(); + } + + + } + + /// + /// Abstract entry point for all states + /// + /// + /// Implementation of the call that needs protected + /// containing result of protected call + public abstract Task Invoke( Task body ); + + /// + /// Abstract entry point for all states + /// + /// Implementation of the call that needs protected + /// containing result of protected call + public abstract Task Invoke( Task body ); + + /// + /// Invoked when call fails + /// + protected abstract void CallFails( ); + + /// + /// Invoked when call succeeds + /// + protected abstract void CallSucceeds( ); + + /// + /// Invoked on the transitioned-to state during transition. Notifies listeners after invoking subclass template method _enter + /// + protected abstract void EnterInternal( ); + + /// + /// Enter the state. NotifyTransitionListeners is not awaited -- its "fire and forget". + /// It is up to the user to handle any errors that occur in this state. + /// + public void Enter( ) + { + EnterInternal( ); + NotifyTransitionListeners( ); + } + + } + + /// + /// This interface represents the parts of the internal circuit breaker state; the behavior stack, watched by, watching and termination queue + /// + public interface IAtomicState + { + void AddListener( Action listener ); + bool HasListeners { get; } + Task Invoke( Task body ); + void Enter( ); + } +}