@@ -108,12 +108,12 @@ def listen(self):
108
108
(table , row ) = key .split (self .TABLE_NAME_SEPARATOR , 1 )
109
109
if self .handlers .has_key (table ):
110
110
client = self .get_redis_client (self .db_name )
111
- data = self .__raw_to_typed (client .hgetall (key ))
111
+ data = self .raw_to_typed (client .hgetall (key ))
112
112
self .__fire (table , row , data )
113
113
except ValueError :
114
114
pass #Ignore non table-formated redis entries
115
115
116
- def __raw_to_typed (self , raw_data ):
116
+ def raw_to_typed (self , raw_data ):
117
117
if raw_data == None :
118
118
return None
119
119
typed_data = {}
@@ -141,7 +141,7 @@ def __raw_to_typed(self, raw_data):
141
141
typed_data [key ] = raw_data [raw_key ]
142
142
return typed_data
143
143
144
- def __typed_to_raw (self , typed_data ):
144
+ def typed_to_raw (self , typed_data ):
145
145
if typed_data == None :
146
146
return None
147
147
elif typed_data == {}:
@@ -187,7 +187,7 @@ def set_entry(self, table, key, data):
187
187
client .delete (_hash )
188
188
else :
189
189
original = self .get_entry (table , key )
190
- client .hmset (_hash , self .__typed_to_raw (data ))
190
+ client .hmset (_hash , self .typed_to_raw (data ))
191
191
for k in [ k for k in original .keys () if k not in data .keys () ]:
192
192
if type (original [k ]) == list :
193
193
k = k + '@'
@@ -208,7 +208,7 @@ def mod_entry(self, table, key, data):
208
208
if data == None :
209
209
client .delete (_hash )
210
210
else :
211
- client .hmset (_hash , self .__typed_to_raw (data ))
211
+ client .hmset (_hash , self .typed_to_raw (data ))
212
212
213
213
def get_entry (self , table , key ):
214
214
"""Read a table entry from config db.
@@ -222,7 +222,7 @@ def get_entry(self, table, key):
222
222
key = self .serialize_key (key )
223
223
client = self .get_redis_client (self .db_name )
224
224
_hash = '{}{}{}' .format (table .upper (), self .TABLE_NAME_SEPARATOR , key )
225
- return self .__raw_to_typed (client .hgetall (_hash ))
225
+ return self .raw_to_typed (client .hgetall (_hash ))
226
226
227
227
def get_keys (self , table , split = True ):
228
228
"""Read all keys of a table from config db.
@@ -266,7 +266,7 @@ def get_table(self, table):
266
266
data = {}
267
267
for key in keys :
268
268
try :
269
- entry = self .__raw_to_typed (client .hgetall (key ))
269
+ entry = self .raw_to_typed (client .hgetall (key ))
270
270
if entry != None :
271
271
if PY3K :
272
272
key = key .decode ('utf-8' )
@@ -328,10 +328,135 @@ def get_config(self):
328
328
key = key .decode ('utf-8' )
329
329
try :
330
330
(table_name , row ) = key .split (self .TABLE_NAME_SEPARATOR , 1 )
331
- entry = self .__raw_to_typed (client .hgetall (key ))
331
+ entry = self .raw_to_typed (client .hgetall (key ))
332
332
if entry != None :
333
333
data .setdefault (table_name , {})[self .deserialize_key (row )] = entry
334
334
except ValueError :
335
335
pass #Ignore non table-formated redis entries
336
336
return data
337
337
338
+
339
+ class ConfigDBPipeConnector (ConfigDBConnector ):
340
+ REDIS_SCAN_BATCH_SIZE = 30
341
+
342
+ def __init__ (self , ** kwargs ):
343
+ super (ConfigDBPipeConnector , self ).__init__ (** kwargs )
344
+
345
+ def __delete_entries (self , client , pipe , pattern , cursor ):
346
+ """Helper method to delete table entries from config db using Redis pipeline
347
+ with batch size of REDIS_SCAN_BATCH_SIZE.
348
+ The caller should call pipeline execute once ready
349
+ Args:
350
+ client: Redis client
351
+ pipe: Redis DB pipe
352
+ pattern: key pattern
353
+ cursor: position to start scanning from
354
+
355
+ Returns:
356
+ cur: poition of next item to scan
357
+ """
358
+ cur , keys = client .scan (cursor = cursor , match = pattern , count = self .REDIS_SCAN_BATCH_SIZE )
359
+ for key in keys :
360
+ pipe .delete (key )
361
+
362
+ return cur
363
+
364
+ def __delete_table (self , client , pipe , table ):
365
+ """Helper method to delete table entries from config db using Redis pipeline.
366
+ The caller should call pipeline execute once ready
367
+ Args:
368
+ client: Redis client
369
+ pipe: Redis DB pipe
370
+ table: Table name.
371
+ """
372
+ pattern = '{}{}*' .format (table .upper (), self .TABLE_NAME_SEPARATOR )
373
+ cur = self .__delete_entries (client , pipe , pattern , 0 )
374
+ while cur != 0 :
375
+ cur = self .__delete_entries (client , pipe , pattern , cur )
376
+
377
+ def __mod_entry (self , pipe , table , key , data ):
378
+ """Modify a table entry to config db.
379
+ Args:
380
+ table: Table name.
381
+ pipe: Redis DB pipe
382
+ table: Table name.
383
+ key: Key of table entry, or a tuple of keys if it is a multi-key table.
384
+ data: Table row data in a form of dictionary {'column_key': 'value', ...}.
385
+ Pass {} as data will create an entry with no column if not already existed.
386
+ Pass None as data will delete the entry.
387
+ """
388
+ key = self .serialize_key (key )
389
+ _hash = '{}{}{}' .format (table .upper (), self .TABLE_NAME_SEPARATOR , key )
390
+ if data == None :
391
+ pipe .delete (_hash )
392
+ else :
393
+ pipe .hmset (_hash , self .typed_to_raw (data ))
394
+
395
+ def mod_config (self , data ):
396
+ """Write multiple tables into config db.
397
+ Extra entries/fields in the db which are not in the data are kept.
398
+ Args:
399
+ data: config data in a dictionary form
400
+ {
401
+ 'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
402
+ 'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
403
+ ...
404
+ }
405
+ """
406
+ client = self .get_redis_client (self .db_name )
407
+ pipe = client .pipeline ()
408
+ for table_name in data :
409
+ table_data = data [table_name ]
410
+ if table_data == None :
411
+ self .__delete_table (client , pipe , table_name )
412
+ continue
413
+ for key in table_data :
414
+ self .__mod_entry (pipe , table_name , key , table_data [key ])
415
+ pipe .execute ()
416
+ client .bgsave ()
417
+
418
+ def _get_config (self , client , pipe , data , cursor ):
419
+ """Read config data in batches of size REDIS_SCAN_BATCH_SIZE using Redis pipelines
420
+ Args:
421
+ client: Redis client
422
+ pipe: Redis DB pipe
423
+ data: config dictionary
424
+ cursor: position to start scanning from
425
+
426
+ Returns:
427
+ cur: poition of next item to scan
428
+ """
429
+ cur , keys = client .scan (cursor = cursor , match = '*' , count = self .REDIS_SCAN_BATCH_SIZE )
430
+ keys = [key .decode ('utf-8' ) for key in keys if key != self .INIT_INDICATOR ]
431
+ for key in keys :
432
+ pipe .hgetall (key )
433
+ records = pipe .execute ()
434
+
435
+ for index , key in enumerate (keys ):
436
+ (table_name , row ) = key .split (self .TABLE_NAME_SEPARATOR , 1 )
437
+ entry = self .raw_to_typed (records [index ])
438
+ if entry is not None :
439
+ data .setdefault (table_name , {})[self .deserialize_key (row )] = entry
440
+
441
+ return cur
442
+
443
+ def get_config (self ):
444
+ """Read all config data.
445
+ Returns:
446
+ Config data in a dictionary form of
447
+ {
448
+ 'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
449
+ 'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
450
+ ...
451
+ }
452
+ """
453
+ client = self .get_redis_client (self .db_name )
454
+ pipe = client .pipeline ()
455
+ data = {}
456
+
457
+ cur = self ._get_config (client , pipe , data , 0 )
458
+ while cur != 0 :
459
+ cur = self ._get_config (client , pipe , data , cur )
460
+
461
+ return data
462
+
0 commit comments