diff --git a/CHANGELOG.md b/CHANGELOG.md index 65dcca7a2a6..93de64a993b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add the `WithSource` option to the `go.opentelemetry.io/contrib/bridges/otelslog` log bridge to set the `code.*` attributes in the log record that includes the source location where the record was emitted. (#6253) - Add `ContextWithStartTime` and `StartTimeFromContext` to `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`, which allows setting the start time using go context. (#6137) - Set the `code.*` attributes in `go.opentelemetry.io/contrib/bridges/otelzap` if the `zap.Logger` was created with the `AddCaller` or `AddStacktrace` option. (#6268) +- Add new `WithMetricsAttributesFn` option to allow setting dynamic, per-request metric attributes in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#6281) ### Changed diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/config.go b/instrumentation/google.golang.org/grpc/otelgrpc/config.go index 9e87fb4bb19..621c4341a43 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/config.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/config.go @@ -4,6 +4,8 @@ package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" import ( + "context" + "google.golang.org/grpc/stats" "go.opentelemetry.io/otel" @@ -29,6 +31,10 @@ const ( // Deprecated: Use stats handlers instead. type InterceptorFilter func(*InterceptorInfo) bool +// MetricAttributesFn is a optional function which will be called per request returning a slice +// of attribute.KeyValue to be appended to metric attributes recorded by the stats.Handler. +type MetricAttributesFn func(ctx context.Context, payload any) []attribute.KeyValue + // Filter is a predicate used to determine whether a given request in // should be instrumented by the attached RPC tag info. // A Filter must return true if the request should be instrumented. @@ -36,14 +42,15 @@ type Filter func(*stats.RPCTagInfo) bool // config is a group of options for this instrumentation. type config struct { - Filter Filter - InterceptorFilter InterceptorFilter - Propagators propagation.TextMapPropagator - TracerProvider trace.TracerProvider - MeterProvider metric.MeterProvider - SpanStartOptions []trace.SpanStartOption - SpanAttributes []attribute.KeyValue - MetricAttributes []attribute.KeyValue + Filter Filter + InterceptorFilter InterceptorFilter + Propagators propagation.TextMapPropagator + TracerProvider trace.TracerProvider + MeterProvider metric.MeterProvider + SpanStartOptions []trace.SpanStartOption + SpanAttributes []attribute.KeyValue + MetricAttributes []attribute.KeyValue + MetricAttributesFn MetricAttributesFn ReceivedEvent bool SentEvent bool @@ -303,3 +310,18 @@ func (o metricAttributesOption) apply(c *config) { func WithMetricAttributes(a ...attribute.KeyValue) Option { return metricAttributesOption{a: a} } + +type metricAttributesFnOption struct{ f MetricAttributesFn } + +func (o metricAttributesFnOption) apply(c *config) { + if o.f != nil { + c.MetricAttributesFn = o.f + } +} + +// WithMetricAttributesFn returns an Option to set the MetricAttributesFn function which +// will be called on each gRPC request to append metric attributes to recorded instruments +// by the stats.Handler. +func WithMetricAttributesFn(f MetricAttributesFn) Option { + return metricAttributesFnOption{f: f} +} diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/internal/test/test_utils.go b/instrumentation/google.golang.org/grpc/otelgrpc/internal/test/test_utils.go index 717db324b72..72c63f1b8c9 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/internal/test/test_utils.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/internal/test/test_utils.go @@ -36,11 +36,10 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" + testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" - - testpb "google.golang.org/grpc/interop/grpc_testing" ) var ( diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index c01cb897cd3..d996912fe8d 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -152,6 +152,16 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool case *stats.InPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.inMessages, 1) + + // If a MetricAttributesFn is defined call this function and update the gRPCContext with the metric attributes + // returned by ths function. + if f := c.MetricAttributesFn; f != nil { + attrs := f(ctx, rs.Payload) + + gctx.metricAttrs = append(gctx.metricAttrs, attrs...) + metricAttrs = append(metricAttrs, attrs...) + } + c.rpcInBytes.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...))) } diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index 884c60c1c21..9c4cfd3dfc0 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -19,6 +19,9 @@ import ( testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/status" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/filters" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal/test" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric" @@ -27,15 +30,12 @@ import ( "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" - - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/filters" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal/test" ) var ( - testSpanAttr = attribute.String("test_span", "OK") - testMetricAttr = attribute.String("test_metric", "OK") + testSpanAttr = attribute.String("test_span", "OK") + testMetricAttr = attribute.String("test_metric", "OK") + customTestMetricAttr = attribute.String("custom_test_metric", "OK") ) func TestStatsHandler(t *testing.T) { @@ -79,7 +79,10 @@ func TestStatsHandler(t *testing.T) { otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents), otelgrpc.WithFilter(filters.ServiceName(tt.filterSvcName)), otelgrpc.WithSpanAttributes(testSpanAttr), - otelgrpc.WithMetricAttributes(testMetricAttr)), + otelgrpc.WithMetricAttributes(testMetricAttr), + otelgrpc.WithMetricAttributesFn(func(context.Context, any) []attribute.KeyValue { + return []attribute.KeyValue{customTestMetricAttr} + })), ), }, []grpc.ServerOption{ @@ -89,7 +92,10 @@ func TestStatsHandler(t *testing.T) { otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents), otelgrpc.WithFilter(filters.ServiceName(tt.filterSvcName)), otelgrpc.WithSpanAttributes(testSpanAttr), - otelgrpc.WithMetricAttributes(testMetricAttr)), + otelgrpc.WithMetricAttributes(testMetricAttr), + otelgrpc.WithMetricAttributesFn(func(context.Context, any) []attribute.KeyValue { + return []attribute.KeyValue{customTestMetricAttr} + })), ), }, ) @@ -737,7 +743,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("EmptyCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), }, { Attributes: attribute.NewSet( @@ -745,7 +753,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("UnaryCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), }, { Attributes: attribute.NewSet( @@ -753,7 +763,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingInputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), }, { Attributes: attribute.NewSet( @@ -761,7 +773,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingOutputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), }, { Attributes: attribute.NewSet( @@ -769,7 +783,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("FullDuplexCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), }, }, }, @@ -786,7 +802,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("EmptyCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(0)), @@ -799,7 +817,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("UnaryCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, Max: metricdata.NewExtrema(int64(271840)), @@ -812,7 +832,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingInputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2}, Max: metricdata.NewExtrema(int64(45912)), @@ -825,7 +847,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingOutputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(21)), @@ -838,7 +862,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("FullDuplexCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2}, Max: metricdata.NewExtrema(int64(45918)), @@ -861,7 +887,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("EmptyCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(0)), @@ -874,7 +902,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("UnaryCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, Max: metricdata.NewExtrema(int64(314167)), @@ -887,7 +917,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingInputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(4)), @@ -900,7 +932,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingOutputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}, Max: metricdata.NewExtrema(int64(58987)), @@ -913,7 +947,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("FullDuplexCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}, Max: metricdata.NewExtrema(int64(58987)), @@ -937,7 +973,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("EmptyCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(1)), @@ -951,7 +989,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("UnaryCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(1)), @@ -965,7 +1005,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingInputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(4)), @@ -979,7 +1021,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingOutputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(1)), @@ -993,7 +1037,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("FullDuplexCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(4)), @@ -1017,7 +1063,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("EmptyCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(1)), @@ -1031,7 +1079,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("UnaryCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(1)), @@ -1045,7 +1095,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingInputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(1)), @@ -1059,7 +1111,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingOutputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(4)), @@ -1073,7 +1127,9 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("FullDuplexCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(4)), @@ -1115,7 +1171,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("EmptyCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), }, { Attributes: attribute.NewSet( @@ -1123,7 +1181,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("UnaryCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), }, { Attributes: attribute.NewSet( @@ -1131,7 +1191,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingInputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), }, { Attributes: attribute.NewSet( @@ -1139,7 +1201,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingOutputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), }, { Attributes: attribute.NewSet( @@ -1147,7 +1211,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("FullDuplexCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), }, }, }, @@ -1164,7 +1230,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("EmptyCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(0)), @@ -1177,7 +1245,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("UnaryCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, Max: metricdata.NewExtrema(int64(271840)), @@ -1190,7 +1260,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingInputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2}, Max: metricdata.NewExtrema(int64(45912)), @@ -1203,7 +1275,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingOutputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(21)), @@ -1216,7 +1290,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("FullDuplexCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2}, Max: metricdata.NewExtrema(int64(45918)), @@ -1239,7 +1315,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("EmptyCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(0)), @@ -1252,7 +1330,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("UnaryCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, Max: metricdata.NewExtrema(int64(314167)), @@ -1265,7 +1345,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingInputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(4)), @@ -1278,7 +1360,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingOutputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}, Max: metricdata.NewExtrema(int64(58987)), @@ -1291,7 +1375,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("FullDuplexCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}, Max: metricdata.NewExtrema(int64(58987)), @@ -1315,7 +1401,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("EmptyCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(1)), @@ -1329,7 +1417,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("UnaryCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(1)), @@ -1343,7 +1433,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingInputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(4)), @@ -1357,7 +1449,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingOutputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(1)), @@ -1371,7 +1465,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("FullDuplexCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(4)), @@ -1395,7 +1491,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("EmptyCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(1)), @@ -1409,7 +1507,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("UnaryCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(1)), @@ -1423,7 +1523,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingInputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(1)), @@ -1437,7 +1539,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("StreamingOutputCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(4)), @@ -1451,7 +1555,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { semconv.RPCMethod("FullDuplexCall"), semconv.RPCService("grpc.testing.TestService"), semconv.RPCSystemGRPC, - testMetricAttr), + testMetricAttr, + customTestMetricAttr, + ), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Max: metricdata.NewExtrema(int64(4)), diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/stats_handler_test.go index 1f96bbd4f8e..22ca3dd4f77 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/stats_handler_test.go @@ -38,16 +38,25 @@ func TestStatsHandlerHandleRPCServerErrors(t *testing.T) { otelgrpc.WithTracerProvider(tp), otelgrpc.WithMeterProvider(mp), otelgrpc.WithMetricAttributes(testMetricAttr), + otelgrpc.WithMetricAttributesFn(func(context.Context, any) []attribute.KeyValue { + return []attribute.KeyValue{customTestMetricAttr} + }), ) serviceName := "TestGrpcService" methodName := serviceName + "/" + name fullMethodName := "/" + methodName + // call the server handler ctx := serverHandler.TagRPC(context.Background(), &stats.RPCTagInfo{ FullMethodName: fullMethodName, }) + // Ensure dynamic metrics are added + serverHandler.HandleRPC(ctx, &stats.InPayload{ + Payload: []byte(""), + }) + grpcErr := status.Error(check.grpcCode, check.grpcCode.String()) serverHandler.HandleRPC(ctx, &stats.End{ Error: grpcErr, @@ -82,6 +91,26 @@ func assertStatsHandlerServerMetrics(t *testing.T, reader metric.Reader, service otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(code)), testMetricAttr, + customTestMetricAttr, + ), + }, + }, + }, + }, + { + Name: "rpc.server.request.size", + Description: "Measures size of RPC request messages (uncompressed).", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.RPCMethod(name), + semconv.RPCService(serviceName), + semconv.RPCSystemGRPC, + testMetricAttr, + customTestMetricAttr, ), }, }, @@ -101,6 +130,7 @@ func assertStatsHandlerServerMetrics(t *testing.T, reader metric.Reader, service otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(code)), testMetricAttr, + customTestMetricAttr, ), }, }, @@ -120,6 +150,7 @@ func assertStatsHandlerServerMetrics(t *testing.T, reader metric.Reader, service otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(code)), testMetricAttr, + customTestMetricAttr, ), }, },