Skip to content

Commit 60e2982

Browse files
[FIXED] Subject state consistency (#6226)
Subject state would not always remain consistent given a specific pattern of message removals. There were three bugs: - `recalculateFirstForSubj` in memstore would do `startSeq+1`, but filestore would always just start at `mb.first.seq`. These are now consistent. - `recalculateFirstForSubj` was not called when `ss.Msgs == 1`, which could mean we had a stale `ss.FirstSeq` if it needed to be recalculated. - If after recalculation it turns out `ss.FirstSeq` equals the message we're trying to remove, we need to `recalculateFirstForSubj` again, since `ss.Last` is also lazy and could be incorrect. Apart from that, filestore and memstore are now both equivalent when it comes to first updating per-subject state and then removing the message, as well as `removeSeqPerSubject` and how it updates the subject state. Signed-off-by: Maurice van Veen <[email protected]>
2 parents f264fb3 + 283b607 commit 60e2982

File tree

3 files changed

+131
-19
lines changed

3 files changed

+131
-19
lines changed

server/filestore.go

+17-9
Original file line numberDiff line numberDiff line change
@@ -2614,10 +2614,6 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si
26142614
// Always reset.
26152615
ss.First, ss.Last, ss.Msgs = 0, 0, 0
26162616

2617-
if filter == _EMPTY_ {
2618-
filter = fwcs
2619-
}
2620-
26212617
// We do need to figure out the first and last sequences.
26222618
wc := subjectHasWildcard(filter)
26232619
start, stop := uint32(math.MaxUint32), uint32(0)
@@ -7502,6 +7498,9 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
75027498
// Update fss
75037499
smb.removeSeqPerSubject(sm.subj, mseq)
75047500
fs.removePerSubject(sm.subj)
7501+
// Need to mark the sequence as deleted. Otherwise, recalculating ss.First
7502+
// for per-subject info would be able to find it still.
7503+
smb.dmap.Insert(mseq)
75057504
}
75067505
}
75077506

@@ -7943,11 +7942,16 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {
79437942

79447943
// Only one left.
79457944
if ss.Msgs == 1 {
7946-
if seq == ss.Last {
7947-
ss.Last = ss.First
7948-
} else {
7949-
ss.First = ss.Last
7945+
// Update first if we need to, we must check if this removal is about what's going to be ss.First
7946+
if ss.firstNeedsUpdate {
7947+
mb.recalculateFirstForSubj(subj, ss.First, ss)
79507948
}
7949+
// If we're removing the first message, we must recalculate again.
7950+
// ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
7951+
if ss.First == seq {
7952+
mb.recalculateFirstForSubj(subj, ss.First, ss)
7953+
}
7954+
ss.Last = ss.First
79517955
ss.firstNeedsUpdate = false
79527956
return
79537957
}
@@ -7977,8 +7981,12 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
79777981
startSlot = 0
79787982
}
79797983

7984+
fseq := startSeq + 1
7985+
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
7986+
fseq = mbFseq
7987+
}
79807988
var le = binary.LittleEndian
7981-
for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ {
7989+
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
79827990
bi := mb.cache.idx[slot] &^ hbit
79837991
if bi == dbit {
79847992
// delete marker so skip.

server/memstore.go

+20-10
Original file line numberDiff line numberDiff line change
@@ -1055,8 +1055,9 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
10551055
if sm := ms.msgs[seq]; sm != nil {
10561056
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
10571057
purged++
1058-
delete(ms.msgs, seq)
10591058
ms.removeSeqPerSubject(sm.subj, seq)
1059+
// Must delete message after updating per-subject info, to be consistent with file store.
1060+
delete(ms.msgs, seq)
10601061
}
10611062
}
10621063
if purged > ms.state.Msgs {
@@ -1144,8 +1145,9 @@ func (ms *memStore) Truncate(seq uint64) error {
11441145
if sm := ms.msgs[i]; sm != nil {
11451146
purged++
11461147
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
1147-
delete(ms.msgs, i)
11481148
ms.removeSeqPerSubject(sm.subj, i)
1149+
// Must delete message after updating per-subject info, to be consistent with file store.
1150+
delete(ms.msgs, i)
11491151
}
11501152
}
11511153
// Reset last.
@@ -1406,17 +1408,24 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
14061408
}
14071409
ss.Msgs--
14081410

1409-
// If we know we only have 1 msg left don't need to search for next first.
1411+
// Only one left.
14101412
if ss.Msgs == 1 {
1411-
if seq == ss.Last {
1412-
ss.Last = ss.First
1413-
} else {
1414-
ss.First = ss.Last
1413+
// Update first if we need to, we must check if this removal is about what's going to be ss.First
1414+
if ss.firstNeedsUpdate {
1415+
ms.recalculateFirstForSubj(subj, ss.First, ss)
14151416
}
1417+
// If we're removing the first message, we must recalculate again.
1418+
// ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
1419+
if ss.First == seq {
1420+
ms.recalculateFirstForSubj(subj, ss.First, ss)
1421+
}
1422+
ss.Last = ss.First
14161423
ss.firstNeedsUpdate = false
1417-
} else {
1418-
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
1424+
return
14191425
}
1426+
1427+
// We can lazily calculate the first sequence when needed.
1428+
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
14201429
}
14211430

14221431
// Will recalculate the first sequence for this subject in this block.
@@ -1446,7 +1455,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
14461455

14471456
ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
14481457

1449-
delete(ms.msgs, seq)
14501458
if ms.state.Msgs > 0 {
14511459
ms.state.Msgs--
14521460
if ss > ms.state.Bytes {
@@ -1471,6 +1479,8 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
14711479

14721480
// Remove any per subject tracking.
14731481
ms.removeSeqPerSubject(sm.subj, seq)
1482+
// Must delete message after updating per-subject info, to be consistent with file store.
1483+
delete(ms.msgs, seq)
14741484

14751485
if ms.scb != nil {
14761486
// We do not want to hold any locks here.

server/store_test.go

+94
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,97 @@ func TestStoreDeleteRange(t *testing.T) {
141141
require_Equal(t, last, 2)
142142
require_Equal(t, num, 1)
143143
}
144+
145+
func TestStoreSubjectStateConsistency(t *testing.T) {
146+
testAllStoreAllPermutations(
147+
t, false,
148+
StreamConfig{Name: "TEST", Subjects: []string{"foo"}},
149+
func(t *testing.T, fs StreamStore) {
150+
getSubjectState := func() SimpleState {
151+
t.Helper()
152+
ss := fs.SubjectsState("foo")
153+
return ss["foo"]
154+
}
155+
156+
// Publish an initial batch of messages.
157+
for i := 0; i < 4; i++ {
158+
_, _, err := fs.StoreMsg("foo", nil, nil)
159+
require_NoError(t, err)
160+
}
161+
162+
// Expect 4 msgs, with first=1, last=4.
163+
ss := getSubjectState()
164+
require_Equal(t, ss.Msgs, 4)
165+
require_Equal(t, ss.First, 1)
166+
require_Equal(t, ss.Last, 4)
167+
168+
// Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate.
169+
removed, err := fs.RemoveMsg(1)
170+
require_NoError(t, err)
171+
require_True(t, removed)
172+
173+
// Will update first, so corrects to seq 2.
174+
ss = getSubjectState()
175+
require_Equal(t, ss.Msgs, 3)
176+
require_Equal(t, ss.First, 2)
177+
require_Equal(t, ss.Last, 4)
178+
179+
// Remove last message.
180+
removed, err = fs.RemoveMsg(4)
181+
require_NoError(t, err)
182+
require_True(t, removed)
183+
184+
// ss.Last is lazy, just like ss.First, but it's not recalculated. Only total msg count decreases.
185+
ss = getSubjectState()
186+
require_Equal(t, ss.Msgs, 2)
187+
require_Equal(t, ss.First, 2)
188+
require_Equal(t, ss.Last, 4)
189+
190+
// Remove first message again.
191+
removed, err = fs.RemoveMsg(2)
192+
require_NoError(t, err)
193+
require_True(t, removed)
194+
195+
// Since we only have one message left, must update ss.First and set ss.Last to equal.
196+
ss = getSubjectState()
197+
require_Equal(t, ss.Msgs, 1)
198+
require_Equal(t, ss.First, 3)
199+
require_Equal(t, ss.Last, 3)
200+
201+
// Publish some more messages so we can test another scenario.
202+
for i := 0; i < 3; i++ {
203+
_, _, err := fs.StoreMsg("foo", nil, nil)
204+
require_NoError(t, err)
205+
}
206+
207+
// Just check the state is complete again.
208+
ss = getSubjectState()
209+
require_Equal(t, ss.Msgs, 4)
210+
require_Equal(t, ss.First, 3)
211+
require_Equal(t, ss.Last, 7)
212+
213+
// Remove last sequence, ss.Last is lazy so doesn't get updated.
214+
removed, err = fs.RemoveMsg(7)
215+
require_NoError(t, err)
216+
require_True(t, removed)
217+
218+
// Remove first sequence, ss.First is lazy so doesn't get updated.
219+
removed, err = fs.RemoveMsg(3)
220+
require_NoError(t, err)
221+
require_True(t, removed)
222+
223+
// Remove (now) first sequence, but because ss.First is lazy we first need to recalculate
224+
// to know seq 5 became ss.First. And since we're removing seq 5 we need to recalculate ss.First
225+
// yet again, since ss.Last is lazy and is not correct.
226+
removed, err = fs.RemoveMsg(5)
227+
require_NoError(t, err)
228+
require_True(t, removed)
229+
230+
// ss.First should equal ss.Last, last should have been updated now.
231+
ss = getSubjectState()
232+
require_Equal(t, ss.Msgs, 1)
233+
require_Equal(t, ss.First, 6)
234+
require_Equal(t, ss.Last, 6)
235+
},
236+
)
237+
}

0 commit comments

Comments
 (0)