Skip to content

Commit

Permalink
Merge pull request btcsuite#20 from mit-dci/ffldb-change
Browse files Browse the repository at this point in the history
ffldb: Support storing spend journals and switch to mutable treap
  • Loading branch information
kcalvinalvin authored Nov 8, 2021
2 parents a0a1645 + c1ebe85 commit d2ea7fc
Show file tree
Hide file tree
Showing 12 changed files with 657 additions and 159 deletions.
7 changes: 0 additions & 7 deletions blockchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,13 +782,6 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block *btcutil.Block, view
return err
}

// Update the transaction spend journal by removing the record
// that contains all txos spent by the block.
err = dbRemoveSpendJournalEntry(dbTx, block.Hash())
if err != nil {
return err
}

// Allow the index manager to call each of the currently active
// optional indexes with the block being disconnected so they
// can update themselves accordingly.
Expand Down
51 changes: 15 additions & 36 deletions blockchain/chainio.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ const (
// latestUtxoSetBucketVersion is the current version of the utxo set
// bucket that is used to track all unspent outputs.
latestUtxoSetBucketVersion = 2

// latestSpendJournalBucketVersion is the current version of the spend
// journal bucket that is used to track all spent transactions for use
// in reorgs.
latestSpendJournalBucketVersion = 1
)

var (
Expand All @@ -55,14 +50,6 @@ var (
// consistency status of the utxo state.
utxoStateConsistencyKeyName = []byte("utxostateconsistency")

// spendJournalVersionKeyName is the name of the db key used to store
// the version of the spend journal currently in the database.
spendJournalVersionKeyName = []byte("spendjournalversion")

// spendJournalBucketName is the name of the db bucket used to house
// transactions outputs that are spent in each block.
spendJournalBucketName = []byte("spendjournal")

// utxoSetVersionKeyName is the name of the db key used to store the
// version of the utxo set currently in the database.
utxoSetVersionKeyName = []byte("utxosetversion")
Expand Down Expand Up @@ -462,9 +449,10 @@ func serializeSpendJournalEntry(stxos []SpentTxOut) []byte {
// was the final output spend in the containing transaction. It is up to the
// caller to handle this properly by looking the information up in the utxo set.
func dbFetchSpendJournalEntry(dbTx database.Tx, block *btcutil.Block) ([]SpentTxOut, error) {
// Exclude the coinbase transaction since it can't spend anything.
spendBucket := dbTx.Metadata().Bucket(spendJournalBucketName)
serialized := spendBucket.Get(block.Hash()[:])
serialized, err := dbTx.FetchSpendJournal(block.Hash())
if err != nil {
return nil, err
}
blockTxns := block.MsgBlock().Transactions[1:]
stxos, err := deserializeSpendJournalEntry(serialized, blockTxns)
if err != nil {
Expand All @@ -490,16 +478,8 @@ func dbFetchSpendJournalEntry(dbTx database.Tx, block *btcutil.Block) ([]SpentTx
// spent txouts. The spent txouts slice must contain an entry for every txout
// the transactions in the block spend in the order they are spent.
func dbPutSpendJournalEntry(dbTx database.Tx, blockHash *chainhash.Hash, stxos []SpentTxOut) error {
spendBucket := dbTx.Metadata().Bucket(spendJournalBucketName)
serialized := serializeSpendJournalEntry(stxos)
return spendBucket.Put(blockHash[:], serialized)
}

// dbRemoveSpendJournalEntry uses an existing database transaction to remove the
// spend journal entry for the passed block hash.
func dbRemoveSpendJournalEntry(dbTx database.Tx, blockHash *chainhash.Hash) error {
spendBucket := dbTx.Metadata().Bucket(spendJournalBucketName)
return spendBucket.Delete(blockHash[:])
return dbTx.StoreSpendJournal(blockHash, serialized)
}

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -1211,12 +1191,6 @@ func (b *BlockChain) createChainState() error {
return err
}

// Create the bucket that houses the spend journal data and
// store its version.
_, err = meta.CreateBucket(spendJournalBucketName)
if err != nil {
return err
}
err = dbPutVersion(dbTx, utxoSetVersionKeyName,
latestUtxoSetBucketVersion)
if err != nil {
Expand All @@ -1231,11 +1205,6 @@ func (b *BlockChain) createChainState() error {
if err != nil {
return err
}
err = dbPutVersion(dbTx, spendJournalVersionKeyName,
latestSpendJournalBucketVersion)
if err != nil {
return err
}

// Save the genesis block to the block index database.
err = dbStoreBlockNode(dbTx, node)
Expand Down Expand Up @@ -1269,6 +1238,16 @@ func (b *BlockChain) createChainState() error {
}
}

// Store empty spend journal for the genesis block. This is needed
// because indexers will try to fetch the spend journal for the genesis.
//
// TODO fix the above because indexers don't need to do that. But like
// this works so eh.
err = dbTx.StoreSpendJournal(genesisBlock.Hash(), []byte{})
if err != nil {
return err
}

// Store the genesis block into the database.
return dbStoreBlock(dbTx, genesisBlock)
})
Expand Down
47 changes: 26 additions & 21 deletions database/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ const (
// exist in the database.
ErrBlockNotFound

// ErrSpendJournalNotFound indicates a spend journal with the provided hash does not
// exist in the database.
ErrSpendJournalNotFound

// ErrBlockExists indicates a block with the provided hash already
// exists in the database.
ErrBlockExists
Expand All @@ -133,27 +137,28 @@ const (

// Map of ErrorCode values back to their constant names for pretty printing.
var errorCodeStrings = map[ErrorCode]string{
ErrDbTypeRegistered: "ErrDbTypeRegistered",
ErrDbUnknownType: "ErrDbUnknownType",
ErrDbDoesNotExist: "ErrDbDoesNotExist",
ErrDbExists: "ErrDbExists",
ErrDbNotOpen: "ErrDbNotOpen",
ErrDbAlreadyOpen: "ErrDbAlreadyOpen",
ErrInvalid: "ErrInvalid",
ErrCorruption: "ErrCorruption",
ErrTxClosed: "ErrTxClosed",
ErrTxNotWritable: "ErrTxNotWritable",
ErrBucketNotFound: "ErrBucketNotFound",
ErrBucketExists: "ErrBucketExists",
ErrBucketNameRequired: "ErrBucketNameRequired",
ErrKeyRequired: "ErrKeyRequired",
ErrKeyTooLarge: "ErrKeyTooLarge",
ErrValueTooLarge: "ErrValueTooLarge",
ErrIncompatibleValue: "ErrIncompatibleValue",
ErrBlockNotFound: "ErrBlockNotFound",
ErrBlockExists: "ErrBlockExists",
ErrBlockRegionInvalid: "ErrBlockRegionInvalid",
ErrDriverSpecific: "ErrDriverSpecific",
ErrDbTypeRegistered: "ErrDbTypeRegistered",
ErrDbUnknownType: "ErrDbUnknownType",
ErrDbDoesNotExist: "ErrDbDoesNotExist",
ErrDbExists: "ErrDbExists",
ErrDbNotOpen: "ErrDbNotOpen",
ErrDbAlreadyOpen: "ErrDbAlreadyOpen",
ErrInvalid: "ErrInvalid",
ErrCorruption: "ErrCorruption",
ErrTxClosed: "ErrTxClosed",
ErrTxNotWritable: "ErrTxNotWritable",
ErrBucketNotFound: "ErrBucketNotFound",
ErrBucketExists: "ErrBucketExists",
ErrBucketNameRequired: "ErrBucketNameRequired",
ErrKeyRequired: "ErrKeyRequired",
ErrKeyTooLarge: "ErrKeyTooLarge",
ErrValueTooLarge: "ErrValueTooLarge",
ErrIncompatibleValue: "ErrIncompatibleValue",
ErrBlockNotFound: "ErrBlockNotFound",
ErrSpendJournalNotFound: "ErrSpendJournalNotFound",
ErrBlockExists: "ErrBlockExists",
ErrBlockRegionInvalid: "ErrBlockRegionInvalid",
ErrDriverSpecific: "ErrDriverSpecific",
}

// String returns the ErrorCode as a human-readable name.
Expand Down
1 change: 1 addition & 0 deletions database/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestErrorCodeStringer(t *testing.T) {
{database.ErrValueTooLarge, "ErrValueTooLarge"},
{database.ErrIncompatibleValue, "ErrIncompatibleValue"},
{database.ErrBlockNotFound, "ErrBlockNotFound"},
{database.ErrSpendJournalNotFound, "ErrSpendJournalNotFound"},
{database.ErrBlockExists, "ErrBlockExists"},
{database.ErrBlockRegionInvalid, "ErrBlockRegionInvalid"},
{database.ErrDriverSpecific, "ErrDriverSpecific"},
Expand Down
67 changes: 58 additions & 9 deletions database/ffldb/blockio.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
// the future.
blockFilenameTemplate = "%09d.fdb"

// The name to be used for spend journals.
spendJournalFilenameTemplate = "%09d-undo.fdb"

// maxOpenFiles is the max number of open files to maintain in the
// open blocks cache. Note that this does not include the current
// write file, so there will typically be one more than this value open.
Expand Down Expand Up @@ -174,6 +177,9 @@ type blockStore struct {
openFileFunc func(fileNum uint32) (*lockableFile, error)
openWriteFileFunc func(fileNum uint32) (filer, error)
deleteFileFunc func(fileNum uint32) error

// filePathFunc returns the file path of where the data is stored.
filePathFunc func(dbPath string, fileNum uint32) string
}

// blockLocation identifies a particular block file and location.
Expand Down Expand Up @@ -223,6 +229,13 @@ func blockFilePath(dbPath string, fileNum uint32) string {
return filepath.Join(dbPath, fileName)
}

// spendJournalFilePath return the file path for the provided spend journal
// file number.
func spendJournalFilePath(dbPath string, fileNum uint32) string {
fileName := fmt.Sprintf(spendJournalFilenameTemplate, fileNum)
return filepath.Join(dbPath, fileName)
}

// openWriteFile returns a file handle for the passed flat file number in
// read/write mode. The file will be created if needed. It is typically used
// for the current file that will have all new data appended. Unlike openFile,
Expand All @@ -232,7 +245,7 @@ func (s *blockStore) openWriteFile(fileNum uint32) (filer, error) {
// The current block file needs to be read-write so it is possible to
// append to it. Also, it shouldn't be part of the least recently used
// file.
filePath := blockFilePath(s.basePath, fileNum)
filePath := s.filePathFunc(s.basePath, fileNum)
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
str := fmt.Sprintf("failed to open file %q: %v", filePath, err)
Expand All @@ -251,7 +264,7 @@ func (s *blockStore) openWriteFile(fileNum uint32) (filer, error) {
// for WRITES.
func (s *blockStore) openFile(fileNum uint32) (*lockableFile, error) {
// Open the appropriate file as read-only.
filePath := blockFilePath(s.basePath, fileNum)
filePath := s.filePathFunc(s.basePath, fileNum)
file, err := os.Open(filePath)
if err != nil {
return nil, makeDbErr(database.ErrDriverSpecific, err.Error(),
Expand Down Expand Up @@ -299,7 +312,7 @@ func (s *blockStore) openFile(fileNum uint32) (*lockableFile, error) {
// must already be closed and it is the responsibility of the caller to do any
// other state cleanup necessary.
func (s *blockStore) deleteFile(fileNum uint32) error {
filePath := blockFilePath(s.basePath, fileNum)
filePath := s.filePathFunc(s.basePath, fileNum)
if err := os.Remove(filePath); err != nil {
return makeDbErr(database.ErrDriverSpecific, err.Error(), err)
}
Expand Down Expand Up @@ -518,7 +531,7 @@ func (s *blockStore) readBlock(hash *chainhash.Hash, loc blockLocation) ([]byte,
n, err := blockFile.file.ReadAt(serializedData, int64(loc.fileOffset))
blockFile.RUnlock()
if err != nil {
str := fmt.Sprintf("failed to read block %s from file %d, "+
str := fmt.Sprintf("failed to read data %s from file %d, "+
"offset %d: %v", hash, loc.blockFileNum, loc.fileOffset,
err)
return nil, makeDbErr(database.ErrDriverSpecific, str, err)
Expand All @@ -531,7 +544,7 @@ func (s *blockStore) readBlock(hash *chainhash.Hash, loc blockLocation) ([]byte,
serializedChecksum := binary.BigEndian.Uint32(serializedData[n-4:])
calculatedChecksum := crc32.Checksum(serializedData[:n-4], castagnoli)
if serializedChecksum != calculatedChecksum {
str := fmt.Sprintf("block data for block %s checksum "+
str := fmt.Sprintf("data for block %s checksum "+
"does not match - got %x, want %x", hash,
calculatedChecksum, serializedChecksum)
return nil, makeDbErr(database.ErrCorruption, str, nil)
Expand All @@ -542,7 +555,7 @@ func (s *blockStore) readBlock(hash *chainhash.Hash, loc blockLocation) ([]byte,
// wrong network in the directory.
serializedNet := byteOrder.Uint32(serializedData[:4])
if serializedNet != uint32(s.network) {
str := fmt.Sprintf("block data for block %s is for the "+
str := fmt.Sprintf("data for block %s is for the "+
"wrong network - got %d, want %d", hash, serializedNet,
uint32(s.network))
return nil, makeDbErr(database.ErrDriverSpecific, str, nil)
Expand Down Expand Up @@ -717,11 +730,13 @@ func (s *blockStore) handleRollback(oldBlockFileNum, oldBlockOffset uint32) {
// current write cursor which is also stored in the metadata. Thus, it is used
// to detect unexpected shutdowns in the middle of writes so the block files
// can be reconciled.
func scanBlockFiles(dbPath string) (int, uint32) {
func scanBlockFiles(dbPath string,
filePathFunc func(dbPath string, fileNum uint32) string) (int, uint32) {

lastFile := -1
fileLen := uint32(0)
for i := 0; ; i++ {
filePath := blockFilePath(dbPath, uint32(i))
filePath := filePathFunc(dbPath, uint32(i))
st, err := os.Stat(filePath)
if err != nil {
break
Expand All @@ -742,7 +757,40 @@ func newBlockStore(basePath string, network wire.BitcoinNet) *blockStore {
// Look for the end of the latest block to file to determine what the
// write cursor position is from the viewpoing of the block files on
// disk.
fileNum, fileOff := scanBlockFiles(basePath)
fileNum, fileOff := scanBlockFiles(basePath, blockFilePath)
if fileNum == -1 {
fileNum = 0
fileOff = 0
}

store := &blockStore{
network: network,
basePath: basePath,
maxBlockFileSize: maxBlockFileSize,
openBlockFiles: make(map[uint32]*lockableFile),
openBlocksLRU: list.New(),
fileNumToLRUElem: make(map[uint32]*list.Element),

writeCursor: &writeCursor{
curFile: &lockableFile{},
curFileNum: uint32(fileNum),
curOffset: fileOff,
},
}
store.openFileFunc = store.openFile
store.openWriteFileFunc = store.openWriteFile
store.deleteFileFunc = store.deleteFile
store.filePathFunc = blockFilePath
return store
}

// newSJStore returns a new spend journal store with the current spend journal file
// number and offset set and all fields initialized.
func newSJStore(basePath string, network wire.BitcoinNet) *blockStore {
// Look for the end of the latest block to file to determine what the
// write cursor position is from the viewpoing of the block files on
// disk.
fileNum, fileOff := scanBlockFiles(basePath, spendJournalFilePath)
if fileNum == -1 {
fileNum = 0
fileOff = 0
Expand All @@ -765,5 +813,6 @@ func newBlockStore(basePath string, network wire.BitcoinNet) *blockStore {
store.openFileFunc = store.openFile
store.openWriteFileFunc = store.openWriteFile
store.deleteFileFunc = store.deleteFile
store.filePathFunc = spendJournalFilePath
return store
}
Loading

0 comments on commit d2ea7fc

Please sign in to comment.