Skip to content

Commit d813311

Browse files
authored
fix(otel): ensure default processors are applied in fbreceiver (#42658)
* fix(otel): ensure default processors are applied in fbreceiver * set default processors in NewBeatReceiver
1 parent 643d153 commit d813311

File tree

6 files changed

+120
-7
lines changed

6 files changed

+120
-7
lines changed

libbeat/beat/info.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ type Info struct {
4646
DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring
4747
Namespace *monitoring.Namespace // a monitor namespace that is unique per beat instance
4848
}
49-
LogConsumer consumer.Logs // otel log consumer
50-
49+
LogConsumer consumer.Logs // otel log consumer
50+
UseDefaultProcessors bool // Whether to use the default processors
5151
}
5252

5353
func (i Info) FQDNAwareHostname(useFQDN bool) string {

libbeat/cmd/instance/beat.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool, initFuncs []func
263263
}
264264

265265
// NewBeatReceiver creates a Beat that will be used in the context of an otel receiver
266-
func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, consumer consumer.Logs, core zapcore.Core) (*Beat, error) {
266+
func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, useDefaultProcessors bool, consumer consumer.Logs, core zapcore.Core) (*Beat, error) {
267267
b, err := NewBeat(settings.Name,
268268
settings.IndexPrefix,
269269
settings.Version,
@@ -440,6 +440,7 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
440440
return nil, fmt.Errorf("error setting index supporter: %w", err)
441441
}
442442

443+
b.Info.UseDefaultProcessors = useDefaultProcessors
443444
processingFactory := settings.Processing
444445
if processingFactory == nil {
445446
processingFactory = processing.MakeDefaultBeatSupport(true)

libbeat/publisher/processing/default.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,11 @@ func MakeDefaultSupport(
114114
// don't try to "merge" the two lists somehow, if the supportFactory caller requests its own processors, use those
115115
// also makes it easier to disable global processors if needed, since they're otherwise hardcoded
116116
var rawProcessors processors.PluginConfig
117+
shouldLoadDefaultProcessors := info.UseDefaultProcessors || fleetmode.Enabled()
118+
117119
// don't check the array directly, use HasField, that way processors can easily be bypassed with -E processors=[]
118-
if fleetmode.Enabled() && !beatCfg.HasField("processors") {
119-
log.Debugf("In fleet mode with no processors specified, defaulting to global processors")
120+
if shouldLoadDefaultProcessors && !beatCfg.HasField("processors") {
121+
log.Debugf("In fleet/otel mode with no processors specified, defaulting to global processors")
120122
rawProcessors = fleetDefaultProcessors
121123

122124
} else {

x-pack/filebeat/fbreceiver/factory.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.
4545
settings.ElasticLicensed = true
4646
settings.Initialize = append(settings.Initialize, include.InitializeModule)
4747

48-
b, err := instance.NewBeatReceiver(settings, cfg.Beatconfig, consumer, set.Logger.Core())
48+
b, err := instance.NewBeatReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core())
4949
if err != nil {
5050
return nil, fmt.Errorf("error creating %s: %w", Name, err)
5151
}

x-pack/filebeat/fbreceiver/receiver_test.go

+110
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@
55
package fbreceiver
66

77
import (
8+
"bufio"
89
"bytes"
910
"context"
11+
"strings"
1012
"testing"
1113
"time"
1214

15+
"github.com/elastic/elastic-agent-libs/mapstr"
16+
1317
"github.com/stretchr/testify/assert"
1418
"github.com/stretchr/testify/require"
1519
"go.opentelemetry.io/collector/consumer"
@@ -91,6 +95,112 @@ found:
9195
assert.NoError(t, err, "Error shutting down filebeatreceiver")
9296
}
9397

98+
func TestReceiverDefaultProcessors(t *testing.T) {
99+
config := Config{
100+
Beatconfig: map[string]interface{}{
101+
"filebeat": map[string]interface{}{
102+
"inputs": []map[string]interface{}{
103+
{
104+
"type": "benchmark",
105+
"enabled": true,
106+
"message": "test",
107+
"count": 1,
108+
},
109+
},
110+
},
111+
"output": map[string]interface{}{
112+
"otelconsumer": map[string]interface{}{},
113+
},
114+
"logging": map[string]interface{}{
115+
"level": "debug",
116+
"selectors": []string{
117+
"*",
118+
},
119+
},
120+
"path.home": t.TempDir(),
121+
},
122+
}
123+
124+
var zapLogs bytes.Buffer
125+
core := zapcore.NewCore(
126+
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
127+
zapcore.AddSync(&zapLogs),
128+
zapcore.DebugLevel)
129+
130+
receiverSettings := receiver.Settings{}
131+
receiverSettings.Logger = zap.New(core)
132+
133+
logsCh := make(chan []mapstr.M, 1)
134+
logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
135+
var logs []mapstr.M
136+
for i := 0; i < ld.ResourceLogs().Len(); i++ {
137+
rl := ld.ResourceLogs().At(i)
138+
for j := 0; j < rl.ScopeLogs().Len(); j++ {
139+
sl := rl.ScopeLogs().At(j)
140+
for k := 0; k < sl.LogRecords().Len(); k++ {
141+
log := sl.LogRecords().At(k)
142+
logs = append(logs, log.Body().Map().AsRaw())
143+
}
144+
}
145+
}
146+
147+
logsCh <- logs
148+
return nil
149+
})
150+
assert.NoError(t, err, "Error creating log consumer")
151+
152+
r, err := NewFactory().CreateLogs(context.Background(), receiverSettings, &config, logConsumer)
153+
assert.NoErrorf(t, err, "Error creating receiver. Logs:\n %s", zapLogs.String())
154+
155+
err = r.Start(context.Background(), nil)
156+
assert.NoError(t, err, "Error starting filebeatreceiver")
157+
defer func() {
158+
require.NoError(t, r.Shutdown(context.Background()))
159+
}()
160+
161+
var logs []mapstr.M
162+
select {
163+
case logs = <-logsCh:
164+
case <-time.After(1 * time.Minute):
165+
t.Fatal("timeout waiting for logs")
166+
}
167+
168+
require.Len(t, logs, 1)
169+
t.Log("ingested log: ", logs[0])
170+
171+
scanner := bufio.NewScanner(&zapLogs)
172+
wantKeywords := []string{
173+
"Generated new processors",
174+
"add_host_metadata",
175+
"add_cloud_metadata",
176+
"add_docker_metadata",
177+
"add_kubernetes_metadata",
178+
}
179+
180+
var processorsLoaded bool
181+
for scanner.Scan() {
182+
line := scanner.Text()
183+
if stringContainsAll(line, wantKeywords) {
184+
processorsLoaded = true
185+
break
186+
}
187+
}
188+
189+
require.True(t, processorsLoaded, "processors not loaded")
190+
// Check that add_host_metadata works, other processors are not guaranteed to add fields in all environments
191+
require.Contains(t, logs[0].Flatten(), "host.architecture")
192+
}
193+
194+
func stringContainsAll(s string, want []string) bool {
195+
for _, w := range want {
196+
if !strings.Contains(s, w) {
197+
return false
198+
}
199+
}
200+
201+
return true
202+
}
203+
94204
func BenchmarkFactory(b *testing.B) {
95205
tmpDir := b.TempDir()
96206

x-pack/metricbeat/mbreceiver/factory.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.
3333
settings := cmd.MetricbeatSettings(Name)
3434
settings.ElasticLicensed = true
3535

36-
b, err := instance.NewBeatReceiver(settings, cfg.Beatconfig, consumer, set.Logger.Core())
36+
b, err := instance.NewBeatReceiver(settings, cfg.Beatconfig, false, consumer, set.Logger.Core())
3737
if err != nil {
3838
return nil, fmt.Errorf("error creating %s: %w", Name, err)
3939
}

0 commit comments

Comments
 (0)