Skip to content

Commit 23fa141

Browse files
Disconnect sessions when library times out in SOLE_LIBRARY mode (#537)
If FixEngine uses SOLE_LIBRARY mode, Framer will disconnect FIX session connections if library times out
1 parent d05ced1 commit 23fa141

File tree

2 files changed

+71
-30
lines changed

2 files changed

+71
-30
lines changed

artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java

+45-28
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler
114114
"Received Heartbeat (msg=%s) from library %s at %sms, sent at %sns");
115115
private final CharFormatter acquiringSessionFormatter = new CharFormatter(
116116
"Acquiring session %s from library %s");
117+
private final CharFormatter disconnectingSessionFormatter = new CharFormatter(
118+
"Disconnecting session %s from library %s");
117119
private final CharFormatter releasingSessionFormatter = new CharFormatter(
118120
"Releasing session %s with connectionId %s from library %s");
119121
private final CharFormatter connectingFormatter = new CharFormatter(
@@ -493,7 +495,7 @@ private void onLibraryDisconnect(final LiveLibraryInfo library)
493495
DebugLogger.log(LIBRARY_MANAGEMENT, timingOutFormatter.clear().with(library.libraryId()));
494496
}
495497

496-
tryAcquireLibrarySessions(library);
498+
tryAcquireOrDisconnectLibrarySessions(library);
497499
saveLibraryTimeout(library);
498500
disconnectILinkConnections(library);
499501
}
@@ -519,7 +521,7 @@ private void soleLibraryModeUnbind()
519521
}
520522
}
521523

522-
private void tryAcquireLibrarySessions(final LiveLibraryInfo library)
524+
private void tryAcquireOrDisconnectLibrarySessions(final LiveLibraryInfo library)
523525
{
524526
final int librarySessionId = library.aeronSessionId();
525527
final Image image = librarySubscription.imageBySessionId(librarySessionId);
@@ -533,7 +535,7 @@ private void tryAcquireLibrarySessions(final LiveLibraryInfo library)
533535
// just acquire
534536
if (!configuration.logOutboundMessages() || sentIndexedPosition(librarySessionId, libraryPosition))
535537
{
536-
acquireLibrarySessions(library);
538+
acquireOrDisconnectLibrarySessions(library);
537539
}
538540
else
539541
{
@@ -547,7 +549,7 @@ private boolean retryAcquireLibrarySessions(final LiveLibraryInfo library)
547549
if (!configuration.logOutboundMessages() ||
548550
sentIndexedPosition(library.aeronSessionId(), library.acquireAtPosition()))
549551
{
550-
acquireLibrarySessions(library);
552+
acquireOrDisconnectLibrarySessions(library);
551553
return true;
552554
}
553555

@@ -572,7 +574,7 @@ private void saveLibraryTimeout(final LibraryInfo library)
572574
schedule(() -> outboundPublication.saveLibraryTimeout(library, 0));
573575
}
574576

575-
private void acquireLibrarySessions(final LiveLibraryInfo library)
577+
private void acquireOrDisconnectLibrarySessions(final LiveLibraryInfo library)
576578
{
577579
final List<GatewaySession> sessions = library.gatewaySessions();
578580
for (int i = 0, size = sessions.size(); i < size; i++)
@@ -585,31 +587,46 @@ private void acquireLibrarySessions(final LiveLibraryInfo library)
585587
continue;
586588
}
587589

588-
final long sessionId = session.sessionId();
589-
final int sentSequenceNumber = sentSequenceNumberIndex.lastKnownSequenceNumber(sessionId);
590-
final int receivedSequenceNumber = receivedSequenceNumberIndex.lastKnownSequenceNumber(sessionId);
591-
final boolean hasLoggedIn = receivedSequenceNumber != UNK_SESSION;
592-
final SessionState state = hasLoggedIn ? ACTIVE : CONNECTED;
593-
594-
DebugLogger.log(
595-
LIBRARY_MANAGEMENT,
596-
acquiringSessionFormatter, session.sessionId(), library.libraryId());
597-
598-
((FixGatewaySessions)gatewaySessions).acquire(
599-
session,
600-
state,
601-
false,
602-
session.heartbeatIntervalInS(),
603-
sentSequenceNumber,
604-
receivedSequenceNumber,
605-
session.username(),
606-
session.password());
607-
608-
schedule(saveManageSession(ENGINE_LIBRARY_ID, session));
590+
if (soleLibraryMode)
591+
{
592+
DebugLogger.log(
593+
LIBRARY_MANAGEMENT,
594+
disconnectingSessionFormatter, session.sessionId(), library.libraryId());
595+
596+
final long connectionId = session.connectionId();
597+
receiverEndPoints.removeConnection(connectionId, DisconnectReason.LIBRARY_DISCONNECT);
598+
fixSenderEndPoints.removeConnection(connectionId);
599+
gatewaySessions.releaseByConnectionId(connectionId);
600+
}
609601

610-
if (performingDisconnectOperation)
602+
else
611603
{
612-
session.session().logoutAndDisconnect();
604+
final long sessionId = session.sessionId();
605+
final int sentSequenceNumber = sentSequenceNumberIndex.lastKnownSequenceNumber(sessionId);
606+
final int receivedSequenceNumber = receivedSequenceNumberIndex.lastKnownSequenceNumber(sessionId);
607+
final boolean hasLoggedIn = receivedSequenceNumber != UNK_SESSION;
608+
final SessionState state = hasLoggedIn ? ACTIVE : CONNECTED;
609+
610+
DebugLogger.log(
611+
LIBRARY_MANAGEMENT,
612+
acquiringSessionFormatter, session.sessionId(), library.libraryId());
613+
614+
((FixGatewaySessions)gatewaySessions).acquire(
615+
session,
616+
state,
617+
false,
618+
session.heartbeatIntervalInS(),
619+
sentSequenceNumber,
620+
receivedSequenceNumber,
621+
session.username(),
622+
session.password());
623+
624+
schedule(saveManageSession(ENGINE_LIBRARY_ID, session));
625+
626+
if (performingDisconnectOperation)
627+
{
628+
session.session().logoutAndDisconnect();
629+
}
613630
}
614631
}
615632
}

artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
*/
1616
package uk.co.real_logic.artio.system_tests;
1717

18+
import uk.co.real_logic.artio.Reply;
1819
import uk.co.real_logic.artio.engine.*;
1920
import uk.co.real_logic.artio.engine.framer.LibraryInfo;
2021
import uk.co.real_logic.artio.library.FixLibrary;
2122
import uk.co.real_logic.artio.library.LibraryConfiguration;
2223
import org.junit.Test;
24+
import uk.co.real_logic.artio.messages.SessionState;
25+
import uk.co.real_logic.artio.session.Session;
2326

2427
import java.util.List;
2528

@@ -204,7 +207,7 @@ public void shouldAllowReconnectingInitiatorsToReconnect()
204207
}
205208

206209
@Test(timeout = TEST_TIMEOUT_IN_MS)
207-
public void shouldAcquireSessionsWithLoggingSwitchedOff()
210+
public void shouldInitiatingLibraryDisconnectSessionOnLibraryTimeout()
208211
{
209212
// Equivalent invariant tested in Engine mode in NoLoggingGatewayToGatewaySystemTest
210213
launch(false, false);
@@ -216,6 +219,27 @@ public void shouldAcquireSessionsWithLoggingSwitchedOff()
216219
testSystem.remove(initiatingLibrary);
217220
awaitLibraryDisconnect(initiatingEngine, testSystem);
218221

219-
acceptingMessagesCanBeExchanged();
222+
assertEventuallyTrue("Accepting library did not recognize disconnected session",
223+
() ->
224+
{
225+
testSystem.poll();
226+
final List<Session> sessions = acceptingLibrary.sessions();
227+
assertEquals(1, sessions.size());
228+
final Session session = sessions.get(0);
229+
assertEquals(SessionState.DISCONNECTED, session.state());
230+
}
231+
);
232+
233+
assertEventuallyTrue("Initiating Engine did not disconnect session",
234+
() ->
235+
{
236+
final Reply<List<LibraryInfo>> libraryInfoReply = initiatingEngine.libraries();
237+
assertTrue(libraryInfoReply.hasCompleted());
238+
final List<LibraryInfo> libraryInfo = libraryInfoReply.resultIfPresent();
239+
assertEquals(1, libraryInfo.size());
240+
final LibraryInfo libInfo = libraryInfo.get(0);
241+
assertEquals(0, libInfo.sessions().size());
242+
}
243+
);
220244
}
221245
}

0 commit comments

Comments
 (0)