Skip to content

Commit ef6274d

Browse files
authored
[libbeat] Enable WriteAheadLimit in the disk queue (#21391)
1 parent 7a369ca commit ef6274d

File tree

2 files changed

+28
-12
lines changed

2 files changed

+28
-12
lines changed

libbeat/publisher/queue/diskqueue/config.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ func DefaultSettings() Settings {
106106
MaxSegmentSize: 100 * (1 << 20), // 100MiB
107107
MaxBufferSize: (1 << 30), // 1GiB
108108

109-
ReadAheadLimit: 256,
110-
WriteAheadLimit: 1024,
109+
ReadAheadLimit: 512,
110+
WriteAheadLimit: 2048,
111111
}
112112
}
113113

@@ -129,6 +129,14 @@ func SettingsForUserConfig(config *common.Config) (Settings, error) {
129129
// divided by 10.
130130
settings.MaxSegmentSize = uint64(userConfig.MaxSize) / 10
131131
}
132+
133+
if userConfig.ReadAheadLimit != nil {
134+
settings.ReadAheadLimit = *userConfig.ReadAheadLimit
135+
}
136+
if userConfig.WriteAheadLimit != nil {
137+
settings.WriteAheadLimit = *userConfig.WriteAheadLimit
138+
}
139+
132140
return settings, nil
133141
}
134142

libbeat/publisher/queue/diskqueue/core_loop.go

+18-10
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,15 @@ func (dq *diskQueue) run() {
5858
// The writer loop completed a request, so check if there is more
5959
// data to be sent.
6060
dq.maybeWritePending()
61-
// We also check whether the reader loop is waiting for the data
62-
// that was just written.
61+
62+
// The data that was just written is now available for reading, so check
63+
// if we should start a new read request.
6364
dq.maybeReadPending()
6465

66+
// pendingFrames should now be empty. If any producers were blocked
67+
// because pendingFrames hit settings.WriteAheadLimit, wake them up.
68+
dq.maybeUnblockProducers()
69+
6570
// Reader loop handling
6671
case readerLoopResponse := <-dq.readerLoop.responseChan:
6772
dq.handleReaderLoopResponse(readerLoopResponse)
@@ -417,22 +422,25 @@ func (dq *diskQueue) enqueueWriteFrame(frame *writeFrame) {
417422
})
418423
}
419424

420-
// canAcceptFrameOfSize checks whether there is enough free space in the
421-
// queue (subject to settings.MaxBufferSize) to accept a new frame with
422-
// the given size. Size includes both the serialized data and the frame
423-
// header / footer; the easy way to do this for a writeFrame is to pass
425+
// canAcceptFrameOfSize checks whether there is enough free space in the queue
426+
// (subject to settings.{MaxBufferSize, WriteAheadLimit}) to accept a new
427+
// frame with the given size. Size includes both the serialized data and the
428+
// frame header / footer; the easy way to do this for a writeFrame is to pass
424429
// in frame.sizeOnDisk().
425430
// Capacity calculations do not include requests in the blockedProducers
426431
// list (that data is owned by its callers and we can't touch it until
427432
// we are ready to respond). That allows this helper to be used both while
428433
// handling producer requests and while deciding whether to unblock
429434
// producers after free capacity increases.
430-
// If we decide to add limits on how many events / bytes can be stored
431-
// in pendingFrames (to avoid unbounded memory use if the input is faster
432-
// than the disk), this is the function to modify.
433435
func (dq *diskQueue) canAcceptFrameOfSize(frameSize uint64) bool {
436+
// If pendingFrames is already at the WriteAheadLimit, we can't accept
437+
// any new frames right now.
438+
if len(dq.pendingFrames) >= dq.settings.WriteAheadLimit {
439+
return false
440+
}
441+
442+
// If the queue size is unbounded (max == 0), we accept.
434443
if dq.settings.MaxBufferSize == 0 {
435-
// Currently we impose no limitations if the queue size is unbounded.
436444
return true
437445
}
438446

0 commit comments

Comments
 (0)