Skip to content

Commit b03cc74

Browse files
authored
[Xcvrd] Soak duplicate events and process only updated interested events (#285)
* Subscribe to CONFIG_DB instead of APPL_DB * Filter out events * Fix build * improve code coverage
1 parent 3acb171 commit b03cc74

File tree

3 files changed

+107
-27
lines changed

3 files changed

+107
-27
lines changed

sonic-xcvrd/tests/test_xcvrd.py

+21-3
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,24 @@ def test_is_error_sfp_status(self):
331331
assert not is_error_block_eeprom_reading(int(SFP_STATUS_INSERTED))
332332
assert not is_error_block_eeprom_reading(int(SFP_STATUS_REMOVED))
333333

334+
@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
335+
@patch('swsscommon.swsscommon.SubscriberStateTable')
336+
@patch('swsscommon.swsscommon.Select.select')
337+
def test_handle_port_update_event(self, mock_select, mock_sub_table):
338+
mock_selectable = MagicMock()
339+
mock_selectable.pop = MagicMock(
340+
side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None)])
341+
mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable)
342+
mock_sub_table.return_value = mock_selectable
343+
logger = MagicMock()
344+
345+
sel, asic_context = subscribe_port_update_event(DEFAULT_NAMESPACE, logger)
346+
port_mapping = PortMapping()
347+
stop_event = threading.Event()
348+
stop_event.is_set = MagicMock(return_value=False)
349+
handle_port_update_event(sel, asic_context, stop_event,
350+
logger, port_mapping.handle_port_change_event)
351+
334352
@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
335353
@patch('swsscommon.swsscommon.SubscriberStateTable')
336354
@patch('swsscommon.swsscommon.Select.select')
@@ -443,7 +461,7 @@ def test_CmisManagerTask_handle_port_change_event(self):
443461
task.on_port_update_event(port_change_event)
444462
assert len(task.port_dict) == 1
445463

446-
464+
447465
@patch('xcvrd.xcvrd.XcvrTableHelper')
448466
def test_CmisManagerTask_get_configured_freq(self, mock_table_helper):
449467
port_mapping = PortMapping()
@@ -474,6 +492,7 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis):
474492

475493
port_mapping = PortMapping()
476494
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
495+
task.wait_for_port_config_done = MagicMock()
477496
task.task_run()
478497
task.task_stop()
479498
assert task.task_process is None
@@ -482,6 +501,7 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis):
482501
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None)))
483502
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock())
484503
@patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD'))
504+
@patch('xcvrd.xcvrd.CmisManagerTask.wait_for_port_config_done', MagicMock())
485505
def test_CmisManagerTask_task_worker(self, mock_chassis):
486506
mock_xcvr_api = MagicMock()
487507
mock_xcvr_api.set_datapath_deinit = MagicMock(return_value=True)
@@ -553,7 +573,6 @@ def test_CmisManagerTask_task_worker(self, mock_chassis):
553573
'DP8State': 'DataPathActivated'
554574
}
555575
])
556-
557576
mock_sfp = MagicMock()
558577
mock_sfp.get_presence = MagicMock(return_value=True)
559578
mock_sfp.get_xcvr_api = MagicMock(return_value=mock_xcvr_api)
@@ -584,7 +603,6 @@ def test_CmisManagerTask_task_worker(self, mock_chassis):
584603
task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True])
585604
task.task_worker()
586605
assert task.port_dict['Ethernet0']['cmis_state'] == 'DP_DEINIT'
587-
588606
task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True])
589607
task.task_worker()
590608
assert mock_xcvr_api.set_datapath_deinit.call_count == 1

sonic-xcvrd/xcvrd/xcvrd.py

+26-9
Original file line numberDiff line numberDiff line change
@@ -994,10 +994,7 @@ def on_port_update_event(self, port_change_event):
994994
self.port_dict[lport]['lanes'] = port_change_event.port_dict['lanes']
995995
if 'host_tx_ready' in port_change_event.port_dict:
996996
self.port_dict[lport]['host_tx_ready'] = port_change_event.port_dict['host_tx_ready']
997-
if 'admin_status' in port_change_event.port_dict and 'oper_status' in port_change_event.port_dict:
998-
# At times 'admin_status' is NOT the same in the PORT_TABLE of APPL_DB and STATE_DB
999-
# We dont have better way to check if 'admin_status' is from APPL_DB or STATE_DB so this
1000-
# check is put temporarily to listen only to APPL_DB's admin_status and ignore that of STATE_DB
997+
if 'admin_status' in port_change_event.port_dict:
1001998
self.port_dict[lport]['admin_status'] = port_change_event.port_dict['admin_status']
1002999
if 'laser_freq' in port_change_event.port_dict:
10031000
self.port_dict[lport]['laser_freq'] = int(port_change_event.port_dict['laser_freq'])
@@ -1277,13 +1274,36 @@ def configure_laser_frequency(self, api, lport, freq):
12771274
self.log_error("{} Tuning in progress, channel selection may fail!".format(lport))
12781275
return api.set_laser_freq(freq)
12791276

1277+
def wait_for_port_config_done(self, namespace):
1278+
# Connect to APPL_DB and subscribe to PORT table notifications
1279+
appl_db = daemon_base.db_connect("APPL_DB", namespace=namespace)
1280+
1281+
sel = swsscommon.Select()
1282+
port_tbl = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME)
1283+
sel.addSelectable(port_tbl)
1284+
1285+
# Make sure this daemon started after all port configured
1286+
while not self.task_stopping_event.is_set():
1287+
(state, c) = sel.select(port_mapping.SELECT_TIMEOUT_MSECS)
1288+
if state == swsscommon.Select.TIMEOUT:
1289+
continue
1290+
if state != swsscommon.Select.OBJECT:
1291+
self.log_warning("sel.select() did not return swsscommon.Select.OBJECT")
1292+
continue
1293+
1294+
(key, op, fvp) = port_tbl.pop()
1295+
if key in ["PortConfigDone", "PortInitDone"]:
1296+
break
1297+
12801298
def task_worker(self):
12811299
self.xcvr_table_helper = XcvrTableHelper(self.namespaces)
12821300

1283-
self.log_notice("Starting...")
1301+
self.log_notice("Waiting for PortConfigDone...")
1302+
for namespace in self.namespaces:
1303+
self.wait_for_port_config_done(namespace)
12841304

12851305
# APPL_DB for CONFIG updates, and STATE_DB for insertion/removal
1286-
sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces)
1306+
sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces, helper_logger)
12871307
while not self.task_stopping_event.is_set():
12881308
# Handle port change event from main thread
12891309
port_mapping.handle_port_update_event(sel,
@@ -1292,9 +1312,6 @@ def task_worker(self):
12921312
helper_logger,
12931313
self.on_port_update_event)
12941314

1295-
if not self.isPortConfigDone:
1296-
continue
1297-
12981315
for lport, info in self.port_dict.items():
12991316
if self.task_stopping_event.is_set():
13001317
break

sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py

+60-15
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class PortChangeEvent:
1111
PORT_REMOVE = 1
1212
PORT_SET = 2
1313
PORT_DEL = 3
14+
PORT_EVENT = {}
1415

1516
def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None):
1617
# Logical port name, e.g. Ethernet0
@@ -105,11 +106,16 @@ def subscribe_port_config_change(namespaces):
105106
sel.addSelectable(port_tbl)
106107
return sel, asic_context
107108

108-
def subscribe_port_update_event(namespaces):
109+
def subscribe_port_update_event(namespaces, logger):
110+
"""
111+
Subscribe to a particular DB's table and listen to only interested fields
112+
Format :
113+
{ <DB name> : <Table name> , <field1>, <field2>, .. } where only field<n> update will be received
114+
"""
109115
port_tbl_map = [
110-
{'APPL_DB': swsscommon.APP_PORT_TABLE_NAME},
116+
{'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME},
111117
{'STATE_DB': 'TRANSCEIVER_INFO'},
112-
{'STATE_DB': 'PORT_TABLE'},
118+
{'STATE_DB': 'PORT_TABLE', 'FILTER': ['host_tx_ready']},
113119
]
114120

115121
sel = swsscommon.Select()
@@ -119,13 +125,18 @@ def subscribe_port_update_event(namespaces):
119125
db = daemon_base.db_connect(list(d.keys())[0], namespace=namespace)
120126
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
121127
port_tbl = swsscommon.SubscriberStateTable(db, list(d.values())[0])
128+
port_tbl.db_name = list(d.keys())[0]
129+
port_tbl.table_name = list(d.values())[0]
130+
port_tbl.filter = d['FILTER'] if 'FILTER' in d else None
122131
asic_context[port_tbl] = asic_id
123132
sel.addSelectable(port_tbl)
133+
logger.log_warning("subscribing to port_tbl {} - {} DB of namespace {} ".format(
134+
port_tbl, list(d.values())[0], namespace))
124135
return sel, asic_context
125136

126137
def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_event_handler):
127138
"""
128-
Select PORT update events, notify the observers upon a port update in APPL_DB/CONFIG_DB
139+
Select PORT update events, notify the observers upon a port update in CONFIG_DB
129140
or a XCVR insertion/removal in STATE_DB
130141
"""
131142
if not stop_event.is_set():
@@ -135,6 +146,8 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_
135146
if state != swsscommon.Select.OBJECT:
136147
logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT')
137148
return
149+
150+
port_event_cache = {}
138151
for port_tbl in asic_context.keys():
139152
while True:
140153
(key, op, fvp) = port_tbl.pop()
@@ -143,24 +156,56 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_
143156
if not validate_port(key):
144157
continue
145158
fvp = dict(fvp) if fvp is not None else {}
159+
logger.log_warning("$$$ {} handle_port_update_event() : op={} DB:{} Table:{} fvp {}".format(
160+
key, op, port_tbl.db_name, port_tbl.table_name, fvp))
161+
146162
if 'index' not in fvp:
147-
fvp['index'] = '-1'
148-
port_index = int(fvp['index'])
149-
port_change_event = None
150-
if op == swsscommon.SET_COMMAND:
151-
port_change_event = PortChangeEvent(key,
163+
fvp['index'] = '-1'
164+
fvp['key'] = key
165+
fvp['asic_id'] = asic_context[port_tbl]
166+
fvp['op'] = op
167+
fvp['FILTER'] = port_tbl.filter
168+
# Soak duplicate events and consider only the last event
169+
port_event_cache[key+port_tbl.db_name+port_tbl.table_name] = fvp
170+
171+
# Now apply filter over soaked events
172+
for key, fvp in port_event_cache.items():
173+
port_index = int(fvp['index'])
174+
port_change_event = None
175+
diff = {}
176+
filter = fvp['FILTER']
177+
del fvp['FILTER']
178+
if key in PortChangeEvent.PORT_EVENT:
179+
diff = dict(set(fvp.items()) - set(PortChangeEvent.PORT_EVENT[key].items()))
180+
# Ignore duplicate events
181+
if not diff:
182+
PortChangeEvent.PORT_EVENT[key] = fvp
183+
continue
184+
# Ensure only interested field update gets through for processing
185+
if filter is not None:
186+
if not (set(filter) & set(diff.keys())):
187+
PortChangeEvent.PORT_EVENT[key] = fvp
188+
continue
189+
PortChangeEvent.PORT_EVENT[key] = fvp
190+
191+
if fvp['op'] == swsscommon.SET_COMMAND:
192+
port_change_event = PortChangeEvent(fvp['key'],
152193
port_index,
153-
asic_context[port_tbl],
194+
fvp['asic_id'],
154195
PortChangeEvent.PORT_SET,
155196
fvp)
156-
elif op == swsscommon.DEL_COMMAND:
157-
port_change_event = PortChangeEvent(key,
197+
elif fvp['op'] == swsscommon.DEL_COMMAND:
198+
port_change_event = PortChangeEvent(fvp['key'],
158199
port_index,
159-
asic_context[port_tbl],
200+
fvp['asic_id'],
160201
PortChangeEvent.PORT_DEL,
161202
fvp)
162-
if port_change_event is not None:
163-
port_change_event_handler(port_change_event)
203+
# This is the final event considered for processing
204+
logger.log_warning("*** {} handle_port_update_event() fvp {}".format(
205+
key, fvp))
206+
if port_change_event is not None:
207+
port_change_event_handler(port_change_event)
208+
164209

165210
def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler):
166211
"""Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers

0 commit comments

Comments
 (0)