Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(embedded/store): allow skipping precommitted transactions durin… #2045

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 29 additions & 27 deletions embedded/store/immustore.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,43 +533,45 @@ func OpenWith(path string, vLogs []appendable.Appendable, txLog, cLog appendable
precommittedAlh := committedAlh
precommittedTxLogSize := committedTxLogSize

// read pre-committed txs from txLog and insert into cLogBuf to continue with the commit process
// txLog may be partially written, precommitted transactions loading is terminated if an inconsistency is found
txReader := appendable.NewReaderFrom(txLog, precommittedTxLogSize, multiapp.DefaultReadBufferSize)
if !opts.DiscardPrecommittedTransactions {
// read pre-committed txs from txLog and insert into cLogBuf to continue with the commit process
// txLog may be partially written, precommitted transactions loading is terminated if an inconsistency is found
txReader := appendable.NewReaderFrom(txLog, precommittedTxLogSize, multiapp.DefaultReadBufferSize)

tx, _ := txPool.Alloc()
tx, _ := txPool.Alloc()

for {
err = tx.readFrom(txReader, false)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
opts.logger.Infof("%v: discarding pre-committed transaction: %d", err, precommittedTxID+1)
break
}
for {
err = tx.readFrom(txReader, false)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
opts.logger.Infof("%v: discarding pre-committed transaction: %d", err, precommittedTxID+1)
break
}

if tx.header.ID != precommittedTxID+1 || tx.header.PrevAlh != precommittedAlh {
opts.logger.Infof("%v: discarding pre-committed transaction: %d", ErrCorruptedData, precommittedTxID+1)
break
}
if tx.header.ID != precommittedTxID+1 || tx.header.PrevAlh != precommittedAlh {
opts.logger.Infof("%v: discarding pre-committed transaction: %d", ErrCorruptedData, precommittedTxID+1)
break
}

precommittedTxID++
precommittedAlh = tx.header.Alh()
precommittedTxID++
precommittedAlh = tx.header.Alh()

txSize := int(txReader.ReadCount() - (precommittedTxLogSize - committedTxLogSize))
txSize := int(txReader.ReadCount() - (precommittedTxLogSize - committedTxLogSize))

err = cLogBuf.put(precommittedTxID, precommittedAlh, precommittedTxLogSize, txSize)
if err != nil {
txPool.Release(tx)
return nil, fmt.Errorf("%v: while loading pre-committed transaction: %v", err, precommittedTxID+1)
err = cLogBuf.put(precommittedTxID, precommittedAlh, precommittedTxLogSize, txSize)
if err != nil {
txPool.Release(tx)
return nil, fmt.Errorf("%v: while loading pre-committed transaction: %v", err, precommittedTxID+1)
}

precommittedTxLogSize += int64(txSize)
}

precommittedTxLogSize += int64(txSize)
txPool.Release(tx)
}

txPool.Release(tx)

vLogsMap := make(map[byte]*refVLog, len(vLogs))
vLogUnlockedList := list.New()

Expand Down
7 changes: 4 additions & 3 deletions embedded/store/ongoing_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,16 @@ func newOngoingTx(ctx context.Context, s *ImmuStore, opts *TxOptions) (*OngoingT

tx.mode = opts.Mode

if opts.Mode == WriteOnlyTx {
if opts.Mode != WriteOnlyTx {
return tx, nil
}

tx.snapshotMustIncludeTxID = opts.SnapshotMustIncludeTxID
tx.snapshotRenewalPeriod = opts.SnapshotRenewalPeriod
tx.mvccReadSet = &mvccReadSet{}

if opts.Mode == ReadWriteTx {
tx.mvccReadSet = &mvccReadSet{}
}
return tx, nil
}

Expand Down Expand Up @@ -562,7 +564,6 @@ func (tx *OngoingTx) NewKeyReader(spec KeyReaderSpec) (KeyReader, error) {

return snap.NewKeyReader(spec)
}

return newOngoingTxKeyReader(tx, spec)
}

Expand Down
29 changes: 18 additions & 11 deletions embedded/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ type Options struct {
CompressionLevel int
EmbeddedValues bool
PreallocFiles bool
// Discard processing of transactions that were precommitted before opening
DiscardPrecommittedTransactions bool

// options below affect indexing
IndexOpts *IndexOptions
Expand Down Expand Up @@ -247,17 +249,17 @@ func DefaultOptions() *Options {
WriteTxHeaderVersion: DefaultWriteTxHeaderVersion,

// options below are only set during initialization and stored as metadata
MaxTxEntries: DefaultMaxTxEntries,
MaxKeyLen: DefaultMaxKeyLen,
MaxValueLen: DefaultMaxValueLen,
FileSize: DefaultFileSize,
CompressionFormat: DefaultCompressionFormat,
CompressionLevel: DefaultCompressionLevel,
EmbeddedValues: DefaultEmbeddedValues,
PreallocFiles: DefaultPreallocFiles,

IndexOpts: DefaultIndexOptions(),
AHTOpts: DefaultAHTOptions(),
MaxTxEntries: DefaultMaxTxEntries,
MaxKeyLen: DefaultMaxKeyLen,
MaxValueLen: DefaultMaxValueLen,
FileSize: DefaultFileSize,
CompressionFormat: DefaultCompressionFormat,
CompressionLevel: DefaultCompressionLevel,
EmbeddedValues: DefaultEmbeddedValues,
PreallocFiles: DefaultPreallocFiles,
DiscardPrecommittedTransactions: false,
IndexOpts: DefaultIndexOptions(),
AHTOpts: DefaultAHTOptions(),
}
}

Expand Down Expand Up @@ -620,6 +622,11 @@ func (opts *Options) WithAHTOptions(ahtOptions *AHTOptions) *Options {
return opts
}

func (opts *Options) WithDiscardPrecommittedTransactions(discard bool) *Options {
opts.DiscardPrecommittedTransactions = discard
return opts
}

// IndexOptions

func (opts *IndexOptions) WithCacheSize(cacheSize int) *IndexOptions {
Expand Down
Loading