Skip to content
This repository was archived by the owner on Apr 9, 2024. It is now read-only.

refactor(DB): rollback tx when failed rather than close connection #319

Merged
merged 1 commit into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/storage/src/relational/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common::{
PaginationResponse, Range, Result,
};
use common_logger::{tracing, tracing_async};
use db_xsql::{rbatis::Bytes as RbBytes, XSQLPool};
use db_xsql::{commit_transaction, rbatis::Bytes as RbBytes, XSQLPool};
use protocol::db::{DBDriver, DBInfo, SimpleBlock, SimpleTransaction, TransactionWrapper};

use ckb_types::core::{BlockNumber, BlockView, HeaderView};
Expand Down Expand Up @@ -51,7 +51,7 @@ impl Storage for RelationalStorage {
self.insert_transaction_table(ctx.clone(), &block, &mut tx)
.await?;

tx.commit().await?;
commit_transaction(tx).await?;
Ok(())
}

Expand All @@ -69,7 +69,7 @@ impl Storage for RelationalStorage {
.await?;
self.remove_block_table(ctx.clone(), block_number, block_hash, &mut tx)
.await?;
tx.commit().await?;
commit_transaction(tx).await?;

Ok(())
}
Expand Down Expand Up @@ -545,7 +545,7 @@ impl Storage for RelationalStorage {
let res = self
.insert_registered_address_table(addresses, &mut tx)
.await?;
tx.commit().await?;
commit_transaction(tx).await?;

Ok(res
.iter()
Expand Down
9 changes: 3 additions & 6 deletions core/synchronization/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use core_storage::relational::table::{
IO_TYPE_INPUT, IO_TYPE_OUTPUT,
};
use core_storage::relational::{generate_id, to_rb_bytes, BATCH_SIZE_THRESHOLD};
use db_xsql::{rbatis::crud::CRUDMut, XSQLPool};
use db_xsql::{commit_transaction, rbatis::crud::CRUDMut, XSQLPool};

use ckb_types::core::{BlockNumber, BlockView};
use ckb_types::prelude::*;
Expand Down Expand Up @@ -335,9 +335,8 @@ async fn sync_indexer_cell(task: Vec<BlockNumber>, rdb: XSQLPool) -> Result<()>
.for_each(|c| c.id = generate_id(c.block_number));
core_storage::save_batch_slice!(tx, indexer_cells, status_list);

tx.commit().await?;
commit_transaction(tx).await?;

let _ = tx.take_conn().unwrap().close().await;
free_one_task();

Ok(())
Expand Down Expand Up @@ -419,9 +418,7 @@ async fn sync_blocks<T: SyncAdapter>(
canonical_data_table_batch
);

tx.commit().await?;

let _ = tx.take_conn().unwrap().close().await;
commit_transaction(tx).await?;

Ok(())
}
Expand Down
11 changes: 10 additions & 1 deletion db/xsql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod page;

pub use rbatis;

use common::Result;
use common::{anyhow::anyhow, Result};
use protocol::db::DBDriver;

use log::LevelFilter;
Expand Down Expand Up @@ -146,6 +146,15 @@ impl XSQLPool {
}
}

pub async fn commit_transaction(mut tx: RBatisTxExecutor<'_>) -> Result<()> {
if tx.commit().await.is_err() {
tx.rollback().await?;
return Err(anyhow!("Commit transaction failed, transaction rollback!"));
}

Ok(())
}

fn build_url(
db_type: &str,
db_name: &str,
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[tokio::main(flavor = "multi_thread")]
async fn main() {
std::panic::set_hook(Box::new(move |info| {
println!("panic occurred {:?}", info);
log::error!("panic occurred {:?}", info);
std::process::exit(-1);
}));
Expand Down