Skip to content

Commit

Permalink
chore(embedded/store): allow discarding precommitted transactions dur…
Browse files Browse the repository at this point in the history
…ing open

Signed-off-by: Stefano Scafiti <[email protected]>
  • Loading branch information
ostafen committed Feb 14, 2025
1 parent cdd00cb commit 3119645
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 38 deletions.
56 changes: 29 additions & 27 deletions embedded/store/immustore.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,43 +533,45 @@ func OpenWith(path string, vLogs []appendable.Appendable, txLog, cLog appendable
precommittedAlh := committedAlh
precommittedTxLogSize := committedTxLogSize

// read pre-committed txs from txLog and insert into cLogBuf to continue with the commit process
// txLog may be partially written, precommitted transactions loading is terminated if an inconsistency is found
txReader := appendable.NewReaderFrom(txLog, precommittedTxLogSize, multiapp.DefaultReadBufferSize)
if !opts.DiscardPrecommittedTransactions {
// read pre-committed txs from txLog and insert into cLogBuf to continue with the commit process
// txLog may be partially written, precommitted transactions loading is terminated if an inconsistency is found
txReader := appendable.NewReaderFrom(txLog, precommittedTxLogSize, multiapp.DefaultReadBufferSize)

tx, _ := txPool.Alloc()
tx, _ := txPool.Alloc()

for {
err = tx.readFrom(txReader, false)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
opts.logger.Infof("%v: discarding pre-committed transaction: %d", err, precommittedTxID+1)
break
}
for {
err = tx.readFrom(txReader, false)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
opts.logger.Infof("%v: discarding pre-committed transaction: %d", err, precommittedTxID+1)
break
}

if tx.header.ID != precommittedTxID+1 || tx.header.PrevAlh != precommittedAlh {
opts.logger.Infof("%v: discarding pre-committed transaction: %d", ErrCorruptedData, precommittedTxID+1)
break
}
if tx.header.ID != precommittedTxID+1 || tx.header.PrevAlh != precommittedAlh {
opts.logger.Infof("%v: discarding pre-committed transaction: %d", ErrCorruptedData, precommittedTxID+1)
break
}

precommittedTxID++
precommittedAlh = tx.header.Alh()
precommittedTxID++
precommittedAlh = tx.header.Alh()

txSize := int(txReader.ReadCount() - (precommittedTxLogSize - committedTxLogSize))
txSize := int(txReader.ReadCount() - (precommittedTxLogSize - committedTxLogSize))

err = cLogBuf.put(precommittedTxID, precommittedAlh, precommittedTxLogSize, txSize)
if err != nil {
txPool.Release(tx)
return nil, fmt.Errorf("%v: while loading pre-committed transaction: %v", err, precommittedTxID+1)
err = cLogBuf.put(precommittedTxID, precommittedAlh, precommittedTxLogSize, txSize)
if err != nil {
txPool.Release(tx)
return nil, fmt.Errorf("%v: while loading pre-committed transaction: %v", err, precommittedTxID+1)
}

precommittedTxLogSize += int64(txSize)
}

precommittedTxLogSize += int64(txSize)
txPool.Release(tx)
}

txPool.Release(tx)

vLogsMap := make(map[byte]*refVLog, len(vLogs))
vLogUnlockedList := list.New()

Expand Down
29 changes: 18 additions & 11 deletions embedded/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ type Options struct {
CompressionLevel int
EmbeddedValues bool
PreallocFiles bool
// Discard processing of transactions that were precommitted before opening
DiscardPrecommittedTransactions bool

// options below affect indexing
IndexOpts *IndexOptions
Expand Down Expand Up @@ -247,17 +249,17 @@ func DefaultOptions() *Options {
WriteTxHeaderVersion: DefaultWriteTxHeaderVersion,

// options below are only set during initialization and stored as metadata
MaxTxEntries: DefaultMaxTxEntries,
MaxKeyLen: DefaultMaxKeyLen,
MaxValueLen: DefaultMaxValueLen,
FileSize: DefaultFileSize,
CompressionFormat: DefaultCompressionFormat,
CompressionLevel: DefaultCompressionLevel,
EmbeddedValues: DefaultEmbeddedValues,
PreallocFiles: DefaultPreallocFiles,

IndexOpts: DefaultIndexOptions(),
AHTOpts: DefaultAHTOptions(),
MaxTxEntries: DefaultMaxTxEntries,
MaxKeyLen: DefaultMaxKeyLen,
MaxValueLen: DefaultMaxValueLen,
FileSize: DefaultFileSize,
CompressionFormat: DefaultCompressionFormat,
CompressionLevel: DefaultCompressionLevel,
EmbeddedValues: DefaultEmbeddedValues,
PreallocFiles: DefaultPreallocFiles,
DiscardPrecommittedTransactions: false,
IndexOpts: DefaultIndexOptions(),
AHTOpts: DefaultAHTOptions(),
}
}

Expand Down Expand Up @@ -620,6 +622,11 @@ func (opts *Options) WithAHTOptions(ahtOptions *AHTOptions) *Options {
return opts
}

func (opts *Options) WithDiscardPrecommittedTransactions(discard bool) *Options {
opts.DiscardPrecommittedTransactions = discard
return opts
}

// IndexOptions

func (opts *IndexOptions) WithCacheSize(cacheSize int) *IndexOptions {
Expand Down

0 comments on commit 3119645

Please sign in to comment.