@@ -4276,87 +4276,91 @@ func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T)
4276
4276
nc , js := jsClientConnect (t , c .randomServer ())
4277
4277
defer nc .Close ()
4278
4278
4279
+ checkConsistency := func () {
4280
+ t .Helper ()
4281
+ checkFor (t , 3 * time .Second , 250 * time .Millisecond , func () error {
4282
+ for _ , s := range c .servers {
4283
+ acc , err := s .lookupAccount (globalAccountName )
4284
+ if err != nil {
4285
+ return err
4286
+ }
4287
+ mset , err := acc .lookupStream ("TEST" )
4288
+ if err != nil {
4289
+ return err
4290
+ }
4291
+ state := mset .state ()
4292
+ if state .Msgs != 3 || state .Bytes != 99 {
4293
+ return fmt .Errorf ("stream state didn't match, got %d messages with %d bytes" , state .Msgs , state .Bytes )
4294
+ }
4295
+ }
4296
+ return nil
4297
+ })
4298
+ }
4299
+
4279
4300
_ , err := js .AddStream (& nats.StreamConfig {
4280
4301
Name : "TEST" ,
4281
- Subjects : []string {"foo.> " },
4302
+ Subjects : []string {"foo" },
4282
4303
Replicas : 3 ,
4283
4304
})
4284
- nc .Close ()
4285
4305
require_NoError (t , err )
4286
4306
4287
- // Pick one server that will only store a part of the messages in its WAL.
4288
- rs := c .randomNonStreamLeader (globalAccountName , "TEST" )
4289
- ts := time .Now ().UnixNano ()
4290
-
4291
- // Manually add 3 append entries to each node's WAL, except for one node who is one behind.
4292
- var scratch [1024 ]byte
4293
- for _ , s := range c .servers {
4294
- for _ , n := range s .raftNodes {
4295
- rn := n .(* raft )
4296
- if rn .accName == globalAccountName {
4297
- for i := uint64 (0 ); i < 3 ; i ++ {
4298
- // One server will be one behind and need to catchup.
4299
- if s .Name () == rs .Name () && i >= 2 {
4300
- break
4301
- }
4307
+ sl := c .streamLeader (globalAccountName , "TEST" )
4308
+ require_NoError (t , err )
4309
+ acc , err := sl .lookupAccount (globalAccountName )
4310
+ require_NoError (t , err )
4311
+ mset , err := acc .lookupStream ("TEST" )
4312
+ require_NoError (t , err )
4313
+ rn := mset .raftNode ().(* raft )
4314
+ leaderId := rn .ID ()
4302
4315
4303
- esm := encodeStreamMsgAllowCompress ("foo" , "_INBOX.foo" , nil , nil , i , ts , true , false )
4304
- entries := []* Entry {newEntry (EntryNormal , esm )}
4305
- rn .Lock ()
4306
- ae := rn .buildAppendEntry (entries )
4307
- ae .buf , err = ae .encode (scratch [:])
4308
- require_NoError (t , err )
4309
- err = rn .storeToWAL (ae )
4310
- rn .Unlock ()
4311
- require_NoError (t , err )
4312
- }
4313
- }
4314
- }
4316
+ for i := 0 ; i < 3 ; i ++ {
4317
+ _ , err = js .Publish ("foo" , nil )
4318
+ require_NoError (t , err )
4315
4319
}
4320
+ nc .Close ()
4321
+ checkConsistency ()
4316
4322
4317
- // Restart all.
4318
- c .stopAll ()
4319
- c .restartAll ()
4320
- c .waitOnAllCurrent ()
4321
- c .waitOnStreamLeader (globalAccountName , "TEST" )
4323
+ // Pick one server that will only store a part of the messages in its WAL.
4324
+ rs := c .randomNonStreamLeader (globalAccountName , "TEST" )
4325
+ acc , err = rs .lookupAccount (globalAccountName )
4326
+ require_NoError (t , err )
4327
+ mset , err = acc .lookupStream ("TEST" )
4328
+ require_NoError (t , err )
4329
+ rn = mset .raftNode ().(* raft )
4330
+ index , commit , _ := rn .Progress ()
4331
+ require_Equal (t , index , 4 )
4332
+ require_Equal (t , index , commit )
4322
4333
4323
- rs = c .serverByName (rs .Name ())
4334
+ // We'll simulate as-if the last message was never received/stored.
4335
+ // Will need to truncate the stream, correct lseq (so the msg isn't skipped) and truncate the WAL.
4336
+ // This will simulate that the RAFT layer can restore it.
4337
+ mset .mu .Lock ()
4338
+ mset .lseq --
4339
+ err = mset .store .Truncate (2 )
4340
+ mset .mu .Unlock ()
4341
+ require_NoError (t , err )
4342
+ rn .Lock ()
4343
+ rn .truncateWAL (rn .pterm , rn .pindex - 1 )
4344
+ rn .Unlock ()
4324
4345
4325
4346
// Check all servers ended up with all published messages, which had quorum.
4326
- checkFor (t , 3 * time .Second , 250 * time .Millisecond , func () error {
4327
- for _ , s := range c .servers {
4328
- acc , err := s .lookupAccount (globalAccountName )
4329
- if err != nil {
4330
- return err
4331
- }
4332
- mset , err := acc .lookupStream ("TEST" )
4333
- if err != nil {
4334
- return err
4335
- }
4336
- state := mset .state ()
4337
- if state .Msgs != 3 || state .Bytes != 99 {
4338
- return fmt .Errorf ("stream state didn't match, got %d messages with %d bytes" , state .Msgs , state .Bytes )
4339
- }
4340
- }
4341
- return nil
4342
- })
4347
+ checkConsistency ()
4343
4348
4344
- // Check that the first two published messages came from our WAL, and
4345
- // the last came from a catchup by another leader.
4349
+ // Check that all entries came from the expected leader.
4346
4350
for _ , n := range rs .raftNodes {
4347
4351
rn := n .(* raft )
4348
4352
if rn .accName == globalAccountName {
4349
- ae , err := rn .loadEntry (2 )
4353
+ ae , err := rn .loadEntry (1 )
4350
4354
require_NoError (t , err )
4351
- require_True (t , ae .leader == rn . ID () )
4355
+ require_Equal (t , ae .leader , leaderId )
4352
4356
4353
- ae , err = rn .loadEntry (3 )
4357
+ ae , err = rn .loadEntry (2 )
4354
4358
require_NoError (t , err )
4355
- require_True (t , ae .leader == rn . ID () )
4359
+ require_Equal (t , ae .leader , leaderId )
4356
4360
4357
- ae , err = rn .loadEntry (4 )
4361
+ ae , err = rn .loadEntry (3 )
4358
4362
require_NoError (t , err )
4359
- require_True (t , ae .leader != rn . ID () )
4363
+ require_Equal (t , ae .leader , leaderId )
4360
4364
}
4361
4365
}
4362
4366
}
0 commit comments