diff --git a/CHANGELOG.md b/CHANGELOG.md index 1088576e76..c4c614bcd8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,12 @@ Changelog All notable changes to this project will be documented in this file. +## 4.49.0 - TBD + +### Added + +- Output `snowflake_streaming` has two new stats `snowflake_register_latency_ns` and `snowflake_commit_latency_ns`. (@rockwotj) + ## 4.48.0 - 2025-03-03 ### Added diff --git a/internal/impl/snowflake/metrics.go b/internal/impl/snowflake/metrics.go index 972f43925a..7f327f3854 100644 --- a/internal/impl/snowflake/metrics.go +++ b/internal/impl/snowflake/metrics.go @@ -11,6 +11,8 @@ package snowflake import ( + "time" + "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming" @@ -22,6 +24,8 @@ type snowpipeMetrics struct { buildTime *service.MetricTimer convertTime *service.MetricTimer serializeTime *service.MetricTimer + registerTime *service.MetricTimer + commitTime *service.MetricTimer } func newSnowpipeMetrics(m *service.Metrics) *snowpipeMetrics { @@ -30,14 +34,18 @@ func newSnowpipeMetrics(m *service.Metrics) *snowpipeMetrics { uploadTime: m.NewTimer("snowflake_upload_latency_ns"), convertTime: m.NewTimer("snowflake_convert_latency_ns"), serializeTime: m.NewTimer("snowflake_serialize_latency_ns"), + registerTime: m.NewTimer("snowflake_register_latency_ns"), + commitTime: m.NewTimer("snowflake_commit_latency_ns"), compressedOutput: m.NewCounter("snowflake_compressed_output_size_bytes"), } } -func (m *snowpipeMetrics) Report(stats streaming.InsertStats) { +func (m *snowpipeMetrics) Report(stats streaming.InsertStats, commitTime time.Duration) { m.compressedOutput.Incr(int64(stats.CompressedOutputSize)) m.uploadTime.Timing(stats.UploadTime.Nanoseconds()) m.buildTime.Timing(stats.BuildTime.Nanoseconds()) m.convertTime.Timing(stats.ConvertTime.Nanoseconds()) m.serializeTime.Timing(stats.SerializeTime.Nanoseconds()) + m.registerTime.Timing(stats.RegisterTime.Nanoseconds()) + m.commitTime.Timing(commitTime.Nanoseconds()) } diff --git a/internal/impl/snowflake/output_snowflake_streaming.go b/internal/impl/snowflake/output_snowflake_streaming.go index 16c81163e1..56b8f3d2aa 100644 --- a/internal/impl/snowflake/output_snowflake_streaming.go +++ b/internal/impl/snowflake/output_snowflake_streaming.go @@ -955,7 +955,7 @@ func (o *snowpipePooledOutput) WriteBatch(ctx context.Context, batch service.Mes return wrapInsertError(err) } o.logger.Debugf("done inserting %d rows using channel %s, stats: %+v", len(batch), channel.Name, stats) - o.metrics.Report(stats) + commitStart := time.Now() polls, err := channel.WaitUntilCommitted(ctx, o.commitTimeout) if err != nil { reopened, reopenErr := o.openChannel(ctx, channel.Name, channel.ID) @@ -968,7 +968,9 @@ func (o *snowpipePooledOutput) WriteBatch(ctx context.Context, batch service.Mes } return err } - o.logger.Tracef("batch committed in snowflake after %d polls", polls) + commitDuration := time.Since(commitStart) + o.logger.Debugf("batch of %d rows committed using channel %s after %d polls in %s", len(batch), channel.Name, polls, commitDuration) + o.metrics.Report(stats, commitDuration) o.channelPool.Release(channel) return nil } @@ -1056,7 +1058,7 @@ func (o *snowpipeIndexedOutput) WriteBatch(ctx context.Context, batch service.Me return wrapInsertError(err) } o.logger.Debugf("done inserting %d rows using channel %s, stats: %+v", len(batch), channel.Name, stats) - o.metrics.Report(stats) + commitStart := time.Now() polls, err := channel.WaitUntilCommitted(ctx, o.commitTimeout) if err != nil { reopened, reopenErr := o.openChannel(ctx, channel.Name, channel.ID) @@ -1069,7 +1071,9 @@ func (o *snowpipeIndexedOutput) WriteBatch(ctx context.Context, batch service.Me } return err } - o.logger.Tracef("batch committed in snowflake after %d polls", polls) + commitDuration := time.Since(commitStart) + o.logger.Debugf("batch of %d rows committed using channel %s after %d polls in %s", len(batch), channel.Name, polls, commitDuration) + o.metrics.Report(stats, commitDuration) o.channelPool.Release(channel.Name, channel) return nil }