From 9b25fa3d02ba411d82da7e6dd89842a5c9414fa2 Mon Sep 17 00:00:00 2001 From: Mike Cohen Date: Tue, 4 Mar 2025 12:12:51 +1000 Subject: [PATCH] Added more fields to the tracker --- vql/parsers/event_logs/tracker.go | 25 ++++++++++++++++++++++--- vql/parsers/event_logs/watcher.go | 22 ++++++++++++++-------- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/vql/parsers/event_logs/tracker.go b/vql/parsers/event_logs/tracker.go index 524499d132..54bb50c9a1 100644 --- a/vql/parsers/event_logs/tracker.go +++ b/vql/parsers/event_logs/tracker.go @@ -2,7 +2,6 @@ package event_logs import ( "context" - "fmt" "sort" "sync" "time" @@ -19,6 +18,9 @@ type EventLogWatcherStats struct { FindLastEvent int64 // Nanoseconds MonitorOnce int64 Count int64 + FirstScan time.Time + LastScan time.Time + NextScan time.Time } type EventLogWatcherTracker struct { @@ -40,6 +42,20 @@ func (self *EventLogWatcherTracker) AddRow( stats.Count++ } +func (self *EventLogWatcherTracker) SetNextScan( + filename *accessors.OSPath, accessor_name string, next time.Time) { + self.mu.Lock() + defer self.mu.Unlock() + + key := accessor_name + filename.String() + stats, pres := self.files[key] + if !pres { + return + } + + stats.NextScan = next +} + func (self *EventLogWatcherTracker) ChargeMonitorOnce( filename *accessors.OSPath, accessor_name string) func() { @@ -60,6 +76,7 @@ func (self *EventLogWatcherTracker) ChargeMonitorOnce( } stats.MonitorOnce += int64(duration) + stats.LastScan = start } } @@ -75,8 +92,6 @@ func (self *EventLogWatcherTracker) ChargeFindLastEvent( duration := utils.GetTime().Now().Sub(start) - fmt.Printf("Duration %v\n", int64(duration)) - stats, pres := self.files[key] if !pres { stats = &EventLogWatcherStats{ @@ -86,6 +101,7 @@ func (self *EventLogWatcherTracker) ChargeFindLastEvent( } stats.FindLastEvent += int64(duration) + stats.FirstScan = start } } @@ -110,6 +126,9 @@ func (self *EventLogWatcherTracker) WriteProfile(ctx context.Context, output_chan <- ordereddict.NewDict(). Set("Filename", stat.Filename). + Set("FirstScan", stat.FirstScan). + Set("LastScan", stat.LastScan). + Set("NextScan", stat.NextScan.Sub(utils.GetTime().Now()).String()). Set("FindLastEvent", time.Duration(stat.FindLastEvent).String()). Set("MonitorOnce", time.Duration(stat.MonitorOnce).String()). Set("Count", stat.Count) diff --git a/vql/parsers/event_logs/watcher.go b/vql/parsers/event_logs/watcher.go index 452785ae04..aa3c2d8f61 100644 --- a/vql/parsers/event_logs/watcher.go +++ b/vql/parsers/event_logs/watcher.go @@ -2,7 +2,6 @@ package event_logs import ( "context" - "math/rand" "sync" "time" @@ -19,7 +18,7 @@ import ( ) var ( - GlobalEventLogService = NewEventLogWatcherService() + GlobalEventLogService = NewEventLogWatcherService(context.Background()) ) // This service watches one or more event logs files and multiplexes @@ -27,12 +26,14 @@ var ( type EventLogWatcherService struct { mu sync.Mutex + ctx context.Context registrations map[string][]*Handle } -func NewEventLogWatcherService() *EventLogWatcherService { +func NewEventLogWatcherService(ctx context.Context) *EventLogWatcherService { return &EventLogWatcherService{ registrations: make(map[string][]*Handle), + ctx: ctx, } } @@ -74,7 +75,7 @@ func (self *EventLogWatcherService) Register( builder := services.ScopeBuilderFromScope(scope) subscope := manager.BuildScope(builder) - go self.StartMonitoring( + go self.StartMonitoring(self.ctx, subscope, filename, accessor, frequency) } @@ -89,6 +90,7 @@ func (self *EventLogWatcherService) Register( // Monitor the filename for new events and emit them to all interested // listeners. If no listeners exist we terminate. func (self *EventLogWatcherService) StartMonitoring( + ctx context.Context, scope vfilter.Scope, filename *accessors.OSPath, accessor_name string, frequency uint64) { @@ -102,9 +104,6 @@ func (self *EventLogWatcherService) StartMonitoring( frequency = 15 } - // Add some jitter to ensure evtx parsing is not synchronized. - frequency = uint64(rand.Intn(int(frequency)*2/10)) + frequency - // A resolver for messages resolver, _ := evtx.GetNativeResolver() accessor, err := accessors.GetAccessor(accessor_name, scope) @@ -128,7 +127,12 @@ func (self *EventLogWatcherService) StartMonitoring( last_event = self.monitorOnce( filename, accessor_name, accessor, last_event, resolver) - time.Sleep(time.Duration(frequency) * time.Second) + duration := utils.Jitter(time.Duration(frequency) * time.Second) + + eventLogWatchTracker.SetNextScan( + filename, accessor_name, utils.GetTime().Now().Add(duration)) + + utils.SleepWithCtx(ctx, duration) } } @@ -251,6 +255,8 @@ func (self *EventLogWatcherService) monitorOnce( event.Set("Message", evtx.ExpandMessage(event, resolver)) } + eventLogWatchTracker.AddRow(filename, accessor_name) + new_handles := make([]*Handle, 0, len(handles)) for _, handle := range handles { select {