From 24b87d6957433cfcb5894df272a4b178f45489f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Fri, 5 Jul 2024 12:12:07 +0200 Subject: [PATCH 1/9] processors/isolate: Add NewLogProcessor --- processors/isolate/go.mod | 17 ++++++++++ processors/isolate/go.sum | 31 ++++++++++++++++++ processors/isolate/processor.go | 58 +++++++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+) create mode 100644 processors/isolate/go.mod create mode 100644 processors/isolate/go.sum create mode 100644 processors/isolate/processor.go diff --git a/processors/isolate/go.mod b/processors/isolate/go.mod new file mode 100644 index 00000000000..ca90875cee3 --- /dev/null +++ b/processors/isolate/go.mod @@ -0,0 +1,17 @@ +module go.opentelemetry.io/contrib/processors/isolate + +go 1.21 + +require go.opentelemetry.io/otel/sdk/log v0.4.0 + +require ( + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + go.opentelemetry.io/otel v1.28.0 // indirect + go.opentelemetry.io/otel/log v0.4.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.opentelemetry.io/otel/sdk v1.28.0 // indirect + go.opentelemetry.io/otel/trace v1.28.0 // indirect + golang.org/x/sys v0.22.0 // indirect +) diff --git a/processors/isolate/go.sum b/processors/isolate/go.sum new file mode 100644 index 00000000000..3262fb855d0 --- /dev/null +++ b/processors/isolate/go.sum @@ -0,0 +1,31 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/log v0.4.0 h1:/vZ+3Utqh18e8TPjuc3ecg284078KWrR8BRz+PQAj3o= +go.opentelemetry.io/otel/log v0.4.0/go.mod h1:DhGnQvky7pHy82MIRV43iXh3FlKN8UUKftn0KbLOq6I= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= +go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/sdk/log v0.4.0 h1:1mMI22L82zLqf6KtkjrRy5BbagOTWdJsqMY/HSqILAA= +go.opentelemetry.io/otel/sdk/log v0.4.0/go.mod h1:AYJ9FVF0hNOgAVzUG/ybg/QttnXhUePWAupmCqtdESo= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/processors/isolate/processor.go b/processors/isolate/processor.go new file mode 100644 index 00000000000..4e6cae061a8 --- /dev/null +++ b/processors/isolate/processor.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package isolate // import "go.opentelemetry.io/contrib/processors/isolate" + +import ( + "context" + + "go.opentelemetry.io/otel/sdk/log" +) + +// NewLogProcessor returns a new [LogProcessor] that wraps the downstream +// [log.Processor]. +// +// If downstream is nil a default No-Op [log.Processor] is used. The returned +// processor will not be enabled for nor emit any records. +func NewLogProcessor(downstream log.Processor) *LogProcessor { + if downstream == nil { + downstream = defaultProcessor + } + return &LogProcessor{Processor: downstream} +} + +// LogProcessor is an [log.Processor] implementation clones the recieved log +// records in order to no share mutable data with subsequent registered processors. +// +// If the wrapped [log.Processor] is nil, calls to the LogProcessor methods +// will panic. +// +// Use [NewLogProcessor] to create a new LogProcessor that ensures +// no panics. +type LogProcessor struct { + log.Processor +} + +// Compile time assertion that LogProcessor implements log.Processor. +var _ log.Processor = (*LogProcessor)(nil) + +// OnEmit clones the record and calls the wrapped downstream processor. +func (p *LogProcessor) OnEmit(ctx context.Context, record log.Record) error { + record = record.Clone() + return p.Processor.OnEmit(ctx, record) +} + +// Enabled clones the record and calls the wrapped downstream processor. +func (p *LogProcessor) Enabled(ctx context.Context, record log.Record) bool { + record = record.Clone() + return p.Processor.Enabled(ctx, record) +} + +var defaultProcessor = noopProcessor{} + +type noopProcessor struct{} + +func (p noopProcessor) OnEmit(context.Context, log.Record) error { return nil } +func (p noopProcessor) Enabled(context.Context, log.Record) bool { return false } +func (p noopProcessor) Shutdown(context.Context) error { return nil } +func (p noopProcessor) ForceFlush(context.Context) error { return nil } From a119457dc274563c26ca10fa09bb334c23fce779 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Fri, 5 Jul 2024 12:13:15 +0200 Subject: [PATCH 2/9] Update versions.yaml --- versions.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/versions.yaml b/versions.yaml index 60a49cd2760..ed1c9d207fe 100644 --- a/versions.yaml +++ b/versions.yaml @@ -83,6 +83,7 @@ module-sets: modules: - go.opentelemetry.io/contrib/processors/baggage/baggagetrace - go.opentelemetry.io/contrib/processors/baggagecopy + - go.opentelemetry.io/contrib/processors/isolate - go.opentelemetry.io/contrib/processors/minsev experimental-detectors: version: v0.0.1 From f6500eea7fd74fb3de537845d4ace74c6dfed630 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Fri, 5 Jul 2024 12:16:07 +0200 Subject: [PATCH 3/9] Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 24f112dd9b3..8ea5548a212 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108) - Support for stdoutlog exporter in `go.opentelemetry.io/contrib/config`. (#5850) +- Add `go.opentelemetry.io/contrib/processors/isolate` module. + This module provides an isolating log processor. (#5861) ## [1.28.0/0.53.0/0.22.0/0.8.0/0.3.0/0.1.0] - 2024-07-02 From 818659474c6284c2b03d8b1e90e4f36a44ca3890 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Fri, 5 Jul 2024 12:25:43 +0200 Subject: [PATCH 4/9] Fix typo --- processors/isolate/processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processors/isolate/processor.go b/processors/isolate/processor.go index 4e6cae061a8..e13f06a73bc 100644 --- a/processors/isolate/processor.go +++ b/processors/isolate/processor.go @@ -21,7 +21,7 @@ func NewLogProcessor(downstream log.Processor) *LogProcessor { return &LogProcessor{Processor: downstream} } -// LogProcessor is an [log.Processor] implementation clones the recieved log +// LogProcessor is an [log.Processor] implementation clones the received log // records in order to no share mutable data with subsequent registered processors. // // If the wrapped [log.Processor] is nil, calls to the LogProcessor methods From 6d3b7c020cb2aa64d762e69ccf8efcd7a0fa7db2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 8 Jul 2024 10:30:08 +0200 Subject: [PATCH 5/9] Add unit tests --- processors/isolate/go.mod | 10 ++- processors/isolate/go.sum | 2 + processors/isolate/processor_test.go | 99 ++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 2 deletions(-) create mode 100644 processors/isolate/processor_test.go diff --git a/processors/isolate/go.mod b/processors/isolate/go.mod index ca90875cee3..33c12f6bafc 100644 --- a/processors/isolate/go.mod +++ b/processors/isolate/go.mod @@ -2,16 +2,22 @@ module go.opentelemetry.io/contrib/processors/isolate go 1.21 -require go.opentelemetry.io/otel/sdk/log v0.4.0 +require ( + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/otel/log v0.4.0 + go.opentelemetry.io/otel/sdk/log v0.4.0 +) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/otel v1.28.0 // indirect - go.opentelemetry.io/otel/log v0.4.0 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/otel/sdk v1.28.0 // indirect go.opentelemetry.io/otel/trace v1.28.0 // indirect golang.org/x/sys v0.22.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/processors/isolate/go.sum b/processors/isolate/go.sum index 3262fb855d0..20c9445e1d9 100644 --- a/processors/isolate/go.sum +++ b/processors/isolate/go.sum @@ -27,5 +27,7 @@ go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+ go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/processors/isolate/processor_test.go b/processors/isolate/processor_test.go new file mode 100644 index 00000000000..9539181dae1 --- /dev/null +++ b/processors/isolate/processor_test.go @@ -0,0 +1,99 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package isolate + +import ( + "context" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + logapi "go.opentelemetry.io/otel/log" + "go.opentelemetry.io/otel/sdk/log" +) + +const testAttrCount = 10 + +var testCtx = context.WithValue(context.Background(), "k", "v") //nolint // Simplify for testing. + +func TestLogProcessorOnEmit(t *testing.T) { + wrapped := &processor{ReturnErr: assert.AnError} + + p := NewLogProcessor(wrapped) + + var r log.Record + for i := 0; i < testAttrCount; i++ { + r.AddAttributes(logapi.Int(strconv.Itoa(i), i)) + } + + assert.ErrorIs(t, p.OnEmit(testCtx, r), assert.AnError) + + // Assert passthrough of the arguments. + if assert.Len(t, wrapped.OnEmitCalls, 1) { + assert.Equal(t, testCtx, wrapped.OnEmitCalls[0].Ctx) + assert.Equal(t, r, wrapped.OnEmitCalls[0].Record) + } + + // Assert that the record is not being affected by subsequent modifications. + r.AddAttributes(logapi.String("foo", "bar")) + assert.Equal(t, testAttrCount, wrapped.OnEmitCalls[0].Record.AttributesLen(), "should be isolated from subsequent modifications") +} + +func TestLogProcessorEnabled(t *testing.T) { + wrapped := &processor{} + + p := NewLogProcessor(wrapped) + + var r log.Record + for i := 0; i < testAttrCount; i++ { + r.AddAttributes(logapi.Int(strconv.Itoa(i), i)) + } + + assert.True(t, p.Enabled(testCtx, r)) + + // Assert passthrough of the arguments. + if assert.Len(t, wrapped.EnabledCalls, 1) { + assert.Equal(t, testCtx, wrapped.EnabledCalls[0].Ctx) + assert.Equal(t, r, wrapped.EnabledCalls[0].Record) + } + + // Assert that the record is not being affected by subsequent modifications. + r.AddAttributes(logapi.String("foo", "bar")) + assert.Equal(t, testAttrCount, wrapped.EnabledCalls[0].Record.AttributesLen(), "should be isolated from subsequent modifications") +} + +type args struct { + Ctx context.Context + Record log.Record +} + +type processor struct { + ReturnErr error + + OnEmitCalls []args + EnabledCalls []args + ForceFlushCalls []context.Context + ShutdownCalls []context.Context +} + +func (p *processor) OnEmit(ctx context.Context, r log.Record) error { + p.OnEmitCalls = append(p.OnEmitCalls, args{ctx, r}) + return p.ReturnErr +} + +func (p *processor) Enabled(ctx context.Context, r log.Record) bool { + p.EnabledCalls = append(p.EnabledCalls, args{ctx, r}) + return true +} + +func (p *processor) Shutdown(ctx context.Context) error { + p.ShutdownCalls = append(p.ShutdownCalls, ctx) + return p.ReturnErr +} + +func (p *processor) ForceFlush(ctx context.Context) error { + p.ForceFlushCalls = append(p.ForceFlushCalls, ctx) + return p.ReturnErr +} From 73d60b694c09f19170b2ba4dd92421d27ca8a09f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 8 Jul 2024 10:35:08 +0200 Subject: [PATCH 6/9] Add package comment --- processors/isolate/processor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/processors/isolate/processor.go b/processors/isolate/processor.go index e13f06a73bc..aa68e24c8df 100644 --- a/processors/isolate/processor.go +++ b/processors/isolate/processor.go @@ -1,6 +1,8 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +// Package isolate provides an isolating processor that can be used to +// configure independent processing pipelines. package isolate // import "go.opentelemetry.io/contrib/processors/isolate" import ( From 083328d0ea44d76fd5be2a658221656913b1d567 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 8 Jul 2024 10:48:03 +0200 Subject: [PATCH 7/9] Add benchmark --- processors/isolate/processor_test.go | 41 ++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/processors/isolate/processor_test.go b/processors/isolate/processor_test.go index 9539181dae1..34b6ae81a62 100644 --- a/processors/isolate/processor_test.go +++ b/processors/isolate/processor_test.go @@ -97,3 +97,44 @@ func (p *processor) ForceFlush(ctx context.Context) error { p.ForceFlushCalls = append(p.ForceFlushCalls, ctx) return p.ReturnErr } + +func BenchmarkLogProcessor(b *testing.B) { + var ok bool + var err error + + var r log.Record + r.SetBody(logapi.StringValue("message")) + + var rWithShared log.Record + for i := 0; i < testAttrCount; i++ { + rWithShared.AddAttributes(logapi.Int(strconv.Itoa(i), i)) + } + + testCases := []struct { + desc string + r log.Record + }{ + { + desc: "Record without shared data", + r: r, + }, + { + desc: "Record with shared data", + r: rWithShared, + }, + } + + p := NewLogProcessor(noopProcessor{}) + + for _, tc := range testCases { + b.Run(tc.desc, func(b *testing.B) { + b.ReportAllocs() + for n := 0; n < b.N; n++ { + ok = p.Enabled(testCtx, tc.r) + err = p.OnEmit(testCtx, tc.r) + } + }) + } + + _, _ = ok, err +} From 9b4e4f7d4af279341f399efdb6592ab954885ed3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 8 Jul 2024 10:57:10 +0200 Subject: [PATCH 8/9] Add example --- processors/isolate/example_test.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 processors/isolate/example_test.go diff --git a/processors/isolate/example_test.go b/processors/isolate/example_test.go new file mode 100644 index 00000000000..4d8347485ce --- /dev/null +++ b/processors/isolate/example_test.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package isolate_test + +import ( + "go.opentelemetry.io/contrib/processors/isolate" + "go.opentelemetry.io/otel/sdk/log" +) + +func Example() { + // Log processing pipelines that process and emit telemetry. + var p1 log.Processor + var p2 log.Processor + var p3 log.Processor + + // Register the processors using + // isolate.NewLogProcessor and the log.WithProcessor option + // so that the log records are not shared between pipelines. + _ = log.NewLoggerProvider( + log.WithProcessor(isolate.NewLogProcessor(p1)), + log.WithProcessor(isolate.NewLogProcessor(p2)), + log.WithProcessor(isolate.NewLogProcessor(p3)), + ) +} From a1f38621fb4bc9dfb875f5cac2700cb52a6f91a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 8 Jul 2024 16:03:17 +0200 Subject: [PATCH 9/9] Update CODEOWNERS --- CODEOWNERS | 1 + 1 file changed, 1 insertion(+) diff --git a/CODEOWNERS b/CODEOWNERS index 077f34e7d6c..f9f5c7080f6 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -57,6 +57,7 @@ instrumentation/net/http/otelhttp/ @open-te instrumentation/runtime/ @open-telemetry/go-approvers @MadVikingGod processors/baggagecopy @open-telemetry/go-approvers @codeboten @MikeGoldsmith +processors/isolate @open-telemetry/go-approvers @pellared processors/minsev @open-telemetry/go-approvers @MrAlias propagators/autoprop/ @open-telemetry/go-approvers @MrAlias