From 616b40150ded71f57f650067fcbc5c99d7c343e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 4 Dec 2018 11:50:31 +0100 Subject: [PATCH] Switch from `parity-rocksdb` to upstream `rust-rocksdb` --- kvdb-rocksdb/Cargo.toml | 2 +- kvdb-rocksdb/src/lib.rs | 191 +++++++++++++++++++++++----------------- 2 files changed, 109 insertions(+), 84 deletions(-) diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index 0465be84a..11a651c7d 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -15,7 +15,7 @@ log = "0.4" num_cpus = "1.0" parking_lot = "0.6" regex = "1.0" -parity-rocksdb = "0.5" +rocksdb = "0.10" [dev-dependencies] tempdir = "0.3" diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index eceebbc43..30d9763a5 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -23,7 +23,7 @@ extern crate interleaved_ordered; extern crate num_cpus; extern crate parking_lot; extern crate regex; -extern crate parity_rocksdb; +extern crate rocksdb; #[cfg(test)] extern crate ethereum_types; @@ -36,9 +36,10 @@ use std::{cmp, fs, io, mem, result, error}; use std::path::Path; use parking_lot::{Mutex, MutexGuard, RwLock}; -use parity_rocksdb::{ - DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator, - Options, BlockBasedOptions, Direction, Cache, Column, ReadOptions +use rocksdb::{ + DB, WriteBatch, WriteOptions, IteratorMode, DBIterator, + Options, BlockBasedOptions, Direction, ReadOptions, ColumnFamily, + Error }; use interleaved_ordered::{interleave_ordered, InterleaveOrdered}; @@ -206,7 +207,7 @@ impl Default for DatabaseConfig { // pub struct DatabaseIterator<'a> { iter: InterleaveOrdered<::std::vec::IntoIter<(Box<[u8]>, Box<[u8]>)>, DBIterator>, - _marker: PhantomData<&'a Database>, + _marker: PhantomData<&'a ()>, } impl<'a> Iterator for DatabaseIterator<'a> { @@ -219,38 +220,50 @@ impl<'a> Iterator for DatabaseIterator<'a> { struct DBAndColumns { db: DB, - cfs: Vec, + cfs: Vec>, } // get column family configuration from database config. fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Result { - let mut opts = Options::new(); - - opts.set_parsed_options("level_compaction_dynamic_level_bytes=true").map_err(other_io_err)?; + let mut opts = Options::default(); opts.set_block_based_table_factory(block_opts); - opts.set_parsed_options( - &format!("block_based_table_factory={{{};{}}}", - "cache_index_and_filter_blocks=true", - "pin_l0_filter_and_index_blocks_in_cache=true")).map_err(other_io_err)?; - - opts.optimize_level_style_compaction(config.memory_budget_per_col() as i32); + opts.optimize_level_style_compaction(config.memory_budget_per_col()); opts.set_target_file_size_base(config.compaction.initial_file_size); - opts.set_parsed_options("compression_per_level=").map_err(other_io_err)?; - Ok(opts) } +/// Utility structure that makes the given type implement `Send + Sync`. +/// YOU NEED TO BE SURE WHAT YOU ARE DOING! +struct MakeSendSync(T); + +unsafe impl Send for MakeSendSync {} +unsafe impl Sync for MakeSendSync {} + +impl ::std::ops::Deref for MakeSendSync { + type Target = T; + + fn deref(&self) -> &T { + &self.0 + } +} + +impl From for MakeSendSync { + fn from(data: T) -> MakeSendSync { + MakeSendSync(data) + } +} + /// Key-Value database. pub struct Database { db: RwLock>, config: DatabaseConfig, - write_opts: WriteOptions, - read_opts: ReadOptions, - block_opts: BlockBasedOptions, path: String, + write_opts: MakeSendSync, + read_opts: MakeSendSync, + block_opts: MakeSendSync, // Dirty values added with `write_buffered`. Cleaned on `flush`. overlay: RwLock, KeyState>>>, // Values currently being flushed. Cleared when `flush` completes. @@ -261,9 +274,12 @@ pub struct Database { } #[inline] -fn check_for_corruption>(path: P, res: result::Result) -> io::Result { +fn check_for_corruption>( + path: P, + res: result::Result +) -> io::Result { if let Err(ref s) = res { - if s.starts_with("Corruption:") { + if is_corrupted(s) { warn!("DB corrupted: {}. Repair will be triggered on next restart", s); let _ = fs::File::create(path.as_ref().join(Database::CORRUPTION_FILE_NAME)); } @@ -272,8 +288,26 @@ fn check_for_corruption>(path: P, res: result::Result bool { - s.starts_with("Corruption:") || s.starts_with("Invalid argument: You have to open all column families") +fn is_corrupted(err: &Error) -> bool { + err.as_ref().starts_with("Corruption:") + || err.as_ref().starts_with("Invalid argument: You have to open all column families") +} + +/// Generate the options for RocksDB, based on the given `DatabaseConfig`. +fn generate_options(config: &DatabaseConfig) -> Options { + let mut opts = Options::default(); + + //TODO: rate_limiter_bytes_per_sec={} was removed + + opts.set_use_fsync(false); + opts.create_if_missing(true); + opts.set_max_open_files(config.max_open_files); + opts.set_bytes_per_sync(1048576); + //TODO: keep_log_file_num=1 was removed + opts.set_write_buffer_size(config.memory_budget_per_col() / 2); + opts.increase_parallelism(cmp::max(1, ::num_cpus::get() as i32 / 2)); + + opts } impl Database { @@ -286,33 +320,16 @@ impl Database { /// Open database file. Creates if it does not exist. pub fn open(config: &DatabaseConfig, path: &str) -> io::Result { - let mut opts = Options::new(); - - if let Some(rate_limit) = config.compaction.write_rate_limit { - opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit)).map_err(other_io_err)?; - } - opts.set_use_fsync(false); - opts.create_if_missing(true); - opts.set_max_open_files(config.max_open_files); - opts.set_parsed_options("keep_log_file_num=1").map_err(other_io_err)?; - opts.set_parsed_options("bytes_per_sync=1048576").map_err(other_io_err)?; - opts.set_db_write_buffer_size(config.memory_budget_per_col() / 2); - opts.increase_parallelism(cmp::max(1, ::num_cpus::get() as i32 / 2)); - - let mut block_opts = BlockBasedOptions::new(); - - { - block_opts.set_block_size(config.compaction.block_size); - let cache_size = cmp::max(8, config.memory_budget() / 3); - let cache = Cache::new(cache_size); - block_opts.set_cache(cache); - } + let mut block_opts = BlockBasedOptions::default(); + block_opts.set_block_size(config.compaction.block_size); + let cache_size = cmp::max(8, config.memory_budget() / 3); + block_opts.set_lru_cache(cache_size); // attempt database repair if it has been previously marked as corrupted let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME); if db_corrupted.exists() { warn!("DB has been previously marked as corrupted, attempting repair"); - DB::repair(&opts, path).map_err(other_io_err)?; + DB::repair(generate_options(config), path).map_err(other_io_err)?; fs::remove_file(db_corrupted)?; } @@ -327,13 +344,14 @@ impl Database { } let write_opts = WriteOptions::new(); - let mut read_opts = ReadOptions::new(); - read_opts.set_verify_checksums(false); + let read_opts = ReadOptions::default(); + //TODO: removed read_opts.set_verify_checksums(false); - let mut cfs: Vec = Vec::new(); + let opts = generate_options(config); + let mut cfs: Vec = Vec::new(); let db = match config.columns { Some(_) => { - match DB::open_cf(&opts, path, &cfnames, &cf_options) { + match DB::open_cf(&opts, path, &cfnames) { Ok(db) => { cfs = cfnames.iter().map(|n| db.cf_handle(n) .expect("rocksdb opens a cf_handle for each cfname; qed")).collect(); @@ -341,7 +359,7 @@ impl Database { } Err(_) => { // retry and create CFs - match DB::open_cf(&opts, path, &[], &[]) { + match DB::open_cf(&opts, path, &[]) { Ok(mut db) => { cfs = cfnames.iter() .enumerate() @@ -362,33 +380,32 @@ impl Database { Ok(db) => db, Err(ref s) if is_corrupted(s) => { warn!("DB corrupted: {}, attempting repair", s); - DB::repair(&opts, path).map_err(other_io_err)?; - - match cfnames.is_empty() { - true => DB::open(&opts, path).map_err(other_io_err)?, - false => { - let db = DB::open_cf(&opts, path, &cfnames, &cf_options).map_err(other_io_err)?; - cfs = cfnames.iter().map(|n| db.cf_handle(n) - .expect("rocksdb opens a cf_handle for each cfname; qed")).collect(); - db - }, + DB::repair(generate_options(config), path).map_err(other_io_err)?; + + if cfnames.is_empty() { + DB::open(&opts, path).map_err(other_io_err)? + } else { + let db = DB::open_cf(&opts, path, &cfnames).map_err(other_io_err)?; + cfs = cfnames.iter().map(|n| db.cf_handle(n) + .expect("rocksdb opens a cf_handle for each cfname; qed")).collect(); + db } - }, + } Err(s) => { return Err(other_io_err(s)) } }; let num_cols = cfs.len(); Ok(Database { - db: RwLock::new(Some(DBAndColumns{ db: db, cfs: cfs })), + db: RwLock::new(Some(DBAndColumns{ db, cfs: cfs.into_iter().map(Into::into).collect() })), config: config.clone(), - write_opts: write_opts, overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), flushing_lock: Mutex::new(false), path: path.to_owned(), - read_opts: read_opts, - block_opts: block_opts, + read_opts: read_opts.into(), + write_opts: write_opts.into(), + block_opts: block_opts.into(), }) } @@ -423,7 +440,7 @@ impl Database { fn write_flushing_with_lock(&self, _lock: &mut MutexGuard) -> io::Result<()> { match *self.db.read() { Some(DBAndColumns { ref db, ref cfs }) => { - let batch = WriteBatch::new(); + let mut batch = WriteBatch::default(); mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); { for (c, column) in self.flushing.read().iter().enumerate() { @@ -431,14 +448,14 @@ impl Database { match *state { KeyState::Delete => { if c > 0 { - batch.delete_cf(cfs[c - 1], key).map_err(other_io_err)?; + batch.delete_cf(*cfs[c - 1], key).map_err(other_io_err)?; } else { batch.delete(key).map_err(other_io_err)?; } }, KeyState::Insert(ref value) => { if c > 0 { - batch.put_cf(cfs[c - 1], key, value).map_err(other_io_err)?; + batch.put_cf(*cfs[c - 1], key, value).map_err(other_io_err)?; } else { batch.put(key, value).map_err(other_io_err)?; } @@ -448,9 +465,7 @@ impl Database { } } - check_for_corruption( - &self.path, - db.write_opt(batch, &self.write_opts))?; + check_for_corruption(&self.path, db.write_opt(batch, &self.write_opts))?; for column in self.flushing.write().iter_mut() { column.clear(); @@ -481,7 +496,7 @@ impl Database { pub fn write(&self, tr: DBTransaction) -> io::Result<()> { match *self.db.read() { Some(DBAndColumns { ref db, ref cfs }) => { - let batch = WriteBatch::new(); + let mut batch = WriteBatch::default(); let ops = tr.ops; for op in ops { // remove any buffered operation for this key @@ -490,11 +505,11 @@ impl Database { match op { DBOp::Insert { col, key, value } => match col { None => batch.put(&key, &value).map_err(other_io_err)?, - Some(c) => batch.put_cf(cfs[c as usize], &key, &value).map_err(other_io_err)?, + Some(c) => batch.put_cf(*cfs[c as usize], &key, &value).map_err(other_io_err)?, }, DBOp::Delete { col, key } => match col { None => batch.delete(&key).map_err(other_io_err)?, - Some(c) => batch.delete_cf(cfs[c as usize], &key).map_err(other_io_err)?, + Some(c) => batch.delete_cf(*cfs[c as usize], &key).map_err(other_io_err)?, } } } @@ -520,8 +535,13 @@ impl Database { Some(&KeyState::Delete) => Ok(None), None => { col.map_or_else( - || db.get_opt(key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))), - |c| db.get_cf_opt(cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v)))) + || db + .get_opt(key, &self.read_opts) + .map(|r| r.map(|v| DBValue::from_slice(&v))), + |c| db + .get_cf_opt(*cfs[c as usize], key, &self.read_opts) + .map(|r| r.map(|v| DBValue::from_slice(&v))) + ) .map_err(other_io_err) }, } @@ -552,14 +572,19 @@ impl Database { let mut overlay_data = overlay.iter() .filter_map(|(k, v)| match *v { KeyState::Insert(ref value) => - Some((k.clone().into_vec().into_boxed_slice(), value.clone().into_vec().into_boxed_slice())), + Some( + ( + k.clone().into_vec().into_boxed_slice(), + value.clone().into_vec().into_boxed_slice() + ) + ), KeyState::Delete => None, }).collect::>(); overlay_data.sort(); let iter = col.map_or_else( - || db.iterator_opt(IteratorMode::Start, &self.read_opts), - |c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts) + || db.iterator(IteratorMode::Start), + |c| db.iterator_cf(*cfs[c as usize], IteratorMode::Start) .expect("iterator params are valid; qed") ); @@ -575,8 +600,8 @@ impl Database { fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Option { match *self.db.read() { Some(DBAndColumns { ref db, ref cfs }) => { - let iter = col.map_or_else(|| db.iterator_opt(IteratorMode::From(prefix, Direction::Forward), &self.read_opts), - |c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::From(prefix, Direction::Forward), &self.read_opts) + let iter = col.map_or_else(|| db.iterator(IteratorMode::From(prefix, Direction::Forward)), + |c| db.iterator_cf(*cfs[c as usize], IteratorMode::From(prefix, Direction::Forward)) .expect("iterator params are valid; qed")); Some(DatabaseIterator { @@ -657,7 +682,7 @@ impl Database { Some(DBAndColumns { ref mut db, ref mut cfs }) => { let col = cfs.len() as u32; let name = format!("col{}", col); - cfs.push(db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?); + cfs.push(db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?.into()); Ok(()) }, None => Ok(()),