From 08ddb3c9e1f64a80a51ad8074d7a83b9c1a9ecc3 Mon Sep 17 00:00:00 2001 From: Ed Robinson Date: Wed, 24 Jan 2024 15:27:59 +0000 Subject: [PATCH] AWS IAM authentication AWS have now provided a library https://github.com/aws/aws-msk-iam-sasl-signer-go to provide IAM authentication with MSK clusters. This should be very similar to #242 in operation, but by using this library there is much less code to maintain here. --- go.mod | 13 +++++++++++++ go.sum | 30 ++++++++++++++++++++++++++++++ kafka/client.go | 2 +- kafka/config.go | 37 +++++++++++++++++++++++++++++++++---- kafka/provider.go | 13 ++++++++++--- 5 files changed, 87 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index dac6aa5c..167006ce 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,19 @@ require ( github.com/agext/levenshtein v1.2.2 // indirect github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect github.com/armon/go-radix v1.0.0 // indirect + github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect + github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect + github.com/aws/aws-sdk-go-v2/config v1.18.28 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect + github.com/aws/smithy-go v1.13.5 // indirect github.com/bgentry/speakeasy v0.1.0 // indirect github.com/cloudflare/circl v1.3.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 8f9d3e41..d949e334 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,32 @@ github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmms github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 h1:UyjtGmO0Uwl/K+zpzPwLoXzMhcN9xmnR2nrqJoBrg3c= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0/go.mod h1:TJAXuFs2HcMib3sN5L0gUC+Q01Qvy3DemvA55WuC+iA= +github.com/aws/aws-sdk-go-v2 v1.19.0 h1:klAT+y3pGFBU/qVf1uzwttpBbiuozJYWzNLHioyDJ+k= +github.com/aws/aws-sdk-go-v2 v1.19.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2/config v1.18.28 h1:TINEaKyh1Td64tqFvn09iYpKiWjmHYrG1fa91q2gnqw= +github.com/aws/aws-sdk-go-v2/config v1.18.28/go.mod h1:nIL+4/8JdAuNHEjn/gPEXqtnS02Q3NXB/9Z7o5xE4+A= +github.com/aws/aws-sdk-go-v2/credentials v1.13.27 h1:dz0yr/yR1jweAnsCx+BmjerUILVPQ6FS5AwF/OyG1kA= +github.com/aws/aws-sdk-go-v2/credentials v1.13.27/go.mod h1:syOqAek45ZXZp29HlnRS/BNgMIW6uiRmeuQsz4Qh2UE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 h1:kP3Me6Fy3vdi+9uHd7YLr6ewPxRL+PU6y15urfTaamU= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5/go.mod h1:Gj7tm95r+QsDoN2Fhuz/3npQvcZbkEf5mL70n3Xfluc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 h1:hMUCiE3Zi5AHrRNGf5j985u0WyqI6r2NULhUfo0N/No= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35/go.mod h1:ipR5PvpSPqIqL5Mi82BxLnfMkHVbmco8kUwO2xrCi0M= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 h1:yOpYx+FTBdpk/g+sBU6Cb1H0U/TLEcYYp66mYqsPpcc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29/go.mod h1:M/eUABlDbw2uVrdAn+UsI6M727qp2fxkp8K0ejcBDUY= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 h1:8r5m1BoAWkn0TDC34lUculryf7nUF25EgIMdjvGCkgo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36/go.mod h1:Rmw2M1hMVTwiUhjwMoIBFWFJMhvJbct06sSidxInkhY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 h1:IiDolu/eLmuB18DRZibj77n1hHQT7z12jnGO7Ze3pLc= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29/go.mod h1:fDbkK4o7fpPXWn8YAPmTieAMuB9mk/VgvW64uaUqxd4= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 h1:sWDv7cMITPcZ21QdreULwxOOAmE05JjEsT6fCDtDA9k= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.13/go.mod h1:DfX0sWuT46KpcqbMhJ9QWtxAIP1VozkDWf8VAkByjYY= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 h1:BFubHS/xN5bjl818QaroN6mQdjneYQ+AOx44KNXlyH4= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13/go.mod h1:BzqsVVFduubEmzrVtUFQQIQdFqvUItF8XUq2EnS8Wog= +github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 h1:e5mnydVdCVWxP+5rPAGi2PYxC7u2OZgH1ypC114H04U= +github.com/aws/aws-sdk-go-v2/service/sts v1.19.3/go.mod h1:yVGZA1CPkmUhBdA039jXNJJG7/6t+G+EBWmFq23xqnY= +github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= +github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA= @@ -55,6 +81,7 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -130,6 +157,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= @@ -300,6 +329,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/kafka/client.go b/kafka/client.go index 93293a70..a7affbdf 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -543,7 +543,7 @@ func (c *Client) versionForKey(apiKey, wantedMaxVersion int) int { return 0 } -//topicConfig retrives the non-default config map for a topic +// topicConfig retrives the non-default config map for a topic func (c *Client) topicConfig(topic string) (map[string]*string, error) { conf := map[string]*string{} request := &sarama.DescribeConfigsRequest{ diff --git a/kafka/config.go b/kafka/config.go index e0028f93..0d58de24 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -1,6 +1,7 @@ package kafka import ( + "context" "crypto/tls" "crypto/x509" "encoding/pem" @@ -10,6 +11,7 @@ import ( "time" "github.com/IBM/sarama" + "github.com/aws/aws-msk-iam-sasl-signer-go/signer" "golang.org/x/net/proxy" ) @@ -25,6 +27,16 @@ type Config struct { SASLUsername string SASLPassword string SASLMechanism string + SASLAWSRegion string +} + +type MSKAccessTokenProvider struct { + region string +} + +func (m *MSKAccessTokenProvider) Token() (*sarama.AccessToken, error) { + token, _, err := signer.GenerateAuthToken(context.TODO(), m.region) + return &sarama.AccessToken{Token: token}, err } func (c *Config) newKafkaConfig() (*sarama.Config, error) { @@ -46,14 +58,30 @@ func (c *Config) newKafkaConfig() (*sarama.Config, error) { case "scram-sha256": kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} } kafkaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256) + case "aws-iam": + kafkaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeOAuth) + region := c.SASLAWSRegion + if region == "" { + region = os.Getenv("AWS_REGION") + } + if region == "" { + log.Fatalf("[ERROR] aws region must be configured or AWS_REGION environment variable must be set to use aws-iam sasl mechanism") + } + kafkaConfig.Net.SASL.TokenProvider = &MSKAccessTokenProvider{region: region} case "plain": default: - log.Fatalf("[ERROR] Invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\" or \"plain\"", c.SASLMechanism) + log.Fatalf("[ERROR] Invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\", \"aws-iam\" or \"plain\"", c.SASLMechanism) } + kafkaConfig.Net.SASL.Enable = true - kafkaConfig.Net.SASL.Password = c.SASLPassword - kafkaConfig.Net.SASL.User = c.SASLUsername kafkaConfig.Net.SASL.Handshake = true + + if c.SASLUsername != "" { + kafkaConfig.Net.SASL.User = c.SASLUsername + } + if c.SASLPassword != "" { + kafkaConfig.Net.SASL.Password = c.SASLPassword + } } else { log.Printf("[WARN] SASL disabled username: '%s', password '%s'", c.SASLUsername, "****") } @@ -79,7 +107,7 @@ func (c *Config) newKafkaConfig() (*sarama.Config, error) { } func (c *Config) saslEnabled() bool { - return c.SASLUsername != "" || c.SASLPassword != "" + return c.SASLUsername != "" || c.SASLPassword != "" || c.SASLMechanism == "aws-iam" } func NewTLSConfig(clientCert, clientKey, caCert, clientKeyPassphrase string) (*tls.Config, error) { @@ -181,6 +209,7 @@ func (config *Config) copyWithMaskedSensitiveValues() Config { "*****", config.TLSEnabled, config.SkipTLSVerify, + config.SASLAWSRegion, config.SASLUsername, "*****", config.SASLMechanism, diff --git a/kafka/provider.go b/kafka/provider.go index d4239fc5..a071d5e4 100644 --- a/kafka/provider.go +++ b/kafka/provider.go @@ -61,6 +61,12 @@ func Provider() *schema.Provider { DefaultFunc: schema.EnvDefaultFunc("KAFKA_CLIENT_KEY_PASSPHRASE", nil), Description: "The passphrase for the private key that the certificate was issued for.", }, + "sasl_aws_region": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + DefaultFunc: schema.EnvDefaultFunc("KAFKA_SASL_IAM_AWS_REGION", nil), + Description: "AWS region where MSK is deployed.", + }, "sasl_username": &schema.Schema{ Type: schema.TypeString, Optional: true, @@ -77,7 +83,7 @@ func Provider() *schema.Provider { Type: schema.TypeString, Optional: true, DefaultFunc: schema.EnvDefaultFunc("KAFKA_SASL_MECHANISM", "plain"), - Description: "SASL mechanism, can be plain, scram-sha512, scram-sha256", + Description: "SASL mechanism, can be plain, scram-sha512, scram-sha256, aws-iam", }, "skip_tls_verify": &schema.Schema{ Type: schema.TypeBool, @@ -119,9 +125,9 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) { saslMechanism := d.Get("sasl_mechanism").(string) switch saslMechanism { - case "scram-sha512", "scram-sha256", "plain": + case "scram-sha512", "scram-sha256", "aws-iam", "plain": default: - return nil, fmt.Errorf("[ERROR] Invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\" or \"plain\"", saslMechanism) + return nil, fmt.Errorf("[ERROR] Invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\", \"aws-iam\" or \"plain\"", saslMechanism) } config := &Config{ @@ -131,6 +137,7 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) { ClientCertKey: d.Get("client_key").(string), ClientCertKeyPassphrase: d.Get("client_key_passphrase").(string), SkipTLSVerify: d.Get("skip_tls_verify").(bool), + SASLAWSRegion: d.Get("sasl_aws_region").(string), SASLUsername: d.Get("sasl_username").(string), SASLPassword: d.Get("sasl_password").(string), SASLMechanism: saslMechanism,