Skip to content

Commit

Permalink
ffldb: Support storing spend journals and switch to mutable treap
Browse files Browse the repository at this point in the history
1: Switch the dbcache to use mutable treaps.

ffldb was using a immutable treap for the database cache.  While this is
good in that it supports snapshotting of the database transactions,
there is no need for this as a generic Bitcoin block storage.

Even worse, the immutable treap would allocate O(n) nodes per insert and
deletion, leading to a huge amount of memory being allcated.

This PR switches away from immutable treaps as there isn't really a need
for snapshotting and it costs too much in terms of memory allocation.

2: Save spend journals as flat files.

Spend journals are needed during reorgs, which rarely happens.  These
spend journals were being stored in leveldb, which isn't necessary.
This PR makes the spend journals to be saved in flat files, just like
blocks.

There now isn't any support for deleting spend journals but this isn't
really a concern.  The current btcd model of storing blocks is if the
received block passes initial pow and sanity checks.  Since spend
journals are only created when the block passes all verification, it's
even less likely that the spend journals are effected by disk fill
attacks.

There isn't any need for deleting blocks and so there is even less of a
need for deleting spend journals.
  • Loading branch information
kcalvinalvin committed Nov 8, 2021
1 parent a0a1645 commit c1ebe85
Show file tree
Hide file tree
Showing 12 changed files with 657 additions and 159 deletions.
7 changes: 0 additions & 7 deletions blockchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,13 +782,6 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block *btcutil.Block, view
return err
}

// Update the transaction spend journal by removing the record
// that contains all txos spent by the block.
err = dbRemoveSpendJournalEntry(dbTx, block.Hash())
if err != nil {
return err
}

// Allow the index manager to call each of the currently active
// optional indexes with the block being disconnected so they
// can update themselves accordingly.
Expand Down
51 changes: 15 additions & 36 deletions blockchain/chainio.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ const (
// latestUtxoSetBucketVersion is the current version of the utxo set
// bucket that is used to track all unspent outputs.
latestUtxoSetBucketVersion = 2

// latestSpendJournalBucketVersion is the current version of the spend
// journal bucket that is used to track all spent transactions for use
// in reorgs.
latestSpendJournalBucketVersion = 1
)

var (
Expand All @@ -55,14 +50,6 @@ var (
// consistency status of the utxo state.
utxoStateConsistencyKeyName = []byte("utxostateconsistency")

// spendJournalVersionKeyName is the name of the db key used to store
// the version of the spend journal currently in the database.
spendJournalVersionKeyName = []byte("spendjournalversion")

// spendJournalBucketName is the name of the db bucket used to house
// transactions outputs that are spent in each block.
spendJournalBucketName = []byte("spendjournal")

// utxoSetVersionKeyName is the name of the db key used to store the
// version of the utxo set currently in the database.
utxoSetVersionKeyName = []byte("utxosetversion")
Expand Down Expand Up @@ -462,9 +449,10 @@ func serializeSpendJournalEntry(stxos []SpentTxOut) []byte {
// was the final output spend in the containing transaction. It is up to the
// caller to handle this properly by looking the information up in the utxo set.
func dbFetchSpendJournalEntry(dbTx database.Tx, block *btcutil.Block) ([]SpentTxOut, error) {
// Exclude the coinbase transaction since it can't spend anything.
spendBucket := dbTx.Metadata().Bucket(spendJournalBucketName)
serialized := spendBucket.Get(block.Hash()[:])
serialized, err := dbTx.FetchSpendJournal(block.Hash())
if err != nil {
return nil, err
}
blockTxns := block.MsgBlock().Transactions[1:]
stxos, err := deserializeSpendJournalEntry(serialized, blockTxns)
if err != nil {
Expand All @@ -490,16 +478,8 @@ func dbFetchSpendJournalEntry(dbTx database.Tx, block *btcutil.Block) ([]SpentTx
// spent txouts. The spent txouts slice must contain an entry for every txout
// the transactions in the block spend in the order they are spent.
func dbPutSpendJournalEntry(dbTx database.Tx, blockHash *chainhash.Hash, stxos []SpentTxOut) error {
spendBucket := dbTx.Metadata().Bucket(spendJournalBucketName)
serialized := serializeSpendJournalEntry(stxos)
return spendBucket.Put(blockHash[:], serialized)
}

// dbRemoveSpendJournalEntry uses an existing database transaction to remove the
// spend journal entry for the passed block hash.
func dbRemoveSpendJournalEntry(dbTx database.Tx, blockHash *chainhash.Hash) error {
spendBucket := dbTx.Metadata().Bucket(spendJournalBucketName)
return spendBucket.Delete(blockHash[:])
return dbTx.StoreSpendJournal(blockHash, serialized)
}

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -1211,12 +1191,6 @@ func (b *BlockChain) createChainState() error {
return err
}

// Create the bucket that houses the spend journal data and
// store its version.
_, err = meta.CreateBucket(spendJournalBucketName)
if err != nil {
return err
}
err = dbPutVersion(dbTx, utxoSetVersionKeyName,
latestUtxoSetBucketVersion)
if err != nil {
Expand All @@ -1231,11 +1205,6 @@ func (b *BlockChain) createChainState() error {
if err != nil {
return err
}
err = dbPutVersion(dbTx, spendJournalVersionKeyName,
latestSpendJournalBucketVersion)
if err != nil {
return err
}

// Save the genesis block to the block index database.
err = dbStoreBlockNode(dbTx, node)
Expand Down Expand Up @@ -1269,6 +1238,16 @@ func (b *BlockChain) createChainState() error {
}
}

// Store empty spend journal for the genesis block. This is needed
// because indexers will try to fetch the spend journal for the genesis.
//
// TODO fix the above because indexers don't need to do that. But like
// this works so eh.
err = dbTx.StoreSpendJournal(genesisBlock.Hash(), []byte{})
if err != nil {
return err
}

// Store the genesis block into the database.
return dbStoreBlock(dbTx, genesisBlock)
})
Expand Down
47 changes: 26 additions & 21 deletions database/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ const (
// exist in the database.
ErrBlockNotFound

// ErrSpendJournalNotFound indicates a spend journal with the provided hash does not
// exist in the database.
ErrSpendJournalNotFound

// ErrBlockExists indicates a block with the provided hash already
// exists in the database.
ErrBlockExists
Expand All @@ -133,27 +137,28 @@ const (

// Map of ErrorCode values back to their constant names for pretty printing.
var errorCodeStrings = map[ErrorCode]string{
ErrDbTypeRegistered: "ErrDbTypeRegistered",
ErrDbUnknownType: "ErrDbUnknownType",
ErrDbDoesNotExist: "ErrDbDoesNotExist",
ErrDbExists: "ErrDbExists",
ErrDbNotOpen: "ErrDbNotOpen",
ErrDbAlreadyOpen: "ErrDbAlreadyOpen",
ErrInvalid: "ErrInvalid",
ErrCorruption: "ErrCorruption",
ErrTxClosed: "ErrTxClosed",
ErrTxNotWritable: "ErrTxNotWritable",
ErrBucketNotFound: "ErrBucketNotFound",
ErrBucketExists: "ErrBucketExists",
ErrBucketNameRequired: "ErrBucketNameRequired",
ErrKeyRequired: "ErrKeyRequired",
ErrKeyTooLarge: "ErrKeyTooLarge",
ErrValueTooLarge: "ErrValueTooLarge",
ErrIncompatibleValue: "ErrIncompatibleValue",
ErrBlockNotFound: "ErrBlockNotFound",
ErrBlockExists: "ErrBlockExists",
ErrBlockRegionInvalid: "ErrBlockRegionInvalid",
ErrDriverSpecific: "ErrDriverSpecific",
ErrDbTypeRegistered: "ErrDbTypeRegistered",
ErrDbUnknownType: "ErrDbUnknownType",
ErrDbDoesNotExist: "ErrDbDoesNotExist",
ErrDbExists: "ErrDbExists",
ErrDbNotOpen: "ErrDbNotOpen",
ErrDbAlreadyOpen: "ErrDbAlreadyOpen",
ErrInvalid: "ErrInvalid",
ErrCorruption: "ErrCorruption",
ErrTxClosed: "ErrTxClosed",
ErrTxNotWritable: "ErrTxNotWritable",
ErrBucketNotFound: "ErrBucketNotFound",
ErrBucketExists: "ErrBucketExists",
ErrBucketNameRequired: "ErrBucketNameRequired",
ErrKeyRequired: "ErrKeyRequired",
ErrKeyTooLarge: "ErrKeyTooLarge",
ErrValueTooLarge: "ErrValueTooLarge",
ErrIncompatibleValue: "ErrIncompatibleValue",
ErrBlockNotFound: "ErrBlockNotFound",
ErrSpendJournalNotFound: "ErrSpendJournalNotFound",
ErrBlockExists: "ErrBlockExists",
ErrBlockRegionInvalid: "ErrBlockRegionInvalid",
ErrDriverSpecific: "ErrDriverSpecific",
}

// String returns the ErrorCode as a human-readable name.
Expand Down
1 change: 1 addition & 0 deletions database/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestErrorCodeStringer(t *testing.T) {
{database.ErrValueTooLarge, "ErrValueTooLarge"},
{database.ErrIncompatibleValue, "ErrIncompatibleValue"},
{database.ErrBlockNotFound, "ErrBlockNotFound"},
{database.ErrSpendJournalNotFound, "ErrSpendJournalNotFound"},
{database.ErrBlockExists, "ErrBlockExists"},
{database.ErrBlockRegionInvalid, "ErrBlockRegionInvalid"},
{database.ErrDriverSpecific, "ErrDriverSpecific"},
Expand Down
67 changes: 58 additions & 9 deletions database/ffldb/blockio.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
// the future.
blockFilenameTemplate = "%09d.fdb"

// The name to be used for spend journals.
spendJournalFilenameTemplate = "%09d-undo.fdb"

// maxOpenFiles is the max number of open files to maintain in the
// open blocks cache. Note that this does not include the current
// write file, so there will typically be one more than this value open.
Expand Down Expand Up @@ -174,6 +177,9 @@ type blockStore struct {
openFileFunc func(fileNum uint32) (*lockableFile, error)
openWriteFileFunc func(fileNum uint32) (filer, error)
deleteFileFunc func(fileNum uint32) error

// filePathFunc returns the file path of where the data is stored.
filePathFunc func(dbPath string, fileNum uint32) string
}

// blockLocation identifies a particular block file and location.
Expand Down Expand Up @@ -223,6 +229,13 @@ func blockFilePath(dbPath string, fileNum uint32) string {
return filepath.Join(dbPath, fileName)
}

// spendJournalFilePath return the file path for the provided spend journal
// file number.
func spendJournalFilePath(dbPath string, fileNum uint32) string {
fileName := fmt.Sprintf(spendJournalFilenameTemplate, fileNum)
return filepath.Join(dbPath, fileName)
}

// openWriteFile returns a file handle for the passed flat file number in
// read/write mode. The file will be created if needed. It is typically used
// for the current file that will have all new data appended. Unlike openFile,
Expand All @@ -232,7 +245,7 @@ func (s *blockStore) openWriteFile(fileNum uint32) (filer, error) {
// The current block file needs to be read-write so it is possible to
// append to it. Also, it shouldn't be part of the least recently used
// file.
filePath := blockFilePath(s.basePath, fileNum)
filePath := s.filePathFunc(s.basePath, fileNum)
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
str := fmt.Sprintf("failed to open file %q: %v", filePath, err)
Expand All @@ -251,7 +264,7 @@ func (s *blockStore) openWriteFile(fileNum uint32) (filer, error) {
// for WRITES.
func (s *blockStore) openFile(fileNum uint32) (*lockableFile, error) {
// Open the appropriate file as read-only.
filePath := blockFilePath(s.basePath, fileNum)
filePath := s.filePathFunc(s.basePath, fileNum)
file, err := os.Open(filePath)
if err != nil {
return nil, makeDbErr(database.ErrDriverSpecific, err.Error(),
Expand Down Expand Up @@ -299,7 +312,7 @@ func (s *blockStore) openFile(fileNum uint32) (*lockableFile, error) {
// must already be closed and it is the responsibility of the caller to do any
// other state cleanup necessary.
func (s *blockStore) deleteFile(fileNum uint32) error {
filePath := blockFilePath(s.basePath, fileNum)
filePath := s.filePathFunc(s.basePath, fileNum)
if err := os.Remove(filePath); err != nil {
return makeDbErr(database.ErrDriverSpecific, err.Error(), err)
}
Expand Down Expand Up @@ -518,7 +531,7 @@ func (s *blockStore) readBlock(hash *chainhash.Hash, loc blockLocation) ([]byte,
n, err := blockFile.file.ReadAt(serializedData, int64(loc.fileOffset))
blockFile.RUnlock()
if err != nil {
str := fmt.Sprintf("failed to read block %s from file %d, "+
str := fmt.Sprintf("failed to read data %s from file %d, "+
"offset %d: %v", hash, loc.blockFileNum, loc.fileOffset,
err)
return nil, makeDbErr(database.ErrDriverSpecific, str, err)
Expand All @@ -531,7 +544,7 @@ func (s *blockStore) readBlock(hash *chainhash.Hash, loc blockLocation) ([]byte,
serializedChecksum := binary.BigEndian.Uint32(serializedData[n-4:])
calculatedChecksum := crc32.Checksum(serializedData[:n-4], castagnoli)
if serializedChecksum != calculatedChecksum {
str := fmt.Sprintf("block data for block %s checksum "+
str := fmt.Sprintf("data for block %s checksum "+
"does not match - got %x, want %x", hash,
calculatedChecksum, serializedChecksum)
return nil, makeDbErr(database.ErrCorruption, str, nil)
Expand All @@ -542,7 +555,7 @@ func (s *blockStore) readBlock(hash *chainhash.Hash, loc blockLocation) ([]byte,
// wrong network in the directory.
serializedNet := byteOrder.Uint32(serializedData[:4])
if serializedNet != uint32(s.network) {
str := fmt.Sprintf("block data for block %s is for the "+
str := fmt.Sprintf("data for block %s is for the "+
"wrong network - got %d, want %d", hash, serializedNet,
uint32(s.network))
return nil, makeDbErr(database.ErrDriverSpecific, str, nil)
Expand Down Expand Up @@ -717,11 +730,13 @@ func (s *blockStore) handleRollback(oldBlockFileNum, oldBlockOffset uint32) {
// current write cursor which is also stored in the metadata. Thus, it is used
// to detect unexpected shutdowns in the middle of writes so the block files
// can be reconciled.
func scanBlockFiles(dbPath string) (int, uint32) {
func scanBlockFiles(dbPath string,
filePathFunc func(dbPath string, fileNum uint32) string) (int, uint32) {

lastFile := -1
fileLen := uint32(0)
for i := 0; ; i++ {
filePath := blockFilePath(dbPath, uint32(i))
filePath := filePathFunc(dbPath, uint32(i))
st, err := os.Stat(filePath)
if err != nil {
break
Expand All @@ -742,7 +757,40 @@ func newBlockStore(basePath string, network wire.BitcoinNet) *blockStore {
// Look for the end of the latest block to file to determine what the
// write cursor position is from the viewpoing of the block files on
// disk.
fileNum, fileOff := scanBlockFiles(basePath)
fileNum, fileOff := scanBlockFiles(basePath, blockFilePath)
if fileNum == -1 {
fileNum = 0
fileOff = 0
}

store := &blockStore{
network: network,
basePath: basePath,
maxBlockFileSize: maxBlockFileSize,
openBlockFiles: make(map[uint32]*lockableFile),
openBlocksLRU: list.New(),
fileNumToLRUElem: make(map[uint32]*list.Element),

writeCursor: &writeCursor{
curFile: &lockableFile{},
curFileNum: uint32(fileNum),
curOffset: fileOff,
},
}
store.openFileFunc = store.openFile
store.openWriteFileFunc = store.openWriteFile
store.deleteFileFunc = store.deleteFile
store.filePathFunc = blockFilePath
return store
}

// newSJStore returns a new spend journal store with the current spend journal file
// number and offset set and all fields initialized.
func newSJStore(basePath string, network wire.BitcoinNet) *blockStore {
// Look for the end of the latest block to file to determine what the
// write cursor position is from the viewpoing of the block files on
// disk.
fileNum, fileOff := scanBlockFiles(basePath, spendJournalFilePath)
if fileNum == -1 {
fileNum = 0
fileOff = 0
Expand All @@ -765,5 +813,6 @@ func newBlockStore(basePath string, network wire.BitcoinNet) *blockStore {
store.openFileFunc = store.openFile
store.openWriteFileFunc = store.openWriteFile
store.deleteFileFunc = store.deleteFile
store.filePathFunc = spendJournalFilePath
return store
}
Loading

0 comments on commit c1ebe85

Please sign in to comment.