diff --git a/plenum/common/ledger_manager.py b/plenum/common/ledger_manager.py index 5bc08677e0..57d657504d 100644 --- a/plenum/common/ledger_manager.py +++ b/plenum/common/ledger_manager.py @@ -844,8 +844,7 @@ def startCatchUpProcess(self, ledgerId: int, proof: ConsistencyProof): .format(CATCH_UP_PREFIX, self, ledgerId)) def _getCatchupTimeout(self, numRequest, batchSize): - return numRequest * (self.config.CatchupTransactionsTimeout + - 0.1 * batchSize) + return numRequest * self.config.CatchupTransactionsTimeout def catchupCompleted(self, ledgerId: int, last_3PC: Tuple=(0, 0)): if ledgerId not in self.ledgerRegistry: diff --git a/plenum/common/txn_util.py b/plenum/common/txn_util.py index b7126538fe..286d3e5b61 100644 --- a/plenum/common/txn_util.py +++ b/plenum/common/txn_util.py @@ -1,4 +1,5 @@ from collections import OrderedDict +import json from ledger.genesis_txn.genesis_txn_file_util import create_genesis_txn_init_ledger from plenum.common.constants import TXN_TIME, TXN_TYPE, TARGET_NYM, ROLE, \ @@ -107,3 +108,34 @@ def idr_from_req_data(data): return data[f.IDENTIFIER.nm] else: return Request.gen_idr_from_sigs(data.get(f.SIGS.nm, {})) + + +def sdk_reqToTxn(sdk_req, cons_time=None): + """ + Transform a client request such that it can be stored in the ledger. + Also this is what will be returned to the client in the reply + + :param sdk_req: sdk request in str or dict type + :param cons_time: UTC epoch at which consensus was reached + :return: + """ + # TODO: we should not reformat transaction this way + # When refactor keep in mind thought about back compatibility + + if isinstance(sdk_req, dict): + data = sdk_req + elif isinstance(sdk_req, str): + data = json.loads(sdk_req) + else: + raise TypeError( + "Expected dict or str as input, but got: {}".format(type(sdk_req))) + + res = { + f.IDENTIFIER.nm: data[f.IDENTIFIER.nm], + f.REQ_ID.nm: data[f.REQ_ID.nm], + f.SIG.nm: data.get(f.SIG.nm, None), + f.SIGS.nm: data.get(f.SIGS.nm, None), + TXN_TIME: cons_time or data.get(TXN_TIME) + } + res.update(data[OPERATION]) + return res diff --git a/plenum/config.py b/plenum/config.py index 7bbbf028e4..8856de8b99 100644 --- a/plenum/config.py +++ b/plenum/config.py @@ -140,7 +140,12 @@ ConsistencyProofsTimeout = 5 # Timeout factor after which a node starts requesting transactions -CatchupTransactionsTimeout = 5 +# We assume, that making consistency proof + iterate over all transactions (getAllTxn) +# will take a little time (0.003 sec for making cp for 10 000 txns + +# 0.2 sec for getAllTxn for 10 000 txn) +# Therefore, node communication is the most cost operation +# Timeout for pool catchuping would be nodeCount * CatchupTransactionsTimeout +CatchupTransactionsTimeout = 6 # Log configuration diff --git a/plenum/test/helper.py b/plenum/test/helper.py index f2081b3641..11eece3d4e 100644 --- a/plenum/test/helper.py +++ b/plenum/test/helper.py @@ -654,7 +654,7 @@ def checkViewNoForNodes(nodes: Iterable[TestNode], expectedViewNo: int = None): assert len(viewNos) == 1 vNo, = viewNos if expectedViewNo is not None: - assert vNo == expectedViewNo, ','.join(['{} -> Ratio: {}'.format( + assert vNo >= expectedViewNo, ','.join(['{} -> Ratio: {}'.format( node.name, node.monitor.masterThroughputRatio()) for node in nodes]) return vNo diff --git a/plenum/test/waits.py b/plenum/test/waits.py index 65666d0a4f..ac272bcb6c 100644 --- a/plenum/test/waits.py +++ b/plenum/test/waits.py @@ -104,9 +104,7 @@ def expectedPoolCatchupTime(nodeCount): To: each of the Nodes finished the the catchup procedure """ config = getConfig() - nodeCatchupTimeout = __Peer2PeerRequestExchangeTime + \ - config.CatchupTransactionsTimeout - return nodeCount * nodeCatchupTimeout + return nodeCount * config.CatchupTransactionsTimeout def expectedPoolGetReadyTimeout(nodeCount): diff --git a/scripts/add_json_txns_to_ledger.py b/scripts/add_json_txns_to_ledger.py new file mode 100644 index 0000000000..37f5caae0f --- /dev/null +++ b/scripts/add_json_txns_to_ledger.py @@ -0,0 +1,67 @@ +#! /usr/bin/env python3 + +import os +import sys +import json +import argparse + +from stp_core.types import HA +from indy_common.config_util import getConfig +from plenum.server.node import Node +from indy_common.config_helper import NodeConfigHelper + +config = getConfig() + + +def get_ha_cliha_node_name(path_to_env): + node_name_key = 'NODE_NAME' + node_port_key = 'NODE_PORT' + node_clien_port_key = 'NODE_CLIENT_PORT' + node_name = '' + node_port = '' + node_clieint_port = '' + with open(path_to_env) as fenv: + for line in fenv.readlines(): + print(line) + if line.find(node_name_key) != -1: + node_name = line.split('=')[1].strip() + elif line.find(node_port_key) != -1: + node_port = int(line.split('=')[1].strip()) + elif line.find(node_clien_port_key) != -1: + node_clieint_port = int(line.split('=')[1].strip()) + return node_name, node_port, node_clieint_port + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('infpath', help="Path to previous generated txns", type=str, default='/tmp/generated_txns') + parser.add_argument('--env_file', help='Path to environment file with node name and ports', default='/etc/indy/indy.env') + args = parser.parse_args() + path_to_txns = os.path.realpath(args.infpath) + path_to_env = os.path.realpath(args.env_file) + + if not os.path.exists(path_to_txns): + print("Path to txns file does not exist") + sys.exit(1) + + if not os.path.exists(path_to_env): + print("Path to env file does not exist") + sys.exit(1) + + nname, nport, ncliport = get_ha_cliha_node_name(path_to_env) + ha = HA("0.0.0.0", nport) + cliha = HA("0.0.0.0", ncliport) + config_helper = NodeConfigHelper(nname, config) + + node = Node(nname, nodeRegistry=None, + ha=ha, + cliha=cliha, + config_helper=config_helper, + config=config) + i = 0 + with open(path_to_txns) as txns: + for txn in txns: + node.domainLedger.add(json.loads(txn)) + i += 1 + if not i % 1000: + print("added {} txns".format(i)) diff --git a/scripts/generate_txns.py b/scripts/generate_txns.py new file mode 100644 index 0000000000..b0900b10c6 --- /dev/null +++ b/scripts/generate_txns.py @@ -0,0 +1,94 @@ +#! /usr/bin/env python3 + +import os +import json +import time +from contextlib import ExitStack +import argparse +import random +from typing import Sequence +from plenum.common.request import Request +from plenum.common.constants import CURRENT_PROTOCOL_VERSION +from plenum.common.util import randomString +from plenum.common.config_util import getConfig +from plenum.common.txn_util import sdk_reqToTxn +from indy.ledger import sign_request +from indy import signus, wallet +from stp_core.loop.looper import Looper + +config = getConfig() + + +async def get_wallet_and_pool(): + pool_name = 'pool' + randomString(3) + wallet_name = 'wallet' + randomString(10) + their_wallet_name = 'their_wallet' + randomString(10) + seed_trustee1 = "000000000000000000000000Trustee1" + + await wallet.create_wallet(pool_name, wallet_name, None, None, None) + my_wallet_handle = await wallet.open_wallet(wallet_name, None, None) + + await wallet.create_wallet(pool_name, their_wallet_name, None, None, None) + their_wallet_handle = await wallet.open_wallet(their_wallet_name, None, None) + + await signus.create_and_store_my_did(my_wallet_handle, "{}") + + (their_did, their_verkey) = await signus.create_and_store_my_did(their_wallet_handle, + json.dumps({"seed": seed_trustee1})) + + await signus.store_their_did(my_wallet_handle, json.dumps({'did': their_did, 'verkey': their_verkey})) + + return their_wallet_handle, their_did + + +def randomOperation(): + return { + "type": "buy", + "amount": random.randint(10, 100000) + } + + +def random_requests(count): + return [randomOperation() for _ in range(count)] + + +def sdk_gen_request(operation, protocol_version=CURRENT_PROTOCOL_VERSION, identifier=None): + return Request(operation=operation, reqId=random.randint(10, 100000), + protocolVersion=protocol_version, identifier=identifier) + + +def sdk_random_request_objects(count, protocol_version, identifier=None): + ops = random_requests(count) + return [sdk_gen_request(op, protocol_version=protocol_version, identifier=identifier) for op in ops] + + +def sdk_sign_request_objects(looper, sdk_wallet, reqs: Sequence): + wallet_h, did = sdk_wallet + reqs_str = [json.dumps(req.as_dict) for req in reqs] + resp = [looper.loop.run_until_complete(sign_request(wallet_h, did, req)) for req in reqs_str] + return resp + + +def sdk_signed_random_requests(looper, sdk_wallet, count): + _, did = sdk_wallet + reqs_obj = sdk_random_request_objects(count, identifier=did, protocol_version=CURRENT_PROTOCOL_VERSION) + return sdk_sign_request_objects(looper, sdk_wallet, reqs_obj) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('count', help="Count of generated txns", type=int) + parser.add_argument('outfpath', help="Path to save generated txns", type=str, default='/tmp/generated_txns') + args = parser.parse_args() + path_to_save = os.path.realpath(args.outfpath) + + with ExitStack() as exit_stack: + with Looper() as looper: + sdk_wallet, did = looper.loop.run_until_complete(get_wallet_and_pool()) + with open(path_to_save, 'w') as outpath: + for _ in range(args.count): + req = sdk_signed_random_requests(looper, (sdk_wallet, did), 1)[0] + txn = sdk_reqToTxn(req, int(time.time())) + outpath.write(json.dumps(txn)) + outpath.write(os.linesep) + looper.stopall()