Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement persistence with the new structures #965

Merged
merged 2 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/chain/src/indexed_tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ impl<A: Anchor, IA: Append> Append for IndexedAdditions<A, IA> {
self.graph_additions.append(other.graph_additions);
self.index_additions.append(other.index_additions);
}

fn is_empty(&self) -> bool {
self.graph_additions.is_empty() && self.index_additions.is_empty()
}
}

/// Represents a structure that can index transaction data.
Expand Down
4 changes: 4 additions & 0 deletions crates/chain/src/keychain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ impl<K: Ord> Append for DerivationAdditions<K> {

self.0.append(&mut other.0);
}

fn is_empty(&self) -> bool {
self.0.is_empty()
}
}

impl<K> Default for DerivationAdditions<K> {
Expand Down
2 changes: 2 additions & 0 deletions crates/chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub mod tx_graph;
pub use tx_data_traits::*;
mod chain_oracle;
pub use chain_oracle::*;
mod persist;
pub use persist::*;

#[doc(hidden)]
pub mod example_utils;
Expand Down
89 changes: 89 additions & 0 deletions crates/chain/src/persist.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use core::convert::Infallible;

use crate::Append;

/// `Persist` wraps a [`PersistBackend`] (`B`) to create a convenient staging area for changes (`C`)
/// before they are persisted.
///
/// Not all changes to the in-memory representation needs to be written to disk right away, so
/// [`Persist::stage`] can be used to *stage* changes first and then [`Persist::commit`] can be used
/// to write changes to disk.
#[derive(Debug)]
pub struct Persist<B, C> {
backend: B,
stage: C,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe it should be S for stage, instead of C?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C for changeset though 😮

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then do changeset: C :P

Copy link
Member

@notmandatory notmandatory May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I support changeset: C 🙂

}

impl<B, C> Persist<B, C>
where
B: PersistBackend<C>,
C: Default + Append,
{
/// Create a new [`Persist`] from [`PersistBackend`].
pub fn new(backend: B) -> Self {
Self {
backend,
stage: Default::default(),
}
}

/// Stage a `changeset` to be commited later with [`commit`].
///
/// [`commit`]: Self::commit
pub fn stage(&mut self, changeset: C) {
self.stage.append(changeset)
}

/// Get the changes that have not been commited yet.
pub fn staged(&self) -> &C {
&self.stage
}

/// Commit the staged changes to the underlying persistance backend.
///
/// Returns a backend-defined error if this fails.
pub fn commit(&mut self) -> Result<(), B::WriteError> {
let mut temp = C::default();
core::mem::swap(&mut temp, &mut self.stage);
self.backend.write_changes(&temp)
Comment on lines +46 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhhhh maybe put a small comment here, it took me a second to figure out what you were trying to do (commit changes and clean self.stage at the same time)

Also, a shorter way of writing this, but not necessarily a cleaner one, might be:

self.backend.write_changes(core::mem::take(&mut self.stage))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually shouldn't we only clear it in the case that it succeeds.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    /// Commit the staged changes to the underlying persistance backend.
    ///
    /// Changes that are committed (if any) are returned.
    ///
    /// # Error
    ///
    /// Returns a backend-defined error if this fails.
    pub fn commit(&mut self) -> Result<Option<C>, B::WriteError> {
        if self.stage.is_empty() {
            return Ok(None);
        }
        self.backend
            .write_changes(&self.stage)
            // if written successfully, take and return `self.stage`
            .map(|_| Some(core::mem::take(&mut self.stage)))
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

/// A persistence backend for [`Persist`].
///
/// `C` represents the changeset; a datatype that records changes made to in-memory data structures
/// that are to be persisted, or retrieved from persistence.
pub trait PersistBackend<C> {
/// The error the backend returns when it fails to write.
type WriteError: core::fmt::Debug;

/// The error the backend returns when it fails to load changesets `C`.
type LoadError: core::fmt::Debug;

/// Writes a changeset to the persistence backend.
///
/// It is up to the backend what it does with this. It could store every changeset in a list or
/// it inserts the actual changes into a more structured database. All it needs to guarantee is
/// that [`load_from_persistence`] restores a keychain tracker to what it should be if all
/// changesets had been applied sequentially.
///
/// [`load_from_persistence`]: Self::load_from_persistence
fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError>;

/// Return the aggregate changeset `C` from persistence.
fn load_from_persistence(&mut self) -> Result<C, Self::LoadError>;
}

impl<C: Default> PersistBackend<C> for () {
type WriteError = Infallible;

type LoadError = Infallible;

fn write_changes(&mut self, _changeset: &C) -> Result<(), Self::WriteError> {
Ok(())
}

fn load_from_persistence(&mut self) -> Result<C, Self::LoadError> {
Ok(C::default())
}
}
37 changes: 37 additions & 0 deletions crates/chain/src/tx_data_traits.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::collections::BTreeMap;
use crate::collections::BTreeSet;
use crate::BlockId;
use alloc::vec::Vec;
use bitcoin::{Block, OutPoint, Transaction, TxOut};

/// Trait to do something with every txout contained in a structure.
Expand Down Expand Up @@ -64,20 +65,56 @@ impl<A: Anchor> Anchor for &'static A {
pub trait Append {
/// Append another object of the same type onto `self`.
fn append(&mut self, other: Self);

/// Returns whether the structure is considered empty.
fn is_empty(&self) -> bool;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say I wanted to implement Append for a u32 to represent a monotonically increasing value -- I guess u32::MIN would return true for is_empty.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think implementing Append for u32 is a rather confusing combination. Option<u32> or using a wrapped type makes more sense for me.

}

impl Append for () {
fn append(&mut self, _other: Self) {}

fn is_empty(&self) -> bool {
true
}
}

impl<K: Ord, V> Append for BTreeMap<K, V> {
fn append(&mut self, mut other: Self) {
BTreeMap::append(self, &mut other)
}

fn is_empty(&self) -> bool {
BTreeMap::is_empty(self)
}
}

impl<T: Ord> Append for BTreeSet<T> {
fn append(&mut self, mut other: Self) {
BTreeSet::append(self, &mut other)
}

fn is_empty(&self) -> bool {
BTreeSet::is_empty(self)
}
}

impl<T> Append for Vec<T> {
fn append(&mut self, mut other: Self) {
Vec::append(self, &mut other)
}

fn is_empty(&self) -> bool {
Vec::is_empty(self)
}
}

impl<A: Append, B: Append> Append for (A, B) {
fn append(&mut self, other: Self) {
Append::append(&mut self.0, other.0);
Append::append(&mut self.1, other.1);
}

fn is_empty(&self) -> bool {
Append::is_empty(&self.0) && Append::is_empty(&self.1)
}
}
7 changes: 7 additions & 0 deletions crates/chain/src/tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,13 @@ impl<A: Ord> Append for Additions<A> {
.collect::<Vec<_>>(),
);
}

fn is_empty(&self) -> bool {
self.tx.is_empty()
&& self.txout.is_empty()
&& self.anchors.is_empty()
&& self.last_seen.is_empty()
}
}

impl<A> AsRef<TxGraph<A>> for TxGraph<A> {
Expand Down
100 changes: 100 additions & 0 deletions crates/file_store/src/entry_iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use bincode::Options;
use std::{
fs::File,
io::{self, Seek},
marker::PhantomData,
};

use crate::bincode_options;

/// Iterator over entries in a file store.
///
/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`.
///
/// [`next`]: Self::next
pub struct EntryIter<'t, T> {
db_file: Option<&'t mut File>,

/// The file position for the first read of `db_file`.
start_pos: Option<u64>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe expand in the docs why it's an Option and what happens when this is None

types: PhantomData<T>,
}

impl<'t, T> EntryIter<'t, T> {
pub fn new(start_pos: u64, db_file: &'t mut File) -> Self {
Self {
db_file: Some(db_file),
start_pos: Some(start_pos),
types: PhantomData,
}
}
}

impl<'t, T> Iterator for EntryIter<'t, T>
where
T: serde::de::DeserializeOwned,
{
type Item = Result<T, IterError>;

fn next(&mut self) -> Option<Self::Item> {
// closure which reads a single entry starting from `self.pos`
let read_one = |f: &mut File, start_pos: Option<u64>| -> Result<Option<T>, IterError> {
let pos = match start_pos {
Some(pos) => f.seek(io::SeekFrom::Start(pos))?,
None => f.stream_position()?,
};

match bincode_options().deserialize_from(&*f) {
Ok(changeset) => {
f.stream_position()?;
Ok(Some(changeset))
}
Err(e) => {
if let bincode::ErrorKind::Io(inner) = &*e {
if inner.kind() == io::ErrorKind::UnexpectedEof {
let eof = f.seek(io::SeekFrom::End(0))?;
if pos == eof {
return Ok(None);
}
}
}
f.seek(io::SeekFrom::Start(pos))?;
Err(IterError::Bincode(*e))
}
}
};

let result = read_one(self.db_file.as_mut()?, self.start_pos.take());
if result.is_err() {
self.db_file = None;
}
result.transpose()
}
}

impl From<io::Error> for IterError {
fn from(value: io::Error) -> Self {
IterError::Io(value)
}
}

/// Error type for [`EntryIter`].
#[derive(Debug)]
pub enum IterError {
/// Failure to read from the file.
Io(io::Error),
/// Failure to decode data from the file.
Bincode(bincode::ErrorKind),
}

impl core::fmt::Display for IterError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
IterError::Io(e) => write!(f, "io error trying to read entry {}", e),
IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e),
}
}
}

impl std::error::Error for IterError {}
Loading