diff --git a/CHANGELOG.md b/CHANGELOG.md index f3221b510eb..654e7c555e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - Added support for providing `endpoint`, `pollingIntervalMs` and `initialSamplingRate` using environment variable `OTEL_TRACES_SAMPLER_ARG` in `go.opentelemetry.io/contrib/samples/jaegerremote`. (#6310) +- Added support exporting logs via OTLP over gRPC in `go.opentelemetry.io/contrib/config`. (#6340) ### Fixed diff --git a/config/go.mod b/config/go.mod index 91d6fa8cf58..ff7e5f46432 100644 --- a/config/go.mod +++ b/config/go.mod @@ -6,6 +6,7 @@ require ( github.com/prometheus/client_golang v1.20.5 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.8.0 go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.8.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.32.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.32.0 diff --git a/config/go.sum b/config/go.sum index db5ff800267..b4d84b91392 100644 --- a/config/go.sum +++ b/config/go.sum @@ -45,6 +45,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.8.0 h1:WzNab7hOOLzdDF/EoWCt4glhrbMPVMOO5JYTmpz36Ls= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.8.0/go.mod h1:hKvJwTzJdp90Vh7p6q/9PAOd55dI6WA6sWj62a/JvSs= go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.8.0 h1:S+LdBGiQXtJdowoJoQPEtI52syEP/JYBUpjO49EQhV8= go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.8.0/go.mod h1:5KXybFvPGds3QinJWQT7pmXf+TN5YIa7CNYObWRkj50= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.32.0 h1:j7ZSD+5yn+lo3sGV69nW04rRR0jhYnBwjuX3r0HvnK0= diff --git a/config/log.go b/config/log.go index 63fc17a6b4c..81a769238cd 100644 --- a/config/log.go +++ b/config/log.go @@ -10,6 +10,7 @@ import ( "net/url" "time" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" "go.opentelemetry.io/otel/exporters/stdout/stdoutlog" "go.opentelemetry.io/otel/log" @@ -79,6 +80,8 @@ func logExporter(ctx context.Context, exporter LogRecordExporter) (sdklog.Export switch *exporter.OTLP.Protocol { case protocolProtobufHTTP: return otlpHTTPLogExporter(ctx, exporter.OTLP) + case protocolProtobufGRPC: + return otlpGRPCLogExporter(ctx, exporter.OTLP) default: return nil, fmt.Errorf("unsupported protocol %q", *exporter.OTLP.Protocol) } @@ -153,3 +156,45 @@ func otlpHTTPLogExporter(ctx context.Context, otlpConfig *OTLP) (sdklog.Exporter return otlploghttp.New(ctx, opts...) } + +func otlpGRPCLogExporter(ctx context.Context, otlpConfig *OTLP) (sdklog.Exporter, error) { + var opts []otlploggrpc.Option + + if otlpConfig.Endpoint != nil { + u, err := url.ParseRequestURI(*otlpConfig.Endpoint) + if err != nil { + return nil, err + } + // ParseRequestURI leaves the Host field empty when no + // scheme is specified (i.e. localhost:4317). This check is + // here to support the case where a user may not specify a + // scheme. The code does its best effort here by using + // otlpConfig.Endpoint as-is in that case + if u.Host != "" { + opts = append(opts, otlploggrpc.WithEndpoint(u.Host)) + } else { + opts = append(opts, otlploggrpc.WithEndpoint(*otlpConfig.Endpoint)) + } + if u.Scheme == "http" { + opts = append(opts, otlploggrpc.WithInsecure()) + } + } + if otlpConfig.Compression != nil { + switch *otlpConfig.Compression { + case compressionGzip: + opts = append(opts, otlploggrpc.WithCompressor(*otlpConfig.Compression)) + case compressionNone: + // none requires no options + default: + return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression) + } + } + if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 { + opts = append(opts, otlploggrpc.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout))) + } + if len(otlpConfig.Headers) > 0 { + opts = append(opts, otlploggrpc.WithHeaders(toStringMap(otlpConfig.Headers))) + } + + return otlploggrpc.New(ctx, opts...) +} diff --git a/config/log_test.go b/config/log_test.go index 1fa978f4f16..3e1efe11b3a 100644 --- a/config/log_test.go +++ b/config/log_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" "go.opentelemetry.io/otel/exporters/stdout/stdoutlog" "go.opentelemetry.io/otel/log" @@ -64,6 +65,9 @@ func TestLogProcessor(t *testing.T) { otlpHTTPExporter, err := otlploghttp.New(ctx) require.NoError(t, err) + otlpGRPCExporter, err := otlploggrpc.New(ctx) + require.NoError(t, err) + consoleExporter, err := stdoutlog.New( stdoutlog.WithPrettyPrint(), ) @@ -172,6 +176,120 @@ func TestLogProcessor(t *testing.T) { }, wantProcessor: sdklog.NewBatchProcessor(consoleExporter), }, + { + name: "batch/otlp-grpc-exporter-no-endpoint", + processor: LogRecordProcessor{ + Batch: &BatchLogRecordProcessor{ + MaxExportBatchSize: ptr(0), + ExportTimeout: ptr(0), + MaxQueueSize: ptr(0), + ScheduleDelay: ptr(0), + Exporter: LogRecordExporter{ + OTLP: &OTLP{ + Protocol: ptr("grpc"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantProcessor: sdklog.NewBatchProcessor(otlpGRPCExporter), + }, + { + name: "batch/otlp-grpc-exporter", + processor: LogRecordProcessor{ + Batch: &BatchLogRecordProcessor{ + MaxExportBatchSize: ptr(0), + ExportTimeout: ptr(0), + MaxQueueSize: ptr(0), + ScheduleDelay: ptr(0), + Exporter: LogRecordExporter{ + OTLP: &OTLP{ + Protocol: ptr("grpc"), + Endpoint: ptr("http://localhost:4317"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantProcessor: sdklog.NewBatchProcessor(otlpGRPCExporter), + }, + { + name: "batch/otlp-grpc-exporter-no-scheme", + processor: LogRecordProcessor{ + Batch: &BatchLogRecordProcessor{ + MaxExportBatchSize: ptr(0), + ExportTimeout: ptr(0), + MaxQueueSize: ptr(0), + ScheduleDelay: ptr(0), + Exporter: LogRecordExporter{ + OTLP: &OTLP{ + Protocol: ptr("grpc"), + Endpoint: ptr("localhost:4317"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantProcessor: sdklog.NewBatchProcessor(otlpGRPCExporter), + }, + { + name: "batch/otlp-grpc-invalid-endpoint", + processor: LogRecordProcessor{ + Batch: &BatchLogRecordProcessor{ + MaxExportBatchSize: ptr(0), + ExportTimeout: ptr(0), + MaxQueueSize: ptr(0), + ScheduleDelay: ptr(0), + Exporter: LogRecordExporter{ + OTLP: &OTLP{ + Protocol: ptr("grpc"), + Endpoint: ptr(" "), + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantErr: &url.Error{Op: "parse", URL: " ", Err: errors.New("invalid URI for request")}, + }, + { + name: "batch/otlp-grpc-invalid-compression", + processor: LogRecordProcessor{ + Batch: &BatchLogRecordProcessor{ + MaxExportBatchSize: ptr(0), + ExportTimeout: ptr(0), + MaxQueueSize: ptr(0), + ScheduleDelay: ptr(0), + Exporter: LogRecordExporter{ + OTLP: &OTLP{ + Protocol: ptr("grpc"), + Endpoint: ptr("localhost:4317"), + Compression: ptr("invalid"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantErr: errors.New("unsupported compression \"invalid\""), + }, { name: "batch/otlp-http-exporter", processor: LogRecordProcessor{