diff --git a/common/src/lib.rs b/common/src/lib.rs index b0d714a6d..2d4157726 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -209,14 +209,26 @@ impl PaginationRequest { } } - pub fn set_order(mut self, order: Order) -> Self { + pub fn order(mut self, order: Order) -> Self { + self.set_order(order); + self + } + + pub fn set_order(&mut self, order: Order) { self.order = order; + } + + pub fn limit(mut self, limit: Option) -> Self { + self.set_limit(limit); self } - pub fn set_limit(mut self, limit: Option) -> Self { + pub fn set_limit(&mut self, limit: Option) { self.limit = limit; - self + } + + pub fn update_by_response(&mut self, response: PaginationResponse) { + self.cursor = response.next_cursor; } } @@ -227,6 +239,22 @@ pub struct PaginationResponse { pub count: Option, } +impl PaginationResponse { + pub fn new(response: Vec) -> Self { + Self { + response, + next_cursor: None, + count: None, + } + } +} + +impl Default for PaginationResponse { + fn default() -> Self { + Self::new(vec![]) + } +} + #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct DetailedCell { pub epoch_number: u64, diff --git a/core/cli/src/config.rs b/core/cli/src/config.rs index 5aaa5debb..0f2ab1668 100644 --- a/core/cli/src/config.rs +++ b/core/cli/src/config.rs @@ -116,6 +116,9 @@ pub struct MercuryConfig { #[serde(default = "default_extensions_config")] pub extensions_config: Vec, + + #[serde(default = "default_pool_cache_size")] + pub pool_cache_size: u64, } impl MercuryConfig { @@ -205,6 +208,10 @@ fn default_extensions_config() -> Vec { vec![] } +fn default_pool_cache_size() -> u64 { + 100u64 +} + fn default_file_size_limit() -> u64 { 1073741824 // 1GiB } diff --git a/core/cli/src/lib.rs b/core/cli/src/lib.rs index 1511a7c81..2dde3dcb8 100644 --- a/core/cli/src/lib.rs +++ b/core/cli/src/lib.rs @@ -124,6 +124,7 @@ impl<'a> Cli<'a> { self.parse_cmd_args("ckb_uri", self.config.network_config.ckb_uri.clone()), self.config.cheque_since, LevelFilter::from_str(&self.config.db_config.db_log_level).unwrap(), + self.config.pool_cache_size, ); let stop_handle = service diff --git a/core/rpc/core/src/impl.rs b/core/rpc/core/src/impl.rs index e50c62725..134285e0e 100644 --- a/core/rpc/core/src/impl.rs +++ b/core/rpc/core/src/impl.rs @@ -62,6 +62,7 @@ pub struct MercuryRpcImpl { cheque_timeout: RationalU256, cellbase_maturity: RationalU256, sync_state: Arc>, + pool_cache_size: u64, } #[async_trait] @@ -277,6 +278,7 @@ impl MercuryRpcImpl { cheque_timeout: RationalU256, cellbase_maturity: RationalU256, sync_state: Arc>, + pool_cache_size: u64, ) -> Self { SECP256K1_CODE_HASH.swap(Arc::new( builtin_scripts @@ -332,6 +334,7 @@ impl MercuryRpcImpl { cheque_timeout, cellbase_maturity, sync_state, + pool_cache_size, } } } diff --git a/core/rpc/core/src/impl/adjust_account.rs b/core/rpc/core/src/impl/adjust_account.rs index 4e2550d20..6c42cabd4 100644 --- a/core/rpc/core/src/impl/adjust_account.rs +++ b/core/rpc/core/src/impl/adjust_account.rs @@ -11,7 +11,9 @@ use core_rpc_types::{ use common::hash::blake2b_256_to_160; use common::utils::decode_udt_amount; -use common::{Address, AddressPayload, Context, DetailedCell, ACP, SECP256K1, SUDT}; +use common::{ + Address, AddressPayload, Context, DetailedCell, PaginationRequest, ACP, SECP256K1, SUDT, +}; use common_logger::tracing_async; use ckb_types::core::TransactionView; @@ -48,6 +50,7 @@ impl MercuryRpcImpl { Some((**ACP_CODE_HASH.load()).clone()), None, false, + &mut PaginationRequest::default(), ) .await?; let live_acps_len = live_acps.len(); diff --git a/core/rpc/core/src/impl/build_tx.rs b/core/rpc/core/src/impl/build_tx.rs index 80c6537a5..bae7d99bf 100644 --- a/core/rpc/core/src/impl/build_tx.rs +++ b/core/rpc/core/src/impl/build_tx.rs @@ -3,21 +3,21 @@ use crate::{error::CoreError, InnerResult, MercuryRpcImpl}; use common::hash::blake2b_256_to_160; use common::utils::decode_udt_amount; -use common::{Address, Context, DetailedCell, ACP, CHEQUE, DAO, SECP256K1, SUDT}; +use common::{ + Address, Context, DetailedCell, PaginationRequest, ACP, CHEQUE, DAO, SECP256K1, SUDT, +}; use common_logger::tracing_async; use core_ckb_client::CkbRpc; use core_rpc_types::consts::{ BYTE_SHANNONS, CHEQUE_CELL_CAPACITY, DEFAULT_FEE_RATE, INIT_ESTIMATE_FEE, MAX_ITEM_NUM, - MIN_CKB_CAPACITY, MIN_DAO_CAPACITY, STANDARD_SUDT_CAPACITY, -}; -use core_rpc_types::lazy::{ - ACP_CODE_HASH, CHEQUE_CODE_HASH, CURRENT_EPOCH_NUMBER, SECP256K1_CODE_HASH, + MIN_CKB_CAPACITY, MIN_DAO_CAPACITY, }; +use core_rpc_types::lazy::{ACP_CODE_HASH, CURRENT_EPOCH_NUMBER, SECP256K1_CODE_HASH}; use core_rpc_types::{ AssetInfo, AssetType, DaoClaimPayload, DaoDepositPayload, DaoWithdrawPayload, ExtraType, From, - GetBalancePayload, HashAlgorithm, Item, JsonItem, Mode, Ownership, RequiredUDT, SignAlgorithm, - SignatureAction, SimpleTransferPayload, SinceConfig, SinceFlag, SinceType, Source, - SudtIssuePayload, To, ToInfo, TransactionCompletionResponse, TransferPayload, UDTInfo, + GetBalancePayload, HashAlgorithm, Item, JsonItem, Mode, SignAlgorithm, SignatureAction, + SimpleTransferPayload, SinceConfig, SinceFlag, SinceType, Source, SudtIssuePayload, To, ToInfo, + TransactionCompletionResponse, TransferPayload, }; use core_storage::Storage; @@ -25,8 +25,7 @@ use ckb_jsonrpc_types::TransactionView as JsonTransactionView; use ckb_types::core::{ EpochNumberWithFraction, ScriptHashType, TransactionBuilder, TransactionView, }; -use ckb_types::{bytes::Bytes, constants::TX_VERSION, packed, prelude::*, H160, H256, U256}; -use num_traits::Zero; +use ckb_types::{bytes::Bytes, constants::TX_VERSION, packed, prelude::*, H160, U256}; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; @@ -68,78 +67,6 @@ impl MercuryRpcImpl { .await } - #[tracing_async] - async fn prebuild_dao_deposit_transaction_deprecated( - &self, - ctx: Context, - payload: DaoDepositPayload, - fixed_fee: u64, - ) -> InnerResult<(TransactionView, Vec, usize)> { - let mut inputs = Vec::new(); - let (mut outputs, mut cells_data) = (vec![], vec![]); - let mut script_set = HashSet::new(); - let mut signature_entries = HashMap::new(); - let mut input_index = 0; - - // pool - let mut items = vec![]; - for json_item in payload.from.items.clone() { - let item = Item::try_from(json_item)?; - items.push(item) - } - let change_fee_cell_index = self - .build_required_ckb_and_change_tx_part( - ctx.clone(), - items.clone(), - Some(payload.from.source), - payload.amount + fixed_fee, - None, - None, - &mut inputs, - &mut script_set, - &mut signature_entries, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - - // build output deposit cell - let deposit_address = match payload.to { - Some(address) => match Address::from_str(&address) { - Ok(address) => address, - Err(error) => return Err(CoreError::InvalidRpcParams(error).into()), - }, - None => self.get_secp_address_by_item(items[0].clone())?, - }; - let type_script = self - .get_script_builder(DAO)? - .hash_type(ScriptHashType::Type.into()) - .build(); - let output_deposit = packed::CellOutputBuilder::default() - .capacity(payload.amount.pack()) - .lock(deposit_address.payload().into()) - .type_(Some(type_script).pack()) - .build(); - let output_data_deposit: packed::Bytes = Bytes::from(vec![0u8; 8]).pack(); - outputs.push(output_deposit); - cells_data.push(output_data_deposit); - - // build resp - let inputs = self._build_tx_cell_inputs(&inputs, None, Source::Free)?; - script_set.insert(DAO.to_string()); - self.prebuild_tx_complete( - inputs, - outputs, - cells_data, - script_set, - vec![], - signature_entries, - HashMap::new(), - ) - .map(|(tx_view, signature_actions)| (tx_view, signature_actions, change_fee_cell_index)) - } - #[tracing_async] async fn prebuild_dao_deposit_transaction( &self, @@ -205,158 +132,6 @@ impl MercuryRpcImpl { .await } - #[tracing_async] - async fn prebuild_dao_withdraw_transaction_deprecated( - &self, - ctx: Context, - payload: DaoWithdrawPayload, - estimate_fee: u64, - ) -> InnerResult<(TransactionView, Vec, usize)> { - let item = Item::try_from(payload.clone().from)?; - let pay_item = match payload.clone().pay_fee { - Some(pay_fee) => Item::Address(pay_fee), - None => item.clone(), - }; - - // pool ckb for fee - let mut input_cells = Vec::new(); - let mut script_set = HashSet::new(); - let mut signature_actions = HashMap::new(); - let mut input_index = 0; - - self.pool_live_cells_by_items( - ctx.clone(), - vec![pay_item.clone()], - MIN_CKB_CAPACITY + estimate_fee, - vec![], - None, - &mut 0, - &mut input_cells, - &mut script_set, - &mut signature_actions, - &mut input_index, - ) - .await?; - - // build output change cell - let change_fee_cell_index = 0; - let pay_cell_capacity: u64 = input_cells[change_fee_cell_index] - .cell_output - .capacity() - .unpack(); - let change_address = self.get_secp_address_by_item(pay_item.clone())?; - let output_change = packed::CellOutputBuilder::default() - .capacity((pay_cell_capacity - estimate_fee).pack()) - .lock(change_address.payload().into()) - .build(); - - // This check ensures that only one pay fee cell is placed first in the input - // and the change cell is placed first in the output, - // so that the index of each input deposit cell - // and the corresponding withdrawing cell are the same, - // which meets the withdrawing tx(phase I) requirements - if input_cells.len() > 1 { - return Err(CoreError::CannotFindChangeCell.into()); - } - - // get deposit cells - let mut asset_ckb_set = HashSet::new(); - asset_ckb_set.insert(AssetInfo::new_ckb()); - let cells = self - .get_live_cells_by_item( - ctx.clone(), - item.clone(), - asset_ckb_set.clone(), - None, - None, - Some((**SECP256K1_CODE_HASH.load()).clone()), - Some(ExtraType::Dao), - false, - ) - .await?; - - let tip_epoch_number = (**CURRENT_EPOCH_NUMBER.load()).clone(); - let deposit_cells = cells - .into_iter() - .filter(|cell| cell.cell_data == Box::new([0u8; 8]).to_vec()) - .filter(|cell| { - (EpochNumberWithFraction::from_full_value(cell.epoch_number).to_rational() - + U256::from(4u64)) - < tip_epoch_number - }) - .collect::>(); - if deposit_cells.is_empty() { - return Err(CoreError::CannotFindDepositCell.into()); - } - - // build header_deps - let mut header_deps = HashSet::new(); - for cell in &deposit_cells { - header_deps.insert(cell.block_hash.pack()); - } - let header_deps: Vec = header_deps.into_iter().collect(); - - // build inputs - input_cells.extend_from_slice(&deposit_cells); - let inputs = self._build_tx_cell_inputs(&input_cells, None, Source::Free)?; - - // build output withdrawing cells - let mut outputs_withdraw: Vec = deposit_cells - .iter() - .map(|cell| { - let cell_output = &cell.cell_output; - packed::CellOutputBuilder::default() - .capacity(cell_output.capacity()) - .lock(cell_output.lock()) - .type_(cell_output.type_()) - .build() - }) - .collect(); - let mut outputs_data_withdraw: Vec = deposit_cells - .iter() - .map(|cell| { - let data: packed::Uint64 = cell.block_number.pack(); - data.as_bytes().pack() - }) - .collect(); - - // build outputs - let (mut outputs, mut cells_data) = (vec![output_change], vec![Default::default()]); - outputs.append(&mut outputs_withdraw); - cells_data.append(&mut outputs_data_withdraw); - - // add signatures - // let cell_sigs: Vec<&SignatureEntry> = signature_entries.iter().map(|(_, s)| s).collect(); - // let mut last_index = cell_sigs[0].index; // ensure there is only one sig of pay fee cell - let address = self.get_secp_address_by_item(item)?; - for cell in deposit_cells { - let lock_hash = cell.cell_output.calc_lock_hash().to_string(); - utils::add_signature_action( - address.to_string(), - lock_hash, - SignAlgorithm::Secp256k1, - HashAlgorithm::Blake2b, - &mut signature_actions, - input_index, - ); - input_index += 1; - } - - // build resp - script_set.insert(DAO.to_string()); - - self.prebuild_tx_complete( - inputs, - outputs, - cells_data, - script_set, - header_deps, - signature_actions, - HashMap::new(), - ) - .map(|(tx_view, signature_actions)| (tx_view, signature_actions, change_fee_cell_index)) - } - #[tracing_async] async fn prebuild_dao_withdraw_transaction( &self, @@ -382,6 +157,7 @@ impl MercuryRpcImpl { Some((**SECP256K1_CODE_HASH.load()).clone()), Some(ExtraType::Dao), false, + &mut PaginationRequest::default(), ) .await?; @@ -521,6 +297,7 @@ impl MercuryRpcImpl { Some((**SECP256K1_CODE_HASH.load()).clone()), Some(ExtraType::Dao), false, + &mut PaginationRequest::default(), ) .await?; let tip_epoch_number = (**CURRENT_EPOCH_NUMBER.load()).clone(); @@ -670,47 +447,6 @@ impl MercuryRpcImpl { .map(|(tx_view, signature_actions)| (tx_view, signature_actions, change_cell_index)) } - #[tracing_async] - pub(crate) async fn inner_build_transfer_transaction_deprecated( - &self, - ctx: Context, - payload: TransferPayload, - ) -> InnerResult { - if payload.from.items.is_empty() || payload.to.to_infos.is_empty() { - return Err(CoreError::NeedAtLeastOneFromAndOneTo.into()); - } - if payload.from.items.len() > MAX_ITEM_NUM || payload.to.to_infos.len() > MAX_ITEM_NUM { - return Err(CoreError::ExceedMaxItemNum.into()); - } - utils::check_same_enum_value(payload.from.items.iter().collect())?; - let mut payload = payload; - payload.from.items = utils::dedup_json_items(payload.from.items); - - for to_info in &payload.to.to_infos { - match u128::from_str(&to_info.amount) { - Ok(amount) => { - if amount == 0u128 { - return Err(CoreError::TransferAmountMustPositive.into()); - } - } - Err(_) => { - return Err(CoreError::InvalidRpcParams( - "To amount should be a valid u128 number".to_string(), - ) - .into()); - } - } - } - - self.build_transaction_with_adjusted_fee( - Self::prebuild_transfer_transaction_deprecated, - ctx, - payload.clone(), - payload.fee_rate, - ) - .await - } - #[tracing_async] pub(crate) async fn inner_build_transfer_transaction( &self, @@ -760,33 +496,6 @@ impl MercuryRpcImpl { .await } - #[tracing_async] - async fn prebuild_transfer_transaction_deprecated( - &self, - ctx: Context, - payload: TransferPayload, - fixed_fee: u64, - ) -> InnerResult<(TransactionView, Vec, usize)> { - match (&payload.asset_info.asset_type, &payload.to.mode) { - (AssetType::CKB, Mode::HoldByFrom) => { - self.prebuild_secp_transfer_transaction(ctx.clone(), payload, fixed_fee) - .await - } - (AssetType::CKB, Mode::HoldByTo) => { - self.prebuild_acp_transfer_transaction_with_ckb(ctx.clone(), payload, fixed_fee) - .await - } - (AssetType::UDT, Mode::HoldByFrom) => { - self.prebuild_cheque_transfer_transaction(ctx.clone(), payload, fixed_fee) - .await - } - (AssetType::UDT, Mode::HoldByTo) => { - self.prebuild_acp_transfer_transaction_with_udt(ctx.clone(), payload, fixed_fee) - .await - } - } - } - #[tracing_async] async fn prebuild_transfer_transaction( &self, @@ -814,132 +523,6 @@ impl MercuryRpcImpl { } } - #[tracing_async] - async fn prebuild_secp_transfer_transaction( - &self, - ctx: Context, - payload: TransferPayload, - fixed_fee: u64, - ) -> InnerResult<(TransactionView, Vec, usize)> { - let mut script_set = HashSet::new(); - let (mut outputs, mut cells_data) = (vec![], vec![]); - let mut signature_actions: HashMap = HashMap::new(); - let mut change_fee_cell_index = 0usize; - let mut input_index = 0; - - // tx part I: build pay fee input and change output - let mut inputs_part_1 = vec![]; - let mut required_ckb_part_1 = 0; - - if let Some(ref pay_address) = payload.pay_fee { - let items = vec![Item::Identity(utils::address_to_identity(pay_address)?)]; - required_ckb_part_1 += fixed_fee; - change_fee_cell_index = self - .build_required_ckb_and_change_tx_part( - ctx.clone(), - items, - None, - required_ckb_part_1, - None, - None, - &mut inputs_part_1, - &mut script_set, - &mut signature_actions, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - } - - // tx part II - let mut inputs_part_2 = vec![]; - let mut required_ckb_part_2 = 0; - - // build the outputs - for to in &payload.to.to_infos { - let capacity = to - .amount - .parse::() - .map_err(|err| CoreError::InvalidRpcParams(err.to_string()))?; - if capacity < MIN_CKB_CAPACITY { - return Err(CoreError::RequiredCKBLessThanMin.into()); - } - let item = Item::Address(to.address.to_owned()); - let secp_address = self.get_secp_address_by_item(item)?; - required_ckb_part_2 += capacity; - self.build_cell_for_output( - capacity, - secp_address.payload().into(), - None, - None, - &mut outputs, - &mut cells_data, - )?; - } - - // build the inputs and the change cell - let mut items = vec![]; - for json_item in &payload.from.items { - let item = Item::try_from(json_item.to_owned())?; - items.push(item) - } - if required_ckb_part_1.is_zero() { - change_fee_cell_index = self - .build_required_ckb_and_change_tx_part( - ctx.clone(), - items, - Some(payload.from.source.clone()), - required_ckb_part_2 + fixed_fee, - payload.change, - None, - &mut inputs_part_2, - &mut script_set, - &mut signature_actions, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - } else { - self.build_required_ckb_and_change_tx_part( - ctx.clone(), - items, - Some(payload.from.source.clone()), - required_ckb_part_2, - payload.change, - None, - &mut inputs_part_2, - &mut script_set, - &mut signature_actions, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - }; - - // build resp - let mut inputs = vec![]; - inputs.append(&mut inputs_part_1); - inputs.append(&mut inputs_part_2); - let inputs = self._build_tx_cell_inputs( - &inputs, - payload.since.clone(), - payload.from.source.clone(), - )?; - self.prebuild_tx_complete( - inputs, - outputs, - cells_data, - script_set, - vec![], - signature_actions, - HashMap::new(), - ) - .map(|(tx_view, signature_actions)| (tx_view, signature_actions, change_fee_cell_index)) - } - #[tracing_async] async fn prebuild_ckb_secp_transfer_transaction( &self, @@ -974,163 +557,13 @@ impl MercuryRpcImpl { ctx.clone(), map_json_items(payload.from.items)?, payload.since, - map_option_address_to_identity(payload.pay_fee)?, - payload.change, - payload.from.source, - fixed_fee, - transfer_components, - ) - .await - } - - #[tracing_async] - async fn prebuild_acp_transfer_transaction_with_ckb( - &self, - ctx: Context, - payload: TransferPayload, - fixed_fee: u64, - ) -> InnerResult<(TransactionView, Vec, usize)> { - let mut script_set = HashSet::new(); - let (mut outputs, mut cells_data) = (vec![], vec![]); - let mut signature_actions: HashMap = HashMap::new(); - let mut change_fee_cell_index = 0usize; - let mut input_index = 0; - - // tx part I: build pay fee input and change output - let mut inputs_part_1 = vec![]; - let mut required_ckb_part_1 = 0; - - if let Some(ref pay_address) = payload.pay_fee { - let items = vec![Item::Identity(utils::address_to_identity(pay_address)?)]; - required_ckb_part_1 += fixed_fee; - change_fee_cell_index = self - .build_required_ckb_and_change_tx_part( - ctx.clone(), - items, - None, - required_ckb_part_1, - None, - None, - &mut inputs_part_1, - &mut script_set, - &mut signature_actions, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - } - - // tx part II: build acp inputs and outputs - let mut required_ckb_part_2 = 0; - let mut inputs_part_2 = vec![]; - - for to in &payload.to.to_infos { - let item = Item::Identity(utils::address_to_identity(&to.address)?); - - // build acp input - let asset_set = HashSet::new(); - let live_acps = self - .get_live_cells_by_item( - ctx.clone(), - item.clone(), - asset_set, - None, - None, - Some((**ACP_CODE_HASH.load()).clone()), - None, - false, - ) - .await?; - if live_acps.is_empty() { - return Err(CoreError::CannotFindACPCell.into()); - } - inputs_part_2.push(live_acps[0].clone()); - input_index += 1; - - // build acp output - let current_capacity: u64 = live_acps[0].cell_output.capacity().unpack(); - let current_udt_amount = decode_udt_amount(&live_acps[0].cell_data); - let required_capacity = to - .amount - .parse::() - .map_err(|err| CoreError::InvalidRpcParams(err.to_string()))?; - self.build_cell_for_output( - current_capacity + required_capacity, - live_acps[0].cell_output.lock(), - live_acps[0].cell_output.type_().to_opt(), - Some(current_udt_amount), - &mut outputs, - &mut cells_data, - )?; - - script_set.insert(ACP.to_string()); - - required_ckb_part_2 += required_capacity; - } - - // tx part III: - let mut from_items = vec![]; - for json_item in payload.from.items { - let item = Item::try_from(json_item)?; - from_items.push(item) - } - let mut inputs_part_3 = vec![]; - if required_ckb_part_1.is_zero() { - change_fee_cell_index = self - .build_required_ckb_and_change_tx_part( - ctx.clone(), - from_items, - Some(payload.from.source.clone()), - required_ckb_part_2 + fixed_fee, - payload.change, - None, - &mut inputs_part_3, - &mut script_set, - &mut signature_actions, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - } else { - self.build_required_ckb_and_change_tx_part( - ctx.clone(), - from_items, - Some(payload.from.source.clone()), - required_ckb_part_2, - payload.change, - None, - &mut inputs_part_3, - &mut script_set, - &mut signature_actions, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - }; - - // build resp - let mut inputs = vec![]; - inputs.append(&mut inputs_part_1); - inputs.append(&mut inputs_part_2); - inputs.append(&mut inputs_part_3); - let inputs = self._build_tx_cell_inputs( - &inputs, - payload.since.clone(), - payload.from.source.clone(), - )?; - self.prebuild_tx_complete( - inputs, - outputs, - cells_data, - script_set, - vec![], - signature_actions, - HashMap::new(), + map_option_address_to_identity(payload.pay_fee)?, + payload.change, + payload.from.source, + fixed_fee, + transfer_components, ) - .map(|(tx_view, signature_actions)| (tx_view, signature_actions, change_fee_cell_index)) + .await } #[tracing_async] @@ -1156,6 +589,7 @@ impl MercuryRpcImpl { Some((**ACP_CODE_HASH.load()).clone()), None, false, + &mut PaginationRequest::default().limit(Some(1)), ) .await?; if live_acps.is_empty() { @@ -1196,187 +630,6 @@ impl MercuryRpcImpl { .await } - #[tracing_async] - async fn prebuild_cheque_transfer_transaction( - &self, - ctx: Context, - payload: TransferPayload, - fixed_fee: u64, - ) -> InnerResult<(TransactionView, Vec, usize)> { - let mut script_set = HashSet::new(); - let (mut outputs, mut cells_data) = (vec![], vec![]); - let mut signature_actions: HashMap = HashMap::new(); - let mut change_fee_cell_index = 0usize; - let mut input_index = 0; - script_set.insert(SUDT.to_string()); - - // tx part I: build pay fee input and change output - let mut inputs_part_1 = vec![]; - let mut required_ckb_part_1 = 0; - - if let Some(ref pay_address) = payload.pay_fee { - let items = vec![Item::Identity(utils::address_to_identity(pay_address)?)]; - required_ckb_part_1 += fixed_fee; - change_fee_cell_index = self - .build_required_ckb_and_change_tx_part( - ctx.clone(), - items, - None, - required_ckb_part_1, - None, - None, - &mut inputs_part_1, - &mut script_set, - &mut signature_actions, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - } - - // tx part II: build cheque outputs - let mut inputs_part_2: Vec = vec![]; - let mut required_udt = 0; - let mut required_ckb_part_2 = 0; - - for to in &payload.to.to_infos { - let receiver_address = - Address::from_str(&to.address).map_err(CoreError::InvalidRpcParams)?; - if !receiver_address.is_secp256k1() { - return Err(CoreError::InvalidRpcParams( - "Every to address should be secp/256k1 address".to_string(), - ) - .into()); - } - - // build cheque output - let sudt_type_script = self - .build_sudt_type_script( - ctx.clone(), - blake2b_256_to_160(&payload.asset_info.udt_hash), - ) - .await?; - let to_udt_amount = to - .amount - .parse::() - .map_err(|err| CoreError::InvalidRpcParams(err.to_string()))?; - let sender_address = { - let json_item = &payload.from.items[0]; - let item = Item::try_from(json_item.to_owned())?; - self.get_secp_address_by_item(item)? - }; - let cheque_args = utils::build_cheque_args(receiver_address, sender_address); - let cheque_lock = self - .get_script_builder(CHEQUE)? - .args(cheque_args) - .hash_type(ScriptHashType::Type.into()) - .build(); - self.build_cell_for_output( - CHEQUE_CELL_CAPACITY, - cheque_lock, - Some(sudt_type_script), - Some(to_udt_amount), - &mut outputs, - &mut cells_data, - )?; - script_set.insert(CHEQUE.to_string()); - - required_udt += to_udt_amount; - required_ckb_part_2 += CHEQUE_CELL_CAPACITY; - } - - // tx_part III: pool udt - let mut pool_udt_amount: u128 = 0; - let mut inputs_part_3 = vec![]; - let mut from_items = vec![]; - for json_item in payload.from.items { - let item = Item::try_from(json_item)?; - from_items.push(item) - } - - self.build_required_udt_tx_part( - ctx.clone(), - from_items.clone(), - Some(payload.from.source.clone()), - payload.asset_info.udt_hash.clone(), - required_udt, - &mut pool_udt_amount, - &mut inputs_part_3, - &mut script_set, - &mut signature_actions, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - - // tx_part IV: pool ckb - let mut inputs_part_4 = vec![]; - if required_ckb_part_1.is_zero() { - change_fee_cell_index = self - .build_required_ckb_and_change_tx_part( - ctx.clone(), - from_items, - Some(payload.from.source.clone()), - required_ckb_part_2 + fixed_fee, - payload.change, - Some(UDTInfo { - asset_info: payload.asset_info, - amount: pool_udt_amount - required_udt, - }), - &mut inputs_part_4, - &mut script_set, - &mut signature_actions, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - } else { - self.build_required_ckb_and_change_tx_part( - ctx.clone(), - from_items, - Some(payload.from.source.clone()), - required_ckb_part_2, - payload.change, - Some(UDTInfo { - asset_info: payload.asset_info, - amount: pool_udt_amount - required_udt, - }), - &mut inputs_part_4, - &mut script_set, - &mut signature_actions, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - }; - - // build resp - let mut inputs = vec![]; - inputs.append(&mut inputs_part_1); - inputs.append(&mut inputs_part_2); - inputs.append(&mut inputs_part_3); - inputs.append(&mut inputs_part_4); - let inputs = self._build_tx_cell_inputs( - &inputs, - payload.since.clone(), - payload.from.source.clone(), - )?; - self.prebuild_tx_complete( - inputs, - outputs, - cells_data, - script_set, - vec![], - signature_actions, - HashMap::new(), - ) - .map(|(tx_view, signature_actions)| (tx_view, signature_actions, change_fee_cell_index)) - } - #[tracing_async] async fn prebuild_udt_cheque_transfer_transaction( &self, @@ -1459,230 +712,6 @@ impl MercuryRpcImpl { .await } - #[tracing_async] - async fn prebuild_acp_transfer_transaction_with_udt( - &self, - ctx: Context, - payload: TransferPayload, - fixed_fee: u64, - ) -> InnerResult<(TransactionView, Vec, usize)> { - let mut script_set = HashSet::new(); - let (mut outputs, mut cells_data) = (vec![], vec![]); - let mut signature_actions: HashMap = HashMap::new(); - let mut change_fee_cell_index = 0; - let mut input_index = 0; - script_set.insert(SUDT.to_string()); - - // tx part I: build pay fee input and change output - let mut inputs_part_1 = vec![]; - let mut required_ckb_part_1 = 0; - - if let Some(ref pay_address) = payload.pay_fee { - let items = vec![Item::Identity(utils::address_to_identity(pay_address)?)]; - required_ckb_part_1 += fixed_fee; - change_fee_cell_index = self - .build_required_ckb_and_change_tx_part( - ctx.clone(), - items, - None, - required_ckb_part_1, - None, - None, - &mut inputs_part_1, - &mut script_set, - &mut signature_actions, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - } - - // tx part II: build acp inputs and outputs - let mut required_udt = 0; - let mut inputs_part_2 = vec![]; - - for to in &payload.to.to_infos { - let item = Item::Identity(utils::address_to_identity(&to.address)?); - - // build acp input - let mut asset_set = HashSet::new(); - asset_set.insert(payload.asset_info.clone()); - let live_acps = self - .get_live_cells_by_item( - ctx.clone(), - item.clone(), - asset_set, - None, - None, - Some((**ACP_CODE_HASH.load()).clone()), - None, - false, - ) - .await?; - if live_acps.is_empty() { - return Err(CoreError::CannotFindACPCell.into()); - } - let existing_udt_amount = decode_udt_amount(&live_acps[0].cell_data); - inputs_part_2.push(live_acps[0].clone()); - input_index += 1; - script_set.insert(ACP.to_string()); - - // build acp output - let to_udt_amount = to - .amount - .parse::() - .map_err(|err| CoreError::InvalidRpcParams(err.to_string()))?; - self.build_cell_for_output( - live_acps[0].cell_output.capacity().unpack(), - live_acps[0].cell_output.lock(), - live_acps[0].cell_output.type_().to_opt(), - Some(existing_udt_amount + to_udt_amount), - &mut outputs, - &mut cells_data, - )?; - - required_udt += to_udt_amount; - } - - // tx part III: pool udt - let mut pool_udt_amount: u128 = 0; - let mut inputs_part_3 = vec![]; - let mut from_items = vec![]; - - for json_item in payload.from.items { - let item = Item::try_from(json_item)?; - from_items.push(item) - } - - self.pool_live_cells_by_items( - ctx.clone(), - from_items.clone(), - 0, - vec![RequiredUDT { - udt_hash: payload.asset_info.udt_hash.clone(), - amount_required: required_udt as i128, - }], - Some(payload.from.source.clone()), - &mut 0, - &mut inputs_part_3, - &mut script_set, - &mut signature_actions, - &mut input_index, - ) - .await?; - - for cell in &inputs_part_3 { - let udt_amount = decode_udt_amount(&cell.cell_data); - pool_udt_amount += udt_amount; - - let code_hash: H256 = cell.cell_output.lock().code_hash().unpack(); - if code_hash == **CHEQUE_CODE_HASH.load() { - let address = match self.generate_ckb_ownership(ctx.clone(), cell).await? { - Ownership::Address(address) => address, - Ownership::LockHash(_) => return Err(CoreError::CannotFindAddressByH160.into()), - }; - let address = Address::from_str(&address).map_err(CoreError::InvalidRpcParams)?; - let lock = address_to_script(address.payload()); - self.build_cell_for_output( - cell.cell_output.capacity().unpack(), - lock, - None, - None, - &mut outputs, - &mut cells_data, - )?; - } else if code_hash == **ACP_CODE_HASH.load() { - self.build_cell_for_output( - cell.cell_output.capacity().unpack(), - cell.cell_output.lock(), - cell.cell_output.type_().to_opt(), - Some(0), - &mut outputs, - &mut cells_data, - )?; - } else { - self.build_cell_for_output( - cell.cell_output.capacity().unpack(), - cell.cell_output.lock(), - None, - None, - &mut outputs, - &mut cells_data, - )?; - } - } - - // tx part IV: - // pool ckb for fee(if needed) - // and build change cell(both for ckb and udt) - // if pooling ckb fails, an error will be returned, - // ckb from the udt cell will no longer be collected - let mut inputs_part_4 = vec![]; - if required_ckb_part_1.is_zero() { - change_fee_cell_index = self - .build_required_ckb_and_change_tx_part( - ctx.clone(), - from_items, - Some(payload.from.source.clone()), - fixed_fee, - payload.change, - Some(UDTInfo { - asset_info: payload.asset_info, - amount: pool_udt_amount - required_udt, - }), - &mut inputs_part_4, - &mut script_set, - &mut signature_actions, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - } else { - self.build_required_ckb_and_change_tx_part( - ctx.clone(), - from_items, - Some(payload.from.source.clone()), - 0, - payload.change, - Some(UDTInfo { - asset_info: payload.asset_info, - amount: pool_udt_amount - required_udt, - }), - &mut inputs_part_4, - &mut script_set, - &mut signature_actions, - &mut outputs, - &mut cells_data, - &mut input_index, - ) - .await?; - }; - - // build tx - let mut inputs = vec![]; - inputs.append(&mut inputs_part_1); - inputs.append(&mut inputs_part_2); - inputs.append(&mut inputs_part_3); - inputs.append(&mut inputs_part_4); - let inputs = self._build_tx_cell_inputs( - &inputs, - payload.since.clone(), - payload.from.source.clone(), - )?; - self.prebuild_tx_complete( - inputs, - outputs, - cells_data, - script_set, - vec![], - signature_actions, - HashMap::new(), - ) - .map(|(tx_view, signature_actions)| (tx_view, signature_actions, change_fee_cell_index)) - } - #[tracing_async] async fn prebuild_udt_acp_transfer_transaction( &self, @@ -1708,6 +737,7 @@ impl MercuryRpcImpl { Some((**ACP_CODE_HASH.load()).clone()), None, false, + &mut PaginationRequest::default().limit(Some(1)), ) .await?; if live_acps.is_empty() { @@ -1962,6 +992,7 @@ impl MercuryRpcImpl { Some((**ACP_CODE_HASH.load()).clone()), None, false, + &mut PaginationRequest::default().limit(Some(1)), ) .await?; if live_acps.is_empty() { @@ -2300,163 +1331,6 @@ impl MercuryRpcImpl { Ok(inputs) } - #[tracing_async] - async fn build_required_ckb_and_change_tx_part( - &self, - ctx: Context, - items: Vec, - source: Option, - required_ckb: u64, - change_address: Option, - udt_change_info: Option, - inputs: &mut Vec, - script_set: &mut HashSet, - signature_actions: &mut HashMap, - outputs: &mut Vec, - cells_data: &mut Vec, - input_index: &mut usize, - ) -> InnerResult { - let required_ckb = if let Some(udt_info) = &udt_change_info { - if udt_info.amount != 0 { - required_ckb + STANDARD_SUDT_CAPACITY + MIN_CKB_CAPACITY - } else { - required_ckb + MIN_CKB_CAPACITY - } - } else { - required_ckb + MIN_CKB_CAPACITY - }; - - self.pool_live_cells_by_items( - ctx.clone(), - items.to_owned(), - required_ckb, - vec![], - source, - &mut 0, - inputs, - script_set, - signature_actions, - input_index, - ) - .await?; - - // build change cell - let pool_capacity = _get_pool_capacity(inputs)?; - let item = if let Some(address) = change_address { - Item::Address(address) - } else { - items[0].to_owned() - }; - let secp_address = self.get_secp_address_by_item(item)?; - - if let Some(udt_info) = udt_change_info { - if udt_info.amount != 0 { - let type_script = self - .build_sudt_type_script( - ctx.clone(), - blake2b_256_to_160(&udt_info.asset_info.udt_hash), - ) - .await?; - self.build_cell_for_output( - STANDARD_SUDT_CAPACITY, - secp_address.payload().into(), - Some(type_script), - Some(udt_info.amount), - outputs, - cells_data, - )?; - } - } - - let change_cell_capacity = pool_capacity - required_ckb + MIN_CKB_CAPACITY; - - let change_cell_index = self.build_cell_for_output( - change_cell_capacity, - secp_address.payload().into(), - None, - None, - outputs, - cells_data, - )?; - Ok(change_cell_index) - } - - #[tracing_async] - async fn build_required_udt_tx_part( - &self, - ctx: Context, - from_items: Vec, - source: Option, - udt_hash: H256, - required_udt: u128, - pool_udt_amount: &mut u128, - inputs: &mut Vec, - script_set: &mut HashSet, - signature_actions: &mut HashMap, - outputs: &mut Vec, - cells_data: &mut Vec, - input_index: &mut usize, - ) -> InnerResult<()> { - self.pool_live_cells_by_items( - ctx.clone(), - from_items.clone(), - 0, - vec![RequiredUDT { - udt_hash, - amount_required: required_udt as i128, - }], - source, - &mut 0, - inputs, - script_set, - signature_actions, - input_index, - ) - .await?; - - for cell in inputs { - let udt_amount = decode_udt_amount(&cell.cell_data); - *pool_udt_amount += udt_amount; - - let code_hash: H256 = cell.cell_output.lock().code_hash().unpack(); - if code_hash == **CHEQUE_CODE_HASH.load() { - let address = match self.generate_ckb_ownership(ctx.clone(), cell).await? { - Ownership::Address(address) => address, - Ownership::LockHash(_) => return Err(CoreError::CannotFindAddressByH160.into()), - }; - let address = Address::from_str(&address).map_err(CoreError::InvalidRpcParams)?; - let lock = address_to_script(address.payload()); - self.build_cell_for_output( - cell.cell_output.capacity().unpack(), - lock, - None, - None, - outputs, - cells_data, - )?; - } else if code_hash == **ACP_CODE_HASH.load() { - self.build_cell_for_output( - cell.cell_output.capacity().unpack(), - cell.cell_output.lock(), - cell.cell_output.type_().to_opt(), - Some(0), - outputs, - cells_data, - )?; - } else { - self.build_cell_for_output( - cell.cell_output.capacity().unpack(), - cell.cell_output.lock(), - None, - None, - outputs, - cells_data, - )?; - } - } - Ok(()) - } - #[tracing_async] pub(crate) async fn inner_build_sudt_issue_transaction( &self, @@ -2616,6 +1490,7 @@ impl MercuryRpcImpl { Some((**ACP_CODE_HASH.load()).clone()), None, false, + &mut PaginationRequest::default().limit(Some(1)), ) .await?; if live_acps.is_empty() { diff --git a/core/rpc/core/src/impl/query.rs b/core/rpc/core/src/impl/query.rs index 65515cac7..56f15b4fc 100644 --- a/core/rpc/core/src/impl/query.rs +++ b/core/rpc/core/src/impl/query.rs @@ -61,6 +61,7 @@ impl MercuryRpcImpl { None, None, true, + &mut PaginationRequest::default(), ) .await?; diff --git a/core/rpc/core/src/impl/utils.rs b/core/rpc/core/src/impl/utils.rs index 3067e1490..1461e1e8c 100644 --- a/core/rpc/core/src/impl/utils.rs +++ b/core/rpc/core/src/impl/utils.rs @@ -22,8 +22,8 @@ use core_rpc_types::lazy::{ use core_rpc_types::{ decode_record_id, encode_record_id, AssetInfo, AssetType, Balance, DaoInfo, DaoState, ExtraFilter, ExtraType, HashAlgorithm, IOType, Identity, IdentityFlag, Item, JsonItem, - Ownership, Record, RequiredUDT, SignAlgorithm, SignatureAction, SignatureInfo, - SignatureLocation, SinceConfig, SinceFlag, SinceType, Source, Status, + Ownership, Record, SignAlgorithm, SignatureAction, SignatureInfo, SignatureLocation, + SinceConfig, SinceFlag, SinceType, Source, Status, }; use core_storage::{Storage, TransactionWrapper}; @@ -240,6 +240,7 @@ impl MercuryRpcImpl { lock_filter: Option, extra: Option, for_get_balance: bool, + pagination: &mut PaginationRequest, ) -> InnerResult> { let type_hashes = asset_infos .into_iter() @@ -259,7 +260,7 @@ impl MercuryRpcImpl { }) .collect(); - let ret = match item { + let mut ret = match item { Item::Identity(ident) => { let scripts = self .get_scripts_by_identity(ctx.clone(), ident.clone(), lock_filter) @@ -269,6 +270,7 @@ impl MercuryRpcImpl { .map(|script| script.calc_script_hash().unpack()) .collect::>(); if lock_hashes.is_empty() { + pagination.cursor = None; return Ok(vec![]); } let cells = self @@ -279,10 +281,11 @@ impl MercuryRpcImpl { type_hashes, tip_block_number, None, - PaginationRequest::default(), + pagination.clone(), ) .await .map_err(|e| CoreError::DBError(e.to_string()))?; + pagination.update_by_response(cells.clone()); let (_flag, pubkey_hash) = ident.parse()?; let secp_lock_hash: H256 = self .get_script_builder(SECP256K1)? @@ -311,6 +314,7 @@ impl MercuryRpcImpl { .collect::>(); if lock_hashes.is_empty() { + pagination.cursor = None; return Ok(vec![]); } @@ -322,10 +326,11 @@ impl MercuryRpcImpl { type_hashes, tip_block_number, None, - PaginationRequest::default(), + pagination.clone(), ) .await .map_err(|e| CoreError::DBError(e.to_string()))?; + pagination.update_by_response(cells.clone()); cells .response @@ -372,6 +377,7 @@ impl MercuryRpcImpl { .map(|script| script.calc_script_hash().unpack()) .collect::>(); if lock_hashes.is_empty() { + pagination.cursor = None; return Ok(vec![]); } @@ -383,10 +389,11 @@ impl MercuryRpcImpl { type_hashes, tip_block_number, None, - PaginationRequest::default(), + pagination.clone(), ) .await .map_err(|e| CoreError::DBError(e.to_string()))?; + pagination.update_by_response(cell.clone()); if !cell.response.is_empty() { let cell = cell.response.get(0).cloned().unwrap(); @@ -449,10 +456,9 @@ impl MercuryRpcImpl { }; if extra == Some(ExtraType::CellBase) { - Ok(ret.into_iter().filter(|cell| cell.tx_index == 0).collect()) - } else { - Ok(ret) + ret = ret.into_iter().filter(|cell| cell.tx_index == 0).collect(); } + Ok(ret) } #[tracing_async] @@ -597,106 +603,6 @@ impl MercuryRpcImpl { } } - #[allow(dead_code)] - pub(crate) fn pool_asset( - &self, - pool_cells: &mut Vec, - amount_required: &mut BigInt, - resource_cells: Vec, - is_ckb: bool, - input_capacity_sum: &mut u64, - script_set: &mut HashSet, - signature_actions: &mut HashMap, - script_type: AssetScriptType, - input_index: &mut usize, - ) -> bool { - let zero = BigInt::from(0); - for cell in resource_cells.iter() { - if *amount_required <= zero { - return true; - } - - if self.is_in_cache(&cell.out_point) { - continue; - } - - let amount = if is_ckb { - let capacity: u64 = cell.cell_output.capacity().unpack(); - capacity as u128 - } else { - decode_udt_amount(&cell.cell_data) - }; - - if amount == 0 { - continue; - } - - *amount_required -= amount; - - let addr = match script_type { - AssetScriptType::Secp256k1 => { - script_set.insert(SECP256K1.to_string()); - Address::new( - self.network_type, - AddressPayload::from(cell.cell_output.lock()), - true, - ) - .to_string() - } - AssetScriptType::ACP => { - script_set.insert(ACP.to_string()); - Address::new( - self.network_type, - AddressPayload::from_pubkey_hash( - H160::from_slice(&cell.cell_output.lock().args().raw_data()[0..20]) - .unwrap(), - ), - true, - ) - .to_string() - } - AssetScriptType::ChequeReceiver(ref s) => { - script_set.insert(CHEQUE.to_string()); - s.clone() - } - AssetScriptType::ChequeSender(ref s) => { - script_set.insert(CHEQUE.to_string()); - s.clone() - } - AssetScriptType::Dao(_) => { - // script_set.insert(DAO.to_string()); - // script_set.insert(SECP256K1.to_string()); - // Address::new( - // self.network_type, - // AddressPayload::from_pubkey_hash( - // self.network_type, - // H160::from_slice(&cell.cell_output.lock().args().raw_data()[0..20]) - // .unwrap(), - // ), - // ) - // .to_string() - unreachable!() - } - }; - - pool_cells.push(cell.clone()); - let capacity: u64 = cell.cell_output.capacity().unpack(); - *input_capacity_sum += capacity; - - add_signature_action( - addr, - cell.cell_output.calc_lock_hash().to_string(), - SignAlgorithm::Secp256k1, - HashAlgorithm::Blake2b, - signature_actions, - *input_index, - ); - *input_index += 1; - } - - *amount_required <= zero - } - pub(crate) fn get_secp_lock_hash_by_item(&self, item: Item) -> InnerResult { match item { Item::Identity(ident) => { @@ -1222,140 +1128,6 @@ impl MercuryRpcImpl { Ok(withdraw_capacity) } - #[tracing_async] - pub(crate) async fn pool_live_cells_by_items( - &self, - ctx: Context, - items: Vec, - required_ckb: u64, - required_udts: Vec, - source: Option, - input_capacity_sum: &mut u64, - pool_cells: &mut Vec, - script_set: &mut HashSet, - signature_actions: &mut HashMap, - input_index: &mut usize, - ) -> InnerResult<()> { - let zero = BigInt::from(0); - let mut asset_ckb_set = HashSet::new(); - - if !required_udts.is_empty() { - self.pool_udt( - ctx.clone(), - &required_udts, - &items, - source.clone(), - pool_cells, - input_capacity_sum, - script_set, - signature_actions, - input_index, - ) - .await?; - } - let ckb_collect_already = pool_cells - .iter() - .map::(|cell| cell.cell_output.capacity().unpack()) - .sum::(); - let mut required_ckb = BigInt::from(required_ckb) - ckb_collect_already; - - if required_ckb <= zero { - return Ok(()); - } - - asset_ckb_set.insert(AssetInfo::new_ckb()); - - for item in items.iter() { - // let dao_cells = self - // .get_live_cells_by_item( - // item.clone(), - // asset_ckb_set.clone(), - // None, - // None, - // Some((**SECP256K1_CODE_HASH.load()).clone()), - // Some(ExtraFilter::Dao(DaoInfo::new_deposit(0, 0))), - // false, - // ) - // .await?; - // - // let dao_cells = dao_cells - // .into_iter() - // .filter(|cell| is_dao_unlock(cell)) - // .collect::>(); - // - // if self.pool_asset( - // pool_cells, - // &mut required_ckb, - // dao_cells, - // true, - // input_capacity_sum, - // script_set - // sig_entries, - // AssetScriptType::Dao, - // ) { - // return Ok(()); - // } - - let ckb_cells = self - .get_live_cells_by_item( - ctx.clone(), - item.clone(), - asset_ckb_set.clone(), - None, - None, - Some((**SECP256K1_CODE_HASH.load()).clone()), - None, - false, - ) - .await?; - - let cell_base_cells = ckb_cells - .clone() - .into_iter() - .filter(|cell| cell.tx_index.is_zero() && self.is_cellbase_mature(cell)) - .collect::>(); - - if self.pool_asset( - pool_cells, - &mut required_ckb, - cell_base_cells, - true, - input_capacity_sum, - script_set, - signature_actions, - AssetScriptType::Secp256k1, - input_index, - ) { - return Ok(()); - } - - let normal_ckb_cells = ckb_cells - .into_iter() - .filter(|cell| !cell.tx_index.is_zero() && cell.cell_data.is_empty()) - .collect::>(); - - if self.pool_asset( - pool_cells, - &mut required_ckb, - normal_ckb_cells, - true, - input_capacity_sum, - script_set, - signature_actions, - AssetScriptType::Secp256k1, - input_index, - ) { - return Ok(()); - } - } - - if required_ckb > zero { - return Err(CoreError::TokenIsNotEnough(AssetInfo::new_ckb().to_string()).into()); - } - - Ok(()) - } - #[tracing_async] /// We do not use the accurate `occupied` definition in ckb, which indicates the capacity consumed for storage of the live cells. /// Because by this definition, `occupied` and `free` are both not good indicators for spendable balance. @@ -1457,172 +1229,6 @@ impl MercuryRpcImpl { Ok(header.epoch().to_rational()) } - #[tracing_async] - async fn pool_udt( - &self, - ctx: Context, - required_udts: &[RequiredUDT], - items: &[Item], - source: Option, - pool_cells: &mut Vec, - input_capacity_sum: &mut u64, - script_set: &mut HashSet, - signature_action: &mut HashMap, - input_index: &mut usize, - ) -> InnerResult<()> { - let zero = BigInt::from(0); - for required_udt in required_udts.iter() { - let mut udt_required = BigInt::from(required_udt.amount_required); - let asset_info = AssetInfo::new_udt(required_udt.udt_hash.clone()); - let mut asset_udt_set = HashSet::new(); - asset_udt_set.insert(asset_info.clone()); - - for item in items { - let item_lock_hash = self.get_secp_lock_hash_by_item(item.clone())?; - let cheque_cells = self - .get_live_cells_by_item( - ctx.clone(), - item.clone(), - asset_udt_set.clone(), - None, - None, - Some((**CHEQUE_CODE_HASH.load()).clone()), - None, - false, - ) - .await?; - - if source.is_none() || source == Some(Source::Claimable) { - let cheque_cells_in_time = cheque_cells - .clone() - .into_iter() - .filter(|cell| { - let receiver_lock_hash = - H160::from_slice(&cell.cell_output.lock().args().raw_data()[0..20]) - .unwrap(); - - receiver_lock_hash == item_lock_hash - }) - .collect::>(); - - if !cheque_cells_in_time.is_empty() { - let receiver_addr = - self.get_secp_address_by_item(item.clone())?.to_string(); - - if self.pool_asset( - pool_cells, - &mut udt_required, - cheque_cells_in_time, - false, - input_capacity_sum, - script_set, - signature_action, - AssetScriptType::ChequeReceiver(receiver_addr), - input_index, - ) { - break; - } - } - } - - if source.is_none() || source == Some(Source::Free) { - let cheque_cells_time_out = cheque_cells - .into_iter() - .filter(|cell| { - let sender_lock_hash = H160::from_slice( - &cell.cell_output.lock().args().raw_data()[20..40], - ) - .unwrap(); - sender_lock_hash == item_lock_hash - }) - .collect::>(); - - if !cheque_cells_time_out.is_empty() { - let sender_addr = self.get_secp_address_by_item(item.clone())?.to_string(); - - if self.pool_asset( - pool_cells, - &mut udt_required, - cheque_cells_time_out, - false, - input_capacity_sum, - script_set, - signature_action, - AssetScriptType::ChequeSender(sender_addr), - input_index, - ) { - break; - } - } - - let secp_cells = self - .get_live_cells_by_item( - ctx.clone(), - item.clone(), - asset_udt_set.clone(), - None, - None, - Some((**SECP256K1_CODE_HASH.load()).clone()), - None, - false, - ) - .await?; - - if !secp_cells.is_empty() - && self.pool_asset( - pool_cells, - &mut udt_required, - secp_cells, - false, - input_capacity_sum, - script_set, - signature_action, - AssetScriptType::Secp256k1, - input_index, - ) - { - break; - } - - let acp_cells = self - .get_live_cells_by_item( - ctx.clone(), - item.clone(), - asset_udt_set.clone(), - None, - None, - Some((**ACP_CODE_HASH.load()).clone()), - None, - false, - ) - .await?; - - if !acp_cells.is_empty() - && self.pool_asset( - pool_cells, - &mut udt_required, - acp_cells, - false, - input_capacity_sum, - script_set, - signature_action, - AssetScriptType::ACP, - input_index, - ) - { - break; - } - } - } - - if udt_required > zero { - return Err(CoreError::TokenIsNotEnough(asset_info.to_string()).into()); - } - } - - Ok(()) - } - fn filter_useless_cheque( &self, cell: &DetailedCell, @@ -1764,6 +1370,9 @@ impl MercuryRpcImpl { // balance capacity based on database // add new inputs let mut ckb_cells_cache = CkbCellsCache::new(from_items.clone()); + ckb_cells_cache + .pagination + .set_limit(Some(self.pool_cache_size)); loop { if required_capacity <= 0 { break; @@ -1836,6 +1445,7 @@ impl MercuryRpcImpl { // change acp cell from db let mut cells_cache = AcpCellsCache::new(from_items.clone(), None); + cells_cache.pagination.set_limit(Some(self.pool_cache_size)); loop { let ret = self .poll_next_live_acp_cell(ctx.clone(), &mut cells_cache) @@ -1989,6 +1599,9 @@ impl MercuryRpcImpl { // add new inputs let mut udt_cells_cache = UdtCellsCache::new(from_items.clone(), asset_info.clone(), source.clone()); + udt_cells_cache + .pagination + .set_limit(Some(self.pool_cache_size)); loop { if required_udt_amount <= zero { @@ -2049,6 +1662,7 @@ impl MercuryRpcImpl { vec![Item::Identity(address_to_identity(&receiver_address)?)], Some(asset_info.clone()), ); + cells_cache.pagination.set_limit(Some(self.pool_cache_size)); loop { let ret = self .poll_next_live_acp_cell(ctx.clone(), &mut cells_cache) @@ -2141,6 +1755,7 @@ impl MercuryRpcImpl { Some((**SECP256K1_CODE_HASH.load()).clone()), Some(ExtraType::Dao), false, + &mut ckb_cells_cache.pagination, ) .await?; let tip_epoch_number = (**CURRENT_EPOCH_NUMBER.load()).clone(); @@ -2209,6 +1824,7 @@ impl MercuryRpcImpl { Some((**SECP256K1_CODE_HASH.load()).clone()), None, false, + &mut ckb_cells_cache.pagination, ) .await?; let cell_base_cells = ckb_cells @@ -2240,6 +1856,7 @@ impl MercuryRpcImpl { Some((**SECP256K1_CODE_HASH.load()).clone()), None, false, + &mut ckb_cells_cache.pagination, ) .await?; let secp_udt_cells = secp_udt_cells @@ -2267,6 +1884,7 @@ impl MercuryRpcImpl { Some((**ACP_CODE_HASH.load()).clone()), None, false, + &mut ckb_cells_cache.pagination, ) .await?; let acp_cells = acp_cells @@ -2276,7 +1894,9 @@ impl MercuryRpcImpl { ckb_cells_cache.cell_deque = acp_cells; } } - ckb_cells_cache.array_index += 1; + if ckb_cells_cache.pagination.cursor.is_none() { + ckb_cells_cache.array_index += 1; + } } } @@ -2321,6 +1941,7 @@ impl MercuryRpcImpl { Some((**CHEQUE_CODE_HASH.load()).clone()), None, false, + &mut udt_cells_cache.pagination, ) .await?; let cheque_cells_in_time = cheque_cells @@ -2352,6 +1973,7 @@ impl MercuryRpcImpl { Some((**CHEQUE_CODE_HASH.load()).clone()), None, false, + &mut udt_cells_cache.pagination, ) .await?; let cheque_cells_time_out = cheque_cells @@ -2378,6 +2000,7 @@ impl MercuryRpcImpl { Some((**SECP256K1_CODE_HASH.load()).clone()), None, false, + &mut udt_cells_cache.pagination, ) .await?; let secp_cells = secp_cells @@ -2397,6 +2020,7 @@ impl MercuryRpcImpl { Some((**ACP_CODE_HASH.load()).clone()), None, false, + &mut udt_cells_cache.pagination, ) .await?; let acp_cells = acp_cells @@ -2406,7 +2030,9 @@ impl MercuryRpcImpl { udt_cells_cache.cell_deque = acp_cells; } } - udt_cells_cache.array_index += 1; + if udt_cells_cache.pagination.cursor.is_none() { + udt_cells_cache.array_index += 1; + } } } @@ -2442,11 +2068,14 @@ impl MercuryRpcImpl { Some((**ACP_CODE_HASH.load()).clone()), None, false, + &mut acp_cells_cache.pagination, ) .await?; let acp_cells = acp_cells.into_iter().collect::>(); acp_cells_cache.cell_deque = acp_cells; - acp_cells_cache.current_index += 1; + if acp_cells_cache.pagination.cursor.is_none() { + acp_cells_cache.current_index += 1; + } } } diff --git a/core/rpc/core/src/impl/utils_types.rs b/core/rpc/core/src/impl/utils_types.rs index b93ec1b0d..4cefa8313 100644 --- a/core/rpc/core/src/impl/utils_types.rs +++ b/core/rpc/core/src/impl/utils_types.rs @@ -1,4 +1,4 @@ -use common::DetailedCell; +use common::{DetailedCell, PaginationRequest}; use core_rpc_types::{AssetInfo, Item, SignatureAction, Source}; use ckb_types::packed; @@ -57,6 +57,7 @@ pub struct CkbCellsCache { pub item_category_array: Vec<(usize, PoolCkbCategory)>, pub array_index: usize, pub cell_deque: VecDeque<(DetailedCell, AssetScriptType)>, + pub pagination: PaginationRequest, } impl CkbCellsCache { @@ -78,6 +79,7 @@ impl CkbCellsCache { item_category_array, array_index: 0, cell_deque: VecDeque::new(), + pagination: PaginationRequest::default(), } } } @@ -88,6 +90,7 @@ pub struct UdtCellsCache { pub item_category_array: Vec<(usize, PoolUdtCategory)>, pub array_index: usize, pub cell_deque: VecDeque<(DetailedCell, AssetScriptType)>, + pub pagination: PaginationRequest, } impl UdtCellsCache { @@ -120,6 +123,7 @@ impl UdtCellsCache { item_category_array, array_index: 0, cell_deque: VecDeque::new(), + pagination: PaginationRequest::default(), } } } @@ -129,6 +133,7 @@ pub struct AcpCellsCache { pub asset_info: Option, pub current_index: usize, pub cell_deque: VecDeque, + pub pagination: PaginationRequest, } impl AcpCellsCache { @@ -138,6 +143,7 @@ impl AcpCellsCache { asset_info, current_index: 0, cell_deque: VecDeque::new(), + pagination: PaginationRequest::default(), } } } diff --git a/core/rpc/core/src/tests/mod.rs b/core/rpc/core/src/tests/mod.rs index b694aca76..5e4d7430d 100644 --- a/core/rpc/core/src/tests/mod.rs +++ b/core/rpc/core/src/tests/mod.rs @@ -346,6 +346,7 @@ impl RpcTestEngine { RationalU256::from_u256(6u64.into()), RationalU256::from_u256(6u64.into()), Arc::new(RwLock::new(SyncState::ReadOnly)), + 100u64, ) } diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 6d6a6cad4..4c8c2f512 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -39,9 +39,11 @@ pub struct Service { cheque_since: RationalU256, use_tx_pool_cache: bool, sync_state: Arc>, + pool_cache_size: u64, } impl Service { + #[allow(clippy::too_many_arguments)] pub fn new( center_id: u16, machine_id: u16, @@ -59,6 +61,7 @@ impl Service { ckb_uri: String, cheque_since: u64, log_level: LevelFilter, + pool_cache_size: u64, ) -> Self { let ckb_client = CkbRpcClient::new(ckb_uri); let store = RelationalStorage::new( @@ -89,6 +92,7 @@ impl Service { cheque_since, use_tx_pool_cache, sync_state, + pool_cache_size, } } @@ -134,6 +138,7 @@ impl Service { self.cheque_since.clone(), self.cellbase_maturity.clone(), Arc::clone(&self.sync_state), + self.pool_cache_size, ); info!("Mercury Running!"); diff --git a/core/storage/src/relational/mod.rs b/core/storage/src/relational/mod.rs index 2c6332603..1c6fd1307 100644 --- a/core/storage/src/relational/mod.rs +++ b/core/storage/src/relational/mod.rs @@ -371,7 +371,7 @@ impl Storage for RelationalStorage { let pag = if is_asc { PaginationRequest::default() } else { - PaginationRequest::default().set_order(Order::Desc) + PaginationRequest::default().order(Order::Desc) }; let tx_tables = self .query_transactions( diff --git a/devtools/config/mainnet_config.toml b/devtools/config/mainnet_config.toml index ef77f425b..ead0873af 100644 --- a/devtools/config/mainnet_config.toml +++ b/devtools/config/mainnet_config.toml @@ -11,6 +11,8 @@ cellbase_maturity = 4 cheque_since = 6 +pool_cache_size = 100 + [db_config] center_id = 0 machine_id = 0 diff --git a/devtools/config/testnet_config.toml b/devtools/config/testnet_config.toml index 4160d9bfa..64c847294 100644 --- a/devtools/config/testnet_config.toml +++ b/devtools/config/testnet_config.toml @@ -11,6 +11,8 @@ cellbase_maturity = 4 cheque_since = 6 +pool_cache_size = 100 + [db_config] center_id = 0 machine_id = 0