From 0bf1b12ce2ee0ab12bfa08695fda08eb314809e2 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Tue, 28 Jan 2025 20:27:37 +0530 Subject: [PATCH 1/7] feat: S2 Input and Output plugins --- .gitignore | 1 + go.mod | 45 ++-- go.sum | 80 +++--- internal/impl/s2/bento.go | 93 +++++++ internal/impl/s2/embeds/input_description.md | 23 ++ .../impl/s2/embeds/input_with_prefix_eg.yaml | 17 ++ internal/impl/s2/embeds/output_description.md | 16 ++ .../impl/s2/embeds/output_starwars_eg.yaml | 14 + internal/impl/s2/input.go | 253 ++++++++++++++++++ internal/impl/s2/output.go | 248 +++++++++++++++++ public/components/all/package.go | 1 + public/components/s2/package.go | 6 + 12 files changed, 742 insertions(+), 55 deletions(-) create mode 100644 internal/impl/s2/bento.go create mode 100644 internal/impl/s2/embeds/input_description.md create mode 100644 internal/impl/s2/embeds/input_with_prefix_eg.yaml create mode 100644 internal/impl/s2/embeds/output_description.md create mode 100644 internal/impl/s2/embeds/output_starwars_eg.yaml create mode 100644 internal/impl/s2/input.go create mode 100644 internal/impl/s2/output.go create mode 100644 public/components/s2/package.go diff --git a/.gitignore b/.gitignore index 3f6372575..a2d2f5745 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ release_notes.md .vscode .op .opp +go.work* diff --git a/go.mod b/go.mod index 04bb803ea..df0af2e2a 100644 --- a/go.mod +++ b/go.mod @@ -119,7 +119,7 @@ require ( github.com/smira/go-statsd v1.3.4 github.com/snowflakedb/gosnowflake v1.7.2 github.com/sourcegraph/conc v0.3.0 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/tetratelabs/wazero v1.6.0 github.com/tilinna/z85 v1.0.0 github.com/trinodb/trino-go-client v0.313.0 @@ -136,30 +136,37 @@ require ( go.etcd.io/etcd/client/v3 v3.5.16 go.mongodb.org/mongo-driver v1.13.4 go.nanomsg.org/mangos/v3 v3.4.2 - go.opentelemetry.io/otel v1.29.0 + go.opentelemetry.io/otel v1.32.0 go.opentelemetry.io/otel/exporters/jaeger v1.17.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1 - go.opentelemetry.io/otel/sdk v1.29.0 - go.opentelemetry.io/otel/trace v1.29.0 + go.opentelemetry.io/otel/sdk v1.32.0 + go.opentelemetry.io/otel/trace v1.32.0 go.uber.org/multierr v1.11.0 - golang.org/x/crypto v0.31.0 + golang.org/x/crypto v0.32.0 golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f - golang.org/x/net v0.33.0 - golang.org/x/oauth2 v0.23.0 + golang.org/x/net v0.34.0 + golang.org/x/oauth2 v0.24.0 golang.org/x/sync v0.10.0 golang.org/x/text v0.21.0 google.golang.org/api v0.203.0 - google.golang.org/grpc v1.67.1 - google.golang.org/protobuf v1.35.1 + google.golang.org/grpc v1.70.0 + google.golang.org/protobuf v1.36.4 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v3 v3.0.1 modernc.org/sqlite v1.28.0 ) require ( - cel.dev/expr v0.16.2 // indirect + github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect + github.com/s2-streamstore/optr v1.0.0 // indirect + github.com/s2-streamstore/s2-sdk-go v0.3.0 + github.com/tidwall/btree v1.7.0 // indirect +) + +require ( + cel.dev/expr v0.19.0 // indirect cloud.google.com/go v0.116.0 // indirect cloud.google.com/go/auth v0.9.9 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.5 // indirect @@ -180,7 +187,7 @@ require ( github.com/ClickHouse/ch-go v0.61.5 // indirect github.com/DataDog/zstd v1.5.6 // indirect github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.2 // indirect - github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.3 // indirect + github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.45.0 // indirect github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/Microsoft/go-winio v0.6.2 // indirect @@ -344,23 +351,23 @@ require ( go.etcd.io/bbolt v1.3.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.16 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/contrib/detectors/gcp v1.29.0 // indirect + go.opentelemetry.io/contrib/detectors/gcp v1.32.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect - go.opentelemetry.io/otel/metric v1.29.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.29.0 // indirect + go.opentelemetry.io/otel/metric v1.32.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/mod v0.22.0 // indirect - golang.org/x/sys v0.28.0 // indirect - golang.org/x/term v0.27.0 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/term v0.28.0 // indirect golang.org/x/time v0.7.0 // indirect golang.org/x/tools v0.27.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect @@ -378,7 +385,7 @@ require ( modernc.org/token v1.1.0 // indirect ) -go 1.22.0 +go 1.23.4 // This (indirect) dependency is needed for github.com/AthenZ/athenz but the domain no longer resolves. // Remove once upstream issue fixed. See: https://github.com/AthenZ/athenz/issues/2842 diff --git a/go.sum b/go.sum index 704db0e48..8f2ff8acf 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -cel.dev/expr v0.16.2 h1:RwRhoH17VhAu9U5CMvMhH1PDVgf0tuz9FT+24AfMLfU= -cel.dev/expr v0.16.2/go.mod h1:gXngZQMkWJoSbE8mOzehJlXQyubn/Vg0vR9/F3W7iw8= +cel.dev/expr v0.19.0 h1:lXuo+nDhpyJSpWxpPVi5cPUwzKb+dsdOiw6IreM5yt0= +cel.dev/expr v0.19.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -731,8 +731,8 @@ github.com/DataDog/zstd v1.5.6/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwS github.com/GoogleCloudPlatform/cloudsql-proxy v1.29.0/go.mod h1:spvB9eLJH9dutlbPSRmHvSXXHOwGRyeXh1jVdquA2G8= github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.2 h1:DBjmt6/otSdULyJdVg2BlG0qGZO5tKL4VzOs0jpvw5Q= github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.2/go.mod h1:dppbR7CwXD4pgtV9t3wD1812RaLDcBjtblcDF5f1vI0= -github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.3 h1:cb3br57K508pQEFgBxn9GDhPS9HefpyMPK1RzmtMNzk= -github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.3/go.mod h1:itPGVDKf9cC/ov4MdvJ2QZ0khw4bfoo9jzwTJlaxy2k= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0 h1:3c8yed4lgqTt+oTQ+JNMDo+F4xprBf+O/il4ZC0nRLw= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0/go.mod h1:obipzmGjfSjam60XLwGfqUkJsfiheAl+TUjG+4yzyPM= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.21.0 h1:OEgjQy1rH4Fbn5IpuI9d0uhLl+j6DkDvh9Q2Ucd6GK8= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.21.0/go.mod h1:EUfJ8lb3pjD8VasPPwqIvG2XVCE6DOT8tY5tcwbWA+A= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.45.0 h1:/BF7rO6PYcmFoyJrq6HA3LqQpFSQei9aNuO1fvV3OqU= @@ -1746,6 +1746,8 @@ github.com/opencontainers/runc v1.1.12/go.mod h1:S+lQwSfncpBha7XTy/5lBwWgm5+y5Ma github.com/opensearch-project/opensearch-go/v4 v4.3.0 h1:gmQ+ILFJW6AJimivf+lHGVqCS2SCr/PBBf2Qr1xOCgE= github.com/opensearch-project/opensearch-go/v4 v4.3.0/go.mod h1:+w6KAvEX3S0fVVmZciNLN0CkXhxxem26+F6Y7DoPp04= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= +github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/ory/dockertest/v3 v3.10.0 h1:4K3z2VMe8Woe++invjaTB7VRyQXQy5UY+loujO4aNE4= github.com/ory/dockertest/v3 v3.10.0/go.mod h1:nr57ZbRWMqfsdGdFNLHz5jjNdDb7VVFnzAeW1n5N1Lg= github.com/oschwald/geoip2-golang v1.9.0 h1:uvD3O6fXAXs+usU+UGExshpdP13GAqp4GBrzN7IgKZc= @@ -1860,6 +1862,10 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk= +github.com/s2-streamstore/optr v1.0.0 h1:OqJpOO2uYsG+iXW9mNTlsCePDvxTS8mh9EtGcnyq8w8= +github.com/s2-streamstore/optr v1.0.0/go.mod h1:x/wSUCzWgpnvWWQG4dZ4JBFW2Kw0pE0yhLGb+0xiN8g= +github.com/s2-streamstore/s2-sdk-go v0.3.0 h1:BxsYdsW+w8BLzjBn84rn+IlH+T6UdQ3vBO+AGIt0SEA= +github.com/s2-streamstore/s2-sdk-go v0.3.0/go.mod h1:qfyTLgT32aijTNQeN478iobzZ7EeWg3K8v2gxc3xVa0= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= @@ -1920,13 +1926,15 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tailscale/peercred v0.0.0-20240214030740-b535050b2aa4/go.mod h1:phI29ccmHQBc+wvroosENp1IF9195449VDnFDhJ4rJU= github.com/testcontainers/testcontainers-go v0.28.0 h1:1HLm9qm+J5VikzFDYhOd+Zw12NtOl+8drH2E8nTY1r8= github.com/testcontainers/testcontainers-go v0.28.0/go.mod h1:COlDpUXbwW3owtpMkEB1zo9gwb1CoKVKlyrVPejF4AU= github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g= github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= +github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= +github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -2033,14 +2041,14 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/contrib/detectors/gcp v1.29.0 h1:TiaiXB4DpGD3sdzNlYQxruQngn5Apwzi1X0DRhuGvDQ= -go.opentelemetry.io/contrib/detectors/gcp v1.29.0/go.mod h1:GW2aWZNwR2ZxDLdv8OyC2G8zkRoQBuURgV7RPQgcPoU= +go.opentelemetry.io/contrib/detectors/gcp v1.32.0 h1:P78qWqkLSShicHmAzfECaTgvslqHxblNE9j62Ws1NK8= +go.opentelemetry.io/contrib/detectors/gcp v1.32.0/go.mod h1:TVqo0Sda4Cv8gCIixd7LuLwW4EylumVWfhjZJjDD4DU= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= -go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= -go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= go.opentelemetry.io/otel/exporters/jaeger v1.17.0 h1:D7UpUy2Xc2wsi1Ras6V40q806WM07rqoCWzXu7Sqy+4= go.opentelemetry.io/otel/exporters/jaeger v1.17.0/go.mod h1:nPCqOnEH9rNLKqH/+rrUjiMzHJdV1BlpKcTwRTyKkKI= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 h1:o8iWeVFa1BcLtVEV0LzrCxV2/55tB3xLxADr6Kyoey4= @@ -2049,14 +2057,14 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 h1:p3A5+ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1/go.mod h1:OClrnXUjBqQbInvjJFjYSnMxBSCXBF8r3b34WqjiIrQ= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1 h1:cfuy3bXmLJS7M1RZmAL6SuhGtKUp2KEsrm00OlAXkq4= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1/go.mod h1:22jr92C6KwlwItJmQzfixzQM3oyyuYLCfHiMY+rpsPU= -go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= -go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= -go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= -go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok= -go.opentelemetry.io/otel/sdk/metric v1.29.0 h1:K2CfmJohnRgvZ9UAj2/FhIf/okdWcNdBwe1m8xFXiSY= -go.opentelemetry.io/otel/sdk/metric v1.29.0/go.mod h1:6zZLdCl2fkauYoZIOn/soQIDSWFmNSRcICarHfuhNJQ= -go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= -go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= +go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= @@ -2124,8 +2132,8 @@ golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0 golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2264,8 +2272,8 @@ golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2297,8 +2305,8 @@ golang.org/x/oauth2 v0.4.0/go.mod h1:RznEsdpjGAINPTOF0UH/t+xJ75L18YO3Ho6Pyn+uRec golang.org/x/oauth2 v0.5.0/go.mod h1:9/XBHVqLaWO3/BRHs5jbpYCnOZVjj5V0ndyaAM7KB4I= golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw= golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= -golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= -golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE= +golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -2430,8 +2438,8 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -2445,8 +2453,8 @@ golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= -golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= +golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= +golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2806,10 +2814,10 @@ google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633/go.mod h1:UUQDJDOl google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28 h1:KJjNNclfpIkVqrZlTWcgOOaVQ00LdBnoEaRfkUx760s= google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:mt9/MofW7AWQ+Gy179ChOnvmJatV8YHUmrcedo9CIFI= -google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 h1:M0KvPgPmDZHPlbRbaNU1APr28TvwvvdUPlSv7PUvy8g= -google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:dguCy7UOdZhTvLzDyt15+rOrawrpM4q7DD9dQ1P11P4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a h1:OAiGFfOiA0v9MRYsSidp3ubZaBnteRUyn3xB2ZQ5G/E= +google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a/go.mod h1:jehYqy3+AhJU9ve55aNOaSml7wUXjF9x6z2LcCfpAhY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 h1:J1H9f+LEdWAfHcez/4cvaVBox7cOYT+IU6rgqj5x++8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287/go.mod h1:8BS3B93F/U1juMFq9+EDk+qOT5CO1R9IzXxG3PTqiRk= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -2853,8 +2861,8 @@ google.golang.org/grpc v1.52.3/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5v google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= -google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= -google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= +google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -2873,8 +2881,8 @@ google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= +google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/impl/s2/bento.go b/internal/impl/s2/bento.go new file mode 100644 index 000000000..d8bd46705 --- /dev/null +++ b/internal/impl/s2/bento.go @@ -0,0 +1,93 @@ +package s2 + +import ( + s2bentobox "github.com/s2-streamstore/s2-sdk-go/s2-bentobox" + "github.com/warpstreamlabs/bento/public/service" +) + +const ( + basinField = "basin" + authTokenField = "auth_token" +) + +var ( + basinFieldSpec = service.NewStringField(basinField). + Description("Basin name") + + authTokenFieldSpec = service.NewStringField(authTokenField). + Description("Authentication token for S2 account"). + Secret() +) + +func newConfigSpec() *service.ConfigSpec { + // TODO: Add summary, description etc. + return service.NewConfigSpec(). + Fields(basinFieldSpec, authTokenFieldSpec) +} + +func newConfig(conf *service.ParsedConfig) (*s2bentobox.Config, error) { + basin, err := conf.FieldString(basinField) + if err != nil { + return nil, err + } + + authToken, err := conf.FieldString(authTokenField) + if err != nil { + return nil, err + } + + return &s2bentobox.Config{ + Basin: basin, + AuthToken: authToken, + }, nil +} + +type bentoLogger struct { + L *service.Logger +} + +func (bl *bentoLogger) Tracef(template string, args ...any) { + bl.L.Tracef(template, args...) +} + +func (bl *bentoLogger) Trace(message string) { + bl.L.Trace(message) +} + +func (bl *bentoLogger) Debugf(template string, args ...any) { + bl.L.Debugf(template, args...) +} + +func (bl *bentoLogger) Debug(message string) { + bl.L.Debug(message) +} + +func (bl *bentoLogger) Infof(template string, args ...any) { + bl.L.Infof(template, args...) +} + +func (bl *bentoLogger) Info(message string) { + bl.L.Info(message) +} + +func (bl *bentoLogger) Warnf(template string, args ...any) { + bl.L.Warnf(template, args...) +} + +func (bl *bentoLogger) Warn(message string) { + bl.L.Warn(message) +} + +func (bl *bentoLogger) Errorf(template string, args ...any) { + bl.L.Errorf(template, args...) +} + +func (bl *bentoLogger) Error(message string) { + bl.L.Error(message) +} + +func (bl *bentoLogger) With(keyValuePairs ...any) s2bentobox.Logger { + return &bentoLogger{ + L: bl.L.With(keyValuePairs...), + } +} diff --git a/internal/impl/s2/embeds/input_description.md b/internal/impl/s2/embeds/input_description.md new file mode 100644 index 000000000..eee8e58fc --- /dev/null +++ b/internal/impl/s2/embeds/input_description.md @@ -0,0 +1,23 @@ +Generate an authentication token by logging onto the web console at +[s2.dev](https://s2.dev/dashboard). + +### Cache + +The plugin requires setting up a caching mechanism to resume the input after +the last acknowledged record. + +To know more about setting up a cache resource, see +[Cache docs for Bento](https://warpstreamlabs.github.io/bento/docs/components/caches/about). + +### Metadata + +This input adds the following metadata fields to each message in addition to the +record headers: + +- `s2_stream`: The origin S2 stream. +- `s2_seq_num`: Sequence number of the record in the origin stream formatted as a string. + +All the header values are loosely converted to strings as metadata attributes. + +**Note:** An [S2 command record](https://s2.dev/docs/stream#command-records) has no header +name. This is set as the `s2_command` meta key. diff --git a/internal/impl/s2/embeds/input_with_prefix_eg.yaml b/internal/impl/s2/embeds/input_with_prefix_eg.yaml new file mode 100644 index 000000000..048c4ecf9 --- /dev/null +++ b/internal/impl/s2/embeds/input_with_prefix_eg.yaml @@ -0,0 +1,17 @@ +cache_resources: + - label: s2_seq_num + file: + directory: s2_seq_num_cache + +input: + label: s2_input + s2: + basin: my-favorite-basin + streams: my-favorite-prefix- + auth_token: "${S2_AUTH_TOKEN}" + cache: s2_seq_num + +output: + label: stdout + stdout: + codec: lines diff --git a/internal/impl/s2/embeds/output_description.md b/internal/impl/s2/embeds/output_description.md new file mode 100644 index 000000000..bd039e4b8 --- /dev/null +++ b/internal/impl/s2/embeds/output_description.md @@ -0,0 +1,16 @@ +Generate an authentication token by logging onto the web console at +[s2.dev](https://s2.dev/dashboard). + +### Metadata + +The metadata attributes are set as S2 record headers. Currently, only string +attribute values are supported. + +### Batching + +The plugin expects batched inputs. Messages are batched automatically by Bento. + +By default, Bento disables batching based on `count`, `byte_size`, and `period` +parameters, but the plugin enables batching setting both `count` and +`byte_size` to the maximum values supported by S2 (`1000` and `1MiB`). It also +sets a flush period of `5ms` as a reasonable default. diff --git a/internal/impl/s2/embeds/output_starwars_eg.yaml b/internal/impl/s2/embeds/output_starwars_eg.yaml new file mode 100644 index 000000000..a9382d711 --- /dev/null +++ b/internal/impl/s2/embeds/output_starwars_eg.yaml @@ -0,0 +1,14 @@ +input: + label: towel_blinkenlights_nl + socket: + network: tcp + address: towel.blinkenlights.nl:23 + scanner: + lines: {} + +output: + label: s2_starwars + s2: + basin: my-favorite-basin + stream: starwars + auth_token: "${S2_AUTH_TOKEN}" diff --git a/internal/impl/s2/input.go b/internal/impl/s2/input.go new file mode 100644 index 000000000..7102abd52 --- /dev/null +++ b/internal/impl/s2/input.go @@ -0,0 +1,253 @@ +package s2 + +import ( + "context" + _ "embed" + "encoding/base64" + "encoding/binary" + "errors" + "fmt" + "strconv" + + s2bentobox "github.com/s2-streamstore/s2-sdk-go/s2-bentobox" + "github.com/warpstreamlabs/bento/public/service" +) + +var ( + //go:embed embeds/input_description.md + inputDescription string + + //go:embed embeds/input_with_prefix_eg.yaml + inputWithPrefixExample string +) + +var errCacheNotFound = errors.New("cache not found") + +func init() { + if err := service.RegisterBatchInput( + s2bentobox.PluginName, + newInputConfigSpec(), + func(conf *service.ParsedConfig, r *service.Resources) (service.BatchInput, error) { + config, err := newInputConfig(conf, r) + if err != nil { + return nil, err + } + + return &Input{ + inner: nil, // Will be instantiated during `Connect` + config: config, + logger: r.Logger(), + }, nil + }, + ); err != nil { + panic(err) + } +} + +const ( + streamsField = "streams" + cacheField = "cache" + updateStreamsIntervalField = "update_streams_interval" +) + +var ( + streamsFieldSpec = service.NewAnyField(streamsField). + Description("Streams prefix or list of streams to subscribe to") + + inputMaxInFlightSpec = service.NewInputMaxInFlightField() + + cacheFieldSpec = service.NewStringField(cacheField). + Description("Cache resource label for storing sequence number") + + updateStreamsIntervalSpec = service.NewDurationField(updateStreamsIntervalField). + Advanced(). + Default("60s"). + Description("Interval after which the streams list should update dynamically") +) + +func newInputConfigSpec() *service.ConfigSpec { + return newConfigSpec(). + Fields( + streamsFieldSpec, + inputMaxInFlightSpec, + cacheFieldSpec, + updateStreamsIntervalSpec, + ). + Summary("Consumes records from S2 streams"). + Description(inputDescription). + Example( + "Input with Prefix", + "Fetch records from all the streams with the prefix `my-favorite-prefix-` in the basin.", + inputWithPrefixExample, + ) +} + +func newInputConfig(conf *service.ParsedConfig, r *service.Resources) (*s2bentobox.InputConfig, error) { + config, err := newConfig(conf) + if err != nil { + return nil, err + } + + var inputStreams s2bentobox.InputStreams + + if streams, err := conf.FieldStringList(streamsField); err != nil { + // Try just a prefix. + prefix, err := conf.FieldString(streamsField) + if err != nil { + return nil, err + } + + inputStreams = s2bentobox.PrefixedInputStreams{ + Prefix: prefix, + } + } else { + inputStreams = s2bentobox.StaticInputStreams{ + Streams: streams, + } + } + + maxInFlight, err := conf.FieldMaxInFlight() + if err != nil { + return nil, err + } + + cacheLabel, err := conf.FieldString(cacheField) + if err != nil { + return nil, err + } + + if !r.HasCache(cacheLabel) { + return nil, fmt.Errorf("%w: %q", errCacheNotFound, cacheLabel) + } + + cache := &bentoSeqNumCache{ + Resources: r, + Label: cacheLabel, + } + + updateStreamsInterval, err := conf.FieldDuration(updateStreamsIntervalField) + if err != nil { + return nil, err + } + + return &s2bentobox.InputConfig{ + Config: config, + Streams: inputStreams, + MaxInFlight: maxInFlight, + Logger: &bentoLogger{L: r.Logger()}, + Cache: cache, + UpdateStreamsInterval: updateStreamsInterval, + }, nil +} + +type bentoSeqNumCache struct { + Resources *service.Resources + Label string +} + +func streamCacheKey(stream string) string { + return base64.URLEncoding.EncodeToString([]byte(stream)) +} + +func (b *bentoSeqNumCache) Get(ctx context.Context, stream string) (uint64, error) { + var ( + seqNum uint64 + err error + ) + + if aErr := b.Resources.AccessCache(ctx, b.Label, func(c service.Cache) { + var seqNumBytes []byte + seqNumBytes, err = c.Get(ctx, streamCacheKey(stream)) + if err != nil { + return + } + + seqNum = binary.BigEndian.Uint64(seqNumBytes) + }); aErr != nil { + return 0, aErr + } + + return seqNum, err +} + +func (b *bentoSeqNumCache) Set(ctx context.Context, stream string, seqNum uint64) error { + var err error + + if aErr := b.Resources.AccessCache(ctx, b.Label, func(c service.Cache) { + seqNumBytes := binary.BigEndian.AppendUint64(make([]byte, 0, 8), seqNum) + + err = c.Set(ctx, streamCacheKey(stream), seqNumBytes, nil) + }); aErr != nil { + return aErr + } + + return err +} + +type Input struct { + inner *s2bentobox.Input + config *s2bentobox.InputConfig + logger *service.Logger +} + +func (i *Input) Connect(ctx context.Context) error { + i.logger.Debug("Connecting S2 input") + + inner, err := s2bentobox.ConnectInput(ctx, i.config) + if err != nil { + return err + } + + // Initialize inner connection. + i.inner = inner + + return nil +} + +func (i *Input) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { + i.logger.Debug("Reading batch from S2") + + batch, stream, err := i.inner.ReadBatch(ctx) + if err != nil { + if errors.Is(err, s2bentobox.ErrInputClosed) { + return nil, nil, service.ErrNotConnected + } + + return nil, nil, err + } + + messages := make([]*service.Message, 0, len(batch.Records)) + + for _, record := range batch.Records { + msg := service.NewMessage(record.Body) + + if len(record.Headers) == 1 && len(record.Headers[0].Name) == 0 { + // Command record + msg.MetaSet("s2_command", string(record.Headers[0].Value)) + } else { + for _, header := range record.Headers { + // TODO: Use `MetaSetMut`. + msg.MetaSet(string(header.Name), string(header.Value)) + } + } + + msg.MetaSet("s2_seq_num", strconv.FormatUint(record.SeqNum, 10)) + msg.MetaSet("s2_stream", stream) + + messages = append(messages, msg) + } + + return messages, i.inner.AckFunc(stream, batch), nil +} + +func (i *Input) Close(ctx context.Context) error { + i.logger.Debug("Closing S2 input") + + if err := i.inner.Close(ctx); err != nil { + return err + } + + i.inner = nil + + return nil +} diff --git a/internal/impl/s2/output.go b/internal/impl/s2/output.go new file mode 100644 index 000000000..56ccbc1ee --- /dev/null +++ b/internal/impl/s2/output.go @@ -0,0 +1,248 @@ +package s2 + +import ( + "context" + _ "embed" + "encoding/base64" + "errors" + "fmt" + + "github.com/s2-streamstore/s2-sdk-go/s2" + s2bentobox "github.com/s2-streamstore/s2-sdk-go/s2-bentobox" + "github.com/warpstreamlabs/bento/public/service" +) + +var ( + //go:embed embeds/output_description.md + outputDescription string + + //go:embed embeds/output_starwars_eg.yaml + outputStarwars string +) + +var ( + errInvalidBatchingByteSize = errors.New("invalid batch policy byte size") + errInvalidBatchingCount = errors.New("invalid batch policy count") +) + +func init() { + if err := service.RegisterBatchOutput( + s2bentobox.PluginName, + newOutputConfigSpec(), + func(conf *service.ParsedConfig, r *service.Resources) ( + service.BatchOutput, service.BatchPolicy, int, error, + ) { + policy, err := parseBatchPolicy(conf) + if err != nil { + return nil, policy, 0, err + } + + config, err := newOutputConfig(conf) + if err != nil { + return nil, policy, 0, err + } + + return &Output{ + inner: nil, // Will be instantiated during `Connect` + config: config, + logger: r.Logger(), + }, policy, config.MaxInFlight, nil + }, + ); err != nil { + panic(err) + } +} + +const ( + batchingField = "batching" + streamField = "stream" + matchSeqNumField = "match_seq_num" + fencingTokenField = "fencing_token" +) + +var ( + batchingFieldSpec = service.NewBatchPolicyField(batchingField) + + streamFieldSpec = service.NewStringField(streamField). + Description("Stream name") + + outputMaxInFlightSpec = service.NewOutputMaxInFlightField() + + fencingTokenFieldSpec = service.NewStringField(fencingTokenField). + Optional(). + Description("Enforce a fencing token (base64 encoded)"). + Example("aGVsbG8gczI=") +) + +func newOutputConfigSpec() *service.ConfigSpec { + return newConfigSpec(). + Fields( + batchingFieldSpec, + streamFieldSpec, + outputMaxInFlightSpec, + fencingTokenFieldSpec, + ). + Summary("Sends messages to an S2 stream."). + Description(outputDescription). + Example( + "ASCII Starwars", + "Consume a network stream into an S2 stream", + outputStarwars, + ) +} + +func parseBatchPolicy(conf *service.ParsedConfig) (service.BatchPolicy, error) { + policy, err := conf.FieldBatchPolicy(batchingField) + if err != nil { + return policy, err + } + + // Set required defaults + + if policy.ByteSize <= 0 { + // TODO: We might need to decrease the size since bento message size != s2 + // record metered size. + policy.ByteSize = int(s2.MaxBatchBytes) + } + + if policy.Count <= 0 { + policy.Count = s2.MaxBatchRecords + } + + // This feels sensible to have. Not sure if we should let the user have infinite + // retention until a batch of required size is formed. + if policy.Period == "" { + policy.Period = "5ms" + } + + // Validate limits + + if policy.ByteSize > int(s2.MaxBatchBytes) { + return policy, fmt.Errorf("%w: must not exceed %d", errInvalidBatchingByteSize, s2.MaxBatchBytes) + } + + if policy.Count > s2.MaxBatchRecords { + return policy, fmt.Errorf("%w: must not exceed %d", errInvalidBatchingCount, s2.MaxBatchRecords) + } + + return policy, nil +} + +func newOutputConfig(conf *service.ParsedConfig) (*s2bentobox.OutputConfig, error) { + config, err := newConfig(conf) + if err != nil { + return nil, err + } + + stream, err := conf.FieldString(streamField) + if err != nil { + return nil, err + } + + maxInFlight, err := conf.FieldMaxInFlight() + if err != nil { + return nil, err + } + + var fencingToken []byte + + if conf.Contains(fencingTokenField) { + field, err := conf.FieldString(fencingTokenField) + if err != nil { + return nil, err + } + + fencingToken, err = base64.StdEncoding.DecodeString(field) + if err != nil { + return nil, err + } + } + + return &s2bentobox.OutputConfig{ + Config: config, + Stream: stream, + MaxInFlight: maxInFlight, + FencingToken: fencingToken, + }, nil +} + +type Output struct { + inner *s2bentobox.Output + config *s2bentobox.OutputConfig + logger *service.Logger +} + +func (o *Output) Connect(ctx context.Context) error { + o.logger.Debug("Connecting S2 output") + + inner, err := s2bentobox.ConnectOutput(ctx, o.config) + if err != nil { + return err + } + + // Initialize inner connection. + o.inner = inner + + return nil +} + +func (o *Output) WriteBatch(ctx context.Context, batch service.MessageBatch) error { + o.logger.Debug("Writing batch to S2") + + recordBatch, err := s2.NewAppendRecordBatch() + if err != nil { + panic("empty record batch shouldn't error") + } + + if err := batch.WalkWithBatchedErrors(func(_ int, m *service.Message) error { + body, err := m.AsBytes() + if err != nil { + return err + } + + var headers []s2.Header + if err := m.MetaWalk(func(k, v string) error { + headers = append(headers, s2.Header{ + Name: []byte(k), + Value: []byte(v), + }) + + return nil + }); err != nil { + return err + } + + if !recordBatch.Append(s2.AppendRecord{ + Headers: headers, + Body: body, + }) { + return s2bentobox.ErrAppendRecordBatchFull + } + + return nil + }); err != nil { + return err + } + + if err := o.inner.WriteBatch(ctx, recordBatch); err != nil { + if errors.Is(err, s2bentobox.ErrOutputClosed) { + return service.ErrNotConnected + } + + return err + } + + return nil +} + +func (o *Output) Close(ctx context.Context) error { + o.logger.Debug("Closing S2 output") + + if err := o.inner.Close(ctx); err != nil { + return err + } + + o.inner = nil + + return nil +} diff --git a/public/components/all/package.go b/public/components/all/package.go index b819e09e5..bc7c8f786 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -46,6 +46,7 @@ import ( _ "github.com/warpstreamlabs/bento/public/components/pusher" _ "github.com/warpstreamlabs/bento/public/components/questdb" _ "github.com/warpstreamlabs/bento/public/components/redis" + _ "github.com/warpstreamlabs/bento/public/components/s2" _ "github.com/warpstreamlabs/bento/public/components/sentry" _ "github.com/warpstreamlabs/bento/public/components/sftp" _ "github.com/warpstreamlabs/bento/public/components/snowflake" diff --git a/public/components/s2/package.go b/public/components/s2/package.go new file mode 100644 index 000000000..4755ac4ee --- /dev/null +++ b/public/components/s2/package.go @@ -0,0 +1,6 @@ +package s2 + +import ( + // Bring in the internal plugin definitions. + _ "github.com/warpstreamlabs/bento/internal/impl/s2" +) From c4deee556bf92f4f4964fc05c191816a30dc1c79 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Wed, 29 Jan 2025 20:31:14 +0530 Subject: [PATCH 2/7] update --- go.mod | 2 +- go.sum | 2 ++ internal/impl/s2/embeds/input_description.md | 1 + internal/impl/s2/input.go | 9 +++++---- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index df0af2e2a..750398615 100644 --- a/go.mod +++ b/go.mod @@ -161,7 +161,7 @@ require ( require ( github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect github.com/s2-streamstore/optr v1.0.0 // indirect - github.com/s2-streamstore/s2-sdk-go v0.3.0 + github.com/s2-streamstore/s2-sdk-go v0.4.0 github.com/tidwall/btree v1.7.0 // indirect ) diff --git a/go.sum b/go.sum index 8f2ff8acf..8e4b44a87 100644 --- a/go.sum +++ b/go.sum @@ -1866,6 +1866,8 @@ github.com/s2-streamstore/optr v1.0.0 h1:OqJpOO2uYsG+iXW9mNTlsCePDvxTS8mh9EtGcny github.com/s2-streamstore/optr v1.0.0/go.mod h1:x/wSUCzWgpnvWWQG4dZ4JBFW2Kw0pE0yhLGb+0xiN8g= github.com/s2-streamstore/s2-sdk-go v0.3.0 h1:BxsYdsW+w8BLzjBn84rn+IlH+T6UdQ3vBO+AGIt0SEA= github.com/s2-streamstore/s2-sdk-go v0.3.0/go.mod h1:qfyTLgT32aijTNQeN478iobzZ7EeWg3K8v2gxc3xVa0= +github.com/s2-streamstore/s2-sdk-go v0.4.0 h1:ySyLfs7XELm4NdT+RSonOK9ZxLpjKHXKanlwd79mr6U= +github.com/s2-streamstore/s2-sdk-go v0.4.0/go.mod h1:qfyTLgT32aijTNQeN478iobzZ7EeWg3K8v2gxc3xVa0= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= diff --git a/internal/impl/s2/embeds/input_description.md b/internal/impl/s2/embeds/input_description.md index eee8e58fc..11d0fd350 100644 --- a/internal/impl/s2/embeds/input_description.md +++ b/internal/impl/s2/embeds/input_description.md @@ -14,6 +14,7 @@ To know more about setting up a cache resource, see This input adds the following metadata fields to each message in addition to the record headers: +- `s2_basin`: The S2 basin where the origin stream lives. - `s2_stream`: The origin S2 stream. - `s2_seq_num`: Sequence number of the record in the origin stream formatted as a string. diff --git a/internal/impl/s2/input.go b/internal/impl/s2/input.go index 7102abd52..33a45e6af 100644 --- a/internal/impl/s2/input.go +++ b/internal/impl/s2/input.go @@ -185,7 +185,7 @@ func (b *bentoSeqNumCache) Set(ctx context.Context, stream string, seqNum uint64 } type Input struct { - inner *s2bentobox.Input + inner *s2bentobox.MultiStreamInput config *s2bentobox.InputConfig logger *service.Logger } @@ -193,7 +193,7 @@ type Input struct { func (i *Input) Connect(ctx context.Context) error { i.logger.Debug("Connecting S2 input") - inner, err := s2bentobox.ConnectInput(ctx, i.config) + inner, err := s2bentobox.ConnectMultiStreamInput(ctx, i.config) if err != nil { return err } @@ -207,7 +207,7 @@ func (i *Input) Connect(ctx context.Context) error { func (i *Input) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { i.logger.Debug("Reading batch from S2") - batch, stream, err := i.inner.ReadBatch(ctx) + batch, aFn, stream, err := i.inner.ReadBatch(ctx) if err != nil { if errors.Is(err, s2bentobox.ErrInputClosed) { return nil, nil, service.ErrNotConnected @@ -233,11 +233,12 @@ func (i *Input) ReadBatch(ctx context.Context) (service.MessageBatch, service.Ac msg.MetaSet("s2_seq_num", strconv.FormatUint(record.SeqNum, 10)) msg.MetaSet("s2_stream", stream) + msg.MetaSet("s2_basin", i.config.Basin) messages = append(messages, msg) } - return messages, i.inner.AckFunc(stream, batch), nil + return messages, aFn, nil } func (i *Input) Close(ctx context.Context) error { From e0aff1477adde46e4d531443932893c91d934aa4 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Wed, 29 Jan 2025 20:34:33 +0530 Subject: [PATCH 3/7] update go.sum --- go.sum | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.sum b/go.sum index 8e4b44a87..ae98193b3 100644 --- a/go.sum +++ b/go.sum @@ -1864,8 +1864,6 @@ github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfF github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk= github.com/s2-streamstore/optr v1.0.0 h1:OqJpOO2uYsG+iXW9mNTlsCePDvxTS8mh9EtGcnyq8w8= github.com/s2-streamstore/optr v1.0.0/go.mod h1:x/wSUCzWgpnvWWQG4dZ4JBFW2Kw0pE0yhLGb+0xiN8g= -github.com/s2-streamstore/s2-sdk-go v0.3.0 h1:BxsYdsW+w8BLzjBn84rn+IlH+T6UdQ3vBO+AGIt0SEA= -github.com/s2-streamstore/s2-sdk-go v0.3.0/go.mod h1:qfyTLgT32aijTNQeN478iobzZ7EeWg3K8v2gxc3xVa0= github.com/s2-streamstore/s2-sdk-go v0.4.0 h1:ySyLfs7XELm4NdT+RSonOK9ZxLpjKHXKanlwd79mr6U= github.com/s2-streamstore/s2-sdk-go v0.4.0/go.mod h1:qfyTLgT32aijTNQeN478iobzZ7EeWg3K8v2gxc3xVa0= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= From 0e6cd3643c54214c7e7138670368cc3d10bfc7bd Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Wed, 29 Jan 2025 22:40:50 +0530 Subject: [PATCH 4/7] update byte size limit --- internal/impl/s2/embeds/output_description.md | 7 ++++++- internal/impl/s2/output.go | 11 +++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/internal/impl/s2/embeds/output_description.md b/internal/impl/s2/embeds/output_description.md index bd039e4b8..c4aa6c683 100644 --- a/internal/impl/s2/embeds/output_description.md +++ b/internal/impl/s2/embeds/output_description.md @@ -12,5 +12,10 @@ The plugin expects batched inputs. Messages are batched automatically by Bento. By default, Bento disables batching based on `count`, `byte_size`, and `period` parameters, but the plugin enables batching setting both `count` and -`byte_size` to the maximum values supported by S2 (`1000` and `1MiB`). It also +`byte_size` to the maximum values supported by S2. It also sets a flush period of `5ms` as a reasonable default. + +**Note:** An S2 record batch can be a maximum of 1MiB but the plugin limits the +size of a message to 256KiB since the Bento size limit doesn't take metadata into +account. Moreover, the metered size of the same Bento message will be greater +than the byte size of a Bento message. diff --git a/internal/impl/s2/output.go b/internal/impl/s2/output.go index 56ccbc1ee..4287162cd 100644 --- a/internal/impl/s2/output.go +++ b/internal/impl/s2/output.go @@ -99,10 +99,13 @@ func parseBatchPolicy(conf *service.ParsedConfig) (service.BatchPolicy, error) { // Set required defaults + // Setting the batch byte size max a bit conservatively since Bento does not + // take metadata size into account. Moreover, the S2 metered size of a record + // will be > bento message size. + const maxBatchBytes = 256 * 1024 + if policy.ByteSize <= 0 { - // TODO: We might need to decrease the size since bento message size != s2 - // record metered size. - policy.ByteSize = int(s2.MaxBatchBytes) + policy.ByteSize = maxBatchBytes } if policy.Count <= 0 { @@ -117,7 +120,7 @@ func parseBatchPolicy(conf *service.ParsedConfig) (service.BatchPolicy, error) { // Validate limits - if policy.ByteSize > int(s2.MaxBatchBytes) { + if policy.ByteSize > maxBatchBytes { return policy, fmt.Errorf("%w: must not exceed %d", errInvalidBatchingByteSize, s2.MaxBatchBytes) } From cc3028db1e86298d8c419ffccd7a7e19bf18b7c1 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Thu, 30 Jan 2025 14:51:38 +0530 Subject: [PATCH 5/7] update --- go.mod | 2 +- go.sum | 4 ++-- internal/impl/s2/embeds/input_with_prefix_eg.yaml | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 750398615..c3ff35784 100644 --- a/go.mod +++ b/go.mod @@ -161,7 +161,7 @@ require ( require ( github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect github.com/s2-streamstore/optr v1.0.0 // indirect - github.com/s2-streamstore/s2-sdk-go v0.4.0 + github.com/s2-streamstore/s2-sdk-go v0.4.1 github.com/tidwall/btree v1.7.0 // indirect ) diff --git a/go.sum b/go.sum index ae98193b3..3aff51ccb 100644 --- a/go.sum +++ b/go.sum @@ -1864,8 +1864,8 @@ github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfF github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk= github.com/s2-streamstore/optr v1.0.0 h1:OqJpOO2uYsG+iXW9mNTlsCePDvxTS8mh9EtGcnyq8w8= github.com/s2-streamstore/optr v1.0.0/go.mod h1:x/wSUCzWgpnvWWQG4dZ4JBFW2Kw0pE0yhLGb+0xiN8g= -github.com/s2-streamstore/s2-sdk-go v0.4.0 h1:ySyLfs7XELm4NdT+RSonOK9ZxLpjKHXKanlwd79mr6U= -github.com/s2-streamstore/s2-sdk-go v0.4.0/go.mod h1:qfyTLgT32aijTNQeN478iobzZ7EeWg3K8v2gxc3xVa0= +github.com/s2-streamstore/s2-sdk-go v0.4.1 h1:KQpu6in3D5E93CXcE0XV0WTn+fBla06n23Y4K1/ZHno= +github.com/s2-streamstore/s2-sdk-go v0.4.1/go.mod h1:qfyTLgT32aijTNQeN478iobzZ7EeWg3K8v2gxc3xVa0= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= diff --git a/internal/impl/s2/embeds/input_with_prefix_eg.yaml b/internal/impl/s2/embeds/input_with_prefix_eg.yaml index 048c4ecf9..ef343be02 100644 --- a/internal/impl/s2/embeds/input_with_prefix_eg.yaml +++ b/internal/impl/s2/embeds/input_with_prefix_eg.yaml @@ -7,7 +7,7 @@ input: label: s2_input s2: basin: my-favorite-basin - streams: my-favorite-prefix- + streams: my-favorite-prefix/ auth_token: "${S2_AUTH_TOKEN}" cache: s2_seq_num From 92d900d86b905b1d08b1bf7057ce98a7dd162eaf Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Sat, 1 Feb 2025 00:28:03 +0530 Subject: [PATCH 6/7] address comments --- internal/impl/s2/bento.go | 59 +------- internal/impl/s2/cache.go | 53 +++++++ internal/impl/s2/embeds/input_description.md | 24 --- .../impl/s2/embeds/input_with_prefix_eg.yaml | 17 --- internal/impl/s2/embeds/output_description.md | 21 --- .../impl/s2/embeds/output_starwars_eg.yaml | 14 -- internal/impl/s2/input.go | 138 ++++++++---------- internal/impl/s2/output.go | 116 ++++++++++----- 8 files changed, 192 insertions(+), 250 deletions(-) create mode 100644 internal/impl/s2/cache.go delete mode 100644 internal/impl/s2/embeds/input_description.md delete mode 100644 internal/impl/s2/embeds/input_with_prefix_eg.yaml delete mode 100644 internal/impl/s2/embeds/output_description.md delete mode 100644 internal/impl/s2/embeds/output_starwars_eg.yaml diff --git a/internal/impl/s2/bento.go b/internal/impl/s2/bento.go index d8bd46705..ad5698537 100644 --- a/internal/impl/s2/bento.go +++ b/internal/impl/s2/bento.go @@ -10,21 +10,6 @@ const ( authTokenField = "auth_token" ) -var ( - basinFieldSpec = service.NewStringField(basinField). - Description("Basin name") - - authTokenFieldSpec = service.NewStringField(authTokenField). - Description("Authentication token for S2 account"). - Secret() -) - -func newConfigSpec() *service.ConfigSpec { - // TODO: Add summary, description etc. - return service.NewConfigSpec(). - Fields(basinFieldSpec, authTokenFieldSpec) -} - func newConfig(conf *service.ParsedConfig) (*s2bentobox.Config, error) { basin, err := conf.FieldString(basinField) if err != nil { @@ -43,51 +28,11 @@ func newConfig(conf *service.ParsedConfig) (*s2bentobox.Config, error) { } type bentoLogger struct { - L *service.Logger -} - -func (bl *bentoLogger) Tracef(template string, args ...any) { - bl.L.Tracef(template, args...) -} - -func (bl *bentoLogger) Trace(message string) { - bl.L.Trace(message) -} - -func (bl *bentoLogger) Debugf(template string, args ...any) { - bl.L.Debugf(template, args...) -} - -func (bl *bentoLogger) Debug(message string) { - bl.L.Debug(message) -} - -func (bl *bentoLogger) Infof(template string, args ...any) { - bl.L.Infof(template, args...) -} - -func (bl *bentoLogger) Info(message string) { - bl.L.Info(message) -} - -func (bl *bentoLogger) Warnf(template string, args ...any) { - bl.L.Warnf(template, args...) -} - -func (bl *bentoLogger) Warn(message string) { - bl.L.Warn(message) -} - -func (bl *bentoLogger) Errorf(template string, args ...any) { - bl.L.Errorf(template, args...) -} - -func (bl *bentoLogger) Error(message string) { - bl.L.Error(message) + *service.Logger } func (bl *bentoLogger) With(keyValuePairs ...any) s2bentobox.Logger { return &bentoLogger{ - L: bl.L.With(keyValuePairs...), + Logger: bl.Logger.With(keyValuePairs...), } } diff --git a/internal/impl/s2/cache.go b/internal/impl/s2/cache.go new file mode 100644 index 000000000..4e026a8dd --- /dev/null +++ b/internal/impl/s2/cache.go @@ -0,0 +1,53 @@ +package s2 + +import ( + "context" + "encoding/base64" + "encoding/binary" + + "github.com/warpstreamlabs/bento/public/service" +) + +type bentoSeqNumCache struct { + Resources *service.Resources + Label string +} + +func streamCacheKey(stream string) string { + return base64.URLEncoding.EncodeToString([]byte(stream)) +} + +func (b *bentoSeqNumCache) Get(ctx context.Context, stream string) (uint64, error) { + var ( + seqNum uint64 + err error + ) + + if aErr := b.Resources.AccessCache(ctx, b.Label, func(c service.Cache) { + var seqNumBytes []byte + seqNumBytes, err = c.Get(ctx, streamCacheKey(stream)) + if err != nil { + return + } + + seqNum = binary.BigEndian.Uint64(seqNumBytes) + }); aErr != nil { + return 0, aErr + } + + return seqNum, err +} + +func (b *bentoSeqNumCache) Set(ctx context.Context, stream string, seqNum uint64) error { + var err error + + if aErr := b.Resources.AccessCache(ctx, b.Label, func(c service.Cache) { + seqNumBytes := binary.BigEndian.AppendUint64(make([]byte, 0, 8), seqNum) + + err = c.Set(ctx, streamCacheKey(stream), seqNumBytes, nil) + }); aErr != nil { + return aErr + } + + return err +} diff --git a/internal/impl/s2/embeds/input_description.md b/internal/impl/s2/embeds/input_description.md deleted file mode 100644 index 11d0fd350..000000000 --- a/internal/impl/s2/embeds/input_description.md +++ /dev/null @@ -1,24 +0,0 @@ -Generate an authentication token by logging onto the web console at -[s2.dev](https://s2.dev/dashboard). - -### Cache - -The plugin requires setting up a caching mechanism to resume the input after -the last acknowledged record. - -To know more about setting up a cache resource, see -[Cache docs for Bento](https://warpstreamlabs.github.io/bento/docs/components/caches/about). - -### Metadata - -This input adds the following metadata fields to each message in addition to the -record headers: - -- `s2_basin`: The S2 basin where the origin stream lives. -- `s2_stream`: The origin S2 stream. -- `s2_seq_num`: Sequence number of the record in the origin stream formatted as a string. - -All the header values are loosely converted to strings as metadata attributes. - -**Note:** An [S2 command record](https://s2.dev/docs/stream#command-records) has no header -name. This is set as the `s2_command` meta key. diff --git a/internal/impl/s2/embeds/input_with_prefix_eg.yaml b/internal/impl/s2/embeds/input_with_prefix_eg.yaml deleted file mode 100644 index ef343be02..000000000 --- a/internal/impl/s2/embeds/input_with_prefix_eg.yaml +++ /dev/null @@ -1,17 +0,0 @@ -cache_resources: - - label: s2_seq_num - file: - directory: s2_seq_num_cache - -input: - label: s2_input - s2: - basin: my-favorite-basin - streams: my-favorite-prefix/ - auth_token: "${S2_AUTH_TOKEN}" - cache: s2_seq_num - -output: - label: stdout - stdout: - codec: lines diff --git a/internal/impl/s2/embeds/output_description.md b/internal/impl/s2/embeds/output_description.md deleted file mode 100644 index c4aa6c683..000000000 --- a/internal/impl/s2/embeds/output_description.md +++ /dev/null @@ -1,21 +0,0 @@ -Generate an authentication token by logging onto the web console at -[s2.dev](https://s2.dev/dashboard). - -### Metadata - -The metadata attributes are set as S2 record headers. Currently, only string -attribute values are supported. - -### Batching - -The plugin expects batched inputs. Messages are batched automatically by Bento. - -By default, Bento disables batching based on `count`, `byte_size`, and `period` -parameters, but the plugin enables batching setting both `count` and -`byte_size` to the maximum values supported by S2. It also -sets a flush period of `5ms` as a reasonable default. - -**Note:** An S2 record batch can be a maximum of 1MiB but the plugin limits the -size of a message to 256KiB since the Bento size limit doesn't take metadata into -account. Moreover, the metered size of the same Bento message will be greater -than the byte size of a Bento message. diff --git a/internal/impl/s2/embeds/output_starwars_eg.yaml b/internal/impl/s2/embeds/output_starwars_eg.yaml deleted file mode 100644 index a9382d711..000000000 --- a/internal/impl/s2/embeds/output_starwars_eg.yaml +++ /dev/null @@ -1,14 +0,0 @@ -input: - label: towel_blinkenlights_nl - socket: - network: tcp - address: towel.blinkenlights.nl:23 - scanner: - lines: {} - -output: - label: s2_starwars - s2: - basin: my-favorite-basin - stream: starwars - auth_token: "${S2_AUTH_TOKEN}" diff --git a/internal/impl/s2/input.go b/internal/impl/s2/input.go index 33a45e6af..287505ba7 100644 --- a/internal/impl/s2/input.go +++ b/internal/impl/s2/input.go @@ -3,8 +3,6 @@ package s2 import ( "context" _ "embed" - "encoding/base64" - "encoding/binary" "errors" "fmt" "strconv" @@ -13,14 +11,6 @@ import ( "github.com/warpstreamlabs/bento/public/service" ) -var ( - //go:embed embeds/input_description.md - inputDescription string - - //go:embed embeds/input_with_prefix_eg.yaml - inputWithPrefixExample string -) - var errCacheNotFound = errors.New("cache not found") func init() { @@ -50,35 +40,73 @@ const ( updateStreamsIntervalField = "update_streams_interval" ) -var ( - streamsFieldSpec = service.NewAnyField(streamsField). - Description("Streams prefix or list of streams to subscribe to") +func newInputConfigSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Fields( + service.NewStringField(basinField).Description("Basin name"), + service.NewStringField(authTokenField). + Description("Authentication token for S2 account"). + Secret(), + service.NewAnyField(streamsField). + Description("Streams prefix or list of streams to subscribe to"), + service.NewStringField(cacheField). + Description("Cache resource label for storing sequence number"), + service.NewInputMaxInFlightField().Advanced(), + service.NewDurationField(updateStreamsIntervalField). + Advanced(). + Default("1m"). + Description("Interval after which the streams list should update dynamically"), + ). + Summary("Consumes records from S2 streams"). + Description(` +Generate an authentication token by logging onto the web console at +[s2.dev](https://s2.dev/dashboard). - inputMaxInFlightSpec = service.NewInputMaxInFlightField() +### Cache - cacheFieldSpec = service.NewStringField(cacheField). - Description("Cache resource label for storing sequence number") +The plugin requires setting up a caching mechanism to resume the input after +the last acknowledged record. - updateStreamsIntervalSpec = service.NewDurationField(updateStreamsIntervalField). - Advanced(). - Default("60s"). - Description("Interval after which the streams list should update dynamically") -) +To know more about setting up a cache resource, see +[Cache docs for Bento](https://warpstreamlabs.github.io/bento/docs/components/caches/about). -func newInputConfigSpec() *service.ConfigSpec { - return newConfigSpec(). - Fields( - streamsFieldSpec, - inputMaxInFlightSpec, - cacheFieldSpec, - updateStreamsIntervalSpec, +### Metadata + +This input adds the following metadata fields to each message in addition to the +record headers: + +- `+"`s2_basin`"+`: The S2 basin where the origin stream lives. +- `+"`s2_stream`"+`: The origin S2 stream. +- `+"`s2_seq_num`"+`: Sequence number of the record in the origin stream formatted as a string. + +All the header values are loosely converted to strings as metadata attributes. + +**Note:** An [S2 command record](https://s2.dev/docs/stream#command-records) has no header +name. This is set as the `+"`s2_command`"+` meta key. +`, ). - Summary("Consumes records from S2 streams"). - Description(inputDescription). Example( "Input with Prefix", - "Fetch records from all the streams with the prefix `my-favorite-prefix-` in the basin.", - inputWithPrefixExample, + "Fetch records from all the streams with the prefix `my-favorite-prefix/` in the basin.", + ` +cache_resources: + - label: s2_seq_num + file: + directory: s2_seq_num_cache + +input: + label: s2_input + s2: + basin: my-favorite-basin + streams: my-favorite-prefix/ + auth_token: "${S2_AUTH_TOKEN}" + cache: s2_seq_num + +output: + label: stdout + stdout: + codec: lines +`, ) } @@ -134,56 +162,12 @@ func newInputConfig(conf *service.ParsedConfig, r *service.Resources) (*s2bentob Config: config, Streams: inputStreams, MaxInFlight: maxInFlight, - Logger: &bentoLogger{L: r.Logger()}, + Logger: &bentoLogger{r.Logger()}, Cache: cache, UpdateStreamsInterval: updateStreamsInterval, }, nil } -type bentoSeqNumCache struct { - Resources *service.Resources - Label string -} - -func streamCacheKey(stream string) string { - return base64.URLEncoding.EncodeToString([]byte(stream)) -} - -func (b *bentoSeqNumCache) Get(ctx context.Context, stream string) (uint64, error) { - var ( - seqNum uint64 - err error - ) - - if aErr := b.Resources.AccessCache(ctx, b.Label, func(c service.Cache) { - var seqNumBytes []byte - seqNumBytes, err = c.Get(ctx, streamCacheKey(stream)) - if err != nil { - return - } - - seqNum = binary.BigEndian.Uint64(seqNumBytes) - }); aErr != nil { - return 0, aErr - } - - return seqNum, err -} - -func (b *bentoSeqNumCache) Set(ctx context.Context, stream string, seqNum uint64) error { - var err error - - if aErr := b.Resources.AccessCache(ctx, b.Label, func(c service.Cache) { - seqNumBytes := binary.BigEndian.AppendUint64(make([]byte, 0, 8), seqNum) - - err = c.Set(ctx, streamCacheKey(stream), seqNumBytes, nil) - }); aErr != nil { - return aErr - } - - return err -} - type Input struct { inner *s2bentobox.MultiStreamInput config *s2bentobox.InputConfig diff --git a/internal/impl/s2/output.go b/internal/impl/s2/output.go index 4287162cd..2099ae963 100644 --- a/internal/impl/s2/output.go +++ b/internal/impl/s2/output.go @@ -12,13 +12,10 @@ import ( "github.com/warpstreamlabs/bento/public/service" ) -var ( - //go:embed embeds/output_description.md - outputDescription string - - //go:embed embeds/output_starwars_eg.yaml - outputStarwars string -) +// Setting the batch byte size max a bit conservatively since Bento does not +// take metadata size into account. Moreover, the S2 metered size of a record +// will be > bento message size. +const maxBatchBytes = 256 * 1024 var ( errInvalidBatchingByteSize = errors.New("invalid batch policy byte size") @@ -60,34 +57,78 @@ const ( fencingTokenField = "fencing_token" ) -var ( - batchingFieldSpec = service.NewBatchPolicyField(batchingField) +func newOutputConfigSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Fields( + service.NewStringField(basinField).Description("Basin name"), + service.NewStringField(authTokenField). + Description("Authentication token for S2 account"). + Secret(), + service.NewStringField(streamField).Description("Stream name"), + service.NewStringField(fencingTokenField). + Optional(). + Description("Enforce a fencing token (base64 encoded)"). + Example("aGVsbG8gczI="), + service.NewBatchPolicyField(batchingField). + Advanced(). + LintRule( + fmt.Sprintf(` +root = if this.count > %d { + "the number of messages in a batch cannot exceed 1000" +} +root = if this.byte_size > %d { + "the amount of bytes in a batch cannot exceed 256KiB" +} + `, + s2.MaxBatchRecords, + maxBatchBytes, + ), + ), + service.NewOutputMaxInFlightField().Advanced(), + ). + Summary("Sends messages to an S2 stream."). + Description(` +Generate an authentication token by logging onto the web console at +[s2.dev](https://s2.dev/dashboard). - streamFieldSpec = service.NewStringField(streamField). - Description("Stream name") +### Metadata - outputMaxInFlightSpec = service.NewOutputMaxInFlightField() +The metadata attributes are set as S2 record headers. Currently, only string +attribute values are supported. - fencingTokenFieldSpec = service.NewStringField(fencingTokenField). - Optional(). - Description("Enforce a fencing token (base64 encoded)"). - Example("aGVsbG8gczI=") -) +### Batching -func newOutputConfigSpec() *service.ConfigSpec { - return newConfigSpec(). - Fields( - batchingFieldSpec, - streamFieldSpec, - outputMaxInFlightSpec, - fencingTokenFieldSpec, +The plugin expects batched inputs. Messages are batched automatically by Bento. + +By default, Bento disables batching based on `+"`count`"+`, `+"`byte_size`"+`, and `+"`period`"+` +parameters, but the plugin enables batching setting both `+"`count`"+` and `+"`byte_size`"+` to +the maximum values supported by S2. It also sets a flush period of `+"`5ms`"+` as a reasonable default. + +**Note:** An S2 record batch can be a maximum of 1MiB but the plugin limits the +size of a message to 256KiB since the Bento size limit doesn't take metadata into +account. Moreover, the metered size of the same Bento message will be greater +than the byte size of a Bento message. +`, ). - Summary("Sends messages to an S2 stream."). - Description(outputDescription). Example( "ASCII Starwars", "Consume a network stream into an S2 stream", - outputStarwars, + ` +input: + label: towel_blinkenlights_nl + socket: + network: tcp + address: towel.blinkenlights.nl:23 + scanner: + lines: {} + +output: + label: s2_starwars + s2: + basin: my-favorite-basin + stream: starwars + auth_token: "${S2_AUTH_TOKEN}" +`, ) } @@ -99,11 +140,6 @@ func parseBatchPolicy(conf *service.ParsedConfig) (service.BatchPolicy, error) { // Set required defaults - // Setting the batch byte size max a bit conservatively since Bento does not - // take metadata size into account. Moreover, the S2 metered size of a record - // will be > bento message size. - const maxBatchBytes = 256 * 1024 - if policy.ByteSize <= 0 { policy.ByteSize = maxBatchBytes } @@ -192,10 +228,7 @@ func (o *Output) Connect(ctx context.Context) error { func (o *Output) WriteBatch(ctx context.Context, batch service.MessageBatch) error { o.logger.Debug("Writing batch to S2") - recordBatch, err := s2.NewAppendRecordBatch() - if err != nil { - panic("empty record batch shouldn't error") - } + records := make([]s2.AppendRecord, 0, len(batch)) if err := batch.WalkWithBatchedErrors(func(_ int, m *service.Message) error { body, err := m.AsBytes() @@ -215,18 +248,21 @@ func (o *Output) WriteBatch(ctx context.Context, batch service.MessageBatch) err return err } - if !recordBatch.Append(s2.AppendRecord{ + records = append(records, s2.AppendRecord{ Headers: headers, Body: body, - }) { - return s2bentobox.ErrAppendRecordBatchFull - } + }) return nil }); err != nil { return err } + recordBatch, leftOver := s2.NewAppendRecordBatch(records...) + if len(leftOver) > 0 { + return s2bentobox.ErrAppendRecordBatchFull + } + if err := o.inner.WriteBatch(ctx, recordBatch); err != nil { if errors.Is(err, s2bentobox.ErrOutputClosed) { return service.ErrNotConnected From b70397957ebf8c856d4e4a488259aa5474582c55 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Mon, 3 Feb 2025 15:17:07 +0530 Subject: [PATCH 7/7] go back to 1.22 --- go.mod | 6 +++--- go.sum | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index c3ff35784..eef1b017f 100644 --- a/go.mod +++ b/go.mod @@ -160,8 +160,8 @@ require ( require ( github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect - github.com/s2-streamstore/optr v1.0.0 // indirect - github.com/s2-streamstore/s2-sdk-go v0.4.1 + github.com/s2-streamstore/optr v1.1.0 // indirect + github.com/s2-streamstore/s2-sdk-go v0.5.0 github.com/tidwall/btree v1.7.0 // indirect ) @@ -385,7 +385,7 @@ require ( modernc.org/token v1.1.0 // indirect ) -go 1.23.4 +go 1.22.0 // This (indirect) dependency is needed for github.com/AthenZ/athenz but the domain no longer resolves. // Remove once upstream issue fixed. See: https://github.com/AthenZ/athenz/issues/2842 diff --git a/go.sum b/go.sum index 3aff51ccb..09493e6ae 100644 --- a/go.sum +++ b/go.sum @@ -1862,10 +1862,10 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk= -github.com/s2-streamstore/optr v1.0.0 h1:OqJpOO2uYsG+iXW9mNTlsCePDvxTS8mh9EtGcnyq8w8= -github.com/s2-streamstore/optr v1.0.0/go.mod h1:x/wSUCzWgpnvWWQG4dZ4JBFW2Kw0pE0yhLGb+0xiN8g= -github.com/s2-streamstore/s2-sdk-go v0.4.1 h1:KQpu6in3D5E93CXcE0XV0WTn+fBla06n23Y4K1/ZHno= -github.com/s2-streamstore/s2-sdk-go v0.4.1/go.mod h1:qfyTLgT32aijTNQeN478iobzZ7EeWg3K8v2gxc3xVa0= +github.com/s2-streamstore/optr v1.1.0 h1:ExwYuwxb4Z1mVI+717xKUEDZ1y0ikFAV55jtk8XCYQk= +github.com/s2-streamstore/optr v1.1.0/go.mod h1:ujwXWMeanje1NE1aaaylBwmBkMalZ+wxFxSFOSHmJis= +github.com/s2-streamstore/s2-sdk-go v0.5.0 h1:IgLFvT2CtO1AE52HmZuurCRVpDz6CdfH+elFxDtUDdo= +github.com/s2-streamstore/s2-sdk-go v0.5.0/go.mod h1:VAK2MYAXfT2eMq4IpaWHRuL5ZBbf5mLyJ4X8Ee+9qdY= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=