Skip to content

Commit fa9093c

Browse files
committed
[configdb] Add Ability to Query/Update Redis Using Pipelines
Redis recommend using pipeline in order to obtain optimal speen when handling large volume of data. Te pipeline API also given an ability devide work in batches. singed-off-by: Tamer Ahmed <[email protected]>
1 parent 2df4f40 commit fa9093c

File tree

2 files changed

+134
-9
lines changed

2 files changed

+134
-9
lines changed

src/swsssdk/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
try:
1111
from .dbconnector import SonicDBConfig, SonicV2Connector
12-
from .configdb import ConfigDBConnector
12+
from .configdb import ConfigDBConnector, ConfigDBPipeConnector
1313
from .sonic_db_dump_load import sonic_db_dump_load
1414
except (KeyError, ValueError):
1515
msg = "Failed to database connector objects -- incorrect database config schema."

src/swsssdk/configdb.py

+133-8
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,12 @@ def listen(self):
108108
(table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
109109
if self.handlers.has_key(table):
110110
client = self.get_redis_client(self.db_name)
111-
data = self.__raw_to_typed(client.hgetall(key))
111+
data = self.raw_to_typed(client.hgetall(key))
112112
self.__fire(table, row, data)
113113
except ValueError:
114114
pass #Ignore non table-formated redis entries
115115

116-
def __raw_to_typed(self, raw_data):
116+
def raw_to_typed(self, raw_data):
117117
if raw_data == None:
118118
return None
119119
typed_data = {}
@@ -141,7 +141,7 @@ def __raw_to_typed(self, raw_data):
141141
typed_data[key] = raw_data[raw_key]
142142
return typed_data
143143

144-
def __typed_to_raw(self, typed_data):
144+
def typed_to_raw(self, typed_data):
145145
if typed_data == None:
146146
return None
147147
elif typed_data == {}:
@@ -187,7 +187,7 @@ def set_entry(self, table, key, data):
187187
client.delete(_hash)
188188
else:
189189
original = self.get_entry(table, key)
190-
client.hmset(_hash, self.__typed_to_raw(data))
190+
client.hmset(_hash, self.typed_to_raw(data))
191191
for k in [ k for k in original.keys() if k not in data.keys() ]:
192192
if type(original[k]) == list:
193193
k = k + '@'
@@ -208,7 +208,7 @@ def mod_entry(self, table, key, data):
208208
if data == None:
209209
client.delete(_hash)
210210
else:
211-
client.hmset(_hash, self.__typed_to_raw(data))
211+
client.hmset(_hash, self.typed_to_raw(data))
212212

213213
def get_entry(self, table, key):
214214
"""Read a table entry from config db.
@@ -222,7 +222,7 @@ def get_entry(self, table, key):
222222
key = self.serialize_key(key)
223223
client = self.get_redis_client(self.db_name)
224224
_hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key)
225-
return self.__raw_to_typed(client.hgetall(_hash))
225+
return self.raw_to_typed(client.hgetall(_hash))
226226

227227
def get_keys(self, table, split=True):
228228
"""Read all keys of a table from config db.
@@ -266,7 +266,7 @@ def get_table(self, table):
266266
data = {}
267267
for key in keys:
268268
try:
269-
entry = self.__raw_to_typed(client.hgetall(key))
269+
entry = self.raw_to_typed(client.hgetall(key))
270270
if entry != None:
271271
if PY3K:
272272
key = key.decode('utf-8')
@@ -328,10 +328,135 @@ def get_config(self):
328328
key = key.decode('utf-8')
329329
try:
330330
(table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
331-
entry = self.__raw_to_typed(client.hgetall(key))
331+
entry = self.raw_to_typed(client.hgetall(key))
332332
if entry != None:
333333
data.setdefault(table_name, {})[self.deserialize_key(row)] = entry
334334
except ValueError:
335335
pass #Ignore non table-formated redis entries
336336
return data
337337

338+
339+
class ConfigDBPipeConnector(ConfigDBConnector):
340+
REDIS_SCAN_BATCH_SIZE = 30
341+
342+
def __init__(self, **kwargs):
343+
super(ConfigDBPipeConnector, self).__init__(**kwargs)
344+
345+
def __delete_entries(self, client, pipe, pattern, cursor):
346+
"""Helper method to delete table entries from config db using Redis pipeline
347+
with batch size of REDIS_SCAN_BATCH_SIZE.
348+
The caller should call pipeline execute once ready
349+
Args:
350+
client: Redis client
351+
pipe: Redis DB pipe
352+
pattern: key pattern
353+
cursor: position to start scanning from
354+
355+
Returns:
356+
cur: poition of next item to scan
357+
"""
358+
cur, keys = client.scan(cursor=cursor, match=pattern, count=self.REDIS_SCAN_BATCH_SIZE)
359+
for key in keys:
360+
pipe.delete(key)
361+
362+
return cur
363+
364+
def __delete_table(self, client, pipe, table):
365+
"""Helper method to delete table entries from config db using Redis pipeline.
366+
The caller should call pipeline execute once ready
367+
Args:
368+
client: Redis client
369+
pipe: Redis DB pipe
370+
table: Table name.
371+
"""
372+
pattern = '{}{}*'.format(table.upper(), self.TABLE_NAME_SEPARATOR)
373+
cur = self.__delete_entries(client, pipe, pattern, 0)
374+
while cur != 0:
375+
cur = self.__delete_entries(client, pipe, pattern, cur)
376+
377+
def __mod_entry(self, pipe, table, key, data):
378+
"""Modify a table entry to config db.
379+
Args:
380+
table: Table name.
381+
pipe: Redis DB pipe
382+
table: Table name.
383+
key: Key of table entry, or a tuple of keys if it is a multi-key table.
384+
data: Table row data in a form of dictionary {'column_key': 'value', ...}.
385+
Pass {} as data will create an entry with no column if not already existed.
386+
Pass None as data will delete the entry.
387+
"""
388+
key = self.serialize_key(key)
389+
_hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key)
390+
if data == None:
391+
pipe.delete(_hash)
392+
else:
393+
pipe.hmset(_hash, self.typed_to_raw(data))
394+
395+
def mod_config(self, data):
396+
"""Write multiple tables into config db.
397+
Extra entries/fields in the db which are not in the data are kept.
398+
Args:
399+
data: config data in a dictionary form
400+
{
401+
'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
402+
'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
403+
...
404+
}
405+
"""
406+
client = self.get_redis_client(self.db_name)
407+
pipe = client.pipeline()
408+
for table_name in data:
409+
table_data = data[table_name]
410+
if table_data == None:
411+
self.__delete_table(client, pipe, table_name)
412+
continue
413+
for key in table_data:
414+
self.__mod_entry(pipe, table_name, key, table_data[key])
415+
pipe.execute()
416+
client.bgsave()
417+
418+
def _get_config(self, client, pipe, data, cursor):
419+
"""Read config data in batches of size REDIS_SCAN_BATCH_SIZE using Redis pipelines
420+
Args:
421+
client: Redis client
422+
pipe: Redis DB pipe
423+
data: config dictionary
424+
cursor: position to start scanning from
425+
426+
Returns:
427+
cur: poition of next item to scan
428+
"""
429+
cur, keys = client.scan(cursor=cursor, match='*', count=self.REDIS_SCAN_BATCH_SIZE)
430+
keys = [key.decode('utf-8') for key in keys if key != self.INIT_INDICATOR]
431+
for key in keys:
432+
pipe.hgetall(key)
433+
records = pipe.execute()
434+
435+
for index, key in enumerate(keys):
436+
(table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
437+
entry = self.raw_to_typed(records[index])
438+
if entry is not None:
439+
data.setdefault(table_name, {})[self.deserialize_key(row)] = entry
440+
441+
return cur
442+
443+
def get_config(self):
444+
"""Read all config data.
445+
Returns:
446+
Config data in a dictionary form of
447+
{
448+
'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
449+
'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
450+
...
451+
}
452+
"""
453+
client = self.get_redis_client(self.db_name)
454+
pipe = client.pipeline()
455+
data = {}
456+
457+
cur = self._get_config(client, pipe, data, 0)
458+
while cur != 0:
459+
cur = self._get_config(client, pipe, data, cur)
460+
461+
return data
462+

0 commit comments

Comments
 (0)