diff --git a/src/tribler/core/components/ipv8/eva_protocol.py b/src/tribler/core/components/ipv8/eva_protocol.py index 37df2c7e6f0..91e899340aa 100644 --- a/src/tribler/core/components/ipv8/eva_protocol.py +++ b/src/tribler/core/components/ipv8/eva_protocol.py @@ -1,32 +1,38 @@ -# EVA protocol: a protocol for transferring big binary data over ipv8. -# -# Limitations and other useful information described in the corresponding class. -# Example of use: -# -# class MyCommunity(EVAProtocolMixin, Community): -# community_id = os.urandom(20) -# -# def __init__(self, *args, **kwargs): -# super().__init__(*args, **kwargs) -# self.eva_init() -# -# self.eva_register_receive_callback(self.on_receive) -# self.eva_register_send_complete_callback(self.on_send_complete) -# self.eva_register_error_callback(self.on_error) -# -# def my_function(self, peer): -# self.eva_send_binary(peer, b'info1', b'data1') -# self.eva_send_binary(peer, b'info2', b'data2') -# self.eva_send_binary(peer, b'info3', b'data3') -# -# def on_receive(self, peer, binary_info, binary_data, nonce): -# logger.info(f'Data has been received: {binary_info}') -# -# def on_send_complete(self, peer, binary_info, binary_data, nonce): -# logger.info(f'Transfer has been completed: {binary_info}') -# -# def on_error(self, peer, exception): -# logger.error(f'Error has been occurred: {exception}') +"""EVA protocol: a protocol for transferring big binary data over ipv8. + + Limitations and other useful information described in the corresponding class. + An example of use: + + >>> import os + >>> from ipv8.community import Community + >>> class MyCommunity(EVAProtocolMixin, Community): + ... community_id = os.urandom(20) + ... + >>> def __init__(self, *args, **kwargs): + ... super().__init__(*args, **kwargs) + ... self.eva_init() + ... + ... self.eva.register_receive_callback(self.on_receive) + ... self.eva.register_send_complete_callback(self.on_send_complete) + ... self.eva.register_error_callback(self.on_error) + ... + >>> async def my_function(self, peer): + ... await self.eva.send_binary(peer, b'info1', b'data1') + ... await self.eva.send_binary(peer, b'info2', b'data2') + ... await self.eva.send_binary(peer, b'info3', b'data3') + ... + >>> async def on_receive(self, result): + ... self.logger.info(f'Data has been received: {result}') + ... + >>> async def on_send_complete(self, result): + ... self.logger.info(f'Transfer has been completed: {result}') + ... + >>> async def on_error(self, peer, exception): + ... self.logger.error(f'Error has been occurred: {exception}') + +""" +__version__ = '2.0.0' + import asyncio import logging import math @@ -36,7 +42,7 @@ from dataclasses import dataclass from enum import Enum, auto from random import randint -from typing import Dict, Optional +from typing import Awaitable, Callable, Dict, Optional, Type from ipv8.lazy_community import lazy_wrapper from ipv8.messaging.lazy_payload import VariablePayload, vp_compile @@ -74,66 +80,24 @@ class Error(VariablePayload): class EVAProtocolMixin: """This mixin makes it possible to transfer big binary data over ipv8. - The protocol based on TFTP with windowsize (RFC 7440). - Features: - * timeout - * retransmit - * dynamic window size + The protocol based on TFTP with windowsize (RFC 7440). + Features: + * timeout + * retransmit + * dynamic window size - The maximum data size that can be transferred through the protocol can be - calculated as "block_size * 4294967295" where 4294967295 is the max segment - number (4B unsigned int). + The maximum data size that can be transferred through the protocol can be + calculated as "block_size * 4294967295" where 4294967295 is the max segment + number (4B unsigned int). """ - def eva_init( # pylint: disable=too-many-arguments - self, - block_size=1000, - window_size_in_blocks=16, - start_message_id=186, - retransmit_interval_in_sec=3, - retransmit_attempt_count=3, - timeout_interval_in_sec=10, - binary_size_limit=1024 * 1024 * 1024, - terminate_by_timeout_enabled=True, - max_simultaneous_transfers=10, - ): + def eva_init(self, start_message_id: int = 186, **kwargs): """Init should be called manually within his parent class. - - Args: - block_size: a single block size in bytes. Please keep in mind that - ipv8 adds approx. 177 bytes to each packet. - window_size_in_blocks: size of consecutive blocks to send - start_message_id: a started id that will be used to assigning - protocol's messages ids - retransmit_interval_in_sec: an interval until the next attempt - to retransmit will be made - retransmit_attempt_count: a limit for retransmit attempts - timeout_interval_in_sec: an interval after which the transfer will - be considered as "dead" and will be terminated - binary_size_limit: limit for binary data size. If this limit will be - exceeded, the exception will be returned through a registered - error handler - terminate_by_timeout_enabled: the flag indicating is termination-by-timeout - mechanism enabled or not - max_simultaneous_transfers: an upper limit of simultaneously served peers. - The reason for introducing this parameter is to have a tool for - limiting socket load which could lead to packet loss. + For the arguments description see `EVAProtocol` class. """ + self.eva = EVAProtocol(community=self, **kwargs) self.last_message_id = start_message_id - self.eva_messages = dict() - - self.eva_protocol = EVAProtocol( - community=self, - block_size=block_size, - window_size_in_blocks=window_size_in_blocks, - retransmit_interval_in_sec=retransmit_interval_in_sec, - retransmit_attempt_count=retransmit_attempt_count, - scheduled_send_interval_in_sec=5, - timeout_interval_in_sec=timeout_interval_in_sec, - binary_size_limit=binary_size_limit, - terminate_by_timeout_enabled=terminate_by_timeout_enabled, - max_simultaneous_transfers=max_simultaneous_transfers, - ) + self.eva_messages: Dict[Type[VariablePayload], int] = {} # note: # The order in which _eva_register_message_handler is called defines @@ -143,93 +107,26 @@ def eva_init( # pylint: disable=too-many-arguments self._eva_register_message_handler(Data, self.on_eva_data) self._eva_register_message_handler(Error, self.on_eva_error) - def eva_send_binary(self, peer: Peer, info: bytes, data: bytes, nonce: int = None) -> Future: - """Send a big binary data. - - Due to ipv8 specifics, we can use only one socket port per one peer. - Therefore, at one point in time, the protocol can only transmit one particular - piece of data for one particular peer. - - In case "eva_send_binary" is invoked multiply times for a single peer, the data - transfer will be scheduled and performed when the current sending session is finished. - - An example: - - self.eva_send_binary(peer, b'binary_data0', b'binary_info0') - self.eva_send_binary(peer, b'binary_data1', b'binary_info1') - self.eva_send_binary(peer, b'binary_data2', b'binary_info2') - - Args: - peer: the target peer - info: a binary info, limited by bytes - data: binary data that will be sent to the target. - It is limited by several GB, but the protocol is slow by design, so - try to send less rather than more. - nonce: a unique number for identifying the session. If not specified, generated randomly - """ - return self.eva_protocol.send_binary(peer, info, data, nonce) - - def eva_register_receive_callback(self, callback): - """Register callback that will be invoked when a data receiving is complete. - - An example: - - def on_receive(peer, info, data, nonce): - pass - - self.eva_register_receive_callback(on_receive) - """ - self.eva_protocol.receive_callbacks.add(callback) - - def eva_register_send_complete_callback(self, callback): - """Register callback that will be invoked when a data sending is complete. - - An example: - - def on_send_complete(peer, info, data, nonce): - pass - - self.eva_register_send_complete_callback(on_receive) - """ - self.eva_protocol.send_complete_callbacks.add(callback) - - def eva_register_error_callback(self, callback): - """Register callback that will be invoked in case of an error. - - An example: - - def on_error(self, peer, exception): - pass - - self.eva_register_error_callback(on_error) - """ - self.eva_protocol.error_callbacks.add(callback) - - def eva_send_message(self, peer, message): + def eva_send_message(self, peer: Peer, message: VariablePayload): self.endpoint.send(peer.address, self.ezr_pack(self.eva_messages[type(message)], message)) - def eva_shutdown(self): - """ This method terminates all current transfers - """ - self.eva_protocol.shutdown() - @lazy_wrapper(WriteRequest) - async def on_eva_write_request(self, peer, payload): - await self.eva_protocol.on_write_request(peer, payload) + async def on_eva_write_request(self, peer: Peer, payload: WriteRequest): + await self.eva.on_write_request(peer, payload) @lazy_wrapper(Acknowledgement) - async def on_eva_acknowledgement(self, peer, payload): - await self.eva_protocol.on_acknowledgement(peer, payload) + async def on_eva_acknowledgement(self, peer: Peer, payload: Acknowledgement): + await self.eva.on_acknowledgement(peer, payload) @lazy_wrapper(Data) - async def on_eva_data(self, peer, payload): - await self.eva_protocol.on_data(peer, payload) + async def on_eva_data(self, peer: Peer, payload: Data): + await self.eva.on_data(peer, payload) @lazy_wrapper(Error) - async def on_eva_error(self, peer, payload): - await self.eva_protocol.on_error(peer, payload) + async def on_eva_error(self, peer: Peer, payload: Error): + await self.eva.on_error(peer, payload) - def _eva_register_message_handler(self, message_class, handler): + def _eva_register_message_handler(self, message_class: Type[VariablePayload], handler: Callable): self.add_message_handler(self.last_message_id, handler) self.eva_messages[message_class] = self.last_message_id self.last_message_id += 1 @@ -300,24 +197,24 @@ def terminate(self, result: Optional[TransferResult] = None, exception: Optional self.terminated = True def _terminate_with_result(self, result: TransferResult): - if self.future: - self.future.set_result(result) - callbacks = self.protocol.receive_callbacks if self.type == TransferType.INCOMING \ else self.protocol.send_complete_callbacks for callback in callbacks: - callback(result) + asyncio.create_task(callback(result)) + + if self.future: + self.future.set_result(result) def _terminate_with_exception(self, exception: Exception): logger.warning(f'Peer hash: {self.peer}: "{exception.__class__.__name__}: {exception}".') + for callback in self.protocol.error_callbacks: + asyncio.create_task(callback(self.peer, exception)) + if self.future: self.future.set_exception(exception) - for callback in self.protocol.error_callbacks: - callback(self.peer, exception) - def __str__(self): return ( f'Type: {self.type}. Info: {self.info}. Block: {self.block_number}({self.block_count}). ' @@ -365,6 +262,28 @@ def __init__( # pylint: disable=too-many-arguments terminate_by_timeout_enabled=True, max_simultaneous_transfers=10 ): + """Init should be called manually within his parent class. + + Args: + block_size: a single block size in bytes. Please keep in mind that + ipv8 adds approx. 177 bytes to each packet. + window_size_in_blocks: size of consecutive blocks to send + start_message_id: a started id that will be used to assigning + protocol's messages ids + retransmit_interval_in_sec: an interval until the next attempt + to retransmit will be made + retransmit_attempt_count: a limit for retransmit attempts + timeout_interval_in_sec: an interval after which the transfer will + be considered as "dead" and will be terminated + binary_size_limit: limit for binary data size. If this limit will be + exceeded, the exception will be returned through a registered + error handler + terminate_by_timeout_enabled: the flag indicating is termination-by-timeout + mechanism enabled or not + max_simultaneous_transfers: an upper limit of simultaneously served peers. + The reason for introducing this parameter is to have a tool for + limiting socket load which could lead to packet loss. + """ self.community = community self.scheduled = defaultdict(deque) @@ -397,7 +316,36 @@ def __init__( # pylint: disable=too-many-arguments f'Binary size limit: {binary_size_limit}.' ) - def send_binary(self, peer: Peer, info: bytes, data: bytes, nonce: int = None) -> Future: + def send_binary(self, peer: Peer, info: bytes, data: bytes, nonce: Optional[int] = None) -> Future: + """Send a big binary data. + + Due to ipv8 specifics, we can use only one socket port per one peer. + Therefore, at one point in time, the protocol can only transmit one particular + piece of data for one particular peer. + + In case "eva_send_binary" is invoked multiply times for a single peer, the data + transfer will be scheduled and performed when the current sending session is finished. + + An example: + >>> from ipv8.community import Community + >>> class MyCommunity(EVAProtocolMixin, Community) + >>> def __init__(self, *args, **kwargs): + ... super().__init__(*args, **kwargs) + ... self.eva_init() + ... + >>> async def my_function(self, peer): + ... await self.eva.send_binary(peer, b'binary_data0', b'binary_info0') + ... await self.eva.send_binary(peer, b'binary_data1', b'binary_info1') + ... await self.eva.send_binary(peer, b'binary_data2', b'binary_info2') + + Args: + peer: the target peer + info: a binary info, limited by bytes + data: binary data that will be sent to the target. + It is limited by several GB, but the protocol is slow by design, so + try to send less rather than more. + nonce: a unique number for identifying the session. If not specified, generated randomly + """ if not data: raise ValueException('The empty data binary passed') @@ -428,6 +376,61 @@ def send_binary(self, peer: Peer, info: bytes, data: bytes, nonce: int = None) - self.start_outgoing_transfer(transfer) return transfer.future + def register_receive_callback(self, callback: Callable[[TransferResult], Awaitable[None]]): + """Register callback that will be invoked when a data receiving is complete. + + An example: + >>> import os + >>> from ipv8.community import Community + >>> class MyCommunity(EVAProtocolMixin, Community): + >>> def __init__(self, *args, **kwargs): + ... super().__init__(*args, **kwargs) + ... self.eva_init() + ... self.eva.register_receive_callback(self.on_receive) + ... + >>> async def on_receive(self, result): + ... self.logger.info(f'Data has been received: {result}') + + """ + self.receive_callbacks.add(callback) + + def register_send_complete_callback(self, callback: Callable[[TransferResult], Awaitable[None]]): + """Register callback that will be invoked when a data sending is complete. + + An example: + >>> import os + >>> from ipv8.community import Community + >>> class MyCommunity(EVAProtocolMixin, Community): + >>> def __init__(self, *args, **kwargs): + ... super().__init__(*args, **kwargs) + ... self.eva_init() + ... self.eva.register_send_complete_callback(self.on_send_complete) + ... + >>> async def on_send_complete(self, result): + ... self.logger.info(f'Transfer has been completed: {result}') + + """ + self.send_complete_callbacks.add(callback) + + def register_error_callback(self, callback: Callable[[Peer, TransferException], Awaitable[None]]): + """Register callback that will be invoked in case of an error. + + An example: + + >>> import os + >>> from ipv8.community import Community + >>> class MyCommunity(EVAProtocolMixin, Community): + >>> def __init__(self, *args, **kwargs): + ... super().__init__(*args, **kwargs) + ... self.eva_init() + ... self.eva.register_error_callback(self.on_error) + ... + >>> async def on_error(self, peer, exception): + ... self.logger.error(f'Error has been occurred: {exception}') + + """ + self.error_callbacks.add(callback) + def start_outgoing_transfer(self, transfer: Transfer): self.outgoing[transfer.peer] = transfer @@ -590,6 +593,8 @@ def send_scheduled(self): self.start_outgoing_transfer(transfer) def shutdown(self): + """ This method terminates all current transfers + """ logger.info('Shutting down...') transfers = list(self.incoming.values()) + list(self.outgoing.values()) for transfer in transfers: diff --git a/src/tribler/core/components/ipv8/tests/test_eva_protocol.py b/src/tribler/core/components/ipv8/tests/test_eva_protocol.py index e312c6651b3..4009a4c77e1 100644 --- a/src/tribler/core/components/ipv8/tests/test_eva_protocol.py +++ b/src/tribler/core/components/ipv8/tests/test_eva_protocol.py @@ -64,19 +64,19 @@ def __init__(self, *args, **kwargs): terminate_by_timeout_enabled=False # by default disable the termination ) - self.eva_register_receive_callback(self.on_receive) - self.eva_register_send_complete_callback(self.on_send_complete) - self.eva_register_error_callback(self.on_error) + self.eva.register_receive_callback(self.on_receive) + self.eva.register_send_complete_callback(self.on_send_complete) + self.eva.register_error_callback(self.on_error) - def on_receive(self, result: TransferResult): + async def on_receive(self, result: TransferResult): self.most_recent_received_data = result.info, result.data, result.nonce self.received_data[result.peer].append(self.most_recent_received_data) - def on_send_complete(self, result: TransferResult): + async def on_send_complete(self, result: TransferResult): self.most_recent_sent_data = result.info, result.data, result.nonce self.sent_data[result.peer].append(self.most_recent_sent_data) - def on_error(self, _, exception): + async def on_error(self, _, exception): self.most_recent_received_exception = exception @@ -110,7 +110,7 @@ async def test_one_block_binary(self): # could be send as a single packet. data = (b'test1', b'1234', 42) - await self.alice.eva_send_binary(self.bob.my_peer, *data) + await self.alice.eva.send_binary(self.bob.my_peer, *data) assert self.alice.most_recent_sent_data == data assert self.bob.most_recent_received_data == data @@ -119,7 +119,7 @@ async def test_self_send(self): # In this test we send a single transfer from Alice to Alice. # `ValueException` should be raised. with pytest.raises(ValueException): - await self.alice.eva_send_binary(self.alice.my_peer, b'test1', b'1234') + await self.alice.eva.send_binary(self.alice.my_peer, b'test1', b'1234') async def test_two_blocks_binary(self): # In this test we send a single transfer from Alice to Bob. @@ -127,7 +127,7 @@ async def test_two_blocks_binary(self): # could be send as a two packets. data = b'test2', b'4321', 42 self.alice.block_size = 2 - await self.alice.eva_send_binary(self.bob.my_peer, *data) + await self.alice.eva.send_binary(self.bob.my_peer, *data) assert self.alice.most_recent_sent_data == data assert self.bob.most_recent_received_data == data @@ -137,7 +137,7 @@ async def test_zero_transfer(self): # The transfer size is equal to zero and the transfer attempt should # lead to `ValueException`. with pytest.raises(ValueException): - await self.alice.eva_send_binary(self.bob.my_peer, b'', b'') + await self.alice.eva.send_binary(self.bob.my_peer, b'', b'') @pytest.mark.timeout(PYTEST_TIMEOUT_IN_SEC) async def test_one_megabyte_transfer(self): @@ -145,7 +145,7 @@ async def test_one_megabyte_transfer(self): data_size = 1024 * 1024 data = os.urandom(1), os.urandom(data_size), random.randrange(0, 256) - await self.alice.eva_send_binary(self.bob.my_peer, *data) + await self.alice.eva.send_binary(self.bob.my_peer, *data) assert self.alice.most_recent_sent_data == data assert self.bob.most_recent_received_data == data @@ -161,18 +161,18 @@ async def test_termination_by_timeout(self): # After a failed sending attempt from Alice to Bob we should see that both # instances had terminated their transfers by timeout. for participant in [self.alice, self.bob]: - participant.eva_protocol.terminate_by_timeout_enabled = True + participant.eva.terminate_by_timeout_enabled = True - self.bob.eva_protocol.timeout_interval_in_sec = TEST_DEFAULT_TERMINATE_INTERVAL_IN_SEC / 2 + self.bob.eva.timeout_interval_in_sec = TEST_DEFAULT_TERMINATE_INTERVAL_IN_SEC / 2 # replace `on_data` function to make this community silent - self.bob.eva_protocol.on_data = AsyncMock() + self.bob.eva.on_data = AsyncMock() with pytest.raises(TimeoutException): - await self.alice.eva_send_binary(self.bob.my_peer, b'info', b'data') + await self.alice.eva.send_binary(self.bob.my_peer, b'info', b'data') - assert len(self.alice.eva_protocol.outgoing) == 0 - assert len(self.bob.eva_protocol.incoming) == 0 + assert len(self.alice.eva.outgoing) == 0 + assert len(self.bob.eva.incoming) == 0 assert isinstance(self.bob.most_recent_received_exception, TimeoutException) @@ -184,16 +184,16 @@ async def test_retransmit_enabled(self): # EVA should make `retransmit_attempt_count + 1` failed attempts to # send an acknowledgement. - self.alice.eva_protocol.terminate_by_timeout_enabled = True - self.bob.eva_protocol.retransmit_interval_in_sec = 0 + self.alice.eva.terminate_by_timeout_enabled = True + self.bob.eva.retransmit_interval_in_sec = 0 - self.bob.eva_protocol.send_acknowledgement = Mock() + self.bob.eva.send_acknowledgement = Mock() with pytest.raises(TimeoutException): - await self.alice.eva_send_binary(self.bob.my_peer, b'info', b'data') + await self.alice.eva.send_binary(self.bob.my_peer, b'info', b'data') - expected = self.bob.eva_protocol.retransmit_attempt_count + 1 - assert self.bob.eva_protocol.send_acknowledgement.call_count == expected + expected = self.bob.eva.retransmit_attempt_count + 1 + assert self.bob.eva.send_acknowledgement.call_count == expected async def test_retransmit_disabled(self): # In this test we send a single transfer from Alice to Bob. @@ -203,15 +203,15 @@ async def test_retransmit_disabled(self): # # Bob should make a single attempt to send an acknowledgement. - self.alice.eva_protocol.terminate_by_timeout_enabled = True - self.bob.eva_protocol.retransmit_enabled = False + self.alice.eva.terminate_by_timeout_enabled = True + self.bob.eva.retransmit_enabled = False - self.bob.eva_protocol.send_acknowledgement = Mock() + self.bob.eva.send_acknowledgement = Mock() with pytest.raises(TimeoutException): - await self.alice.eva_send_binary(self.bob.my_peer, b'info', b'data') + await self.alice.eva.send_binary(self.bob.my_peer, b'info', b'data') - assert self.bob.eva_protocol.send_acknowledgement.call_count == 1 + assert self.bob.eva.send_acknowledgement.call_count == 1 async def test_size_limit(self): # In this test we send a single transfer from Alice to Bob. @@ -219,14 +219,14 @@ async def test_size_limit(self): # exceeded binary size limit. # First, try to exceed size limit on a receiver (bob) side. - self.bob.eva_protocol.binary_size_limit = 4 + self.bob.eva.binary_size_limit = 4 with pytest.raises(TransferException): - await self.alice.eva_send_binary(self.bob.my_peer, b'info', b'12345') + await self.alice.eva.send_binary(self.bob.my_peer, b'info', b'12345') # Second, try to exceed size limit on a sender (alice) side. - self.alice.eva_protocol.binary_size_limit = 4 + self.alice.eva.binary_size_limit = 4 with pytest.raises(SizeException): - await self.alice.eva_send_binary(self.bob.my_peer, b'info', b'12345') + await self.alice.eva.send_binary(self.bob.my_peer, b'info', b'12345') @pytest.mark.timeout(PYTEST_TIMEOUT_IN_SEC) async def test_duplex_transfer(self): @@ -236,14 +236,14 @@ async def test_duplex_transfer(self): block_count = 100 block_size = 10 - self.alice.eva_protocol.block_size = block_size - self.bob.eva_protocol.block_size = block_size + self.alice.eva.block_size = block_size + self.bob.eva.block_size = block_size alice_data = os.urandom(1), os.urandom(block_size * block_count), random.randrange(0, 256) bob_data = os.urandom(1), os.urandom(block_size * block_count), random.randrange(0, 256) - alice_feature = self.alice.eva_send_binary(self.bob.my_peer, *alice_data) - bob_feature = self.bob.eva_send_binary(self.alice.my_peer, *bob_data) + alice_feature = self.alice.eva.send_binary(self.bob.my_peer, *alice_data) + bob_feature = self.bob.eva.send_binary(self.alice.my_peer, *bob_data) await drain_loop(asyncio.get_event_loop()) @@ -252,10 +252,10 @@ async def test_duplex_transfer(self): assert self.alice.most_recent_received_data == bob_data assert self.bob.most_recent_received_data == alice_data - assert not self.alice.eva_protocol.incoming - assert not self.alice.eva_protocol.outgoing - assert not self.bob.eva_protocol.incoming - assert not self.bob.eva_protocol.outgoing + assert not self.alice.eva.incoming + assert not self.alice.eva.outgoing + assert not self.bob.eva.incoming + assert not self.bob.eva.outgoing @pytest.mark.timeout(PYTEST_TIMEOUT_IN_SEC) async def test_scheduled_send(self): @@ -270,8 +270,8 @@ async def test_scheduled_send(self): range(data_set_count)] futures = [] for data in alice_data_list: - futures.append(self.alice.eva_send_binary(self.bob.my_peer, *data)) - assert len(self.alice.eva_protocol.scheduled[self.bob.my_peer]) == data_set_count - 1 + futures.append(self.alice.eva.send_binary(self.bob.my_peer, *data)) + assert len(self.alice.eva.scheduled[self.bob.my_peer]) == data_set_count - 1 await drain_loop(asyncio.get_event_loop()) # wait for transfer's complete @@ -279,7 +279,7 @@ async def test_scheduled_send(self): assert future.done() assert self.bob.received_data[self.alice.my_peer] == alice_data_list - assert not self.alice.eva_protocol.scheduled + assert not self.alice.eva.scheduled @pytest.mark.timeout(PYTEST_TIMEOUT_IN_SEC) async def test_multiply_duplex(self): @@ -305,7 +305,7 @@ async def test_multiply_duplex(self): ] for _, community in participants: - community.eva_protocol.block_size = 10 + community.eva.block_size = 10 data = [ (p, list((os.urandom(1), os.urandom(50), random.randrange(0, 256)) for _ in range(data_set_count))) @@ -315,7 +315,7 @@ async def test_multiply_duplex(self): futures = [] for ((_, community), (peer, _)), data_set in data: for d in data_set: - futures.append(community.eva_send_binary(peer, *d)) + futures.append(community.eva.send_binary(peer, *d)) await drain_loop(asyncio.get_event_loop()) @@ -344,12 +344,12 @@ async def test_survive_when_multiply_packets_lost(self): packet_loss_probability = lost_packets_count_estimation / (block_count * data_set_count) - self.bob.eva_protocol.retransmit_attempt_count = lost_packets_count_estimation + self.bob.eva.retransmit_attempt_count = lost_packets_count_estimation for participant in [self.alice, self.bob]: - participant.eva_protocol.retransmit_interval_in_sec = 0 - participant.eva_protocol.block_size = 3 - participant.eva_protocol.window_size = 10 + participant.eva.retransmit_interval_in_sec = 0 + participant.eva.block_size = 3 + participant.eva.window_size = 10 data = [(os.urandom(1), os.urandom(block_size * block_count), 0) for _ in range(data_set_count)] @@ -360,7 +360,7 @@ async def test_survive_when_multiply_packets_lost(self): # modify "on_data" function to proxying all calls and to add a probability # to a packet loss - bob_on_data = self.bob.eva_protocol.on_data + bob_on_data = self.bob.eva.on_data async def fake_bob_on_data(peer, payload): chance_to_fake = random.random() < self.test_store.packet_loss_probability @@ -373,10 +373,10 @@ async def fake_bob_on_data(peer, payload): await bob_on_data(peer, payload) - self.bob.eva_protocol.on_data = fake_bob_on_data + self.bob.eva.on_data = fake_bob_on_data for d in data: - await self.alice.eva_send_binary(self.bob.my_peer, *d) + await self.alice.eva.send_binary(self.bob.my_peer, *d) logging.info(f'Estimated packet lost block_count/probability: ' f'{lost_packets_count_estimation}/{packet_loss_probability}') @@ -392,22 +392,22 @@ async def test_write_request_packets_lost(self): # The EVA protocol should handle this situation by retransmitting # dropped packets. - self.alice.eva_protocol.retransmit_interval_in_sec = 0 + self.alice.eva.retransmit_interval_in_sec = 0 self.lost_packet_count = 2 # replace `real_send_writerequest` function by `fake_write_request` which # ignores (drops) first `lost_packet_count` messages - alice_send_write_request = self.alice.eva_protocol.send_write_request + alice_send_write_request = self.alice.eva.send_write_request def fake_write_request(transfer): self.lost_packet_count -= 1 if self.lost_packet_count < 0: alice_send_write_request(transfer) - self.alice.eva_protocol.send_write_request = fake_write_request + self.alice.eva.send_write_request = fake_write_request data = b'info', b'data', 0 - await self.alice.eva_send_binary(self.bob.my_peer, *data) + await self.alice.eva.send_binary(self.bob.my_peer, *data) assert self.bob.most_recent_received_data == data @@ -425,12 +425,12 @@ async def test_dynamically_changed_window_size(self): block_size = 2 - self.alice.eva_protocol.block_size = block_size - self.bob.eva_protocol.window_size = window_size + self.alice.eva.block_size = block_size + self.bob.eva.window_size = window_size data = os.urandom(1), os.urandom(block_size * 100), 42 - bob_send_acknowledgement = self.bob.eva_protocol.send_acknowledgement + bob_send_acknowledgement = self.bob.eva.send_acknowledgement def bob_fake_send_acknowledgement(transfer): if transfer.window_size == 1: @@ -442,16 +442,16 @@ def bob_fake_send_acknowledgement(transfer): self.test_store.actual_window_size = transfer.window_size bob_send_acknowledgement(transfer) - self.bob.eva_protocol.send_acknowledgement = bob_fake_send_acknowledgement + self.bob.eva.send_acknowledgement = bob_fake_send_acknowledgement - await self.alice.eva_send_binary(self.bob.my_peer, *data) + await self.alice.eva.send_binary(self.bob.my_peer, *data) assert self.bob.received_data[self.alice.my_peer][0] == data async def test_cheating_send_over_size(self): # In this test we send a single transfer from Alice to Bob. # Alice will try to send b`extra` binary data over the original size. - self.bob.eva_protocol.binary_size_limit = 5 + self.bob.eva.binary_size_limit = 5 await self.send_sequence_from_alice_to_bob( WriteRequest(4, 1, b'info'), @@ -467,7 +467,7 @@ async def test_wrong_message_order(self): # Alice will try to send packets in invalid order. These packets # should be dropped - self.bob.eva_protocol.block_size = 2 + self.bob.eva.block_size = 2 await self.send_sequence_from_alice_to_bob( WriteRequest(4, 1, b'info'), @@ -489,7 +489,7 @@ async def test_wrong_message_order_and_wrong_nonce(self): # Alice will try to send packets in invalid order. These packets # should be dropped - self.bob.eva_protocol.block_size = 2 + self.bob.eva.block_size = 2 await self.send_sequence_from_alice_to_bob( WriteRequest(4, 1, b'info'), @@ -516,7 +516,9 @@ async def test_received_packet_that_have_no_transfer(self): @pytest.fixture def eva(): - return EVAProtocol(Mock()) + protocol = EVAProtocol(Mock()) + yield protocol + protocol.shutdown() @pytest.fixture diff --git a/src/tribler/core/components/metadata_store/remote_query_community/remote_query_community.py b/src/tribler/core/components/metadata_store/remote_query_community/remote_query_community.py index 6cc3dac7905..e64c86a4aa8 100644 --- a/src/tribler/core/components/metadata_store/remote_query_community/remote_query_community.py +++ b/src/tribler/core/components/metadata_store/remote_query_community/remote_query_community.py @@ -7,7 +7,6 @@ from ipv8.lazy_community import lazy_wrapper from ipv8.messaging.lazy_payload import VariablePayload, vp_compile from ipv8.requestcache import NumberCache, RandomNumberCache, RequestCache - from pony.orm import db_session from pony.orm.dbapiprovider import OperationalError @@ -150,19 +149,19 @@ def __init__(self, my_peer, endpoint, network, self.add_message_handler(SelectResponsePayload, self.on_remote_select_response) self.eva_init() - self.eva_register_receive_callback(self.on_receive) - self.eva_register_send_complete_callback(self.on_send_complete) - self.eva_register_error_callback(self.on_error) + self.eva.register_receive_callback(self.on_receive) + self.eva.register_send_complete_callback(self.on_send_complete) + self.eva.register_error_callback(self.on_error) - def on_receive(self, result: TransferResult): + async def on_receive(self, result: TransferResult): self.logger.debug(f"EVA data received: peer {hexlify(result.peer.mid)}, info {result.info}") packet = (result.peer.address, result.data) self.on_packet(packet) - def on_send_complete(self, result: TransferResult): + async def on_send_complete(self, result: TransferResult): self.logger.debug(f"EVA outgoing transfer complete: peer {hexlify(result.peer.mid)}, info {result.info}") - def on_error(self, peer, exception): + async def on_error(self, peer, exception): self.logger.warning(f"EVA transfer error: peer {hexlify(peer.mid)}, exception: {exception}") def send_remote_select(self, peer, processing_callback=None, force_eva_response=False, **kwargs): @@ -220,14 +219,13 @@ def send_db_results(self, peer, request_payload_id, db_results, force_eva_respon index = 0 while index < len(db_results): transfer_size = ( - self.eva_protocol.binary_size_limit if force_eva_response else self.rqc_settings.maximum_payload_size + self.eva.binary_size_limit if force_eva_response else self.rqc_settings.maximum_payload_size ) data, index = entries_to_chunk(db_results, transfer_size, start_index=index, include_health=True) payload = SelectResponsePayload(request_payload_id, data) if force_eva_response or (len(data) > self.rqc_settings.maximum_payload_size): - self.eva_send_binary( - peer, struct.pack('>i', request_payload_id), self.ezr_pack(payload.msg_id, payload) - ) + self.eva.send_binary(peer, struct.pack('>i', request_payload_id), + self.ezr_pack(payload.msg_id, payload)) else: self.ez_send(peer, payload) @@ -326,6 +324,6 @@ def _on_query_timeout(self, request_cache): self.network.remove_peer(request_cache.peer) async def unload(self): - self.eva_shutdown() + self.eva.shutdown() await self.request_cache.shutdown() await super().unload() diff --git a/src/tribler/core/components/metadata_store/remote_query_community/tests/test_remote_query_community.py b/src/tribler/core/components/metadata_store/remote_query_community/tests/test_remote_query_community.py index 65ad9879e97..14809f39e0d 100644 --- a/src/tribler/core/components/metadata_store/remote_query_community/tests/test_remote_query_community.py +++ b/src/tribler/core/components/metadata_store/remote_query_community/tests/test_remote_query_community.py @@ -571,9 +571,9 @@ async def test_remote_select_force_eva(self): kwargs_dict = {"metadata_type": [CHANNEL_THUMBNAIL]} - self.nodes[1].overlay.eva_send_binary = Mock() + self.nodes[1].overlay.eva.send_binary = Mock() self.nodes[0].overlay.send_remote_select(self.nodes[1].my_peer, **kwargs_dict, force_eva_response=True) await self.deliver_messages(timeout=0.5) - self.nodes[1].overlay.eva_send_binary.assert_called_once() + self.nodes[1].overlay.eva.send_binary.assert_called_once()