-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqueue.go
102 lines (88 loc) · 2.76 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package main
import (
"time"
)
type Queue struct {
Input chan *SecurityEventsContainer
Options *Options
NextChannel chan []*SecurityEventsContainer
_items []*SecurityEventsContainer
_lastRun time.Time
_maxItems int
_maxSecurityEvents int
_containsCriticalEvent bool
_idleTimeout time.Duration
_idleTimeoutForCritical time.Duration
_firstRun bool
}
func (queue *Queue) Init() {
queue._lastRun = time.Now()
queue._maxItems = 10
queue._maxSecurityEvents = 1000
queue._idleTimeout = queue.Options.IdleTimeout
queue._idleTimeoutForCritical = time.Second * 20
queue._containsCriticalEvent = false
queue._firstRun = true
}
func (queue *Queue) Flush() {
itemsToSend := queue._items
queue.NextChannel <- itemsToSend
queue._items = nil
queue._containsCriticalEvent = false
queue._lastRun = time.Now()
queue._firstRun = false
}
func (queue *Queue) Run() {
// time ticker to flush events
ticker := time.NewTicker(queue._idleTimeout)
go func() {
for range ticker.C {
// push fake nil to input to run reprocessing queue
queue.Input <- nil
}
}()
// speed up the first sending request
time.AfterFunc(20*time.Second, func() {
// push fake nil to input to run reprocessing queue
queue.Input <- nil
})
for eventsContainer := range queue.Input {
// debugJson(eventsContainer)
if eventsContainer != nil {
queue._items = append(queue._items, eventsContainer)
// check if eventsContainer contains critical event, if yes, it should be send on server faster as usual
if !queue._containsCriticalEvent {
for _, securityEvent := range eventsContainer.SecurityEvents {
if securityEvent.Critical {
queue._containsCriticalEvent = true
break
}
}
}
}
currentTime := time.Now()
// check time for flash
if currentTime.Sub(queue._lastRun) > queue._idleTimeout {
// debug("FLUSH by time. queue size: %d", len(queue._items))
queue.Flush()
} else if len(queue._items) >= queue._maxItems {
// debug("FLUSH by maxItems. queue size: %d", len(queue._items))
queue.Flush()
} else if queue._containsCriticalEvent && currentTime.Sub(queue._lastRun) > queue._idleTimeoutForCritical {
// debug("FLUSH by critical event. queue size: %d", len(queue._items))
queue.Flush()
} else if queue._firstRun {
// debug("first run after start. queue size: %d", len(queue._items))
queue.Flush()
} else {
totalEvents := 0
for _, eventsContainer := range queue._items {
totalEvents += len(eventsContainer.SecurityEvents)
}
if totalEvents >= queue._maxSecurityEvents {
// debug("FLUSH by max security events. size: %d, max: %d", totalEvents, queue._maxSecurityEvents)
queue.Flush()
}
}
}
}