Skip to content

Commit

Permalink
Try to fix rare memory allocation deadlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalif committed Apr 5, 2023
1 parent f98e72c commit fdc2d17
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 29 deletions.
15 changes: 0 additions & 15 deletions internal/buffer_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ var bufferLog = GetLogger("buffer")
// BufferPool tracks memory used by cache buffers
type BufferPool struct {
mu sync.Mutex
cond *sync.Cond
wantFree int32

curDirtyID uint64

Expand Down Expand Up @@ -184,8 +182,6 @@ func NewBufferPool(limit int64, gcInterval uint64) *BufferPool {
gcInterval: gcInterval,
}

pool.cond = sync.NewCond(&pool.mu)

return &pool
}

Expand All @@ -202,9 +198,6 @@ func (pool *BufferPool) recomputeBufferLimit() {
func (pool *BufferPool) Use(size int64, ignoreMemoryLimit bool) (err error) {
if size <= 0 {
atomic.AddInt64(&pool.cur, size)
if atomic.LoadInt32(&pool.wantFree) > 0 {
pool.cond.Broadcast()
}
} else {
pool.mu.Lock()
err = pool.UseUnlocked(size, ignoreMemoryLimit)
Expand All @@ -231,9 +224,6 @@ func (pool *BufferPool) UseUnlocked(size int64, ignoreMemoryLimit bool) error {
freed, canFreeMoreAsync := pool.FreeSomeCleanBuffers(newSize - pool.max)
bufferLog.Debugf("Freed %v, now: %v/%v", freed, newSize, pool.max)
for atomic.LoadInt64(&pool.cur) > pool.max && canFreeMoreAsync && !ignoreMemoryLimit {
atomic.AddInt32(&pool.wantFree, 1)
pool.cond.Wait()
atomic.AddInt32(&pool.wantFree, -1)
freed, canFreeMoreAsync = pool.FreeSomeCleanBuffers(atomic.LoadInt64(&pool.cur) - pool.max)
bufferLog.Debugf("Freed %v, now: %v/%v", freed, atomic.LoadInt64(&pool.cur), pool.max)
}
Expand All @@ -245,16 +235,11 @@ func (pool *BufferPool) UseUnlocked(size int64, ignoreMemoryLimit bool) error {
// free memory AND correct our limits, yet we still can't allocate.
// it's likely that we are simply asking for too much
atomic.AddInt64(&pool.cur, -size)
pool.cond.Broadcast()
log.Errorf("Unable to allocate %d bytes, used %d bytes, limit is %d bytes", size, atomic.LoadInt64(&pool.cur)-size, pool.max)
return syscall.ENOMEM
}
}
}

if size < 0 && atomic.LoadInt32(&pool.wantFree) > 0 {
pool.cond.Broadcast()
}

return nil
}
15 changes: 3 additions & 12 deletions internal/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,7 +1494,7 @@ func (inode *Inode) SendUpload() bool {

if inode.Attributes.Size <= inode.fs.flags.SinglePartMB*1024*1024 && inode.mpu == nil {
// Don't flush small files with active file handles (if not under memory pressure)
if inode.IsFlushing == 0 && (inode.fileHandles == 0 || inode.forceFlush || atomic.LoadInt32(&inode.fs.bufferPool.wantFree) > 0) {
if inode.IsFlushing == 0 && (inode.fileHandles == 0 || inode.forceFlush || atomic.LoadInt32(&inode.fs.wantFree) > 0) {
// Don't accidentally trigger a parallel multipart flush
inode.IsFlushing += inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, 1)
Expand Down Expand Up @@ -1543,7 +1543,7 @@ func (inode *Inode) SendUpload() bool {
// Pick part(s) to flush
initiated := false
lastPart := uint64(0)
flushInode := inode.fileHandles == 0 || inode.forceFlush || atomic.LoadInt32(&inode.fs.bufferPool.wantFree) > 0
flushInode := inode.fileHandles == 0 || inode.forceFlush || atomic.LoadInt32(&inode.fs.wantFree) > 0
partDirty := false
partLocked := false
partEvicted := false
Expand Down Expand Up @@ -1624,7 +1624,7 @@ func (inode *Inode) SendUpload() bool {
}
}
if canComplete && (inode.fileHandles == 0 || inode.forceFlush ||
atomic.LoadInt32(&inode.fs.bufferPool.wantFree) > 0 && hasEvictedParts) {
atomic.LoadInt32(&inode.fs.wantFree) > 0 && hasEvictedParts) {
// Complete the multipart upload
inode.IsFlushing += inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, 1)
Expand Down Expand Up @@ -1793,9 +1793,6 @@ func (inode *Inode) FlushSmallObject() {
} else {
inode.SetCacheState(ST_MODIFIED)
}
if atomic.LoadInt32(&inode.fs.bufferPool.wantFree) > 0 {
inode.fs.bufferPool.cond.Broadcast()
}
}
inode.updateFromFlush(sz, resp.ETag, resp.LastModified, resp.StorageClass)
}
Expand Down Expand Up @@ -1984,9 +1981,6 @@ func (inode *Inode) FlushPart(part uint64) {
}
}
}
if atomic.LoadInt32(&inode.fs.bufferPool.wantFree) > 0 {
inode.fs.bufferPool.cond.Broadcast()
}
}
}

Expand Down Expand Up @@ -2062,9 +2056,6 @@ func (inode *Inode) completeMultipart() {
} else {
inode.SetCacheState(ST_MODIFIED)
}
if atomic.LoadInt32(&inode.fs.bufferPool.wantFree) > 0 {
inode.fs.bufferPool.cond.Broadcast()
}
}
}
}
Expand Down
17 changes: 15 additions & 2 deletions internal/goofys.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Goofys struct {
rootAttrs InodeAttributes

bufferPool *BufferPool
wantFree int32

// A lock protecting the state of the file system struct itself (distinct
// from per-inode locks). Should be always taken after any inode locks.
Expand Down Expand Up @@ -537,20 +538,32 @@ func (fs *Goofys) FreeSomeCleanBuffers(size int64) (int64, bool) {
}
}
if haveDirty {
fs.WakeupFlusher()
fs.bufferPool.mu.Unlock()
atomic.AddInt32(&fs.wantFree, 1)
fs.WakeupFlusherAndWait(true)
atomic.AddInt32(&fs.wantFree, -1)
fs.bufferPool.mu.Lock()
}
return freed, haveDirty
}

func (fs *Goofys) WakeupFlusher() {
func (fs *Goofys) WakeupFlusherAndWait(wait bool) {
fs.flusherMu.Lock()
if fs.flushPending == 0 {
fs.flushPending = 1
fs.flusherCond.Broadcast()
if wait {
// Wait for any result
fs.flusherCond.Wait()
}
}
fs.flusherMu.Unlock()
}

func (fs *Goofys) WakeupFlusher() {
fs.WakeupFlusherAndWait(false)
}

func (fs *Goofys) ScheduleRetryFlush() {
if atomic.CompareAndSwapInt32(&fs.flushRetrySet, 0, 1) {
time.AfterFunc(fs.flags.RetryInterval, func() {
Expand Down

0 comments on commit fdc2d17

Please sign in to comment.