diff --git a/.gitignore b/.gitignore index 3f6372575..a2d2f5745 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ release_notes.md .vscode .op .opp +go.work* diff --git a/go.mod b/go.mod index 04bb803ea..eef1b017f 100644 --- a/go.mod +++ b/go.mod @@ -119,7 +119,7 @@ require ( github.com/smira/go-statsd v1.3.4 github.com/snowflakedb/gosnowflake v1.7.2 github.com/sourcegraph/conc v0.3.0 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/tetratelabs/wazero v1.6.0 github.com/tilinna/z85 v1.0.0 github.com/trinodb/trino-go-client v0.313.0 @@ -136,30 +136,37 @@ require ( go.etcd.io/etcd/client/v3 v3.5.16 go.mongodb.org/mongo-driver v1.13.4 go.nanomsg.org/mangos/v3 v3.4.2 - go.opentelemetry.io/otel v1.29.0 + go.opentelemetry.io/otel v1.32.0 go.opentelemetry.io/otel/exporters/jaeger v1.17.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1 - go.opentelemetry.io/otel/sdk v1.29.0 - go.opentelemetry.io/otel/trace v1.29.0 + go.opentelemetry.io/otel/sdk v1.32.0 + go.opentelemetry.io/otel/trace v1.32.0 go.uber.org/multierr v1.11.0 - golang.org/x/crypto v0.31.0 + golang.org/x/crypto v0.32.0 golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f - golang.org/x/net v0.33.0 - golang.org/x/oauth2 v0.23.0 + golang.org/x/net v0.34.0 + golang.org/x/oauth2 v0.24.0 golang.org/x/sync v0.10.0 golang.org/x/text v0.21.0 google.golang.org/api v0.203.0 - google.golang.org/grpc v1.67.1 - google.golang.org/protobuf v1.35.1 + google.golang.org/grpc v1.70.0 + google.golang.org/protobuf v1.36.4 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v3 v3.0.1 modernc.org/sqlite v1.28.0 ) require ( - cel.dev/expr v0.16.2 // indirect + github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect + github.com/s2-streamstore/optr v1.1.0 // indirect + github.com/s2-streamstore/s2-sdk-go v0.5.0 + github.com/tidwall/btree v1.7.0 // indirect +) + +require ( + cel.dev/expr v0.19.0 // indirect cloud.google.com/go v0.116.0 // indirect cloud.google.com/go/auth v0.9.9 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.5 // indirect @@ -180,7 +187,7 @@ require ( github.com/ClickHouse/ch-go v0.61.5 // indirect github.com/DataDog/zstd v1.5.6 // indirect github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.2 // indirect - github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.3 // indirect + github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.45.0 // indirect github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/Microsoft/go-winio v0.6.2 // indirect @@ -344,23 +351,23 @@ require ( go.etcd.io/bbolt v1.3.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.16 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/contrib/detectors/gcp v1.29.0 // indirect + go.opentelemetry.io/contrib/detectors/gcp v1.32.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect - go.opentelemetry.io/otel/metric v1.29.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.29.0 // indirect + go.opentelemetry.io/otel/metric v1.32.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/mod v0.22.0 // indirect - golang.org/x/sys v0.28.0 // indirect - golang.org/x/term v0.27.0 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/term v0.28.0 // indirect golang.org/x/time v0.7.0 // indirect golang.org/x/tools v0.27.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect diff --git a/go.sum b/go.sum index 704db0e48..09493e6ae 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -cel.dev/expr v0.16.2 h1:RwRhoH17VhAu9U5CMvMhH1PDVgf0tuz9FT+24AfMLfU= -cel.dev/expr v0.16.2/go.mod h1:gXngZQMkWJoSbE8mOzehJlXQyubn/Vg0vR9/F3W7iw8= +cel.dev/expr v0.19.0 h1:lXuo+nDhpyJSpWxpPVi5cPUwzKb+dsdOiw6IreM5yt0= +cel.dev/expr v0.19.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -731,8 +731,8 @@ github.com/DataDog/zstd v1.5.6/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwS github.com/GoogleCloudPlatform/cloudsql-proxy v1.29.0/go.mod h1:spvB9eLJH9dutlbPSRmHvSXXHOwGRyeXh1jVdquA2G8= github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.2 h1:DBjmt6/otSdULyJdVg2BlG0qGZO5tKL4VzOs0jpvw5Q= github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.2/go.mod h1:dppbR7CwXD4pgtV9t3wD1812RaLDcBjtblcDF5f1vI0= -github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.3 h1:cb3br57K508pQEFgBxn9GDhPS9HefpyMPK1RzmtMNzk= -github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.3/go.mod h1:itPGVDKf9cC/ov4MdvJ2QZ0khw4bfoo9jzwTJlaxy2k= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0 h1:3c8yed4lgqTt+oTQ+JNMDo+F4xprBf+O/il4ZC0nRLw= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0/go.mod h1:obipzmGjfSjam60XLwGfqUkJsfiheAl+TUjG+4yzyPM= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.21.0 h1:OEgjQy1rH4Fbn5IpuI9d0uhLl+j6DkDvh9Q2Ucd6GK8= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.21.0/go.mod h1:EUfJ8lb3pjD8VasPPwqIvG2XVCE6DOT8tY5tcwbWA+A= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.45.0 h1:/BF7rO6PYcmFoyJrq6HA3LqQpFSQei9aNuO1fvV3OqU= @@ -1746,6 +1746,8 @@ github.com/opencontainers/runc v1.1.12/go.mod h1:S+lQwSfncpBha7XTy/5lBwWgm5+y5Ma github.com/opensearch-project/opensearch-go/v4 v4.3.0 h1:gmQ+ILFJW6AJimivf+lHGVqCS2SCr/PBBf2Qr1xOCgE= github.com/opensearch-project/opensearch-go/v4 v4.3.0/go.mod h1:+w6KAvEX3S0fVVmZciNLN0CkXhxxem26+F6Y7DoPp04= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= +github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/ory/dockertest/v3 v3.10.0 h1:4K3z2VMe8Woe++invjaTB7VRyQXQy5UY+loujO4aNE4= github.com/ory/dockertest/v3 v3.10.0/go.mod h1:nr57ZbRWMqfsdGdFNLHz5jjNdDb7VVFnzAeW1n5N1Lg= github.com/oschwald/geoip2-golang v1.9.0 h1:uvD3O6fXAXs+usU+UGExshpdP13GAqp4GBrzN7IgKZc= @@ -1860,6 +1862,10 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk= +github.com/s2-streamstore/optr v1.1.0 h1:ExwYuwxb4Z1mVI+717xKUEDZ1y0ikFAV55jtk8XCYQk= +github.com/s2-streamstore/optr v1.1.0/go.mod h1:ujwXWMeanje1NE1aaaylBwmBkMalZ+wxFxSFOSHmJis= +github.com/s2-streamstore/s2-sdk-go v0.5.0 h1:IgLFvT2CtO1AE52HmZuurCRVpDz6CdfH+elFxDtUDdo= +github.com/s2-streamstore/s2-sdk-go v0.5.0/go.mod h1:VAK2MYAXfT2eMq4IpaWHRuL5ZBbf5mLyJ4X8Ee+9qdY= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= @@ -1920,13 +1926,15 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tailscale/peercred v0.0.0-20240214030740-b535050b2aa4/go.mod h1:phI29ccmHQBc+wvroosENp1IF9195449VDnFDhJ4rJU= github.com/testcontainers/testcontainers-go v0.28.0 h1:1HLm9qm+J5VikzFDYhOd+Zw12NtOl+8drH2E8nTY1r8= github.com/testcontainers/testcontainers-go v0.28.0/go.mod h1:COlDpUXbwW3owtpMkEB1zo9gwb1CoKVKlyrVPejF4AU= github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g= github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= +github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= +github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -2033,14 +2041,14 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/contrib/detectors/gcp v1.29.0 h1:TiaiXB4DpGD3sdzNlYQxruQngn5Apwzi1X0DRhuGvDQ= -go.opentelemetry.io/contrib/detectors/gcp v1.29.0/go.mod h1:GW2aWZNwR2ZxDLdv8OyC2G8zkRoQBuURgV7RPQgcPoU= +go.opentelemetry.io/contrib/detectors/gcp v1.32.0 h1:P78qWqkLSShicHmAzfECaTgvslqHxblNE9j62Ws1NK8= +go.opentelemetry.io/contrib/detectors/gcp v1.32.0/go.mod h1:TVqo0Sda4Cv8gCIixd7LuLwW4EylumVWfhjZJjDD4DU= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= -go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= -go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= go.opentelemetry.io/otel/exporters/jaeger v1.17.0 h1:D7UpUy2Xc2wsi1Ras6V40q806WM07rqoCWzXu7Sqy+4= go.opentelemetry.io/otel/exporters/jaeger v1.17.0/go.mod h1:nPCqOnEH9rNLKqH/+rrUjiMzHJdV1BlpKcTwRTyKkKI= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 h1:o8iWeVFa1BcLtVEV0LzrCxV2/55tB3xLxADr6Kyoey4= @@ -2049,14 +2057,14 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 h1:p3A5+ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1/go.mod h1:OClrnXUjBqQbInvjJFjYSnMxBSCXBF8r3b34WqjiIrQ= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1 h1:cfuy3bXmLJS7M1RZmAL6SuhGtKUp2KEsrm00OlAXkq4= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1/go.mod h1:22jr92C6KwlwItJmQzfixzQM3oyyuYLCfHiMY+rpsPU= -go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= -go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= -go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= -go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok= -go.opentelemetry.io/otel/sdk/metric v1.29.0 h1:K2CfmJohnRgvZ9UAj2/FhIf/okdWcNdBwe1m8xFXiSY= -go.opentelemetry.io/otel/sdk/metric v1.29.0/go.mod h1:6zZLdCl2fkauYoZIOn/soQIDSWFmNSRcICarHfuhNJQ= -go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= -go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= +go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= @@ -2124,8 +2132,8 @@ golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0 golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2264,8 +2272,8 @@ golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2297,8 +2305,8 @@ golang.org/x/oauth2 v0.4.0/go.mod h1:RznEsdpjGAINPTOF0UH/t+xJ75L18YO3Ho6Pyn+uRec golang.org/x/oauth2 v0.5.0/go.mod h1:9/XBHVqLaWO3/BRHs5jbpYCnOZVjj5V0ndyaAM7KB4I= golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw= golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= -golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= -golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE= +golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -2430,8 +2438,8 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -2445,8 +2453,8 @@ golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= -golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= +golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= +golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2806,10 +2814,10 @@ google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633/go.mod h1:UUQDJDOl google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28 h1:KJjNNclfpIkVqrZlTWcgOOaVQ00LdBnoEaRfkUx760s= google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:mt9/MofW7AWQ+Gy179ChOnvmJatV8YHUmrcedo9CIFI= -google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 h1:M0KvPgPmDZHPlbRbaNU1APr28TvwvvdUPlSv7PUvy8g= -google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:dguCy7UOdZhTvLzDyt15+rOrawrpM4q7DD9dQ1P11P4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a h1:OAiGFfOiA0v9MRYsSidp3ubZaBnteRUyn3xB2ZQ5G/E= +google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a/go.mod h1:jehYqy3+AhJU9ve55aNOaSml7wUXjF9x6z2LcCfpAhY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 h1:J1H9f+LEdWAfHcez/4cvaVBox7cOYT+IU6rgqj5x++8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287/go.mod h1:8BS3B93F/U1juMFq9+EDk+qOT5CO1R9IzXxG3PTqiRk= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -2853,8 +2861,8 @@ google.golang.org/grpc v1.52.3/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5v google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= -google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= -google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= +google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -2873,8 +2881,8 @@ google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= +google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/impl/s2/bento.go b/internal/impl/s2/bento.go new file mode 100644 index 000000000..ad5698537 --- /dev/null +++ b/internal/impl/s2/bento.go @@ -0,0 +1,38 @@ +package s2 + +import ( + s2bentobox "github.com/s2-streamstore/s2-sdk-go/s2-bentobox" + "github.com/warpstreamlabs/bento/public/service" +) + +const ( + basinField = "basin" + authTokenField = "auth_token" +) + +func newConfig(conf *service.ParsedConfig) (*s2bentobox.Config, error) { + basin, err := conf.FieldString(basinField) + if err != nil { + return nil, err + } + + authToken, err := conf.FieldString(authTokenField) + if err != nil { + return nil, err + } + + return &s2bentobox.Config{ + Basin: basin, + AuthToken: authToken, + }, nil +} + +type bentoLogger struct { + *service.Logger +} + +func (bl *bentoLogger) With(keyValuePairs ...any) s2bentobox.Logger { + return &bentoLogger{ + Logger: bl.Logger.With(keyValuePairs...), + } +} diff --git a/internal/impl/s2/cache.go b/internal/impl/s2/cache.go new file mode 100644 index 000000000..4e026a8dd --- /dev/null +++ b/internal/impl/s2/cache.go @@ -0,0 +1,53 @@ +package s2 + +import ( + "context" + "encoding/base64" + "encoding/binary" + + "github.com/warpstreamlabs/bento/public/service" +) + +type bentoSeqNumCache struct { + Resources *service.Resources + Label string +} + +func streamCacheKey(stream string) string { + return base64.URLEncoding.EncodeToString([]byte(stream)) +} + +func (b *bentoSeqNumCache) Get(ctx context.Context, stream string) (uint64, error) { + var ( + seqNum uint64 + err error + ) + + if aErr := b.Resources.AccessCache(ctx, b.Label, func(c service.Cache) { + var seqNumBytes []byte + seqNumBytes, err = c.Get(ctx, streamCacheKey(stream)) + if err != nil { + return + } + + seqNum = binary.BigEndian.Uint64(seqNumBytes) + }); aErr != nil { + return 0, aErr + } + + return seqNum, err +} + +func (b *bentoSeqNumCache) Set(ctx context.Context, stream string, seqNum uint64) error { + var err error + + if aErr := b.Resources.AccessCache(ctx, b.Label, func(c service.Cache) { + seqNumBytes := binary.BigEndian.AppendUint64(make([]byte, 0, 8), seqNum) + + err = c.Set(ctx, streamCacheKey(stream), seqNumBytes, nil) + }); aErr != nil { + return aErr + } + + return err +} diff --git a/internal/impl/s2/input.go b/internal/impl/s2/input.go new file mode 100644 index 000000000..287505ba7 --- /dev/null +++ b/internal/impl/s2/input.go @@ -0,0 +1,238 @@ +package s2 + +import ( + "context" + _ "embed" + "errors" + "fmt" + "strconv" + + s2bentobox "github.com/s2-streamstore/s2-sdk-go/s2-bentobox" + "github.com/warpstreamlabs/bento/public/service" +) + +var errCacheNotFound = errors.New("cache not found") + +func init() { + if err := service.RegisterBatchInput( + s2bentobox.PluginName, + newInputConfigSpec(), + func(conf *service.ParsedConfig, r *service.Resources) (service.BatchInput, error) { + config, err := newInputConfig(conf, r) + if err != nil { + return nil, err + } + + return &Input{ + inner: nil, // Will be instantiated during `Connect` + config: config, + logger: r.Logger(), + }, nil + }, + ); err != nil { + panic(err) + } +} + +const ( + streamsField = "streams" + cacheField = "cache" + updateStreamsIntervalField = "update_streams_interval" +) + +func newInputConfigSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Fields( + service.NewStringField(basinField).Description("Basin name"), + service.NewStringField(authTokenField). + Description("Authentication token for S2 account"). + Secret(), + service.NewAnyField(streamsField). + Description("Streams prefix or list of streams to subscribe to"), + service.NewStringField(cacheField). + Description("Cache resource label for storing sequence number"), + service.NewInputMaxInFlightField().Advanced(), + service.NewDurationField(updateStreamsIntervalField). + Advanced(). + Default("1m"). + Description("Interval after which the streams list should update dynamically"), + ). + Summary("Consumes records from S2 streams"). + Description(` +Generate an authentication token by logging onto the web console at +[s2.dev](https://s2.dev/dashboard). + +### Cache + +The plugin requires setting up a caching mechanism to resume the input after +the last acknowledged record. + +To know more about setting up a cache resource, see +[Cache docs for Bento](https://warpstreamlabs.github.io/bento/docs/components/caches/about). + +### Metadata + +This input adds the following metadata fields to each message in addition to the +record headers: + +- `+"`s2_basin`"+`: The S2 basin where the origin stream lives. +- `+"`s2_stream`"+`: The origin S2 stream. +- `+"`s2_seq_num`"+`: Sequence number of the record in the origin stream formatted as a string. + +All the header values are loosely converted to strings as metadata attributes. + +**Note:** An [S2 command record](https://s2.dev/docs/stream#command-records) has no header +name. This is set as the `+"`s2_command`"+` meta key. +`, + ). + Example( + "Input with Prefix", + "Fetch records from all the streams with the prefix `my-favorite-prefix/` in the basin.", + ` +cache_resources: + - label: s2_seq_num + file: + directory: s2_seq_num_cache + +input: + label: s2_input + s2: + basin: my-favorite-basin + streams: my-favorite-prefix/ + auth_token: "${S2_AUTH_TOKEN}" + cache: s2_seq_num + +output: + label: stdout + stdout: + codec: lines +`, + ) +} + +func newInputConfig(conf *service.ParsedConfig, r *service.Resources) (*s2bentobox.InputConfig, error) { + config, err := newConfig(conf) + if err != nil { + return nil, err + } + + var inputStreams s2bentobox.InputStreams + + if streams, err := conf.FieldStringList(streamsField); err != nil { + // Try just a prefix. + prefix, err := conf.FieldString(streamsField) + if err != nil { + return nil, err + } + + inputStreams = s2bentobox.PrefixedInputStreams{ + Prefix: prefix, + } + } else { + inputStreams = s2bentobox.StaticInputStreams{ + Streams: streams, + } + } + + maxInFlight, err := conf.FieldMaxInFlight() + if err != nil { + return nil, err + } + + cacheLabel, err := conf.FieldString(cacheField) + if err != nil { + return nil, err + } + + if !r.HasCache(cacheLabel) { + return nil, fmt.Errorf("%w: %q", errCacheNotFound, cacheLabel) + } + + cache := &bentoSeqNumCache{ + Resources: r, + Label: cacheLabel, + } + + updateStreamsInterval, err := conf.FieldDuration(updateStreamsIntervalField) + if err != nil { + return nil, err + } + + return &s2bentobox.InputConfig{ + Config: config, + Streams: inputStreams, + MaxInFlight: maxInFlight, + Logger: &bentoLogger{r.Logger()}, + Cache: cache, + UpdateStreamsInterval: updateStreamsInterval, + }, nil +} + +type Input struct { + inner *s2bentobox.MultiStreamInput + config *s2bentobox.InputConfig + logger *service.Logger +} + +func (i *Input) Connect(ctx context.Context) error { + i.logger.Debug("Connecting S2 input") + + inner, err := s2bentobox.ConnectMultiStreamInput(ctx, i.config) + if err != nil { + return err + } + + // Initialize inner connection. + i.inner = inner + + return nil +} + +func (i *Input) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { + i.logger.Debug("Reading batch from S2") + + batch, aFn, stream, err := i.inner.ReadBatch(ctx) + if err != nil { + if errors.Is(err, s2bentobox.ErrInputClosed) { + return nil, nil, service.ErrNotConnected + } + + return nil, nil, err + } + + messages := make([]*service.Message, 0, len(batch.Records)) + + for _, record := range batch.Records { + msg := service.NewMessage(record.Body) + + if len(record.Headers) == 1 && len(record.Headers[0].Name) == 0 { + // Command record + msg.MetaSet("s2_command", string(record.Headers[0].Value)) + } else { + for _, header := range record.Headers { + // TODO: Use `MetaSetMut`. + msg.MetaSet(string(header.Name), string(header.Value)) + } + } + + msg.MetaSet("s2_seq_num", strconv.FormatUint(record.SeqNum, 10)) + msg.MetaSet("s2_stream", stream) + msg.MetaSet("s2_basin", i.config.Basin) + + messages = append(messages, msg) + } + + return messages, aFn, nil +} + +func (i *Input) Close(ctx context.Context) error { + i.logger.Debug("Closing S2 input") + + if err := i.inner.Close(ctx); err != nil { + return err + } + + i.inner = nil + + return nil +} diff --git a/internal/impl/s2/output.go b/internal/impl/s2/output.go new file mode 100644 index 000000000..2099ae963 --- /dev/null +++ b/internal/impl/s2/output.go @@ -0,0 +1,287 @@ +package s2 + +import ( + "context" + _ "embed" + "encoding/base64" + "errors" + "fmt" + + "github.com/s2-streamstore/s2-sdk-go/s2" + s2bentobox "github.com/s2-streamstore/s2-sdk-go/s2-bentobox" + "github.com/warpstreamlabs/bento/public/service" +) + +// Setting the batch byte size max a bit conservatively since Bento does not +// take metadata size into account. Moreover, the S2 metered size of a record +// will be > bento message size. +const maxBatchBytes = 256 * 1024 + +var ( + errInvalidBatchingByteSize = errors.New("invalid batch policy byte size") + errInvalidBatchingCount = errors.New("invalid batch policy count") +) + +func init() { + if err := service.RegisterBatchOutput( + s2bentobox.PluginName, + newOutputConfigSpec(), + func(conf *service.ParsedConfig, r *service.Resources) ( + service.BatchOutput, service.BatchPolicy, int, error, + ) { + policy, err := parseBatchPolicy(conf) + if err != nil { + return nil, policy, 0, err + } + + config, err := newOutputConfig(conf) + if err != nil { + return nil, policy, 0, err + } + + return &Output{ + inner: nil, // Will be instantiated during `Connect` + config: config, + logger: r.Logger(), + }, policy, config.MaxInFlight, nil + }, + ); err != nil { + panic(err) + } +} + +const ( + batchingField = "batching" + streamField = "stream" + matchSeqNumField = "match_seq_num" + fencingTokenField = "fencing_token" +) + +func newOutputConfigSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Fields( + service.NewStringField(basinField).Description("Basin name"), + service.NewStringField(authTokenField). + Description("Authentication token for S2 account"). + Secret(), + service.NewStringField(streamField).Description("Stream name"), + service.NewStringField(fencingTokenField). + Optional(). + Description("Enforce a fencing token (base64 encoded)"). + Example("aGVsbG8gczI="), + service.NewBatchPolicyField(batchingField). + Advanced(). + LintRule( + fmt.Sprintf(` +root = if this.count > %d { + "the number of messages in a batch cannot exceed 1000" +} +root = if this.byte_size > %d { + "the amount of bytes in a batch cannot exceed 256KiB" +} + `, + s2.MaxBatchRecords, + maxBatchBytes, + ), + ), + service.NewOutputMaxInFlightField().Advanced(), + ). + Summary("Sends messages to an S2 stream."). + Description(` +Generate an authentication token by logging onto the web console at +[s2.dev](https://s2.dev/dashboard). + +### Metadata + +The metadata attributes are set as S2 record headers. Currently, only string +attribute values are supported. + +### Batching + +The plugin expects batched inputs. Messages are batched automatically by Bento. + +By default, Bento disables batching based on `+"`count`"+`, `+"`byte_size`"+`, and `+"`period`"+` +parameters, but the plugin enables batching setting both `+"`count`"+` and `+"`byte_size`"+` to +the maximum values supported by S2. It also sets a flush period of `+"`5ms`"+` as a reasonable default. + +**Note:** An S2 record batch can be a maximum of 1MiB but the plugin limits the +size of a message to 256KiB since the Bento size limit doesn't take metadata into +account. Moreover, the metered size of the same Bento message will be greater +than the byte size of a Bento message. +`, + ). + Example( + "ASCII Starwars", + "Consume a network stream into an S2 stream", + ` +input: + label: towel_blinkenlights_nl + socket: + network: tcp + address: towel.blinkenlights.nl:23 + scanner: + lines: {} + +output: + label: s2_starwars + s2: + basin: my-favorite-basin + stream: starwars + auth_token: "${S2_AUTH_TOKEN}" +`, + ) +} + +func parseBatchPolicy(conf *service.ParsedConfig) (service.BatchPolicy, error) { + policy, err := conf.FieldBatchPolicy(batchingField) + if err != nil { + return policy, err + } + + // Set required defaults + + if policy.ByteSize <= 0 { + policy.ByteSize = maxBatchBytes + } + + if policy.Count <= 0 { + policy.Count = s2.MaxBatchRecords + } + + // This feels sensible to have. Not sure if we should let the user have infinite + // retention until a batch of required size is formed. + if policy.Period == "" { + policy.Period = "5ms" + } + + // Validate limits + + if policy.ByteSize > maxBatchBytes { + return policy, fmt.Errorf("%w: must not exceed %d", errInvalidBatchingByteSize, s2.MaxBatchBytes) + } + + if policy.Count > s2.MaxBatchRecords { + return policy, fmt.Errorf("%w: must not exceed %d", errInvalidBatchingCount, s2.MaxBatchRecords) + } + + return policy, nil +} + +func newOutputConfig(conf *service.ParsedConfig) (*s2bentobox.OutputConfig, error) { + config, err := newConfig(conf) + if err != nil { + return nil, err + } + + stream, err := conf.FieldString(streamField) + if err != nil { + return nil, err + } + + maxInFlight, err := conf.FieldMaxInFlight() + if err != nil { + return nil, err + } + + var fencingToken []byte + + if conf.Contains(fencingTokenField) { + field, err := conf.FieldString(fencingTokenField) + if err != nil { + return nil, err + } + + fencingToken, err = base64.StdEncoding.DecodeString(field) + if err != nil { + return nil, err + } + } + + return &s2bentobox.OutputConfig{ + Config: config, + Stream: stream, + MaxInFlight: maxInFlight, + FencingToken: fencingToken, + }, nil +} + +type Output struct { + inner *s2bentobox.Output + config *s2bentobox.OutputConfig + logger *service.Logger +} + +func (o *Output) Connect(ctx context.Context) error { + o.logger.Debug("Connecting S2 output") + + inner, err := s2bentobox.ConnectOutput(ctx, o.config) + if err != nil { + return err + } + + // Initialize inner connection. + o.inner = inner + + return nil +} + +func (o *Output) WriteBatch(ctx context.Context, batch service.MessageBatch) error { + o.logger.Debug("Writing batch to S2") + + records := make([]s2.AppendRecord, 0, len(batch)) + + if err := batch.WalkWithBatchedErrors(func(_ int, m *service.Message) error { + body, err := m.AsBytes() + if err != nil { + return err + } + + var headers []s2.Header + if err := m.MetaWalk(func(k, v string) error { + headers = append(headers, s2.Header{ + Name: []byte(k), + Value: []byte(v), + }) + + return nil + }); err != nil { + return err + } + + records = append(records, s2.AppendRecord{ + Headers: headers, + Body: body, + }) + + return nil + }); err != nil { + return err + } + + recordBatch, leftOver := s2.NewAppendRecordBatch(records...) + if len(leftOver) > 0 { + return s2bentobox.ErrAppendRecordBatchFull + } + + if err := o.inner.WriteBatch(ctx, recordBatch); err != nil { + if errors.Is(err, s2bentobox.ErrOutputClosed) { + return service.ErrNotConnected + } + + return err + } + + return nil +} + +func (o *Output) Close(ctx context.Context) error { + o.logger.Debug("Closing S2 output") + + if err := o.inner.Close(ctx); err != nil { + return err + } + + o.inner = nil + + return nil +} diff --git a/public/components/all/package.go b/public/components/all/package.go index b819e09e5..bc7c8f786 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -46,6 +46,7 @@ import ( _ "github.com/warpstreamlabs/bento/public/components/pusher" _ "github.com/warpstreamlabs/bento/public/components/questdb" _ "github.com/warpstreamlabs/bento/public/components/redis" + _ "github.com/warpstreamlabs/bento/public/components/s2" _ "github.com/warpstreamlabs/bento/public/components/sentry" _ "github.com/warpstreamlabs/bento/public/components/sftp" _ "github.com/warpstreamlabs/bento/public/components/snowflake" diff --git a/public/components/s2/package.go b/public/components/s2/package.go new file mode 100644 index 000000000..4755ac4ee --- /dev/null +++ b/public/components/s2/package.go @@ -0,0 +1,6 @@ +package s2 + +import ( + // Bring in the internal plugin definitions. + _ "github.com/warpstreamlabs/bento/internal/impl/s2" +)