Skip to content

Commit

Permalink
INDY-1113: Node disconnection, view change tests (hyperledger#551)
Browse files Browse the repository at this point in the history
* INDY-1113: Node disconnection, view change tests

- Corrected the procedures of node disconnection and reconnection in tests.
- Removed a workaround patch which had become unnecessary from start_stopped_node function in tests.
- Corrected existing tests according to the fixes specified above.
- Added a test verifying that a view change is restarted each time when the previous one is timed out.
- Added a test verifying that a disconnected node with a lagged view accepts the current view from the other nodes on re-connection.

Signed-off-by: Nikita Spivachuk <[email protected]>

* INDY-1113: Return not strict view no comparison

- Returned back the not strict view number comparison in checkViewNoForNodes function in tests since the strict comparison may cause failures of some tests.
- Corrected parameter lists of some test helper functions.

Signed-off-by: Nikita Spivachuk <[email protected]>

* INDY-1113: Fixed error with future

- Fixed the cause of a potential error with a not retrieved exception of a future in sdk_get_replies function.
- Corrected test helpers for Indy SDK.

Signed-off-by: Nikita Spivachuk <[email protected]>

* INDY-1113: Avoided test hang-ups

- Added a workaround to avoid hang-ups of test_6_nodes_pool_cannot_reach_quorum_with_2_disconnected.

Signed-off-by: Nikita Spivachuk <[email protected]>

* INDY-1113: Corrected a test

Signed-off-by: Nikita Spivachuk <[email protected]>
  • Loading branch information
spivachuk authored and Andrew Nikitin committed Apr 10, 2018
1 parent c70a322 commit 7ac303a
Show file tree
Hide file tree
Showing 19 changed files with 305 additions and 129 deletions.
20 changes: 8 additions & 12 deletions plenum/test/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,11 +635,11 @@ def checkViewNoForNodes(nodes: Iterable[TestNode], expectedViewNo: int = None):
for node in nodes:
logger.debug("{}'s view no is {}".format(node, node.viewNo))
viewNos.add(node.viewNo)
assert len(viewNos) == 1
assert len(viewNos) == 1, 'Expected 1, but got {}'.format(len(viewNos))
vNo, = viewNos
if expectedViewNo is not None:
assert vNo >= expectedViewNo, ','.join(['{} -> Ratio: {}'.format(
node.name, node.monitor.masterThroughputRatio()) for node in nodes])
assert vNo >= expectedViewNo, \
'Expected at least {}, but got {}'.format(expectedViewNo, vNo)
return vNo


Expand Down Expand Up @@ -1023,8 +1023,6 @@ def sdk_get_reply(looper, sdk_req_resp, timeout=None):
try:
resp = looper.run(asyncio.wait_for(resp_task, timeout=timeout))
resp = json.loads(resp)
except asyncio.TimeoutError:
resp = None
except IndyError as e:
resp = e.error_code

Expand All @@ -1047,19 +1045,17 @@ def get_res(task, done_list):
resp = None
return resp

done, pend = looper.run(asyncio.wait(resp_tasks, timeout=timeout))
if pend:
raise AssertionError("{} transactions are still pending. Timeout: {}."
.format(len(pend), timeout))
done, pending = looper.run(asyncio.wait(resp_tasks, timeout=timeout))
if pending:
for task in pending:
task.cancel()
raise TimeoutError("{} requests timed out".format(len(pending)))
ret = [(req, get_res(resp, done)) for req, resp in sdk_req_resp]
return ret


def sdk_check_reply(req_res):
req, res = req_res
if res is None:
raise AssertionError("Got no confirmed result for request {}"
.format(req))
if isinstance(res, ErrorCode):
raise AssertionError("Got an error with code {} for request {}"
.format(res, req))
Expand Down
2 changes: 0 additions & 2 deletions plenum/test/node_catchup/test_config_ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ def test_disconnected_node_catchup_config_ledger_txns(looper,
new_node = newNodeCaughtUp
disconnect_node_and_ensure_disconnected(
looper, txnPoolNodeSet, new_node, stopNode=False)
looper.removeProdable(new_node)

# Do some config txns; using a fixture as a method, passing some arguments
# as None as they only make sense for the fixture (pre-requisites)
Expand All @@ -144,6 +143,5 @@ def test_disconnected_node_catchup_config_ledger_txns(looper,
# Make sure new node got out of sync
waitNodeDataInequality(looper, new_node, *txnPoolNodeSet[:-1])

looper.add(new_node)
reconnect_node_and_ensure_connected(looper, txnPoolNodeSet, new_node)
waitNodeDataEquality(looper, new_node, *txnPoolNodeSet[:-1])
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def testNodeCatchupAfterDisconnect(newNodeCaughtUp, txnPoolNodeSet,
format(newNode, newNode.poolManager.txnSeqNo))
disconnect_node_and_ensure_disconnected(
looper, txnPoolNodeSet, newNode, stopNode=False)
looper.removeProdable(newNode)

# TODO: Check if the node has really stopped processing requests?
logger.debug("Sending requests")
Expand All @@ -36,7 +35,6 @@ def testNodeCatchupAfterDisconnect(newNodeCaughtUp, txnPoolNodeSet,
waitNodeDataInequality(looper, newNode, *txnPoolNodeSet[:-1])

logger.debug("Starting the stopped node, {}".format(newNode))
looper.add(newNode)
reconnect_node_and_ensure_connected(looper, txnPoolNodeSet, newNode)

logger.debug("Waiting for the node to catch up, {}".format(newNode))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from plenum.test.pool_transactions.helper import \
disconnect_node_and_ensure_disconnected
disconnect_node_and_ensure_disconnected, \
reconnect_node_and_ensure_connected
from plenum.test.test_node import ensure_node_disconnected
from stp_core.common.log import getlogger
from plenum.test.helper import sendReqsToNodesAndVerifySuffReplies
Expand Down Expand Up @@ -28,7 +29,6 @@ def testNodeCatchupAfterLostConnection(newNodeCaughtUp, txnPoolNodeSet,
format(newNode, newNode.domainLedger.size))
disconnect_node_and_ensure_disconnected(looper, txnPoolNodeSet, newNode,
stopNode=False)
looper.removeProdable(newNode)

# TODO: Check if the node has really stopped processing requests?
logger.debug("Sending requests")
Expand All @@ -41,7 +41,7 @@ def testNodeCatchupAfterLostConnection(newNodeCaughtUp, txnPoolNodeSet,

logger.debug("Connecting the node {} back, ledger size {}".
format(newNode, newNode.domainLedger.size))
looper.add(newNode)
reconnect_node_and_ensure_connected(looper, txnPoolNodeSet, newNode)

logger.debug("Waiting for the node to catch up, {}".format(newNode))
waitNodeDataEquality(looper, newNode, *txnPoolNodeSet[:-1])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ def test_node_catchup_causes_no_desync(looper, txnPoolNodeSet, client1,
txnPoolNodeSet,
lagging_node,
stopNode=False)
looper.removeProdable(lagging_node)
sendReqsToNodesAndVerifySuffReplies(looper, wallet, client, 5)
looper.add(lagging_node)
reconnect_node_and_ensure_connected(looper, txnPoolNodeSet, lagging_node)

# Check that catchup done
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ def test_node_requests_missing_three_phase_messages(looper, txnPoolNodeSet,
INIT_REQS_CNT)
init_ledger_size = txnPoolNodeSet[0].domainLedger.size

current_node_set = set(txnPoolNodeSet)
for node in disconnected_nodes:
disconnect_node_and_ensure_disconnected(looper, txnPoolNodeSet, node, stopNode=False)
disconnect_node_and_ensure_disconnected(looper, current_node_set, node, stopNode=False)
current_node_set.remove(node)

sdk_send_random_requests(looper, sdk_pool_handle, sdk_wallet_client, MISSING_REQS_CNT)

Expand All @@ -60,7 +62,8 @@ def get_last_pp(node):
len(txnPoolNodeSet))))

for node in disconnected_nodes:
reconnect_node_and_ensure_connected(looper, txnPoolNodeSet, node)
current_node_set.add(node)
reconnect_node_and_ensure_connected(looper, current_node_set, node)
sdk_send_random_and_check(looper,
txnPoolNodeSet,
sdk_pool_handle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,13 @@ def test_node_requests_missing_three_phase_messages_after_long_disconnection(loo
waitNodeDataEquality(looper, disconnected_nodes[0], *txnPoolNodeSet)
init_ledger_size = txnPoolNodeSet[0].domainLedger.size

current_node_set = set(txnPoolNodeSet)
for node in disconnected_nodes:
disconnect_node_and_ensure_disconnected(looper,
txnPoolNodeSet,
current_node_set,
node,
stopNode=False)
looper.removeProdable(node)
current_node_set.remove(node)

sdk_send_random_requests(looper,
sdk_pool_handle,
Expand Down Expand Up @@ -88,9 +89,8 @@ def get_last_pp(node):
time.sleep(preprepare_deviation * 2)

for node in disconnected_nodes:
looper.add(node)
for node in disconnected_nodes:
reconnect_node_and_ensure_connected(looper, txnPoolNodeSet, node)
current_node_set.add(node)
reconnect_node_and_ensure_connected(looper, current_node_set, node)

sdk_send_random_and_check(looper,
txnPoolNodeSet,
Expand Down
37 changes: 21 additions & 16 deletions plenum/test/node_request/test_quorum_disconnected.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import json

from plenum.test.node_request.helper import nodes_by_rank
from plenum.test.pool_transactions.helper import disconnect_node_and_ensure_disconnected
from stp_core.common.util import adict
from plenum.test.pool_transactions.helper import \
disconnect_node_and_ensure_disconnected, \
reconnect_node_and_ensure_connected
from plenum.test.helper import check_request_is_not_returned_to_nodes, \
sdk_send_and_check, sdk_json_to_request_object
from plenum.test.helper import sdk_signed_random_requests
Expand All @@ -15,27 +16,31 @@
whitelist = ['InvalidSignature']


def stop_nodes(looper, txnPoolNodeSet):
def test_6_nodes_pool_cannot_reach_quorum_with_2_disconnected(
txnPoolNodeSet, looper, sdk_pool_handle,
sdk_wallet_client):
'''
Check that we can not reach consensus when more than n-f nodes
are disconnected: disconnect 2 of 6 nodes
'''
faulties = nodes_by_rank(txnPoolNodeSet)[-faultyNodes:]

current_node_set = set(txnPoolNodeSet)
for node in faulties:
for r in node.replicas:
assert not r.isPrimary
disconnect_node_and_ensure_disconnected(
looper, txnPoolNodeSet, node, stopNode=False)
looper.removeProdable(node)
return adict(faulties=faulties)

looper, current_node_set, node, stopNode=False)
current_node_set.remove(node)

def test_6_nodes_pool_cannot_reach_quorum_with_2_disconnected(
txnPoolNodeSet, looper, sdk_pool_handle,
sdk_wallet_client):
'''
Check that we can not reach consensus when more than n-f nodes are disconnected:
disconnect 2 of 6 nodes
'''
stop_nodes(looper, txnPoolNodeSet)
reqs = sdk_signed_random_requests(looper, sdk_wallet_client, 1)
with pytest.raises(AssertionError):
with pytest.raises(TimeoutError):
sdk_send_and_check(reqs, looper, txnPoolNodeSet, sdk_pool_handle)
check_request_is_not_returned_to_nodes(
txnPoolNodeSet, sdk_json_to_request_object(json.loads(reqs[0])))

# The following reconnection of nodes is needed in this test to avoid
# pytest process hangup
for node in faulties:
current_node_set.add(node)
reconnect_node_and_ensure_connected(looper, current_node_set, node)
2 changes: 1 addition & 1 deletion plenum/test/node_request/test_quorum_faulty.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_6_nodes_pool_cannot_reach_quorum_with_2_faulty(afterElection, looper,
txnPoolNodeSet, prepared1,
sdk_wallet_client, sdk_pool_handle):
reqs = sdk_signed_random_requests(looper, sdk_wallet_client, 1)
with pytest.raises(AssertionError):
with pytest.raises(TimeoutError):
sdk_send_and_check(reqs, looper, txnPoolNodeSet, sdk_pool_handle)
check_request_is_not_returned_to_nodes(
txnPoolNodeSet, sdk_json_to_request_object(json.loads(reqs[0])))
2 changes: 0 additions & 2 deletions plenum/test/plugin/demo_plugin/test_catchup.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,12 @@ def test_disconnected_node_catchup_plugin_ledger_txns(looper,
new_node = newNodeCaughtUp
disconnect_node_and_ensure_disconnected(
looper, txnPoolNodeSet, new_node, stopNode=False)
looper.removeProdable(new_node)

# Do some demo txns;
some_demo_txns(looper, sdk_wallet_client, sdk_pool_handle)

# Make sure new node got out of sync
waitNodeDataInequality(looper, new_node, *txnPoolNodeSet[:-1])

looper.add(new_node)
reconnect_node_and_ensure_connected(looper, txnPoolNodeSet, new_node)
waitNodeDataEquality(looper, new_node, *txnPoolNodeSet[:-1])
54 changes: 44 additions & 10 deletions plenum/test/pool_transactions/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from indy.pool import refresh_pool_ledger
from plenum.test.node_catchup.helper import waitNodeDataEquality, \
ensureClientConnectedToNodesAndPoolLedgerSame
from stp_core.loop.looper import Looper
from stp_core.types import HA
from typing import Iterable, Union, Callable

Expand Down Expand Up @@ -543,7 +544,8 @@ def new_client(looper, poolTxnClientData, txnPoolNodeSet, client_tdir):


def disconnectPoolNode(poolNodes: Iterable,
disconnect: Union[str, TestNode], stopNode=True):
disconnect: Union[str, TestNode],
stopNode=True):
if isinstance(disconnect, TestNode):
disconnect = disconnect.name
assert isinstance(disconnect, str)
Expand All @@ -552,43 +554,75 @@ def disconnectPoolNode(poolNodes: Iterable,
if node.name == disconnect:
if stopNode:
node.stop()
else:
else:
node.clientstack.close()
node.nodestack.close()
break
else:
raise AssertionError('The node {} which should be disconnected '
'is not found in the passed pool node list {}'
.format(disconnect, poolNodes))

for node in poolNodes:
if node.name != disconnect:
node.nodestack.disconnectByName(disconnect)


def reconnectPoolNode(poolNodes: Iterable,
connect: Union[str, TestNode], looper):
def reconnectPoolNode(looper: Looper,
poolNodes: Iterable,
connect: Union[str, TestNode]):
if isinstance(connect, TestNode):
connect = connect.name
assert isinstance(connect, str)

for node in poolNodes:
if node.name == connect:
node.start(looper)
else:
if node.isGoing():
node.nodestack.open()
node.clientstack.open()
node.nodestack.maintainConnections(force=True)
else:
node.start(looper)
break
else:
raise AssertionError('The node {} which should be reconnected '
'is not found in the passed pool node list {}'
.format(connect, poolNodes))

for node in poolNodes:
if node.name != connect:
node.nodestack.reconnectRemoteWithName(connect)


def disconnect_node_and_ensure_disconnected(looper, poolNodes,
def disconnect_node_and_ensure_disconnected(looper: Looper,
poolNodes: Iterable[TestNode],
disconnect: Union[str, TestNode],
timeout=None,
stopNode=True):
if isinstance(disconnect, TestNode):
disconnect = disconnect.name
assert isinstance(disconnect, str)

matches = [n for n in poolNodes if n.name == disconnect]
assert len(matches) == 1
node_to_disconnect = matches[0]

disconnectPoolNode(poolNodes, disconnect, stopNode=stopNode)
ensure_node_disconnected(looper, disconnect, poolNodes,
ensure_node_disconnected(looper,
node_to_disconnect,
set(poolNodes) - {node_to_disconnect},
timeout=timeout)


def reconnect_node_and_ensure_connected(looper, poolNodes,
def reconnect_node_and_ensure_connected(looper: Looper,
poolNodes: Iterable[TestNode],
connect: Union[str, TestNode],
timeout=None):
if isinstance(connect, TestNode):
connect = connect.name
assert isinstance(connect, str)

reconnectPoolNode(poolNodes, connect, looper)
reconnectPoolNode(looper, poolNodes, connect)
looper.run(checkNodesConnected(poolNodes, customTimeout=timeout))


Expand Down
2 changes: 1 addition & 1 deletion plenum/test/pool_transactions/test_z_node_key_changed.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def testNodeKeysChanged(looper, txnPoolNodeSet, tdir,
# stopped
txnPoolNodeSet[-1] = node

looper.run(checkNodesConnected(stacks=txnPoolNodeSet))
looper.run(checkNodesConnected(txnPoolNodeSet))
waitNodeDataEquality(looper, node, *txnPoolNodeSet[:-1])
ensureClientConnectedToNodesAndPoolLedgerSame(looper, steward1,
*txnPoolNodeSet)
Expand Down
Loading

0 comments on commit 7ac303a

Please sign in to comment.