diff --git a/internal/impl/gcp/input_cloud_storage.go b/internal/impl/gcp/input_cloud_storage.go index 98fb64e74..7a1ae9216 100644 --- a/internal/impl/gcp/input_cloud_storage.go +++ b/internal/impl/gcp/input_cloud_storage.go @@ -24,7 +24,7 @@ const ( type csiConfig struct { Bucket string - Prefix string + Prefix *service.InterpolatedString DeleteObjects bool Codec codec.DeprecatedFallbackCodec } @@ -33,7 +33,7 @@ func csiConfigFromParsed(pConf *service.ParsedConfig) (conf csiConfig, err error if conf.Bucket, err = pConf.FieldString(csiFieldBucket); err != nil { return } - if conf.Prefix, err = pConf.FieldString(csiFieldPrefix); err != nil { + if conf.Prefix, err = pConf.FieldInterpolatedString(csiFieldPrefix); err != nil { return } if conf.Codec, err = codec.DeprecatedCodecFromParsed(pConf); err != nil { @@ -74,7 +74,7 @@ By default Bento will use a shared credentials file when connecting to GCP servi Fields( service.NewStringField(csiFieldBucket). Description("The name of the bucket from which to download objects."), - service.NewStringField(csiFieldPrefix). + service.NewInterpolatedStringField(csiFieldPrefix). Description("An optional path prefix, if set only objects with the prefix are consumed."). Default(""), ). @@ -124,7 +124,7 @@ func newGCPCloudStorageObjectTarget(key string, ackFn service.AckFunc) *gcpCloud return &gcpCloudStorageObjectTarget{key: key, ackFn: ackFn} } -//------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------ func deleteGCPCloudStorageObjectAckFn( bucket *storage.BucketHandle, @@ -146,7 +146,7 @@ func deleteGCPCloudStorageObjectAckFn( } } -//------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------ type gcpCloudStoragePendingObject struct { target *gcpCloudStorageObjectTarget @@ -173,7 +173,13 @@ func newGCPCloudStorageTargetReader( conf: conf, } - it := bucket.Objects(ctx, &storage.Query{Prefix: conf.Prefix}) + interpolatedPrefix, err := conf.Prefix.TryString(service.NewMessage([]byte{})) + if err != nil { + return nil, fmt.Errorf("error reading interpolated prefix: %w", err) + } + + it := bucket.Objects(ctx, &storage.Query{Prefix: interpolatedPrefix}) + for count := 0; count < maxGCPCloudStorageListObjectsResults; count++ { obj, err := it.Next() if errors.Is(err, iterator.Done) { @@ -221,7 +227,7 @@ func (r gcpCloudStorageTargetReader) Close(context.Context) error { return nil } -//------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------ // gcpCloudStorage is a bento reader.Type implementation that reads messages // from a Google Cloud Storage bucket. diff --git a/internal/impl/gcp/input_cloud_storage_test.go b/internal/impl/gcp/input_cloud_storage_test.go new file mode 100644 index 000000000..488acec80 --- /dev/null +++ b/internal/impl/gcp/input_cloud_storage_test.go @@ -0,0 +1,37 @@ +package gcp + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/warpstreamlabs/bento/public/service" +) + +const yamlConfig = ` + bucket: bkt + prefix: ${!2 * 2} + scanner: + to_the_end: { }` + +func gcpReaderConfig(t *testing.T, yamlStr string) csiConfig { + t.Helper() + spec := csiSpec() + conf, err := spec.ParseYAML(yamlStr, service.GlobalEnvironment()) + require.NoError(t, err) + + ret, err := csiConfigFromParsed(conf) + require.NoError(t, err) + + return ret +} + +func Test_csiConfigFromParsed(t *testing.T) { + cfg := gcpReaderConfig(t, yamlConfig) + + pre, err := cfg.Prefix.TryString(service.NewMessage([]byte{})) + + require.NoError(t, err) + + require.Equal(t, "4", pre) +} diff --git a/website/docs/components/inputs/gcp_cloud_storage.md b/website/docs/components/inputs/gcp_cloud_storage.md index c128c7522..953e76282 100644 --- a/website/docs/components/inputs/gcp_cloud_storage.md +++ b/website/docs/components/inputs/gcp_cloud_storage.md @@ -91,6 +91,7 @@ Type: `string` ### `prefix` An optional path prefix, if set only objects with the prefix are consumed. +This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). Type: `string`