Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added interpolation to the prefix property of gcp_cloud_storage. #203

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions internal/impl/gcp/input_cloud_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (

type csiConfig struct {
Bucket string
Prefix string
Prefix *service.InterpolatedString
DeleteObjects bool
Codec codec.DeprecatedFallbackCodec
}
Expand All @@ -33,7 +33,7 @@ func csiConfigFromParsed(pConf *service.ParsedConfig) (conf csiConfig, err error
if conf.Bucket, err = pConf.FieldString(csiFieldBucket); err != nil {
return
}
if conf.Prefix, err = pConf.FieldString(csiFieldPrefix); err != nil {
if conf.Prefix, err = pConf.FieldInterpolatedString(csiFieldPrefix); err != nil {
return
}
if conf.Codec, err = codec.DeprecatedCodecFromParsed(pConf); err != nil {
Expand Down Expand Up @@ -74,7 +74,7 @@ By default Bento will use a shared credentials file when connecting to GCP servi
Fields(
service.NewStringField(csiFieldBucket).
Description("The name of the bucket from which to download objects."),
service.NewStringField(csiFieldPrefix).
service.NewInterpolatedStringField(csiFieldPrefix).
Description("An optional path prefix, if set only objects with the prefix are consumed.").
Default(""),
).
Expand Down Expand Up @@ -124,7 +124,7 @@ func newGCPCloudStorageObjectTarget(key string, ackFn service.AckFunc) *gcpCloud
return &gcpCloudStorageObjectTarget{key: key, ackFn: ackFn}
}

//------------------------------------------------------------------------------
// ------------------------------------------------------------------------------

func deleteGCPCloudStorageObjectAckFn(
bucket *storage.BucketHandle,
Expand All @@ -146,7 +146,7 @@ func deleteGCPCloudStorageObjectAckFn(
}
}

//------------------------------------------------------------------------------
// ------------------------------------------------------------------------------

type gcpCloudStoragePendingObject struct {
target *gcpCloudStorageObjectTarget
Expand All @@ -173,7 +173,13 @@ func newGCPCloudStorageTargetReader(
conf: conf,
}

it := bucket.Objects(ctx, &storage.Query{Prefix: conf.Prefix})
interpolatedPrefix, err := conf.Prefix.TryString(service.NewMessage([]byte{}))
if err != nil {
return nil, fmt.Errorf("error reading interpolated prefix: %w", err)
}

it := bucket.Objects(ctx, &storage.Query{Prefix: interpolatedPrefix})

for count := 0; count < maxGCPCloudStorageListObjectsResults; count++ {
obj, err := it.Next()
if errors.Is(err, iterator.Done) {
Expand Down Expand Up @@ -221,7 +227,7 @@ func (r gcpCloudStorageTargetReader) Close(context.Context) error {
return nil
}

//------------------------------------------------------------------------------
// ------------------------------------------------------------------------------

// gcpCloudStorage is a bento reader.Type implementation that reads messages
// from a Google Cloud Storage bucket.
Expand Down
37 changes: 37 additions & 0 deletions internal/impl/gcp/input_cloud_storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package gcp

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/warpstreamlabs/bento/public/service"
)

const yamlConfig = `
bucket: bkt
prefix: ${!2 * 2}
scanner:
to_the_end: { }`

func gcpReaderConfig(t *testing.T, yamlStr string) csiConfig {
t.Helper()
spec := csiSpec()
conf, err := spec.ParseYAML(yamlStr, service.GlobalEnvironment())
require.NoError(t, err)

ret, err := csiConfigFromParsed(conf)
require.NoError(t, err)

return ret
}

func Test_csiConfigFromParsed(t *testing.T) {
cfg := gcpReaderConfig(t, yamlConfig)

pre, err := cfg.Prefix.TryString(service.NewMessage([]byte{}))

require.NoError(t, err)

require.Equal(t, "4", pre)
}
1 change: 1 addition & 0 deletions website/docs/components/inputs/gcp_cloud_storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Type: `string`
### `prefix`

An optional path prefix, if set only objects with the prefix are consumed.
This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries).


Type: `string`
Expand Down
Loading