Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INDY-1029] Change timeout for getting cathup replies #499

Merged
merged 7 commits into from
Jan 17, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions plenum/common/ledger_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,8 +844,7 @@ def startCatchUpProcess(self, ledgerId: int, proof: ConsistencyProof):
.format(CATCH_UP_PREFIX, self, ledgerId))

def _getCatchupTimeout(self, numRequest, batchSize):
return numRequest * (self.config.CatchupTransactionsTimeout +
0.1 * batchSize)
return numRequest * self.config.CatchupTransactionsTimeout

def catchupCompleted(self, ledgerId: int, last_3PC: Tuple=(0, 0)):
if ledgerId not in self.ledgerRegistry:
Expand Down
32 changes: 32 additions & 0 deletions plenum/common/txn_util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import OrderedDict
import json

from ledger.genesis_txn.genesis_txn_file_util import create_genesis_txn_init_ledger
from plenum.common.constants import TXN_TIME, TXN_TYPE, TARGET_NYM, ROLE, \
Expand Down Expand Up @@ -107,3 +108,34 @@ def idr_from_req_data(data):
return data[f.IDENTIFIER.nm]
else:
return Request.gen_idr_from_sigs(data.get(f.SIGS.nm, {}))


def sdk_reqToTxn(sdk_req, cons_time=None):
"""
Transform a client request such that it can be stored in the ledger.
Also this is what will be returned to the client in the reply

:param sdk_req: sdk request in str or dict type
:param cons_time: UTC epoch at which consensus was reached
:return:
"""
# TODO: we should not reformat transaction this way
# When refactor keep in mind thought about back compatibility

if isinstance(sdk_req, dict):
data = sdk_req
elif isinstance(sdk_req, str):
data = json.loads(sdk_req)
else:
raise TypeError(
"Expected dict or str as input, but got: {}".format(type(sdk_req)))

res = {
f.IDENTIFIER.nm: data[f.IDENTIFIER.nm],
f.REQ_ID.nm: data[f.REQ_ID.nm],
f.SIG.nm: data.get(f.SIG.nm, None),
f.SIGS.nm: data.get(f.SIGS.nm, None),
TXN_TIME: cons_time or data.get(TXN_TIME)
}
res.update(data[OPERATION])
return res
7 changes: 6 additions & 1 deletion plenum/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@
ConsistencyProofsTimeout = 5

# Timeout factor after which a node starts requesting transactions
CatchupTransactionsTimeout = 5
# We assume, that making consistency proof + iterate over all transactions (getAllTxn)
# will take a little time (0.003 sec for making cp for 10 000 txns +
# 0.2 sec for getAllTxn for 10 000 txn)
# Therefore, node communication is the most cost operation
# Timeout for pool catchuping would be nodeCount * CatchupTransactionsTimeout
CatchupTransactionsTimeout = 6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it just 6 seconds?



# Log configuration
Expand Down
2 changes: 1 addition & 1 deletion plenum/test/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ def checkViewNoForNodes(nodes: Iterable[TestNode], expectedViewNo: int = None):
assert len(viewNos) == 1
vNo, = viewNos
if expectedViewNo is not None:
assert vNo == expectedViewNo, ','.join(['{} -> Ratio: {}'.format(
assert vNo >= expectedViewNo, ','.join(['{} -> Ratio: {}'.format(
node.name, node.monitor.masterThroughputRatio()) for node in nodes])
return vNo

Expand Down
4 changes: 1 addition & 3 deletions plenum/test/waits.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ def expectedPoolCatchupTime(nodeCount):
To: each of the Nodes finished the the catchup procedure
"""
config = getConfig()
nodeCatchupTimeout = __Peer2PeerRequestExchangeTime + \
config.CatchupTransactionsTimeout
return nodeCount * nodeCatchupTimeout
return nodeCount * config.CatchupTransactionsTimeout


def expectedPoolGetReadyTimeout(nodeCount):
Expand Down
99 changes: 99 additions & 0 deletions scripts/add_json_txns_to_ledger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#! /usr/bin/env python3

import os
import sys
import json
import argparse

from stp_core.types import HA
from indy_common.config_util import getConfig
from plenum.server.node import Node
from plenum.common.types import f, OPERATION
from plenum.common.constants import TXN_TIME
from indy_common.config_helper import NodeConfigHelper

config = getConfig()


def get_ha_cliha_node_name(path_to_env):
node_name_key = 'NODE_NAME'
node_port_key = 'NODE_PORT'
node_clien_port_key = 'NODE_CLIENT_PORT'
node_name = ''
node_port = ''
node_clieint_port = ''
with open(path_to_env) as fenv:
for line in fenv.readlines():
print(line)
if line.find(node_name_key) != -1:
node_name = line.split('=')[1].strip()
elif line.find(node_port_key) != -1:
node_port = int(line.split('=')[1].strip())
elif line.find(node_clien_port_key) != -1:
node_clieint_port = int(line.split('=')[1].strip())
return node_name, node_port, node_clieint_port


def sdk_reqToTxn(sdk_req, cons_time=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we duplicate it?

"""
Transform a client request such that it can be stored in the ledger.
Also this is what will be returned to the client in the reply
:param req:
:param cons_time: UTC epoch at which consensus was reached
:return:
"""
# TODO: we should not reformat transaction this way
# When refactor keep in mind thought about back compatibility

if isinstance(sdk_req, dict):
data = sdk_req
elif isinstance(sdk_req, str):
data = json.loads(sdk_req)
else:
raise TypeError(
"Expected dict or str as input, but got: {}".format(type(sdk_req)))

res = {
f.IDENTIFIER.nm: data[f.IDENTIFIER.nm],
f.REQ_ID.nm: data[f.REQ_ID.nm],
f.SIG.nm: data.get(f.SIG.nm, None),
f.SIGS.nm: data.get(f.SIGS.nm, None),
TXN_TIME: cons_time or data.get(TXN_TIME)
}
res.update(data[OPERATION])
return res


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('infpath', help="Path to previous generated txns", type=str, default='/tmp/generated_txns')
parser.add_argument('--env_file', help='Path to environment file with node name and ports', default='/etc/indy/indy.env')
args = parser.parse_args()
path_to_txns = os.path.realpath(args.infpath)
path_to_env = os.path.realpath(args.env_file)

if not os.path.exists(path_to_txns):
print("Path to txns file does not exist")
sys.exit(1)

if not os.path.exists(path_to_env):
print("Path to env file does not exist")
sys.exit(1)

nname, nport, ncliport = get_ha_cliha_node_name(path_to_env)
ha = HA("0.0.0.0", nport)
cliha = HA("0.0.0.0", ncliport)
config_helper = NodeConfigHelper(nname, config)

node = Node(nname, nodeRegistry=None,
ha=ha,
cliha=cliha,
config_helper=config_helper,
config=config)
i = 0
with open(path_to_txns) as txns:
for txn in txns:
node.domainLedger.add(json.loads(txn))
i += 1
if not i % 1000:
print("added {} txns".format(i))
122 changes: 122 additions & 0 deletions scripts/generate_txns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#! /usr/bin/env python3

import os
import json
import time
from contextlib import ExitStack
import argparse
import random
from typing import Sequence
from plenum.common.request import Request
from plenum.common.types import f, OPERATION
from plenum.common.constants import TXN_TIME, CURRENT_PROTOCOL_VERSION
from plenum.common.util import randomString
from plenum.common.config_util import getConfig
from indy.ledger import sign_request
from indy import signus, wallet
from stp_core.loop.looper import Looper

config = getConfig()


def sdk_reqToTxn(sdk_req, cons_time=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we duplicate it?

"""
Transform a client request such that it can be stored in the ledger.
Also this is what will be returned to the client in the reply
:param req:
:param cons_time: UTC epoch at which consensus was reached
:return:
"""

if isinstance(sdk_req, dict):
data = sdk_req
elif isinstance(sdk_req, str):
data = json.loads(sdk_req)
else:
raise TypeError(
"Expected dict or str as input, but got: {}".format(type(sdk_req)))

res = {
f.IDENTIFIER.nm: data[f.IDENTIFIER.nm],
f.REQ_ID.nm: data[f.REQ_ID.nm],
f.SIG.nm: data.get(f.SIG.nm, None),
f.SIGS.nm: data.get(f.SIGS.nm, None),
TXN_TIME: cons_time or data.get(TXN_TIME)
}
res.update(data[OPERATION])
return res


async def get_wallet_and_pool():
pool_name = 'pool' + randomString(3)
wallet_name = 'wallet' + randomString(10)
their_wallet_name = 'their_wallet' + randomString(10)
seed_trustee1 = "000000000000000000000000Trustee1"

await wallet.create_wallet(pool_name, wallet_name, None, None, None)
my_wallet_handle = await wallet.open_wallet(wallet_name, None, None)

await wallet.create_wallet(pool_name, their_wallet_name, None, None, None)
their_wallet_handle = await wallet.open_wallet(their_wallet_name, None, None)

await signus.create_and_store_my_did(my_wallet_handle, "{}")

(their_did, their_verkey) = await signus.create_and_store_my_did(their_wallet_handle,
json.dumps({"seed": seed_trustee1}))

await signus.store_their_did(my_wallet_handle, json.dumps({'did': their_did, 'verkey': their_verkey}))

return their_wallet_handle, their_did


def randomOperation():
return {
"type": "buy",
"amount": random.randint(10, 100000)
}


def random_requests(count):
return [randomOperation() for _ in range(count)]


def sdk_gen_request(operation, protocol_version=CURRENT_PROTOCOL_VERSION, identifier=None):
return Request(operation=operation, reqId=random.randint(10, 100000),
protocolVersion=protocol_version, identifier=identifier)


def sdk_random_request_objects(count, protocol_version, identifier=None):
ops = random_requests(count)
return [sdk_gen_request(op, protocol_version=protocol_version, identifier=identifier) for op in ops]


def sdk_sign_request_objects(looper, sdk_wallet, reqs: Sequence):
wallet_h, did = sdk_wallet
reqs_str = [json.dumps(req.as_dict) for req in reqs]
resp = [looper.loop.run_until_complete(sign_request(wallet_h, did, req)) for req in reqs_str]
return resp


def sdk_signed_random_requests(looper, sdk_wallet, count):
_, did = sdk_wallet
reqs_obj = sdk_random_request_objects(count, identifier=did, protocol_version=CURRENT_PROTOCOL_VERSION)
return sdk_sign_request_objects(looper, sdk_wallet, reqs_obj)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('count', help="Count of generated txns", type=int)
parser.add_argument('outfpath', help="Path to save generated txns", type=str, default='/tmp/generated_txns')
args = parser.parse_args()
path_to_save = os.path.realpath(args.outfpath)

with ExitStack() as exit_stack:
with Looper() as looper:
sdk_wallet, did = looper.loop.run_until_complete(get_wallet_and_pool())
with open(path_to_save, 'w') as outpath:
for _ in range(args.count):
req = sdk_signed_random_requests(looper, (sdk_wallet, did), 1)[0]
txn = sdk_reqToTxn(req, int(time.time()))
outpath.write(json.dumps(txn))
outpath.write(os.linesep)
looper.stopall()