Skip to content

Commit

Permalink
config: added more grpc fields
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsains committed Nov 27, 2024
1 parent 9da47f9 commit 957dfbc
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 32 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Added support for providing `endpoint`, `pollingIntervalMs` and `initialSamplingRate` using environment variable `OTEL_TRACES_SAMPLER_ARG` in `go.opentelemetry.io/contrib/samples/jaegerremote`. (#6310)
- Added support exporting logs via OTLP over gRPC in `go.opentelemetry.io/contrib/config`. (#6340)
- Added support for configuring `Certificate` field when exporting OTLP over gRPC in `go.opentelemetry.io/contrib/config`. (#6376)
- Added support for configuring `Certificate`, `ClientCertificate`, `ClientKey`, and `HeadersList` field when exporting OTLP over gRPC in `go.opentelemetry.io/contrib/config`. (#6376) (TBD)

### Changed

Expand All @@ -39,7 +39,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Use `baggagecopy.NewLogProcessor` when configuring a Log Provider.
- `NewLogProcessor` accepts a `Filter` function type that selects which baggage members are added to the log record.

### Changed
### Changed

- Transform raw (`slog.KindAny`) attribute values to matching `log.Value` types.
For example, `[]string{"foo", "bar"}` attribute value is now transformed to `log.SliceValue(log.StringValue("foo"), log.StringValue("bar"))` instead of `log.String("[foo bar"])`. (#6254)
Expand Down
54 changes: 45 additions & 9 deletions config/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ package config // import "go.opentelemetry.io/contrib/config"

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/url"
"os"
"time"

"google.golang.org/grpc/credentials"

"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/exporters/stdout/stdoutlog"
Expand Down Expand Up @@ -179,13 +183,30 @@ func otlpGRPCLogExporter(ctx context.Context, otlpConfig *OTLP) (sdklog.Exporter
}
if u.Scheme == "http" {
opts = append(opts, otlploggrpc.WithInsecure())
}
if otlpConfig.Certificate != nil {
creds, err := credentials.NewClientTLSFromFile(*otlpConfig.Certificate, "")
if err != nil {
return nil, fmt.Errorf("could not create client tls credentials: %w", err)
} else {
tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12}
if otlpConfig.Certificate != nil {
caText, err := os.ReadFile(*otlpConfig.Certificate)
if err != nil {
return nil, err
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caText) {
return nil, errors.New("could not create certificate authority chain from certificate")
}
tlsConfig.RootCAs = certPool
}
opts = append(opts, otlploggrpc.WithTLSCredentials(creds))
if otlpConfig.ClientCertificate != nil {
if otlpConfig.ClientKey == nil {
return nil, errors.New("client certificate was provided but no client key was provided")
}
clientCert, err := tls.LoadX509KeyPair(*otlpConfig.ClientCertificate, *otlpConfig.ClientKey)
if err != nil {
return nil, fmt.Errorf("could not use client certificate: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{clientCert}
}
opts = append(opts, otlploggrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
}
}
if otlpConfig.Compression != nil {
Expand All @@ -201,9 +222,24 @@ func otlpGRPCLogExporter(ctx context.Context, otlpConfig *OTLP) (sdklog.Exporter
if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 {
opts = append(opts, otlploggrpc.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout)))
}
if len(otlpConfig.Headers) > 0 {
opts = append(opts, otlploggrpc.WithHeaders(toStringMap(otlpConfig.Headers)))
}
if otlpConfig.HeadersList != nil || len(otlpConfig.Headers) > 0 {
headers := make(map[string]string, len(otlpConfig.Headers))

// Headers has priority over HeaderList, so we parse HeadersList first, and then upsert all items in Headers
if otlpConfig.HeadersList != nil {
bag, err := baggage.Parse(*otlpConfig.HeadersList)
if err != nil {
return nil, fmt.Errorf("failed to parse headers list: %w", err)
}
members := bag.Members()
for _, kv := range members {
headers[kv.Key()] = kv.Value()
}
}
for k, v := range toStringMap(otlpConfig.Headers) {
headers[k] = v
}
opts = append(opts, otlploggrpc.WithHeaders(headers))
}
return otlploggrpc.New(ctx, opts...)
}
86 changes: 85 additions & 1 deletion config/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,91 @@ func TestLogProcessor(t *testing.T) {
},
},
},
wantErr: fmt.Errorf("could not create client tls credentials: %w", errors.New("credentials: failed to append certificates")),
wantErr: fmt.Errorf("could not create certificate authority chain from certificate"),
},
{
name: "batch/otlp-grpc-bad-client-certificate",
processor: LogRecordProcessor{
Batch: &BatchLogRecordProcessor{
Exporter: LogRecordExporter{
OTLP: &OTLP{
Protocol: ptr("grpc"),
Endpoint: ptr("localhost:4317"),
Compression: ptr("gzip"),
Timeout: ptr(1000),
ClientCertificate: ptr(filepath.Join("testdata", "bad_cert.crt")),
ClientKey: ptr(filepath.Join("testdata", "bad_cert.crt")),
},
},
},
},
wantErr: fmt.Errorf("could not use client certificate: %w", errors.New("tls: failed to find any PEM data in certificate input")),
},
{
name: "batch/otlp-grpc-exporter-headerslist",
processor: LogRecordProcessor{
Batch: &BatchLogRecordProcessor{
MaxExportBatchSize: ptr(0),
ExportTimeout: ptr(0),
MaxQueueSize: ptr(0),
ScheduleDelay: ptr(0),
Exporter: LogRecordExporter{
OTLP: &OTLP{
Protocol: ptr("grpc"),
Endpoint: ptr("localhost:4317"),
Compression: ptr("gzip"),
Timeout: ptr(1000),
HeadersList: ptr("test=test1"),
},
},
},
},
wantProcessor: sdklog.NewBatchProcessor(otlpGRPCExporter),
},
{
name: "batch/otlp-grpc-exporter-bad-headerslist",
processor: LogRecordProcessor{
Batch: &BatchLogRecordProcessor{
MaxExportBatchSize: ptr(0),
ExportTimeout: ptr(0),
MaxQueueSize: ptr(0),
ScheduleDelay: ptr(0),
Exporter: LogRecordExporter{
OTLP: &OTLP{
Protocol: ptr("grpc"),
Endpoint: ptr("localhost:4317"),
Compression: ptr("gzip"),
Timeout: ptr(1000),
HeadersList: ptr("=bad"),
},
},
},
},
wantErr: fmt.Errorf("failed to parse headers list: %w", fmt.Errorf("%w: \"\"", errors.New("invalid key"))),
},
{
name: "batch/otlp-grpc-exporter-both-headers-headerslist",
processor: LogRecordProcessor{
Batch: &BatchLogRecordProcessor{
MaxExportBatchSize: ptr(0),
ExportTimeout: ptr(0),
MaxQueueSize: ptr(0),
ScheduleDelay: ptr(0),
Exporter: LogRecordExporter{
OTLP: &OTLP{
Protocol: ptr("grpc"),
Endpoint: ptr("localhost:4317"),
Compression: ptr("gzip"),
Timeout: ptr(1000),
Headers: []NameStringValuePair{
{Name: "test", Value: ptr("test1")},
},
HeadersList: ptr("test=test1"),
},
},
},
},
wantProcessor: sdklog.NewBatchProcessor(otlpGRPCExporter),
},
{
name: "batch/otlp-grpc-exporter-no-scheme",
Expand Down
52 changes: 44 additions & 8 deletions config/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package config // import "go.opentelemetry.io/contrib/config"

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
Expand All @@ -21,6 +23,7 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
Expand Down Expand Up @@ -205,13 +208,30 @@ func otlpGRPCMetricExporter(ctx context.Context, otlpConfig *OTLPMetric) (sdkmet
}
if u.Scheme == "http" {
opts = append(opts, otlpmetricgrpc.WithInsecure())
}
if otlpConfig.Certificate != nil {
creds, err := credentials.NewClientTLSFromFile(*otlpConfig.Certificate, "")
if err != nil {
return nil, fmt.Errorf("could not create client tls credentials: %w", err)
} else {
tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12}
if otlpConfig.Certificate != nil {
caText, err := os.ReadFile(*otlpConfig.Certificate)
if err != nil {
return nil, err
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caText) {
return nil, errors.New("could not create certificate authority chain from certificate")
}
tlsConfig.RootCAs = certPool
}
opts = append(opts, otlpmetricgrpc.WithTLSCredentials(creds))
if otlpConfig.ClientCertificate != nil {
if otlpConfig.ClientKey == nil {
return nil, errors.New("client certificate was provided but no client key was provided")
}
clientCert, err := tls.LoadX509KeyPair(*otlpConfig.ClientCertificate, *otlpConfig.ClientKey)
if err != nil {
return nil, fmt.Errorf("could not use client certificate: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{clientCert}
}
opts = append(opts, otlpmetricgrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
}
}

Expand All @@ -228,8 +248,24 @@ func otlpGRPCMetricExporter(ctx context.Context, otlpConfig *OTLPMetric) (sdkmet
if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 {
opts = append(opts, otlpmetricgrpc.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout)))
}
if len(otlpConfig.Headers) > 0 {
opts = append(opts, otlpmetricgrpc.WithHeaders(toStringMap(otlpConfig.Headers)))
if otlpConfig.HeadersList != nil || len(otlpConfig.Headers) > 0 {
headers := make(map[string]string, len(otlpConfig.Headers))

// Headers has priority over HeaderList, so we parse HeadersList first, and then upsert all items in Headers
if otlpConfig.HeadersList != nil {
bag, err := baggage.Parse(*otlpConfig.HeadersList)
if err != nil {
return nil, fmt.Errorf("failed to parse headers list: %w", err)
}
members := bag.Members()
for _, kv := range members {
headers[kv.Key()] = kv.Value()
}
}
for k, v := range toStringMap(otlpConfig.Headers) {
headers[k] = v
}
opts = append(opts, otlpmetricgrpc.WithHeaders(headers))
}
if otlpConfig.TemporalityPreference != nil {
switch *otlpConfig.TemporalityPreference {
Expand Down
74 changes: 73 additions & 1 deletion config/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,79 @@ func TestReader(t *testing.T) {
},
},
},
wantErr: fmt.Errorf("could not create client tls credentials: %w", errors.New("credentials: failed to append certificates")),
wantErr: errors.New("could not create certificate authority chain from certificate"),
},
{
name: "periodic/otlp-grpc-bad-client-certificate",
reader: MetricReader{
Periodic: &PeriodicMetricReader{
Exporter: PushMetricExporter{
OTLP: &OTLPMetric{
Protocol: ptr("grpc"),
Endpoint: ptr("localhost:4317"),
Compression: ptr("gzip"),
Timeout: ptr(1000),
ClientCertificate: ptr(filepath.Join("testdata", "bad_cert.crt")),
ClientKey: ptr(filepath.Join("testdata", "bad_cert.crt")),
},
},
},
},
wantErr: fmt.Errorf("could not use client certificate: %w", errors.New("tls: failed to find any PEM data in certificate input")),
},
{
name: "periodic/otlp-grpc-exporter-headerslist",
reader: MetricReader{
Periodic: &PeriodicMetricReader{
Exporter: PushMetricExporter{
OTLP: &OTLPMetric{
Protocol: ptr("grpc"),
Endpoint: ptr("localhost:4317"),
Compression: ptr("gzip"),
Timeout: ptr(1000),
HeadersList: ptr("test=test1"),
},
},
},
},
wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter),
},
{
name: "periodic/otlp-grpc-exporter-bad-headerslist",
reader: MetricReader{
Periodic: &PeriodicMetricReader{
Exporter: PushMetricExporter{
OTLP: &OTLPMetric{
Protocol: ptr("grpc"),
Endpoint: ptr("localhost:4317"),
Compression: ptr("gzip"),
Timeout: ptr(1000),
HeadersList: ptr("=bad"),
},
},
},
},
wantErr: fmt.Errorf("failed to parse headers list: %w", fmt.Errorf("%w: \"\"", errors.New("invalid key"))),
},
{
name: "periodic/otlp-grpc-exporter-both-headers-headerslist",
reader: MetricReader{
Periodic: &PeriodicMetricReader{
Exporter: PushMetricExporter{
OTLP: &OTLPMetric{
Protocol: ptr("grpc"),
Endpoint: ptr("localhost:4317"),
Compression: ptr("gzip"),
Timeout: ptr(1000),
Headers: []NameStringValuePair{
{Name: "test", Value: ptr("test1")},
},
HeadersList: ptr("test=test1"),
},
},
},
},
wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter),
},
{
name: "periodic/otlp-grpc-exporter-no-endpoint",
Expand Down
Loading

0 comments on commit 957dfbc

Please sign in to comment.