Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snowflake: add stats around register and commit latency #3234

Merged
merged 3 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ Changelog

All notable changes to this project will be documented in this file.

## 4.49.0 - TBD

### Added

- Output `snowflake_streaming` has two new stats `snowflake_register_latency_ns` and `snowflake_commit_latency_ns`. (@rockwotj)

## 4.48.0 - 2025-03-03

### Added
Expand Down
10 changes: 9 additions & 1 deletion internal/impl/snowflake/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package snowflake

import (
"time"

"github.com/redpanda-data/benthos/v4/public/service"

"github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming"
Expand All @@ -22,6 +24,8 @@ type snowpipeMetrics struct {
buildTime *service.MetricTimer
convertTime *service.MetricTimer
serializeTime *service.MetricTimer
registerTime *service.MetricTimer
commitTime *service.MetricTimer
}

func newSnowpipeMetrics(m *service.Metrics) *snowpipeMetrics {
Expand All @@ -30,14 +34,18 @@ func newSnowpipeMetrics(m *service.Metrics) *snowpipeMetrics {
uploadTime: m.NewTimer("snowflake_upload_latency_ns"),
convertTime: m.NewTimer("snowflake_convert_latency_ns"),
serializeTime: m.NewTimer("snowflake_serialize_latency_ns"),
registerTime: m.NewTimer("snowflake_register_latency_ns"),
commitTime: m.NewTimer("snowflake_commit_latency_ns"),
compressedOutput: m.NewCounter("snowflake_compressed_output_size_bytes"),
}
}

func (m *snowpipeMetrics) Report(stats streaming.InsertStats) {
func (m *snowpipeMetrics) Report(stats streaming.InsertStats, commitTime time.Duration) {
m.compressedOutput.Incr(int64(stats.CompressedOutputSize))
m.uploadTime.Timing(stats.UploadTime.Nanoseconds())
m.buildTime.Timing(stats.BuildTime.Nanoseconds())
m.convertTime.Timing(stats.ConvertTime.Nanoseconds())
m.serializeTime.Timing(stats.SerializeTime.Nanoseconds())
m.registerTime.Timing(stats.RegisterTime.Nanoseconds())
m.commitTime.Timing(commitTime.Nanoseconds())
}
12 changes: 8 additions & 4 deletions internal/impl/snowflake/output_snowflake_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ func (o *snowpipePooledOutput) WriteBatch(ctx context.Context, batch service.Mes
return wrapInsertError(err)
}
o.logger.Debugf("done inserting %d rows using channel %s, stats: %+v", len(batch), channel.Name, stats)
o.metrics.Report(stats)
commitStart := time.Now()
polls, err := channel.WaitUntilCommitted(ctx, o.commitTimeout)
if err != nil {
reopened, reopenErr := o.openChannel(ctx, channel.Name, channel.ID)
Expand All @@ -968,7 +968,9 @@ func (o *snowpipePooledOutput) WriteBatch(ctx context.Context, batch service.Mes
}
return err
}
o.logger.Tracef("batch committed in snowflake after %d polls", polls)
commitDuration := time.Since(commitStart)
o.logger.Debugf("batch of %d rows committed using channel %s after %d polls in %s", len(batch), channel.Name, polls, commitDuration)
o.metrics.Report(stats, commitDuration)
o.channelPool.Release(channel)
return nil
}
Expand Down Expand Up @@ -1056,7 +1058,7 @@ func (o *snowpipeIndexedOutput) WriteBatch(ctx context.Context, batch service.Me
return wrapInsertError(err)
}
o.logger.Debugf("done inserting %d rows using channel %s, stats: %+v", len(batch), channel.Name, stats)
o.metrics.Report(stats)
commitStart := time.Now()
polls, err := channel.WaitUntilCommitted(ctx, o.commitTimeout)
if err != nil {
reopened, reopenErr := o.openChannel(ctx, channel.Name, channel.ID)
Expand All @@ -1069,7 +1071,9 @@ func (o *snowpipeIndexedOutput) WriteBatch(ctx context.Context, batch service.Me
}
return err
}
o.logger.Tracef("batch committed in snowflake after %d polls", polls)
commitDuration := time.Since(commitStart)
o.logger.Debugf("batch of %d rows committed using channel %s after %d polls in %s", len(batch), channel.Name, polls, commitDuration)
o.metrics.Report(stats, commitDuration)
o.channelPool.Release(channel.Name, channel)
return nil
}
Expand Down