Skip to content

Commit 0153091

Browse files
Optimise JetStream and Raft locking (#6335)
This PR optimises some uses of the JetStream lock by reducing interactions between that and Raft group locks, which should mean that we don't end up in situations where we hold the JS lock while waiting to take a Raft lock etc. Also updates a couple of the Raft group helper functions to not take the lock as they are returning values that are never mutated, creating unnecessary contention. Signed-off-by: Neil Twigg <[email protected]>
2 parents 0449bb7 + 1c8290a commit 0153091

File tree

3 files changed

+23
-16
lines changed

3 files changed

+23
-16
lines changed

server/jetstream_api.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -4701,9 +4701,13 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
47014701
return
47024702
}
47034703

4704+
// Since these could wait on the Raft group lock, don't do so under the JS lock.
4705+
ourID := cc.meta.ID()
4706+
groupLeader := cc.meta.GroupLeader()
4707+
groupCreated := cc.meta.Created()
4708+
47044709
js.mu.RLock()
47054710
isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName)
4706-
ourID := cc.meta.ID()
47074711
var rg *raftGroup
47084712
var offline, isMember bool
47094713
if ca != nil {
@@ -4717,7 +4721,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
47174721
// Also capture if we think there is no meta leader.
47184722
var isLeaderLess bool
47194723
if !isLeader {
4720-
isLeaderLess = cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault
4724+
isLeaderLess = groupLeader == _EMPTY_ && time.Since(groupCreated) > lostQuorumIntervalDefault
47214725
}
47224726
js.mu.RUnlock()
47234727

server/jetstream_cluster.go

+14-8
Original file line numberDiff line numberDiff line change
@@ -902,15 +902,17 @@ func (js *jetStream) server() *Server {
902902
// Will respond if we do not think we have a metacontroller leader.
903903
func (js *jetStream) isLeaderless() bool {
904904
js.mu.RLock()
905-
defer js.mu.RUnlock()
906-
907905
cc := js.cluster
908906
if cc == nil || cc.meta == nil {
907+
js.mu.RUnlock()
909908
return false
910909
}
910+
meta := cc.meta
911+
js.mu.RUnlock()
912+
911913
// If we don't have a leader.
912914
// Make sure we have been running for enough time.
913-
if cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault {
915+
if meta.GroupLeader() == _EMPTY_ && time.Since(meta.Created()) > lostQuorumIntervalDefault {
914916
return true
915917
}
916918
return false
@@ -922,29 +924,32 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool {
922924
return false
923925
}
924926
js.mu.RLock()
925-
defer js.mu.RUnlock()
926-
927927
cc := js.cluster
928+
started := js.started
928929

929930
// If we are not a member we can not say..
930931
if cc.meta == nil {
932+
js.mu.RUnlock()
931933
return false
932934
}
933935
if !rg.isMember(cc.meta.ID()) {
936+
js.mu.RUnlock()
934937
return false
935938
}
936939
// Single peer groups always have a leader if we are here.
937940
if rg.node == nil {
941+
js.mu.RUnlock()
938942
return false
939943
}
944+
js.mu.RUnlock()
940945
// If we don't have a leader.
941946
if rg.node.GroupLeader() == _EMPTY_ {
942947
// Threshold for jetstream startup.
943948
const startupThreshold = 10 * time.Second
944949

945950
if rg.node.HadPreviousLeader() {
946951
// Make sure we have been running long enough to intelligently determine this.
947-
if time.Since(js.started) > startupThreshold {
952+
if time.Since(started) > startupThreshold {
948953
return true
949954
}
950955
}
@@ -4487,10 +4492,11 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
44874492
} else {
44884493
// If we are clustered update the known peers.
44894494
js.mu.RLock()
4490-
if node := rg.node; node != nil {
4495+
node := rg.node
4496+
js.mu.RUnlock()
4497+
if node != nil {
44914498
node.UpdateKnownPeers(ca.Group.Peers)
44924499
}
4493-
js.mu.RUnlock()
44944500
}
44954501

44964502
// Check if we already have this consumer running.

server/raft.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -1674,14 +1674,12 @@ func (n *raft) ID() string {
16741674
if n == nil {
16751675
return _EMPTY_
16761676
}
1677-
n.RLock()
1678-
defer n.RUnlock()
1677+
// Lock not needed as n.id is never changed after creation.
16791678
return n.id
16801679
}
16811680

16821681
func (n *raft) Group() string {
1683-
n.RLock()
1684-
defer n.RUnlock()
1682+
// Lock not needed as n.group is never changed after creation.
16851683
return n.group
16861684
}
16871685

@@ -1741,8 +1739,7 @@ func (n *raft) QuitC() <-chan struct{} { return n.quit }
17411739
func (n *raft) AppliedFloorC() <-chan struct{} { return n.aflrc }
17421740

17431741
func (n *raft) Created() time.Time {
1744-
n.RLock()
1745-
defer n.RUnlock()
1742+
// Lock not needed as n.created is never changed after creation.
17461743
return n.created
17471744
}
17481745

0 commit comments

Comments
 (0)