Skip to content

Commit

Permalink
AWS IAM authentication
Browse files Browse the repository at this point in the history
AWS have now provided a library
https://github.com/aws/aws-msk-iam-sasl-signer-go to provide IAM
authentication with MSK clusters.

This should be very similar to #242 in operation, but by using this library there is
much less code to maintain here.
  • Loading branch information
errm authored and Mongey committed Jan 28, 2024
1 parent 74c5627 commit 08ddb3c
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 8 deletions.
13 changes: 13 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ require (
github.com/agext/levenshtein v1.2.2 // indirect
github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect
github.com/armon/go-radix v1.0.0 // indirect
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.28 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/bgentry/speakeasy v0.1.0 // indirect
github.com/cloudflare/circl v1.3.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
30 changes: 30 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,32 @@ github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmms
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 h1:UyjtGmO0Uwl/K+zpzPwLoXzMhcN9xmnR2nrqJoBrg3c=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0/go.mod h1:TJAXuFs2HcMib3sN5L0gUC+Q01Qvy3DemvA55WuC+iA=
github.com/aws/aws-sdk-go-v2 v1.19.0 h1:klAT+y3pGFBU/qVf1uzwttpBbiuozJYWzNLHioyDJ+k=
github.com/aws/aws-sdk-go-v2 v1.19.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/config v1.18.28 h1:TINEaKyh1Td64tqFvn09iYpKiWjmHYrG1fa91q2gnqw=
github.com/aws/aws-sdk-go-v2/config v1.18.28/go.mod h1:nIL+4/8JdAuNHEjn/gPEXqtnS02Q3NXB/9Z7o5xE4+A=
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 h1:dz0yr/yR1jweAnsCx+BmjerUILVPQ6FS5AwF/OyG1kA=
github.com/aws/aws-sdk-go-v2/credentials v1.13.27/go.mod h1:syOqAek45ZXZp29HlnRS/BNgMIW6uiRmeuQsz4Qh2UE=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 h1:kP3Me6Fy3vdi+9uHd7YLr6ewPxRL+PU6y15urfTaamU=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5/go.mod h1:Gj7tm95r+QsDoN2Fhuz/3npQvcZbkEf5mL70n3Xfluc=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 h1:hMUCiE3Zi5AHrRNGf5j985u0WyqI6r2NULhUfo0N/No=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35/go.mod h1:ipR5PvpSPqIqL5Mi82BxLnfMkHVbmco8kUwO2xrCi0M=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 h1:yOpYx+FTBdpk/g+sBU6Cb1H0U/TLEcYYp66mYqsPpcc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29/go.mod h1:M/eUABlDbw2uVrdAn+UsI6M727qp2fxkp8K0ejcBDUY=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 h1:8r5m1BoAWkn0TDC34lUculryf7nUF25EgIMdjvGCkgo=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36/go.mod h1:Rmw2M1hMVTwiUhjwMoIBFWFJMhvJbct06sSidxInkhY=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 h1:IiDolu/eLmuB18DRZibj77n1hHQT7z12jnGO7Ze3pLc=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29/go.mod h1:fDbkK4o7fpPXWn8YAPmTieAMuB9mk/VgvW64uaUqxd4=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 h1:sWDv7cMITPcZ21QdreULwxOOAmE05JjEsT6fCDtDA9k=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.13/go.mod h1:DfX0sWuT46KpcqbMhJ9QWtxAIP1VozkDWf8VAkByjYY=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 h1:BFubHS/xN5bjl818QaroN6mQdjneYQ+AOx44KNXlyH4=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13/go.mod h1:BzqsVVFduubEmzrVtUFQQIQdFqvUItF8XUq2EnS8Wog=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 h1:e5mnydVdCVWxP+5rPAGi2PYxC7u2OZgH1ypC114H04U=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.3/go.mod h1:yVGZA1CPkmUhBdA039jXNJJG7/6t+G+EBWmFq23xqnY=
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA=
Expand Down Expand Up @@ -55,6 +81,7 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -130,6 +157,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
Expand Down Expand Up @@ -300,6 +329,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
2 changes: 1 addition & 1 deletion kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func (c *Client) versionForKey(apiKey, wantedMaxVersion int) int {
return 0
}

//topicConfig retrives the non-default config map for a topic
// topicConfig retrives the non-default config map for a topic
func (c *Client) topicConfig(topic string) (map[string]*string, error) {
conf := map[string]*string{}
request := &sarama.DescribeConfigsRequest{
Expand Down
37 changes: 33 additions & 4 deletions kafka/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/pem"
Expand All @@ -10,6 +11,7 @@ import (
"time"

"github.com/IBM/sarama"
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
"golang.org/x/net/proxy"
)

Expand All @@ -25,6 +27,16 @@ type Config struct {
SASLUsername string
SASLPassword string
SASLMechanism string
SASLAWSRegion string
}

type MSKAccessTokenProvider struct {
region string
}

func (m *MSKAccessTokenProvider) Token() (*sarama.AccessToken, error) {
token, _, err := signer.GenerateAuthToken(context.TODO(), m.region)
return &sarama.AccessToken{Token: token}, err
}

func (c *Config) newKafkaConfig() (*sarama.Config, error) {
Expand All @@ -46,14 +58,30 @@ func (c *Config) newKafkaConfig() (*sarama.Config, error) {
case "scram-sha256":
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
kafkaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
case "aws-iam":
kafkaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeOAuth)
region := c.SASLAWSRegion
if region == "" {
region = os.Getenv("AWS_REGION")
}
if region == "" {
log.Fatalf("[ERROR] aws region must be configured or AWS_REGION environment variable must be set to use aws-iam sasl mechanism")
}
kafkaConfig.Net.SASL.TokenProvider = &MSKAccessTokenProvider{region: region}
case "plain":
default:
log.Fatalf("[ERROR] Invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\" or \"plain\"", c.SASLMechanism)
log.Fatalf("[ERROR] Invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\", \"aws-iam\" or \"plain\"", c.SASLMechanism)
}

kafkaConfig.Net.SASL.Enable = true
kafkaConfig.Net.SASL.Password = c.SASLPassword
kafkaConfig.Net.SASL.User = c.SASLUsername
kafkaConfig.Net.SASL.Handshake = true

if c.SASLUsername != "" {
kafkaConfig.Net.SASL.User = c.SASLUsername
}
if c.SASLPassword != "" {
kafkaConfig.Net.SASL.Password = c.SASLPassword
}
} else {
log.Printf("[WARN] SASL disabled username: '%s', password '%s'", c.SASLUsername, "****")
}
Expand All @@ -79,7 +107,7 @@ func (c *Config) newKafkaConfig() (*sarama.Config, error) {
}

func (c *Config) saslEnabled() bool {
return c.SASLUsername != "" || c.SASLPassword != ""
return c.SASLUsername != "" || c.SASLPassword != "" || c.SASLMechanism == "aws-iam"
}

func NewTLSConfig(clientCert, clientKey, caCert, clientKeyPassphrase string) (*tls.Config, error) {
Expand Down Expand Up @@ -181,6 +209,7 @@ func (config *Config) copyWithMaskedSensitiveValues() Config {
"*****",
config.TLSEnabled,
config.SkipTLSVerify,
config.SASLAWSRegion,
config.SASLUsername,
"*****",
config.SASLMechanism,
Expand Down
13 changes: 10 additions & 3 deletions kafka/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ func Provider() *schema.Provider {
DefaultFunc: schema.EnvDefaultFunc("KAFKA_CLIENT_KEY_PASSPHRASE", nil),
Description: "The passphrase for the private key that the certificate was issued for.",
},
"sasl_aws_region": &schema.Schema{
Type: schema.TypeString,
Optional: true,
DefaultFunc: schema.EnvDefaultFunc("KAFKA_SASL_IAM_AWS_REGION", nil),
Description: "AWS region where MSK is deployed.",
},
"sasl_username": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Expand All @@ -77,7 +83,7 @@ func Provider() *schema.Provider {
Type: schema.TypeString,
Optional: true,
DefaultFunc: schema.EnvDefaultFunc("KAFKA_SASL_MECHANISM", "plain"),
Description: "SASL mechanism, can be plain, scram-sha512, scram-sha256",
Description: "SASL mechanism, can be plain, scram-sha512, scram-sha256, aws-iam",
},
"skip_tls_verify": &schema.Schema{
Type: schema.TypeBool,
Expand Down Expand Up @@ -119,9 +125,9 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) {

saslMechanism := d.Get("sasl_mechanism").(string)
switch saslMechanism {
case "scram-sha512", "scram-sha256", "plain":
case "scram-sha512", "scram-sha256", "aws-iam", "plain":
default:
return nil, fmt.Errorf("[ERROR] Invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\" or \"plain\"", saslMechanism)
return nil, fmt.Errorf("[ERROR] Invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\", \"aws-iam\" or \"plain\"", saslMechanism)
}

config := &Config{
Expand All @@ -131,6 +137,7 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) {
ClientCertKey: d.Get("client_key").(string),
ClientCertKeyPassphrase: d.Get("client_key_passphrase").(string),
SkipTLSVerify: d.Get("skip_tls_verify").(bool),
SASLAWSRegion: d.Get("sasl_aws_region").(string),
SASLUsername: d.Get("sasl_username").(string),
SASLPassword: d.Get("sasl_password").(string),
SASLMechanism: saslMechanism,
Expand Down

0 comments on commit 08ddb3c

Please sign in to comment.