From 52b1bd967ebea2453caac6f88e84aa8795848e34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paulo=20Sim=C3=A3o?= <102320181+paulosimao-ardanlabs@users.noreply.github.com> Date: Thu, 2 Jan 2025 14:42:01 -0300 Subject: [PATCH 1/3] feat: added interpolation to the prefix property of gcp_cloud_storage. --- internal/impl/gcp/input_cloud_storage.go | 22 ++++++---- internal/impl/gcp/input_cloud_storage_test.go | 41 +++++++++++++++++++ 2 files changed, 55 insertions(+), 8 deletions(-) create mode 100644 internal/impl/gcp/input_cloud_storage_test.go diff --git a/internal/impl/gcp/input_cloud_storage.go b/internal/impl/gcp/input_cloud_storage.go index 98fb64e74..5662fd9dc 100644 --- a/internal/impl/gcp/input_cloud_storage.go +++ b/internal/impl/gcp/input_cloud_storage.go @@ -24,7 +24,7 @@ const ( type csiConfig struct { Bucket string - Prefix string + Prefix *service.InterpolatedString DeleteObjects bool Codec codec.DeprecatedFallbackCodec } @@ -33,7 +33,7 @@ func csiConfigFromParsed(pConf *service.ParsedConfig) (conf csiConfig, err error if conf.Bucket, err = pConf.FieldString(csiFieldBucket); err != nil { return } - if conf.Prefix, err = pConf.FieldString(csiFieldPrefix); err != nil { + if conf.Prefix, err = pConf.FieldInterpolatedString(csiFieldPrefix); err != nil { return } if conf.Codec, err = codec.DeprecatedCodecFromParsed(pConf); err != nil { @@ -74,8 +74,8 @@ By default Bento will use a shared credentials file when connecting to GCP servi Fields( service.NewStringField(csiFieldBucket). Description("The name of the bucket from which to download objects."), - service.NewStringField(csiFieldPrefix). - Description("An optional path prefix, if set only objects with the prefix are consumed."). + service.NewInterpolatedStringField(csiFieldPrefix). + Description("An optional path prefix, if set only objects with the prefix are consumed - supports interpolation."). Default(""), ). Fields(codec.DeprecatedCodecFields("to_the_end")...). @@ -124,7 +124,7 @@ func newGCPCloudStorageObjectTarget(key string, ackFn service.AckFunc) *gcpCloud return &gcpCloudStorageObjectTarget{key: key, ackFn: ackFn} } -//------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------ func deleteGCPCloudStorageObjectAckFn( bucket *storage.BucketHandle, @@ -146,7 +146,7 @@ func deleteGCPCloudStorageObjectAckFn( } } -//------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------ type gcpCloudStoragePendingObject struct { target *gcpCloudStorageObjectTarget @@ -173,7 +173,13 @@ func newGCPCloudStorageTargetReader( conf: conf, } - it := bucket.Objects(ctx, &storage.Query{Prefix: conf.Prefix}) + interpolatedPrefix, err := conf.Prefix.TryString(service.NewMessage([]byte{})) + if err != nil { + return nil, fmt.Errorf("error reading interpolated prefix: %w", err) + } + + it := bucket.Objects(ctx, &storage.Query{Prefix: interpolatedPrefix}) + for count := 0; count < maxGCPCloudStorageListObjectsResults; count++ { obj, err := it.Next() if errors.Is(err, iterator.Done) { @@ -221,7 +227,7 @@ func (r gcpCloudStorageTargetReader) Close(context.Context) error { return nil } -//------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------ // gcpCloudStorage is a bento reader.Type implementation that reads messages // from a Google Cloud Storage bucket. diff --git a/internal/impl/gcp/input_cloud_storage_test.go b/internal/impl/gcp/input_cloud_storage_test.go new file mode 100644 index 000000000..6f1a302aa --- /dev/null +++ b/internal/impl/gcp/input_cloud_storage_test.go @@ -0,0 +1,41 @@ +package gcp + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/warpstreamlabs/bento/public/service" +) + +const yamlConfig = ` + bucket: bkt + prefix: ${!a * 2} + scanner: + to_the_end: { }` + +const msgBody = `{"a":1,"b":"asd"}` + +func gcpReaderConfig(t *testing.T, yamlStr string) csiConfig { + t.Helper() + spec := csiSpec() + conf, err := spec.ParseYAML(yamlStr, service.GlobalEnvironment()) + require.NoError(t, err) + + ret, err := csiConfigFromParsed(conf) + require.NoError(t, err) + + return ret +} + +func Test_csiConfigFromParsed(t *testing.T) { + cfg := gcpReaderConfig(t, yamlConfig) + + msg := service.NewMessage([]byte(msgBody)) + + pre, err := cfg.Prefix.TryString(msg) + + require.NoError(t, err) + + require.Equal(t, "2", pre) +} From a64d82dc1462f87dadfc9ea9b0a053e1f3e533ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paulo=20Sim=C3=A3o?= <102320181+paulosimao-ardanlabs@users.noreply.github.com> Date: Thu, 2 Jan 2025 14:49:14 -0300 Subject: [PATCH 2/3] feat: added interpolation to the prefix property of gcp_cloud_storage. --- website/docs/components/inputs/gcp_cloud_storage.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/website/docs/components/inputs/gcp_cloud_storage.md b/website/docs/components/inputs/gcp_cloud_storage.md index c128c7522..1e7294187 100644 --- a/website/docs/components/inputs/gcp_cloud_storage.md +++ b/website/docs/components/inputs/gcp_cloud_storage.md @@ -90,7 +90,8 @@ Type: `string` ### `prefix` -An optional path prefix, if set only objects with the prefix are consumed. +An optional path prefix, if set only objects with the prefix are consumed - supports interpolation. +This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). Type: `string` From 9cea8372c8ee6fff695a52dc65673acf77dd662b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paulo=20Sim=C3=A3o?= <102320181+paulosimao-ardanlabs@users.noreply.github.com> Date: Thu, 2 Jan 2025 16:52:04 -0300 Subject: [PATCH 3/3] chore: addressing comments from review. --- internal/impl/gcp/input_cloud_storage.go | 2 +- internal/impl/gcp/input_cloud_storage_test.go | 10 +++------- website/docs/components/inputs/gcp_cloud_storage.md | 2 +- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/internal/impl/gcp/input_cloud_storage.go b/internal/impl/gcp/input_cloud_storage.go index 5662fd9dc..7a1ae9216 100644 --- a/internal/impl/gcp/input_cloud_storage.go +++ b/internal/impl/gcp/input_cloud_storage.go @@ -75,7 +75,7 @@ By default Bento will use a shared credentials file when connecting to GCP servi service.NewStringField(csiFieldBucket). Description("The name of the bucket from which to download objects."), service.NewInterpolatedStringField(csiFieldPrefix). - Description("An optional path prefix, if set only objects with the prefix are consumed - supports interpolation."). + Description("An optional path prefix, if set only objects with the prefix are consumed."). Default(""), ). Fields(codec.DeprecatedCodecFields("to_the_end")...). diff --git a/internal/impl/gcp/input_cloud_storage_test.go b/internal/impl/gcp/input_cloud_storage_test.go index 6f1a302aa..488acec80 100644 --- a/internal/impl/gcp/input_cloud_storage_test.go +++ b/internal/impl/gcp/input_cloud_storage_test.go @@ -10,12 +10,10 @@ import ( const yamlConfig = ` bucket: bkt - prefix: ${!a * 2} + prefix: ${!2 * 2} scanner: to_the_end: { }` -const msgBody = `{"a":1,"b":"asd"}` - func gcpReaderConfig(t *testing.T, yamlStr string) csiConfig { t.Helper() spec := csiSpec() @@ -31,11 +29,9 @@ func gcpReaderConfig(t *testing.T, yamlStr string) csiConfig { func Test_csiConfigFromParsed(t *testing.T) { cfg := gcpReaderConfig(t, yamlConfig) - msg := service.NewMessage([]byte(msgBody)) - - pre, err := cfg.Prefix.TryString(msg) + pre, err := cfg.Prefix.TryString(service.NewMessage([]byte{})) require.NoError(t, err) - require.Equal(t, "2", pre) + require.Equal(t, "4", pre) } diff --git a/website/docs/components/inputs/gcp_cloud_storage.md b/website/docs/components/inputs/gcp_cloud_storage.md index 1e7294187..953e76282 100644 --- a/website/docs/components/inputs/gcp_cloud_storage.md +++ b/website/docs/components/inputs/gcp_cloud_storage.md @@ -90,7 +90,7 @@ Type: `string` ### `prefix` -An optional path prefix, if set only objects with the prefix are consumed - supports interpolation. +An optional path prefix, if set only objects with the prefix are consumed. This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries).