Skip to content

Commit c25d492

Browse files
authored
Merge pull request sonic-net#83 from tahmed-dev/taahme/add-redis-pipeline-operation
[configdb] Add Ability to Query/Update Redis Using Pipelines
2 parents f574b95 + 198d143 commit c25d492

File tree

2 files changed

+143
-18
lines changed

2 files changed

+143
-18
lines changed

src/swsssdk/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
try:
1111
from .dbconnector import SonicDBConfig, SonicV2Connector
12-
from .configdb import ConfigDBConnector
12+
from .configdb import ConfigDBConnector, ConfigDBPipeConnector
1313
from .sonic_db_dump_load import sonic_db_dump_load
1414
except (KeyError, ValueError):
1515
msg = "Failed to database connector objects -- incorrect database config schema."

src/swsssdk/configdb.py

+142-17
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,19 @@ def listen(self):
108108
(table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
109109
if self.handlers.has_key(table):
110110
client = self.get_redis_client(self.db_name)
111-
data = self.__raw_to_typed(client.hgetall(key))
111+
data = self.raw_to_typed(client.hgetall(key))
112112
self.__fire(table, row, data)
113113
except ValueError:
114114
pass #Ignore non table-formated redis entries
115115

116-
def __raw_to_typed(self, raw_data):
117-
if raw_data == None:
116+
def raw_to_typed(self, raw_data):
117+
if raw_data is None:
118118
return None
119119
typed_data = {}
120120
for raw_key in raw_data:
121121
key = raw_key
122122
if PY3K:
123-
key = raw_key.decode('utf-8')
123+
key = raw_key.decode()
124124

125125
# "NULL:NULL" is used as a placeholder for objects with no attributes
126126
if key == "NULL":
@@ -136,13 +136,13 @@ def __raw_to_typed(self, raw_data):
136136
typed_data[key[:-1]] = value
137137
else:
138138
if PY3K:
139-
typed_data[key] = raw_data[raw_key].decode('utf-8')
139+
typed_data[key] = raw_data[raw_key].decode()
140140
else:
141141
typed_data[key] = raw_data[raw_key]
142142
return typed_data
143143

144-
def __typed_to_raw(self, typed_data):
145-
if typed_data == None:
144+
def typed_to_raw(self, typed_data):
145+
if typed_data is None:
146146
return None
147147
elif typed_data == {}:
148148
return { "NULL": "NULL" }
@@ -183,11 +183,11 @@ def set_entry(self, table, key, data):
183183
key = self.serialize_key(key)
184184
client = self.get_redis_client(self.db_name)
185185
_hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key)
186-
if data == None:
186+
if data is None:
187187
client.delete(_hash)
188188
else:
189189
original = self.get_entry(table, key)
190-
client.hmset(_hash, self.__typed_to_raw(data))
190+
client.hmset(_hash, self.typed_to_raw(data))
191191
for k in [ k for k in original.keys() if k not in data.keys() ]:
192192
if type(original[k]) == list:
193193
k = k + '@'
@@ -205,10 +205,10 @@ def mod_entry(self, table, key, data):
205205
key = self.serialize_key(key)
206206
client = self.get_redis_client(self.db_name)
207207
_hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key)
208-
if data == None:
208+
if data is None:
209209
client.delete(_hash)
210210
else:
211-
client.hmset(_hash, self.__typed_to_raw(data))
211+
client.hmset(_hash, self.typed_to_raw(data))
212212

213213
def get_entry(self, table, key):
214214
"""Read a table entry from config db.
@@ -222,7 +222,7 @@ def get_entry(self, table, key):
222222
key = self.serialize_key(key)
223223
client = self.get_redis_client(self.db_name)
224224
_hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key)
225-
return self.__raw_to_typed(client.hgetall(_hash))
225+
return self.raw_to_typed(client.hgetall(_hash))
226226

227227
def get_keys(self, table, split=True):
228228
"""Read all keys of a table from config db.
@@ -240,7 +240,7 @@ def get_keys(self, table, split=True):
240240
for key in keys:
241241
try:
242242
if PY3K:
243-
key = key.decode('utf-8')
243+
key = key.decode()
244244
if split:
245245
(_, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
246246
data.append(self.deserialize_key(row))
@@ -266,10 +266,10 @@ def get_table(self, table):
266266
data = {}
267267
for key in keys:
268268
try:
269-
entry = self.__raw_to_typed(client.hgetall(key))
269+
entry = self.raw_to_typed(client.hgetall(key))
270270
if entry != None:
271271
if PY3K:
272-
key = key.decode('utf-8')
272+
key = key.decode()
273273
(_, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
274274
data[self.deserialize_key(row)] = entry
275275
else:
@@ -325,13 +325,138 @@ def get_config(self):
325325
data = {}
326326
for key in keys:
327327
if PY3K:
328-
key = key.decode('utf-8')
328+
key = key.decode()
329329
try:
330330
(table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
331-
entry = self.__raw_to_typed(client.hgetall(key))
331+
entry = self.raw_to_typed(client.hgetall(key))
332332
if entry != None:
333333
data.setdefault(table_name, {})[self.deserialize_key(row)] = entry
334334
except ValueError:
335335
pass #Ignore non table-formated redis entries
336336
return data
337337

338+
339+
class ConfigDBPipeConnector(ConfigDBConnector):
340+
REDIS_SCAN_BATCH_SIZE = 30
341+
342+
def __init__(self, **kwargs):
343+
super(ConfigDBPipeConnector, self).__init__(**kwargs)
344+
345+
def __delete_entries(self, client, pipe, pattern, cursor):
346+
"""Helper method to delete table entries from config db using Redis pipeline
347+
with batch size of REDIS_SCAN_BATCH_SIZE.
348+
The caller should call pipeline execute once ready
349+
Args:
350+
client: Redis client
351+
pipe: Redis DB pipe
352+
pattern: key pattern
353+
cursor: position to start scanning from
354+
355+
Returns:
356+
cur: poition of next item to scan
357+
"""
358+
cur, keys = client.scan(cursor=cursor, match=pattern, count=self.REDIS_SCAN_BATCH_SIZE)
359+
for key in keys:
360+
pipe.delete(key)
361+
362+
return cur
363+
364+
def __delete_table(self, client, pipe, table):
365+
"""Helper method to delete table entries from config db using Redis pipeline.
366+
The caller should call pipeline execute once ready
367+
Args:
368+
client: Redis client
369+
pipe: Redis DB pipe
370+
table: Table name.
371+
"""
372+
pattern = '{}{}*'.format(table.upper(), self.TABLE_NAME_SEPARATOR)
373+
cur = self.__delete_entries(client, pipe, pattern, 0)
374+
while cur != 0:
375+
cur = self.__delete_entries(client, pipe, pattern, cur)
376+
377+
def __mod_entry(self, pipe, table, key, data):
378+
"""Modify a table entry to config db.
379+
Args:
380+
table: Table name.
381+
pipe: Redis DB pipe
382+
table: Table name.
383+
key: Key of table entry, or a tuple of keys if it is a multi-key table.
384+
data: Table row data in a form of dictionary {'column_key': 'value', ...}.
385+
Pass {} as data will create an entry with no column if not already existed.
386+
Pass None as data will delete the entry.
387+
"""
388+
key = self.serialize_key(key)
389+
_hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key)
390+
if data is None:
391+
pipe.delete(_hash)
392+
else:
393+
pipe.hmset(_hash, self.typed_to_raw(data))
394+
395+
def mod_config(self, data):
396+
"""Write multiple tables into config db.
397+
Extra entries/fields in the db which are not in the data are kept.
398+
Args:
399+
data: config data in a dictionary form
400+
{
401+
'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
402+
'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
403+
...
404+
}
405+
"""
406+
client = self.get_redis_client(self.db_name)
407+
pipe = client.pipeline()
408+
for table_name in data:
409+
table_data = data[table_name]
410+
if table_data is None:
411+
self.__delete_table(client, pipe, table_name)
412+
continue
413+
for key in table_data:
414+
self.__mod_entry(pipe, table_name, key, table_data[key])
415+
pipe.execute()
416+
client.bgsave()
417+
418+
def __get_config(self, client, pipe, data, cursor):
419+
"""Read config data in batches of size REDIS_SCAN_BATCH_SIZE using Redis pipelines
420+
Args:
421+
client: Redis client
422+
pipe: Redis DB pipe
423+
data: config dictionary
424+
cursor: position to start scanning from
425+
426+
Returns:
427+
cur: poition of next item to scan
428+
"""
429+
cur, keys = client.scan(cursor=cursor, match='*', count=self.REDIS_SCAN_BATCH_SIZE)
430+
keys = [key.decode() for key in keys if key != self.INIT_INDICATOR]
431+
for key in keys:
432+
pipe.hgetall(key)
433+
records = pipe.execute()
434+
435+
for index, key in enumerate(keys):
436+
(table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
437+
entry = self.raw_to_typed(records[index])
438+
if entry is not None:
439+
data.setdefault(table_name, {})[self.deserialize_key(row)] = entry
440+
441+
return cur
442+
443+
def get_config(self):
444+
"""Read all config data.
445+
Returns:
446+
Config data in a dictionary form of
447+
{
448+
'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
449+
'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
450+
...
451+
}
452+
"""
453+
client = self.get_redis_client(self.db_name)
454+
pipe = client.pipeline()
455+
data = {}
456+
457+
cur = self.__get_config(client, pipe, data, 0)
458+
while cur != 0:
459+
cur = self.__get_config(client, pipe, data, cur)
460+
461+
return data
462+

0 commit comments

Comments
 (0)