Skip to content

Commit

Permalink
[chore] Fix bug in exporter.Request.MergeSplit() (open-telemetry#12009)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR fixes `logsRequest`, `tracesRequest` and `metricsRequest` to NOT
modify the input if `MergeSplit()` returns an error. Copy-pasted the doc
string of `MergeSplit()` for references:


// MergeSplit is a function that merge and/or splits this request with
another one into multiple requests based on the
// configured limit provided in MaxSizeConfig.
// All the returned requests MUST have a number of items that does not
exceed the maximum number of items.
// Size of the last returned request MUST be less or equal than the size
of any other returned request.
// **The original request MUST not be mutated if error is returned after
mutation** or if the exporter is
// marked as not mutable. The length of the returned slice MUST not be
0.
// Experimental: This API is at the early stage of development and may
change without backward compatibility
// until
open-telemetry#8122 is
resolved.
MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, Request)
([]Request, error)


<!-- Issue number if applicable -->
#### Link to tracking issue
Fixes #

<!--Describe what testing was performed and which tests were added.-->
#### Testing

Added `TestMergeSplit{Logs, Traces,
Metrics}InputNotModifiedIfErrorReturned`

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
sfc-gh-sili authored Jan 4, 2025
1 parent afdd296 commit 3e87d0d
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 24 deletions.
17 changes: 11 additions & 6 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,24 @@ func (req *logsRequest) Merge(_ context.Context, r2 Request) (Request, error) {
// MergeSplit splits and/or merges the provided logs request and the current request into one or more requests
// conforming with the MaxSizeConfig.
func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
var req2 *logsRequest
if r2 != nil {
var ok bool
req2, ok = r2.(*logsRequest)
if !ok {
return nil, errors.New("invalid input type")
}
}

var (
res []Request
destReq *logsRequest
capacityLeft = cfg.MaxSizeItems
)
for _, req := range []Request{req, r2} {
if req == nil {
for _, srcReq := range []*logsRequest{req, req2} {
if srcReq == nil {
continue
}
srcReq, ok := req.(*logsRequest)
if !ok {
return nil, errors.New("invalid input type")
}
if srcReq.ld.LogRecordCount() <= capacityLeft {
if destReq == nil {
destReq = srcReq
Expand Down
12 changes: 10 additions & 2 deletions exporter/exporterhelper/logs_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestMergeLogsInvalidInput(t *testing.T) {
lr1 := &tracesRequest{td: testdata.GenerateTraces(2)}
lr2 := &logsRequest{ld: testdata.GenerateLogs(3)}
_, err := lr1.Merge(context.Background(), lr2)
assert.Error(t, err)
require.Error(t, err)
}

func TestMergeSplitLogs(t *testing.T) {
Expand Down Expand Up @@ -129,11 +129,19 @@ func TestMergeSplitLogs(t *testing.T) {
}
}

func TestMergeSplitLogsInputNotModifiedIfErrorReturned(t *testing.T) {
r1 := &logsRequest{ld: testdata.GenerateLogs(18)}
r2 := &tracesRequest{td: testdata.GenerateTraces(3)}
_, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2)
require.Error(t, err)
assert.Equal(t, 18, r1.ItemsCount())
}

func TestMergeSplitLogsInvalidInput(t *testing.T) {
r1 := &tracesRequest{td: testdata.GenerateTraces(2)}
r2 := &logsRequest{ld: testdata.GenerateLogs(3)}
_, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, r2)
assert.Error(t, err)
require.Error(t, err)
}

func TestExtractLogs(t *testing.T) {
Expand Down
17 changes: 11 additions & 6 deletions exporter/exporterhelper/metrics_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,24 @@ func (req *metricsRequest) Merge(_ context.Context, r2 Request) (Request, error)
// MergeSplit splits and/or merges the provided metrics request and the current request into one or more requests
// conforming with the MaxSizeConfig.
func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
var req2 *metricsRequest
if r2 != nil {
var ok bool
req2, ok = r2.(*metricsRequest)
if !ok {
return nil, errors.New("invalid input type")
}
}

var (
res []Request
destReq *metricsRequest
capacityLeft = cfg.MaxSizeItems
)
for _, req := range []Request{req, r2} {
if req == nil {
for _, srcReq := range []*metricsRequest{req, req2} {
if srcReq == nil {
continue
}
srcReq, ok := req.(*metricsRequest)
if !ok {
return nil, errors.New("invalid input type")
}
if srcReq.md.DataPointCount() <= capacityLeft {
if destReq == nil {
destReq = srcReq
Expand Down
12 changes: 10 additions & 2 deletions exporter/exporterhelper/metrics_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestMergeMetricsInvalidInput(t *testing.T) {
mr1 := &tracesRequest{td: testdata.GenerateTraces(2)}
mr2 := &metricsRequest{md: testdata.GenerateMetrics(3)}
_, err := mr1.Merge(context.Background(), mr2)
assert.Error(t, err)
require.Error(t, err)
}

func TestMergeSplitMetrics(t *testing.T) {
Expand Down Expand Up @@ -129,11 +129,19 @@ func TestMergeSplitMetrics(t *testing.T) {
}
}

func TestMergeSplitMetricsInputNotModifiedIfErrorReturned(t *testing.T) {
r1 := &metricsRequest{md: testdata.GenerateMetrics(18)} // 18 metrics, 36 data points
r2 := &logsRequest{ld: testdata.GenerateLogs(3)}
_, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2)
require.Error(t, err)
assert.Equal(t, 36, r1.ItemsCount())
}

func TestMergeSplitMetricsInvalidInput(t *testing.T) {
r1 := &tracesRequest{td: testdata.GenerateTraces(2)}
r2 := &metricsRequest{md: testdata.GenerateMetrics(3)}
_, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2)
assert.Error(t, err)
require.Error(t, err)
}

func TestExtractMetrics(t *testing.T) {
Expand Down
17 changes: 11 additions & 6 deletions exporter/exporterhelper/traces_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,24 @@ func (req *tracesRequest) Merge(_ context.Context, r2 Request) (Request, error)
// MergeSplit splits and/or merges the provided traces request and the current request into one or more requests
// conforming with the MaxSizeConfig.
func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
var req2 *tracesRequest
if r2 != nil {
var ok bool
req2, ok = r2.(*tracesRequest)
if !ok {
return nil, errors.New("invalid input type")
}
}

var (
res []Request
destReq *tracesRequest
capacityLeft = cfg.MaxSizeItems
)
for _, req := range []Request{req, r2} {
if req == nil {
for _, srcReq := range []*tracesRequest{req, req2} {
if srcReq == nil {
continue
}
srcReq, ok := req.(*tracesRequest)
if !ok {
return nil, errors.New("invalid input type")
}
if srcReq.td.SpanCount() <= capacityLeft {
if destReq == nil {
destReq = srcReq
Expand Down
12 changes: 10 additions & 2 deletions exporter/exporterhelper/traces_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestMergeTracesInvalidInput(t *testing.T) {
tr1 := &logsRequest{ld: testdata.GenerateLogs(2)}
tr2 := &tracesRequest{td: testdata.GenerateTraces(3)}
_, err := tr1.Merge(context.Background(), tr2)
assert.Error(t, err)
require.Error(t, err)
}

func TestMergeSplitTraces(t *testing.T) {
Expand Down Expand Up @@ -136,11 +136,19 @@ func TestMergeSplitTraces(t *testing.T) {
}
}

func TestMergeSplitTracesInputNotModifiedIfErrorReturned(t *testing.T) {
r1 := &tracesRequest{td: testdata.GenerateTraces(18)}
r2 := &logsRequest{ld: testdata.GenerateLogs(3)}
_, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2)
require.Error(t, err)
assert.Equal(t, 18, r1.ItemsCount())
}

func TestMergeSplitTracesInvalidInput(t *testing.T) {
r1 := &tracesRequest{td: testdata.GenerateTraces(2)}
r2 := &metricsRequest{md: testdata.GenerateMetrics(3)}
_, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2)
assert.Error(t, err)
require.Error(t, err)
}

func TestExtractTraces(t *testing.T) {
Expand Down

0 comments on commit 3e87d0d

Please sign in to comment.