From d62a86cb12a374f15e2393b2d3da8a2e6f1dfad1 Mon Sep 17 00:00:00 2001 From: Seyed Sajjad Tak Tehrani Date: Wed, 29 Jan 2025 18:04:00 +0100 Subject: [PATCH 1/2] feat(token): add support for eks pod identity cred provider --- kafka/config.go | 60 ++++++++++++++++++++++++++++++----------------- kafka/provider.go | 54 ++++++++++++++++++++++++++---------------- 2 files changed, 73 insertions(+), 41 deletions(-) diff --git a/kafka/config.go b/kafka/config.go index 306d2eb7..6e568a80 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -13,32 +13,35 @@ import ( "github.com/IBM/sarama" "github.com/aws/aws-msk-iam-sasl-signer-go/signer" "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/credentials/endpointcreds" "golang.org/x/net/proxy" "golang.org/x/oauth2" "golang.org/x/oauth2/clientcredentials" ) type Config struct { - BootstrapServers *[]string - Timeout int - CACert string - ClientCert string - ClientCertKey string - ClientCertKeyPassphrase string - KafkaVersion string - TLSEnabled bool - SkipTLSVerify bool - SASLUsername string - SASLPassword string - SASLMechanism string - SASLAWSRegion string - SASLAWSRoleArn string - SASLAWSProfile string - SASLAWSAccessKey string - SASLAWSSecretKey string - SASLAWSToken string - SASLAWSCredsDebug bool - SASLTokenUrl string + BootstrapServers *[]string + Timeout int + CACert string + ClientCert string + ClientCertKey string + ClientCertKeyPassphrase string + KafkaVersion string + TLSEnabled bool + SkipTLSVerify bool + SASLUsername string + SASLPassword string + SASLMechanism string + SASLAWSContainerAuthorizationTokenFile string + SASLAWSContainerCredentialsFullUri string + SASLAWSRegion string + SASLAWSRoleArn string + SASLAWSProfile string + SASLAWSAccessKey string + SASLAWSSecretKey string + SASLAWSToken string + SASLAWSCredsDebug bool + SASLTokenUrl string } type OAuth2Config interface { @@ -85,7 +88,20 @@ func (c *Config) Token() (*sarama.AccessToken, error) { signer.AwsDebugCreds = c.SASLAWSCredsDebug var token string var err error - if c.SASLAWSRoleArn != "" { + + if c.SASLAWSContainerAuthorizationTokenFile != "" && c.SASLAWSContainerCredentialsFullUri != "" { + log.Printf("[INFO] Generating auth token using container credentials in '%s'", c.SASLAWSRegion) + var containerAuthorizationToken []byte + containerAuthorizationToken, err = os.ReadFile(c.SASLAWSContainerAuthorizationTokenFile) + if err != nil { + return nil, fmt.Errorf("failed to read authorization token file: %w", err) + } + tokenOpt := func(o *endpointcreds.Options) { + o.AuthorizationToken = string(containerAuthorizationToken) + } + credProvider := endpointcreds.New(c.SASLAWSContainerCredentialsFullUri, tokenOpt) + token, _, err = signer.GenerateAuthTokenFromCredentialsProvider(context.TODO(), c.SASLAWSRegion, credProvider) + } else if c.SASLAWSRoleArn != "" { log.Printf("[INFO] Generating auth token with a role '%s' in '%s'", c.SASLAWSRoleArn, c.SASLAWSRegion) token, _, err = signer.GenerateAuthTokenFromRole(context.TODO(), c.SASLAWSRegion, c.SASLAWSRoleArn, "terraform-kafka-provider") } else if c.SASLAWSProfile != "" { @@ -303,6 +319,8 @@ func (config *Config) copyWithMaskedSensitiveValues() Config { config.SASLUsername, "*****", config.SASLMechanism, + config.SASLAWSContainerAuthorizationTokenFile, + config.SASLAWSContainerCredentialsFullUri, config.SASLAWSRegion, config.SASLAWSRoleArn, config.SASLAWSProfile, diff --git a/kafka/provider.go b/kafka/provider.go index 11236278..31d814a8 100644 --- a/kafka/provider.go +++ b/kafka/provider.go @@ -73,6 +73,18 @@ func Provider() *schema.Provider { DefaultFunc: schema.EnvDefaultFunc("KAFKA_VERSION", "2.7.0"), Description: "The version of Kafka protocol to use in `$MAJOR.$MINOR.$PATCH` format. Some features may not be available on older versions. Default is 2.7.0.", }, + "sasl_aws_container_authorization_token_file": { + Type: schema.TypeString, + Optional: true, + DefaultFunc: schema.EnvDefaultFunc("AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE", nil), + Description: "Path to a file containing the AWS pod identity authorization token", + }, + "sasl_aws_container_credentials_full_uri": { + Type: schema.TypeString, + Optional: true, + DefaultFunc: schema.EnvDefaultFunc("AWS_CONTAINER_CREDENTIALS_FULL_URI", nil), + Description: "URI to retrieve AWS credentials from", + }, "sasl_aws_role_arn": { Type: schema.TypeString, Optional: true, @@ -179,26 +191,28 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) { } config := &Config{ - BootstrapServers: brokers, - CACert: d.Get("ca_cert").(string), - ClientCert: d.Get("client_cert").(string), - ClientCertKey: d.Get("client_key").(string), - ClientCertKeyPassphrase: d.Get("client_key_passphrase").(string), - KafkaVersion: d.Get("kafka_version").(string), - SkipTLSVerify: d.Get("skip_tls_verify").(bool), - SASLAWSRegion: d.Get("sasl_aws_region").(string), - SASLUsername: d.Get("sasl_username").(string), - SASLPassword: d.Get("sasl_password").(string), - SASLTokenUrl: d.Get("sasl_token_url").(string), - SASLAWSRoleArn: d.Get("sasl_aws_role_arn").(string), - SASLAWSProfile: d.Get("sasl_aws_profile").(string), - SASLAWSAccessKey: d.Get("sasl_aws_access_key").(string), - SASLAWSSecretKey: d.Get("sasl_aws_secret_key").(string), - SASLAWSToken: d.Get("sasl_aws_token").(string), - SASLAWSCredsDebug: d.Get("sasl_aws_creds_debug").(bool), - SASLMechanism: saslMechanism, - TLSEnabled: d.Get("tls_enabled").(bool), - Timeout: d.Get("timeout").(int), + BootstrapServers: brokers, + CACert: d.Get("ca_cert").(string), + ClientCert: d.Get("client_cert").(string), + ClientCertKey: d.Get("client_key").(string), + ClientCertKeyPassphrase: d.Get("client_key_passphrase").(string), + KafkaVersion: d.Get("kafka_version").(string), + SkipTLSVerify: d.Get("skip_tls_verify").(bool), + SASLAWSRegion: d.Get("sasl_aws_region").(string), + SASLAWSContainerAuthorizationTokenFile: d.Get("sasl_aws_container_authorization_token_file").(string), + SASLAWSContainerCredentialsFullUri: d.Get("sasl_aws_container_credentials_full_uri").(string), + SASLUsername: d.Get("sasl_username").(string), + SASLPassword: d.Get("sasl_password").(string), + SASLTokenUrl: d.Get("sasl_token_url").(string), + SASLAWSRoleArn: d.Get("sasl_aws_role_arn").(string), + SASLAWSProfile: d.Get("sasl_aws_profile").(string), + SASLAWSAccessKey: d.Get("sasl_aws_access_key").(string), + SASLAWSSecretKey: d.Get("sasl_aws_secret_key").(string), + SASLAWSToken: d.Get("sasl_aws_token").(string), + SASLAWSCredsDebug: d.Get("sasl_aws_creds_debug").(bool), + SASLMechanism: saslMechanism, + TLSEnabled: d.Get("tls_enabled").(bool), + Timeout: d.Get("timeout").(int), } if config.CACert == "" { From 00f37d667dc53a4d805bf4920768c875d5835678 Mon Sep 17 00:00:00 2001 From: Seyed Sajjad Tak Tehrani Date: Wed, 29 Jan 2025 21:37:27 +0100 Subject: [PATCH 2/2] doc(readme): introduce the new cred provider --- README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/README.md b/README.md index bf01539d..ef29a628 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,16 @@ provider "kafka" { sasl_aws_region = "us-east-1" } ``` + +Example provider with aws-iam(Container Creds) client authentication. You have to export `AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE` and `AWS_CONTAINER_CREDENTIALS_FULL_URI` +```hcl +provider "kafka" { + bootstrap_servers = ["localhost:9098"] + tls_enabled = true + sasl_mechanism = "aws-iam" + sasl_aws_region = "us-east-1" +} +``` #### Compatibility with Redpanda ```hcl @@ -151,6 +161,8 @@ Due to Redpanda not implementing some Metadata APIs, we need to force the Kafka | `sasl_password` | Password for SASL authentication. | `""` | | `sasl_mechanism` | Mechanism for SASL authentication. Allowed values are `plain`, `aws-iam`, `scram-sha256`, `scram-sha512` or `oauthbearer` | `plain` | | `sasl_aws_region` | AWS region for IAM authentication. | `""` | +| `sasl_aws_container_authorization_token_file` | Path to a file containing the AWS pod identity authorization token. | `""` | +| `sasl_aws_container_credentials_full_uri` | URI to retrieve AWS credentials from. | `""` | | `sasl_aws_role_arn` | Arn of AWS IAM role to assume for IAM authentication. | `""` | | `sasl_aws_profile` | AWS profile to use for IAM authentication. | `""` | | `sasl_aws_access_key` | AWS access key. | `""` |