Skip to content

Commit

Permalink
[INDY-1029] Change timeout for getting cathup replies (#499)
Browse files Browse the repository at this point in the history
* [INDY-1029] Change timeout for getting cathup replies

Signed-off-by: Andrew Nikitin <[email protected]>

* [indy-1029] Add scripts for generating and adding transactions to domain ledger for test purpose

Signed-off-by: Andrew Nikitin <[email protected]>

* [INDY-1029] Add new timeout var into config

Signed-off-by: Andrew Nikitin <[email protected]>

* [INDY-1029] Fix static validation

Signed-off-by: Andrew Nikitin <[email protected]>

* [INDY-1029] Change evaluating pool catchup timeout for tests

Signed-off-by: Andrew Nikitin <[email protected]>

* [INDY-1029] Make timeout depended from node count (for test logic)

Signed-off-by: Andrew Nikitin <[email protected]>

* [INDY-1029] Erase dublicate sdk_reqToTxn function

Signed-off-by: Andrew Nikitin <[email protected]>
  • Loading branch information
anikitinDSR authored and ashcherbakov committed Jan 17, 2018
1 parent 6d7a942 commit d49cb4b
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 7 deletions.
3 changes: 1 addition & 2 deletions plenum/common/ledger_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,8 +844,7 @@ def startCatchUpProcess(self, ledgerId: int, proof: ConsistencyProof):
.format(CATCH_UP_PREFIX, self, ledgerId))

def _getCatchupTimeout(self, numRequest, batchSize):
return numRequest * (self.config.CatchupTransactionsTimeout +
0.1 * batchSize)
return numRequest * self.config.CatchupTransactionsTimeout

def catchupCompleted(self, ledgerId: int, last_3PC: Tuple=(0, 0)):
if ledgerId not in self.ledgerRegistry:
Expand Down
32 changes: 32 additions & 0 deletions plenum/common/txn_util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import OrderedDict
import json

from ledger.genesis_txn.genesis_txn_file_util import create_genesis_txn_init_ledger
from plenum.common.constants import TXN_TIME, TXN_TYPE, TARGET_NYM, ROLE, \
Expand Down Expand Up @@ -107,3 +108,34 @@ def idr_from_req_data(data):
return data[f.IDENTIFIER.nm]
else:
return Request.gen_idr_from_sigs(data.get(f.SIGS.nm, {}))


def sdk_reqToTxn(sdk_req, cons_time=None):
"""
Transform a client request such that it can be stored in the ledger.
Also this is what will be returned to the client in the reply
:param sdk_req: sdk request in str or dict type
:param cons_time: UTC epoch at which consensus was reached
:return:
"""
# TODO: we should not reformat transaction this way
# When refactor keep in mind thought about back compatibility

if isinstance(sdk_req, dict):
data = sdk_req
elif isinstance(sdk_req, str):
data = json.loads(sdk_req)
else:
raise TypeError(
"Expected dict or str as input, but got: {}".format(type(sdk_req)))

res = {
f.IDENTIFIER.nm: data[f.IDENTIFIER.nm],
f.REQ_ID.nm: data[f.REQ_ID.nm],
f.SIG.nm: data.get(f.SIG.nm, None),
f.SIGS.nm: data.get(f.SIGS.nm, None),
TXN_TIME: cons_time or data.get(TXN_TIME)
}
res.update(data[OPERATION])
return res
7 changes: 6 additions & 1 deletion plenum/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@
ConsistencyProofsTimeout = 5

# Timeout factor after which a node starts requesting transactions
CatchupTransactionsTimeout = 5
# We assume, that making consistency proof + iterate over all transactions (getAllTxn)
# will take a little time (0.003 sec for making cp for 10 000 txns +
# 0.2 sec for getAllTxn for 10 000 txn)
# Therefore, node communication is the most cost operation
# Timeout for pool catchuping would be nodeCount * CatchupTransactionsTimeout
CatchupTransactionsTimeout = 6


# Log configuration
Expand Down
2 changes: 1 addition & 1 deletion plenum/test/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ def checkViewNoForNodes(nodes: Iterable[TestNode], expectedViewNo: int = None):
assert len(viewNos) == 1
vNo, = viewNos
if expectedViewNo is not None:
assert vNo == expectedViewNo, ','.join(['{} -> Ratio: {}'.format(
assert vNo >= expectedViewNo, ','.join(['{} -> Ratio: {}'.format(
node.name, node.monitor.masterThroughputRatio()) for node in nodes])
return vNo

Expand Down
4 changes: 1 addition & 3 deletions plenum/test/waits.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ def expectedPoolCatchupTime(nodeCount):
To: each of the Nodes finished the the catchup procedure
"""
config = getConfig()
nodeCatchupTimeout = __Peer2PeerRequestExchangeTime + \
config.CatchupTransactionsTimeout
return nodeCount * nodeCatchupTimeout
return nodeCount * config.CatchupTransactionsTimeout


def expectedPoolGetReadyTimeout(nodeCount):
Expand Down
67 changes: 67 additions & 0 deletions scripts/add_json_txns_to_ledger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#! /usr/bin/env python3

import os
import sys
import json
import argparse

from stp_core.types import HA
from indy_common.config_util import getConfig
from plenum.server.node import Node
from indy_common.config_helper import NodeConfigHelper

config = getConfig()


def get_ha_cliha_node_name(path_to_env):
node_name_key = 'NODE_NAME'
node_port_key = 'NODE_PORT'
node_clien_port_key = 'NODE_CLIENT_PORT'
node_name = ''
node_port = ''
node_clieint_port = ''
with open(path_to_env) as fenv:
for line in fenv.readlines():
print(line)
if line.find(node_name_key) != -1:
node_name = line.split('=')[1].strip()
elif line.find(node_port_key) != -1:
node_port = int(line.split('=')[1].strip())
elif line.find(node_clien_port_key) != -1:
node_clieint_port = int(line.split('=')[1].strip())
return node_name, node_port, node_clieint_port


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('infpath', help="Path to previous generated txns", type=str, default='/tmp/generated_txns')
parser.add_argument('--env_file', help='Path to environment file with node name and ports', default='/etc/indy/indy.env')
args = parser.parse_args()
path_to_txns = os.path.realpath(args.infpath)
path_to_env = os.path.realpath(args.env_file)

if not os.path.exists(path_to_txns):
print("Path to txns file does not exist")
sys.exit(1)

if not os.path.exists(path_to_env):
print("Path to env file does not exist")
sys.exit(1)

nname, nport, ncliport = get_ha_cliha_node_name(path_to_env)
ha = HA("0.0.0.0", nport)
cliha = HA("0.0.0.0", ncliport)
config_helper = NodeConfigHelper(nname, config)

node = Node(nname, nodeRegistry=None,
ha=ha,
cliha=cliha,
config_helper=config_helper,
config=config)
i = 0
with open(path_to_txns) as txns:
for txn in txns:
node.domainLedger.add(json.loads(txn))
i += 1
if not i % 1000:
print("added {} txns".format(i))
94 changes: 94 additions & 0 deletions scripts/generate_txns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#! /usr/bin/env python3

import os
import json
import time
from contextlib import ExitStack
import argparse
import random
from typing import Sequence
from plenum.common.request import Request
from plenum.common.constants import CURRENT_PROTOCOL_VERSION
from plenum.common.util import randomString
from plenum.common.config_util import getConfig
from plenum.common.txn_util import sdk_reqToTxn
from indy.ledger import sign_request
from indy import signus, wallet
from stp_core.loop.looper import Looper

config = getConfig()


async def get_wallet_and_pool():
pool_name = 'pool' + randomString(3)
wallet_name = 'wallet' + randomString(10)
their_wallet_name = 'their_wallet' + randomString(10)
seed_trustee1 = "000000000000000000000000Trustee1"

await wallet.create_wallet(pool_name, wallet_name, None, None, None)
my_wallet_handle = await wallet.open_wallet(wallet_name, None, None)

await wallet.create_wallet(pool_name, their_wallet_name, None, None, None)
their_wallet_handle = await wallet.open_wallet(their_wallet_name, None, None)

await signus.create_and_store_my_did(my_wallet_handle, "{}")

(their_did, their_verkey) = await signus.create_and_store_my_did(their_wallet_handle,
json.dumps({"seed": seed_trustee1}))

await signus.store_their_did(my_wallet_handle, json.dumps({'did': their_did, 'verkey': their_verkey}))

return their_wallet_handle, their_did


def randomOperation():
return {
"type": "buy",
"amount": random.randint(10, 100000)
}


def random_requests(count):
return [randomOperation() for _ in range(count)]


def sdk_gen_request(operation, protocol_version=CURRENT_PROTOCOL_VERSION, identifier=None):
return Request(operation=operation, reqId=random.randint(10, 100000),
protocolVersion=protocol_version, identifier=identifier)


def sdk_random_request_objects(count, protocol_version, identifier=None):
ops = random_requests(count)
return [sdk_gen_request(op, protocol_version=protocol_version, identifier=identifier) for op in ops]


def sdk_sign_request_objects(looper, sdk_wallet, reqs: Sequence):
wallet_h, did = sdk_wallet
reqs_str = [json.dumps(req.as_dict) for req in reqs]
resp = [looper.loop.run_until_complete(sign_request(wallet_h, did, req)) for req in reqs_str]
return resp


def sdk_signed_random_requests(looper, sdk_wallet, count):
_, did = sdk_wallet
reqs_obj = sdk_random_request_objects(count, identifier=did, protocol_version=CURRENT_PROTOCOL_VERSION)
return sdk_sign_request_objects(looper, sdk_wallet, reqs_obj)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('count', help="Count of generated txns", type=int)
parser.add_argument('outfpath', help="Path to save generated txns", type=str, default='/tmp/generated_txns')
args = parser.parse_args()
path_to_save = os.path.realpath(args.outfpath)

with ExitStack() as exit_stack:
with Looper() as looper:
sdk_wallet, did = looper.loop.run_until_complete(get_wallet_and_pool())
with open(path_to_save, 'w') as outpath:
for _ in range(args.count):
req = sdk_signed_random_requests(looper, (sdk_wallet, did), 1)[0]
txn = sdk_reqToTxn(req, int(time.time()))
outpath.write(json.dumps(txn))
outpath.write(os.linesep)
looper.stopall()

0 comments on commit d49cb4b

Please sign in to comment.