Skip to content

Commit

Permalink
Added more fields to the tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
scudette committed Mar 4, 2025
1 parent d969fb3 commit 9b25fa3
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
25 changes: 22 additions & 3 deletions vql/parsers/event_logs/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package event_logs

import (
"context"
"fmt"
"sort"
"sync"
"time"
Expand All @@ -19,6 +18,9 @@ type EventLogWatcherStats struct {
FindLastEvent int64 // Nanoseconds
MonitorOnce int64
Count int64
FirstScan time.Time
LastScan time.Time
NextScan time.Time
}

type EventLogWatcherTracker struct {
Expand All @@ -40,6 +42,20 @@ func (self *EventLogWatcherTracker) AddRow(
stats.Count++
}

func (self *EventLogWatcherTracker) SetNextScan(
filename *accessors.OSPath, accessor_name string, next time.Time) {
self.mu.Lock()
defer self.mu.Unlock()

key := accessor_name + filename.String()
stats, pres := self.files[key]
if !pres {
return
}

stats.NextScan = next
}

func (self *EventLogWatcherTracker) ChargeMonitorOnce(
filename *accessors.OSPath, accessor_name string) func() {

Expand All @@ -60,6 +76,7 @@ func (self *EventLogWatcherTracker) ChargeMonitorOnce(
}

stats.MonitorOnce += int64(duration)
stats.LastScan = start
}
}

Expand All @@ -75,8 +92,6 @@ func (self *EventLogWatcherTracker) ChargeFindLastEvent(

duration := utils.GetTime().Now().Sub(start)

fmt.Printf("Duration %v\n", int64(duration))

stats, pres := self.files[key]
if !pres {
stats = &EventLogWatcherStats{
Expand All @@ -86,6 +101,7 @@ func (self *EventLogWatcherTracker) ChargeFindLastEvent(
}

stats.FindLastEvent += int64(duration)
stats.FirstScan = start
}
}

Expand All @@ -110,6 +126,9 @@ func (self *EventLogWatcherTracker) WriteProfile(ctx context.Context,

output_chan <- ordereddict.NewDict().
Set("Filename", stat.Filename).
Set("FirstScan", stat.FirstScan).
Set("LastScan", stat.LastScan).
Set("NextScan", stat.NextScan.Sub(utils.GetTime().Now()).String()).
Set("FindLastEvent", time.Duration(stat.FindLastEvent).String()).
Set("MonitorOnce", time.Duration(stat.MonitorOnce).String()).
Set("Count", stat.Count)
Expand Down
22 changes: 14 additions & 8 deletions vql/parsers/event_logs/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package event_logs

import (
"context"
"math/rand"
"sync"
"time"

Expand All @@ -19,20 +18,22 @@ import (
)

var (
GlobalEventLogService = NewEventLogWatcherService()
GlobalEventLogService = NewEventLogWatcherService(context.Background())
)

// This service watches one or more event logs files and multiplexes
// events to multiple readers.
type EventLogWatcherService struct {
mu sync.Mutex

ctx context.Context
registrations map[string][]*Handle
}

func NewEventLogWatcherService() *EventLogWatcherService {
func NewEventLogWatcherService(ctx context.Context) *EventLogWatcherService {
return &EventLogWatcherService{
registrations: make(map[string][]*Handle),
ctx: ctx,
}
}

Expand Down Expand Up @@ -74,7 +75,7 @@ func (self *EventLogWatcherService) Register(
builder := services.ScopeBuilderFromScope(scope)
subscope := manager.BuildScope(builder)

go self.StartMonitoring(
go self.StartMonitoring(self.ctx,
subscope, filename, accessor, frequency)
}

Expand All @@ -89,6 +90,7 @@ func (self *EventLogWatcherService) Register(
// Monitor the filename for new events and emit them to all interested
// listeners. If no listeners exist we terminate.
func (self *EventLogWatcherService) StartMonitoring(
ctx context.Context,
scope vfilter.Scope,
filename *accessors.OSPath,
accessor_name string, frequency uint64) {
Expand All @@ -102,9 +104,6 @@ func (self *EventLogWatcherService) StartMonitoring(
frequency = 15
}

// Add some jitter to ensure evtx parsing is not synchronized.
frequency = uint64(rand.Intn(int(frequency)*2/10)) + frequency

// A resolver for messages
resolver, _ := evtx.GetNativeResolver()
accessor, err := accessors.GetAccessor(accessor_name, scope)
Expand All @@ -128,7 +127,12 @@ func (self *EventLogWatcherService) StartMonitoring(
last_event = self.monitorOnce(
filename, accessor_name, accessor, last_event, resolver)

time.Sleep(time.Duration(frequency) * time.Second)
duration := utils.Jitter(time.Duration(frequency) * time.Second)

eventLogWatchTracker.SetNextScan(
filename, accessor_name, utils.GetTime().Now().Add(duration))

utils.SleepWithCtx(ctx, duration)
}
}

Expand Down Expand Up @@ -251,6 +255,8 @@ func (self *EventLogWatcherService) monitorOnce(
event.Set("Message", evtx.ExpandMessage(event, resolver))
}

eventLogWatchTracker.AddRow(filename, accessor_name)

new_handles := make([]*Handle, 0, len(handles))
for _, handle := range handles {
select {
Expand Down

0 comments on commit 9b25fa3

Please sign in to comment.