Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): included wrappers and tests for opentelemetry instrumentation #4

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions opentelemetry_instrumentation_django_stomp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
This library supports the `django-stomp` library, it can be enabled by
using ``DjangoStompInstrumentor``.

*****************************************
USAGE
-----
In project manage.py you can include the example code below

.. code-block:: python
from opentelemetry_instrumentation_django_stomp import DjangoStompInstrumentor

def publisher_hook(span: Span, body: Dict, headers: Dict):
# Custom code
pass

def consumer_hook(span: Span, body: Dict, headers: Dict):
# Custom code
pass

provider = TracerProvider()
trace.set_tracer_provider(provider)
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))

DjangoStompInstrumentor().instrument(
trace_provider=trace,
publisher_hook=publisher_hook,
consumer_hook=consumer_hook,
)

*****************************************
PUBLISHER
-----
With the django-stomp we can publish a message to broker using `publisher.send` and the instrumentor
can include a span with telemetry data in this function utilization.

.. code-block:: python
publisher = build_publisher(f"publisher-unique-name-{uuid4()}")
publisher.send(
queue=DESTINATION,
body={"a": "random","body": "message},
)

*****************************************
CONSUMER
-----
With the django-stomp we create a simple consumer using pubsub command and the instrumentor
can include a span with telemetry data in this function utilization.

.. code-block:: python
python manage.py pubsub QUEUE_NAME callback_function_to_consume_message
"""
import threading
import typing

from django.conf import settings
from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.trace import TracerProvider
from opentelemetry.trace.span import Span

from .instrumentors.consumer_instrument import ConsumerInstrument
from .instrumentors.publisher_instrument import PublisherInstrument
from .package import _instruments
from .version import __version__

_CTX_KEY = "__otel_django_stomp_span"

_CallbackHookT = typing.Optional[typing.Callable[[Span, typing.Dict, typing.Dict], None]]

local_threading = threading.local()


class DjangoStompInstrumentor(BaseInstrumentor):
def instrumentation_dependencies(self) -> typing.Collection[str]:
"""
Function to check compatibility with dependencies package(django-stomp)
"""
return _instruments

def _uninstrument(self, **kwargs):
"""
Function to unwrap publisher and consumer functions from django-stomp
"""
if hasattr(self, "__opentelemetry_tracer_provider"):
delattr(self, "__opentelemetry_tracer_provider")
ConsumerInstrument().uninstrument()
PublisherInstrument().uninstrument()

def _instrument(self, **kwargs: typing.Dict[str, typing.Any]) -> None:
"""
Instrument function to initialize wrappers in publisher and consumer functions from django-stomp.

Args:
kwargs (Dict[str, Any])):
trace_provider (Optional[TracerProvider]): The tracer provider to use in open-telemetry spans.
publisher_hook (_CallbackHookT): The callable function to call before original function call, use
this to override or enrich the span created in main project.
consumer_hook (_CallbackHookT): The callable function to call before original function call, use
this to override or enrich the span created in main project.

Returns:
"""
instrument_django_stomp = getattr(settings, "OTEL_PYTHON_DJANGO_STOMP_INSTRUMENT", True)
if not instrument_django_stomp:
return None

tracer_provider: typing.Optional[TracerProvider] = kwargs.get("tracer_provider", None)
publisher_hook: _CallbackHookT = kwargs.get("publisher_hook", None)
consumer_hook: _CallbackHookT = kwargs.get("consumer_hook", None)

self.__setattr__("__opentelemetry_tracer_provider", tracer_provider)
tracer = trace.get_tracer(__name__, __version__, tracer_provider)

ConsumerInstrument().instrument(tracer=tracer, callback_hook=consumer_hook)
PublisherInstrument().instrument(tracer=tracer, callback_hook=publisher_hook)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import logging
import typing

import wrapt

from django_stomp.services.consumer import Listener
from django_stomp.settings import STOMP_PROCESS_MSG_WORKERS
from opentelemetry import context
from opentelemetry import propagate
from opentelemetry import trace
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.sdk.trace import Tracer
from opentelemetry.semconv.trace import MessagingOperationValues
from opentelemetry.trace import SpanKind
from stomp.connect import StompConnection11

from opentelemetry_instrumentation_django_stomp.utils.django_stomp_getter import DjangoStompGetter
from opentelemetry_instrumentation_django_stomp.utils.shared_types import CallbackHookT
from opentelemetry_instrumentation_django_stomp.utils.span import enrich_span_with_host_data
from opentelemetry_instrumentation_django_stomp.utils.span import get_span
from opentelemetry_instrumentation_django_stomp.utils.traced_thread_pool_executor import TracedThreadPoolExecutor

_django_stomp_getter = DjangoStompGetter()

_logger = logging.getLogger(__name__)


class ConsumerInstrument:
@staticmethod
def instrument(tracer: Tracer, callback_hook: CallbackHookT = None):
"""Instrumentor function to create span and instrument consumer"""

def wrapper_on_message(wrapped, instance, args, kwargs):
MatheusGeiger marked this conversation as resolved.
Show resolved Hide resolved
frame = args[0]
headers, body = frame.headers, frame.body

ctx = propagate.extract(headers, getter=_django_stomp_getter)
if not ctx:
ctx = context.get_current()
token = context.attach(ctx)

span = get_span(
tracer=tracer,
destination=headers.get("tshoot-destination"),
span_kind=SpanKind.CONSUMER,
headers=headers,
body=body,
span_name="CONSUMER",
operation=str(MessagingOperationValues.RECEIVE.value),
)

try:
with trace.use_span(span, end_on_exit=True):
if callback_hook:
try:
callback_hook(span, body, headers)
except Exception as hook_exception: # pylint: disable=W0703
_logger.exception(hook_exception)
return wrapped(*args, **kwargs)
finally:
context.detach(token)

def wrapper_create_new_worker_executor(wrapped, instance, args, kwargs):
return TracedThreadPoolExecutor(
tracer=trace.get_tracer(__name__),
max_workers=STOMP_PROCESS_MSG_WORKERS,
thread_name_prefix=instance._subscription_id,
)

def common_ack_or_nack_span(span_name: str, wrapped_function: typing.Callable):
span = tracer.start_span(name=span_name, kind=SpanKind.CONSUMER)
enrich_span_with_host_data(span)
with trace.use_span(span, end_on_exit=True):
return wrapped_function

def wrapper_nack(wrapped, instance, args, kwargs):
return common_ack_or_nack_span("NACK", wrapped(*args, **kwargs))

def wrapper_ack(wrapped, instance, args, kwargs):
return common_ack_or_nack_span("ACK", wrapped(*args, **kwargs))

wrapt.wrap_function_wrapper(Listener, "on_message", wrapper_on_message)
wrapt.wrap_function_wrapper(Listener, "_create_new_worker_executor", wrapper_create_new_worker_executor)
wrapt.wrap_function_wrapper(StompConnection11, "nack", wrapper_nack)
wrapt.wrap_function_wrapper(StompConnection11, "ack", wrapper_ack)

@staticmethod
def uninstrument():
unwrap(Listener, "on_message")
unwrap(Listener, "_create_new_worker_executor")
unwrap(StompConnection11, "nack")
unwrap(StompConnection11, "ack")
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import logging

import wrapt

from django_stomp.services.producer import Publisher
from opentelemetry import propagate
from opentelemetry import trace
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.sdk.trace import Tracer
from opentelemetry.trace import SpanKind

from opentelemetry_instrumentation_django_stomp.utils.shared_types import CallbackHookT
from opentelemetry_instrumentation_django_stomp.utils.span import get_span

_logger = logging.getLogger(__name__)


class PublisherInstrument:
@staticmethod
def instrument(tracer: Tracer, callback_hook: CallbackHookT = None):
"""Instrumentor to create span and instrument publisher"""

def wrapper_publisher(wrapped, instance, args, kwargs):
headers = args[0].get("headers")
body = args[0].get("body")
MatheusGeiger marked this conversation as resolved.
Show resolved Hide resolved

span = get_span(
tracer=tracer,
destination=headers.get("tshoot-destination"),
span_kind=SpanKind.PRODUCER,
headers=headers,
body=body,
span_name="PUBLISHER",
)

with trace.use_span(span, end_on_exit=True):
if span.is_recording():
propagate.inject(headers)
if callback_hook:
try:
callback_hook(span, body, headers)
except Exception as hook_exception: # pylint: disable=W0703
_logger.exception(hook_exception)
return wrapped(*args, **kwargs)

wrapt.wrap_function_wrapper(Publisher, "_send_to_broker_without_retry_attempts", wrapper_publisher)
wrapt.wrap_function_wrapper(Publisher, "_send_to_broker", wrapper_publisher)

@staticmethod
def uninstrument():
"""Uninstrument publisher functions from django-stomp"""
unwrap(Publisher, "_send_to_broker_without_retry_attempts")
unwrap(Publisher, "_send_to_broker")
3 changes: 3 additions & 0 deletions opentelemetry_instrumentation_django_stomp/package.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from typing import Collection

_instruments: Collection[str] = ("django-stomp >= 5.0.0",)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import typing

from opentelemetry.propagators.textmap import CarrierT
from opentelemetry.propagators.textmap import Getter


class DjangoStompGetter(Getter[CarrierT]):
"""Propagators class to get trace-parent header from a message from messaging broker"""

def get(self, carrier: CarrierT, key: str) -> typing.Optional[typing.List[str]]:
value = carrier.get(key, None)
return [value] if value is not None else None

def keys(self, carrier: CarrierT) -> typing.List[str]:
return []
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import typing

from opentelemetry.trace.span import Span

CallbackHookT = typing.Optional[typing.Callable[[Span, typing.Dict, typing.Dict], None]]
62 changes: 62 additions & 0 deletions opentelemetry_instrumentation_django_stomp/utils/span.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import json
import sys
import typing

from django.conf import settings
from opentelemetry.sdk.trace import Span, Tracer
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind


def enrich_span_with_host_data(span: Span):
"""Helper function add broker SpanAttributes"""
system = getattr(settings, "STOMP_SYSTEM", None) or "rabbitmq"
attributes = {
SpanAttributes.NET_PEER_NAME: settings.STOMP_SERVER_HOST,
SpanAttributes.NET_PEER_PORT: settings.STOMP_SERVER_PORT,
SpanAttributes.MESSAGING_SYSTEM: system,
}
span.set_attributes(attributes)


def enrich_span(
span: Span,
operation: typing.Optional[str],
destination: str,
headers: typing.Dict,
body: typing.Dict,
) -> None:
"""Helper function add SpanAttributes"""
attributes = {
SpanAttributes.MESSAGING_DESTINATION: destination,
SpanAttributes.MESSAGING_CONVERSATION_ID: str(headers.get("correlation-id")),
SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: sys.getsizeof(
json.dumps(body)
),
}
if operation is not None:
attributes.update({SpanAttributes.MESSAGING_OPERATION: operation})
span.set_attributes(attributes)
enrich_span_with_host_data(span)


def get_span(
tracer: Tracer,
destination: str,
span_kind: SpanKind,
headers: typing.Dict,
body: typing.Dict,
span_name: str,
operation: typing.Optional[str] = None,
) -> Span:
"""Helper function to mount span and call function to set SpanAttributes"""
span = tracer.start_span(name=span_name, kind=span_kind)
if span.is_recording():
enrich_span(
span=span,
operation=operation,
destination=destination,
headers=headers,
body=body,
)
return span
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import typing

from concurrent.futures import ThreadPoolExecutor

from opentelemetry import context as otel_context
from opentelemetry.sdk.trace import Tracer


def with_otel_context(context: otel_context.Context, fn: typing.Callable):
otel_context.attach(context)
return fn()


class TracedThreadPoolExecutor(ThreadPoolExecutor):
"""Implementation of :class:`ThreadPoolExecutor` that will pass context into sub tasks."""

def __init__(self, tracer: Tracer, *args, **kwargs):
self.tracer = tracer
super().__init__(*args, **kwargs)

def submit(self, fn, *args, **kwargs):
"""Submit a new task to the thread pool."""
context = otel_context.get_current()
if context:
return super().submit(
lambda: with_otel_context(context, lambda: fn(*args, **kwargs)),
)
else:
return super().submit(lambda: fn(*args, **kwargs))
1 change: 1 addition & 0 deletions opentelemetry_instrumentation_django_stomp/version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.0.1"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change the version, we should update it here also.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. The idea is update to the correct version of PR from release/first-version to main branch.

13 changes: 13 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[pytest]
trio_mode = true
env =
D:STOMP_SERVER_HOST=broker-rabbitmq
D:STOMP_SERVER_PORT=61613
D:STOMP_SERVER_USER=
D:STOMP_SERVER_PASSWORD=
D:STOMP_USE_SSL=False
D:STOMP_LISTENER_CLIENT_ID=your-app-name
D:STOMP_CORRELATION_ID_REQUIRED=True
D:STOMP_PROCESS_MSG_ON_BACKGROUND=False
D:STOMP_OUTGOING_HEARTBEAT=0
D:STOMP_INCOMING_HEARTBEAT=0
Empty file added tests/__init__.py
Empty file.
Loading