Skip to content

Commit 4544872

Browse files
committed
Adaptive cleanup of mutex table
1 parent 3b41203 commit 4544872

File tree

2 files changed

+91
-10
lines changed

2 files changed

+91
-10
lines changed

crates/sui-core/src/authority_server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl ValidatorService {
191191
tokio::spawn(async move {
192192
narwhal_node::restarter::NodeRestarter::watch(
193193
consensus_keypair,
194-
&*consensus_committee,
194+
&consensus_committee,
195195
consensus_worker_cache,
196196
consensus_storage_base_path,
197197
consensus_execution_state,

crates/sui-storage/src/mutex_table.rs

+90-9
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ use std::collections::HashMap;
66
use std::error::Error;
77
use std::fmt;
88
use std::hash::{BuildHasher, Hash, Hasher};
9-
use std::sync::atomic::{AtomicBool, Ordering};
9+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1010
use std::sync::Arc;
1111
use std::time::Duration;
1212

1313
use tokio::sync::{Mutex, RwLock};
1414
use tokio::task::JoinHandle;
15+
use tokio::time::Instant;
1516
use tracing::info;
1617

1718
type InnerLockTable<K> = HashMap<K, Arc<tokio::sync::Mutex<()>>>;
@@ -22,6 +23,7 @@ pub struct MutexTable<K: Hash> {
2223
_k: std::marker::PhantomData<K>,
2324
_cleaner: JoinHandle<()>,
2425
stop: Arc<AtomicBool>,
26+
size: Arc<AtomicUsize>,
2527
}
2628

2729
#[derive(Debug)]
@@ -46,6 +48,7 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
4648
shard_size: usize,
4749
cleanup_period: Duration,
4850
cleanup_initial_delay: Duration,
51+
cleanup_entries_threshold: usize,
4952
) -> Self {
5053
let lock_table: Arc<Vec<RwLock<InnerLockTable<K>>>> = Arc::new(
5154
(0..num_shards)
@@ -56,19 +59,29 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
5659
let cloned = lock_table.clone();
5760
let stop = Arc::new(AtomicBool::new(false));
5861
let stop_cloned = stop.clone();
62+
let size: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
63+
let size_cloned = size.clone();
5964
Self {
6065
random_state: RandomState::new(),
6166
lock_table,
6267
_k: std::marker::PhantomData {},
6368
_cleaner: tokio::spawn(async move {
6469
tokio::time::sleep(cleanup_initial_delay).await;
70+
let mut previous_cleanup_instant = Instant::now();
6571
while !stop_cloned.load(Ordering::SeqCst) {
66-
Self::cleanup(cloned.clone());
67-
tokio::time::sleep(cleanup_period).await;
72+
if size_cloned.load(Ordering::SeqCst) >= cleanup_entries_threshold
73+
|| previous_cleanup_instant.elapsed() >= cleanup_period
74+
{
75+
let num_removed = Self::cleanup(cloned.clone());
76+
size_cloned.fetch_sub(num_removed, Ordering::SeqCst);
77+
previous_cleanup_instant = Instant::now();
78+
}
79+
tokio::time::sleep(Duration::from_secs(1)).await;
6880
}
6981
info!("Stopping mutex table cleanup!");
7082
}),
7183
stop,
84+
size,
7285
}
7386
}
7487

@@ -78,10 +91,16 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
7891
shard_size,
7992
Duration::from_secs(10),
8093
Duration::from_secs(10),
94+
10_000,
8195
)
8296
}
8397

84-
pub fn cleanup(lock_table: Arc<Vec<RwLock<InnerLockTable<K>>>>) {
98+
pub fn size(&self) -> usize {
99+
self.size.load(Ordering::SeqCst)
100+
}
101+
102+
pub fn cleanup(lock_table: Arc<Vec<RwLock<InnerLockTable<K>>>>) -> usize {
103+
let mut num_removed: usize = 0;
85104
for shard in lock_table.iter() {
86105
let map = shard.try_write();
87106
if map.is_err() {
@@ -93,12 +112,18 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
93112
// This check is also likely sufficient e.g. you don't even need try_lock below, but keeping it just in case
94113
if Arc::strong_count(v) == 1 {
95114
let mutex_guard = v.try_lock();
96-
mutex_guard.is_err()
115+
if mutex_guard.is_ok() {
116+
num_removed += 1;
117+
false
118+
} else {
119+
true
120+
}
97121
} else {
98122
true
99123
}
100124
});
101125
}
126+
num_removed
102127
}
103128

104129
fn get_lock_idx(&self, key: &K) -> usize {
@@ -144,7 +169,10 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
144169
let element = {
145170
let mut map = self.lock_table[lock_idx].write().await;
146171
map.entry(k)
147-
.or_insert_with(|| Arc::new(Mutex::new(())))
172+
.or_insert_with(|| {
173+
self.size.fetch_add(1, Ordering::SeqCst);
174+
Arc::new(Mutex::new(()))
175+
})
148176
.clone()
149177
};
150178
LockGuard(element.lock_owned().await)
@@ -171,7 +199,10 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
171199
.try_write()
172200
.map_err(|_| TryAcquireLockError::LockTableLocked)?;
173201
map.entry(k)
174-
.or_insert_with(|| Arc::new(Mutex::new(())))
202+
.or_insert_with(|| {
203+
self.size.fetch_add(1, Ordering::SeqCst);
204+
Arc::new(Mutex::new(()))
205+
})
175206
.clone()
176207
};
177208
let lock = element.try_lock_owned();
@@ -225,8 +256,13 @@ async fn test_mutex_table_concurrent_in_same_bucket() {
225256
#[tokio::test]
226257
async fn test_mutex_table() {
227258
// Disable bg cleanup with Duration.MAX for initial delay
228-
let mutex_table =
229-
MutexTable::<String>::new_with_cleanup(1, 128, Duration::from_secs(10), Duration::MAX);
259+
let mutex_table = MutexTable::<String>::new_with_cleanup(
260+
1,
261+
128,
262+
Duration::from_secs(10),
263+
Duration::MAX,
264+
1000,
265+
);
230266
let john1 = mutex_table.try_acquire_lock("john".to_string());
231267
assert!(john1.is_ok());
232268
let john2 = mutex_table.try_acquire_lock("john".to_string());
@@ -259,6 +295,7 @@ async fn test_mutex_table_bg_cleanup() {
259295
128,
260296
Duration::from_secs(5),
261297
Duration::from_secs(1),
298+
1000,
262299
);
263300
let lock1 = mutex_table.try_acquire_lock("lock1".to_string());
264301
let lock2 = mutex_table.try_acquire_lock("lock2".to_string());
@@ -296,3 +333,47 @@ async fn test_mutex_table_bg_cleanup() {
296333
assert!(locked.is_empty());
297334
}
298335
}
336+
337+
#[tokio::test]
338+
async fn test_mutex_table_bg_cleanup_with_size_threshold() {
339+
// set up the table to never trigger cleanup because of time period but only size threshold
340+
let mutex_table =
341+
MutexTable::<String>::new_with_cleanup(1, 128, Duration::MAX, Duration::from_secs(1), 5);
342+
let lock1 = mutex_table.try_acquire_lock("lock1".to_string());
343+
let lock2 = mutex_table.try_acquire_lock("lock2".to_string());
344+
let lock3 = mutex_table.try_acquire_lock("lock3".to_string());
345+
let lock4 = mutex_table.try_acquire_lock("lock4".to_string());
346+
let lock5 = mutex_table.try_acquire_lock("lock5".to_string());
347+
assert!(lock1.is_ok());
348+
assert!(lock2.is_ok());
349+
assert!(lock3.is_ok());
350+
assert!(lock4.is_ok());
351+
assert!(lock5.is_ok());
352+
// Trigger cleanup
353+
MutexTable::cleanup(mutex_table.lock_table.clone());
354+
// Try acquiring locks again, these should still fail because locks have not been released
355+
let lock11 = mutex_table.try_acquire_lock("lock1".to_string());
356+
let lock22 = mutex_table.try_acquire_lock("lock2".to_string());
357+
let lock33 = mutex_table.try_acquire_lock("lock3".to_string());
358+
let lock44 = mutex_table.try_acquire_lock("lock4".to_string());
359+
let lock55 = mutex_table.try_acquire_lock("lock5".to_string());
360+
assert!(lock11.is_err());
361+
assert!(lock22.is_err());
362+
assert!(lock33.is_err());
363+
assert!(lock44.is_err());
364+
assert!(lock55.is_err());
365+
assert_eq!(mutex_table.size(), 5);
366+
// drop all locks
367+
drop(lock1);
368+
drop(lock2);
369+
drop(lock3);
370+
drop(lock4);
371+
drop(lock5);
372+
// Wait for bg cleanup to be triggered because of size threshold
373+
tokio::time::sleep(Duration::from_secs(5)).await;
374+
assert_eq!(mutex_table.size(), 0);
375+
for entry in mutex_table.lock_table.iter() {
376+
let locked = entry.read().await;
377+
assert!(locked.is_empty());
378+
}
379+
}

0 commit comments

Comments
 (0)