Skip to content

Commit

Permalink
Merge pull request #1127 from newrelic/add-config-option
Browse files Browse the repository at this point in the history
Add NEW_RELIC_K8S_OPERATOR_ENABLED
  • Loading branch information
hmstepanek authored Apr 19, 2024
2 parents 3b46fad + fc7381a commit 185f6f9
Show file tree
Hide file tree
Showing 11 changed files with 435 additions and 118 deletions.
3 changes: 2 additions & 1 deletion newrelic/bootstrap/sitecustomize.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ def log_message(text, *args, **kwargs):

log_message("python_prefix_matches = %r", python_prefix_matches)
log_message("python_version_matches = %r", python_version_matches)
k8s_operator_enabled = os.environ.get("NEW_RELIC_K8S_OPERATOR_ENABLED", False)

if python_prefix_matches and python_version_matches:
if k8s_operator_enabled or (python_prefix_matches and python_version_matches):
# We also need to skip agent initialisation if neither the license
# key or config file environment variables are set. We do this as
# some people like to use a common startup script which always uses
Expand Down
16 changes: 1 addition & 15 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ def _process_configuration(section):
_process_setting(section, "ai_monitoring.enabled", "getboolean", None)
_process_setting(section, "ai_monitoring.record_content.enabled", "getboolean", None)
_process_setting(section, "ai_monitoring.streaming.enabled", "getboolean", None)
_process_setting(section, "k8s_operator.enabled", "getboolean", None)
_process_setting(section, "package_reporting.enabled", "getboolean", None)


Expand Down Expand Up @@ -4362,26 +4363,11 @@ def _process_module_builtin_defaults():
"instrument_celery_worker",
)

_process_module_definition(
"celery.execute.trace",
"newrelic.hooks.application_celery",
"instrument_celery_execute_trace",
)
_process_module_definition(
"celery.task.trace",
"newrelic.hooks.application_celery",
"instrument_celery_execute_trace",
)
_process_module_definition(
"celery.app.base",
"newrelic.hooks.application_celery",
"instrument_celery_app_base",
)
_process_module_definition(
"celery.app.trace",
"newrelic.hooks.application_celery",
"instrument_celery_execute_trace",
)
_process_module_definition("billiard.pool", "newrelic.hooks.application_celery", "instrument_billiard_pool")

_process_module_definition("flup.server.cgi", "newrelic.hooks.adapter_flup", "instrument_flup_server_cgi")
Expand Down
10 changes: 9 additions & 1 deletion newrelic/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ class AIMonitoringRecordContentSettings(Settings):
pass


class K8sOperatorSettings(Settings):
pass


class PackageReportingSettings(Settings):
pass

Expand Down Expand Up @@ -430,6 +434,7 @@ class EventHarvestConfigHarvestLimitSettings(Settings):
_settings.ai_monitoring = AIMonitoringSettings()
_settings.ai_monitoring.streaming = AIMonitoringStreamingSettings()
_settings.ai_monitoring.record_content = AIMonitoringRecordContentSettings()
_settings.k8s_operator = K8sOperatorSettings()
_settings.package_reporting = PackageReportingSettings()
_settings.attributes = AttributesSettings()
_settings.browser_monitoring = BrowserMonitorSettings()
Expand Down Expand Up @@ -745,7 +750,9 @@ def default_otlp_host(host):
_settings.gc_runtime_metrics.enabled = False
_settings.gc_runtime_metrics.top_object_count_limit = 5

_settings.memory_runtime_pid_metrics.enabled = _environ_as_bool("NEW_RELIC_MEMORY_RUNTIME_METRICS_ENABLED", default=True)
_settings.memory_runtime_pid_metrics.enabled = _environ_as_bool(
"NEW_RELIC_MEMORY_RUNTIME_METRICS_ENABLED", default=True
)

_settings.transaction_events.enabled = True
_settings.transaction_events.attributes.enabled = True
Expand Down Expand Up @@ -953,6 +960,7 @@ def default_otlp_host(host):
"NEW_RELIC_AI_MONITORING_RECORD_CONTENT_ENABLED", default=True
)
_settings.ai_monitoring._llm_token_count_callback = None
_settings.k8s_operator.enabled = _environ_as_bool("NEW_RELIC_K8S_OPERATOR_ENABLED", default=False)
_settings.package_reporting.enabled = _environ_as_bool("NEW_RELIC_PACKAGE_REPORTING_ENABLED", default=True)
_settings.ml_insights_events.enabled = _environ_as_bool("NEW_RELIC_ML_INSIGHTS_EVENTS_ENABLED", default=False)

Expand Down
110 changes: 49 additions & 61 deletions newrelic/hooks/application_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,43 +28,45 @@
from newrelic.api.message_trace import MessageTrace
from newrelic.api.pre_function import wrap_pre_function
from newrelic.api.transaction import current_transaction
from newrelic.common.object_names import callable_name
from newrelic.common.object_wrapper import FunctionWrapper, wrap_function_wrapper
from newrelic.core.agent import shutdown_agent

UNKNOWN_TASK_NAME = "<Unknown Task>"
MAPPING_TASK_NAMES = {"celery.starmap", "celery.map"}

def CeleryTaskWrapper(wrapped, application=None, name=None):
def wrapper(wrapped, instance, args, kwargs):
transaction = current_transaction(active_only=False)

if callable(name):
# Start Hotfix v2.2.1.
# if instance and inspect.ismethod(wrapped):
# _name = name(instance, *args, **kwargs)
# else:
# _name = name(*args, **kwargs)
def task_name(*args, **kwargs):
# Grab the current task, which can be located in either place
if args:
task = args[0]
elif "task" in kwargs:
task = kwargs["task"]
else:
return UNKNOWN_TASK_NAME # Failsafe

if instance is not None:
_name = name(instance, *args, **kwargs)
else:
_name = name(*args, **kwargs)
# End Hotfix v2.2.1.
# Task can be either a task instance or a signature, which subclasses dict, or an actual dict in some cases.
task_name = getattr(task, "name", None) or task.get("task", UNKNOWN_TASK_NAME)

elif name is None:
_name = callable_name(wrapped)
# Under mapping tasks, the root task name isn't descriptive enough so we append the
# subtask name to differentiate between different mapping tasks
if task_name in MAPPING_TASK_NAMES:
try:
subtask = kwargs["task"]["task"]
task_name = "/".join((task_name, subtask))
except Exception:
pass

else:
_name = name
return task_name

# Helper for obtaining the appropriate application object. If
# has an activate() method assume it is a valid application
# object. Don't check by type so se can easily mock it for
# testing if need be.

def _application():
if hasattr(application, "activate"):
return application
return application_instance(application)
def CeleryTaskWrapper(wrapped):
def wrapper(wrapped, instance, args, kwargs):
transaction = current_transaction(active_only=False)

if instance is not None:
_name = task_name(instance, *args, **kwargs)
else:
_name = task_name(*args, **kwargs)

# A Celery Task can be called either outside of a transaction, or
# within the context of an existing transaction. There are 3
Expand Down Expand Up @@ -95,13 +97,14 @@ def _application():
return wrapped(*args, **kwargs)

else:
with BackgroundTask(_application(), _name, "Celery", source=instance) as transaction:
with BackgroundTask(application_instance(), _name, "Celery", source=instance) as transaction:
# Attempt to grab distributed tracing headers
try:
# Headers on earlier versions of Celery may end up as attributes
# on the request context instead of as custom headers. Handler this
# by defaulting to using vars() if headers is not available
headers = getattr(wrapped.request, "headers", None) or vars(wrapped.request)
request = instance.request
headers = getattr(request, "headers", None) or vars(request)

settings = transaction.settings
if headers is not None and settings is not None:
Expand All @@ -128,20 +131,30 @@ def _application():
# instrumentation via FunctionWrapper() relies on __call__ being called which
# in turn executes the wrapper() function defined above. Since the micro
# optimization bypasses __call__ method it breaks our instrumentation of
# celery. To circumvent this problem, we added a run() attribute to our
# celery.
#
# For versions of celery 2.5.3 to 2.5.5+
# Celery has included a monkey-patching provision which did not perform this
# optimization on functions that were monkey-patched. Unfortunately, our
# wrappers are too transparent for celery to detect that they've even been
# monky-patched. To circumvent this, we set the __module__ of our wrapped task
# to this file which causes celery to properly detect that it has been patched.
#
# For versions of celery 2.5.3 to 2.5.5
# To circumvent this problem, we added a run() attribute to our
# FunctionWrapper which points to our __call__ method. This causes Celery
# to execute our __call__ method which in turn applies the wrapper
# correctly before executing the task.
#
# This is only a problem in Celery versions 2.5.3 to 2.5.5. The later
# versions included a monkey-patching provision which did not perform this
# optimization on functions that were monkey-patched.

class TaskWrapper(FunctionWrapper):
def run(self, *args, **kwargs):
return self.__call__(*args, **kwargs)

return TaskWrapper(wrapped, wrapper)
wrapped_task = TaskWrapper(wrapped, wrapper)
# Reset __module__ to be less transparent so celery detects our monkey-patching
wrapped_task.__module__ = CeleryTaskWrapper.__module__

return wrapped_task


def instrument_celery_app_task(module):
Expand All @@ -162,11 +175,8 @@ def instrument_celery_app_task(module):
# the task doesn't pass through it. For Celery 2.5+ need to wrap
# the tracer instead.

def task_name(task, *args, **kwargs):
return task.name

if module.BaseTask.__module__ == module.__name__:
module.BaseTask.__call__ = CeleryTaskWrapper(module.BaseTask.__call__, name=task_name)
module.BaseTask.__call__ = CeleryTaskWrapper(module.BaseTask.__call__)


def wrap_Celery_send_task(wrapped, instance, args, kwargs):
Expand Down Expand Up @@ -195,28 +205,6 @@ def instrument_celery_app_base(module):
wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task)


def instrument_celery_execute_trace(module):
# Triggered for 'celery.execute_trace'.

if hasattr(module, "build_tracer"):
# Need to add a wrapper for background task entry point.

# In Celery 2.5+ we need to wrap the task when tracer is being
# created. Note that in Celery 2.5 the 'build_tracer' function
# actually resided in the module 'celery.execute.task'. In
# Celery 3.0 the 'build_tracer' function moved to
# 'celery.task.trace'.

_build_tracer = module.build_tracer

def build_tracer(name, task, *args, **kwargs):
task = task or module.tasks[name]
task = CeleryTaskWrapper(task, name=name)
return _build_tracer(name, task, *args, **kwargs)

module.build_tracer = build_tracer


def instrument_celery_worker(module):
# Triggered for 'celery.worker' and 'celery.concurrency.processes'.

Expand Down
7 changes: 6 additions & 1 deletion tests/application_celery/_target_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from celery import Celery
from celery import Celery, shared_task
from testing_support.validators.validate_distributed_trace_accepted import (
validate_distributed_trace_accepted,
)
Expand Down Expand Up @@ -44,6 +44,11 @@ def nested_add(x, y):
return add(x, y)


@shared_task
def shared_task_add(x, y):
return x + y


@app.task
@validate_distributed_trace_accepted(transport_type="AMQP")
def assert_dt():
Expand Down
21 changes: 21 additions & 0 deletions tests/application_celery/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from testing_support.fixtures import ( # noqa: F401; pylint: disable=W0611
collector_agent_registration_fixture,
collector_available_fixture,
Expand All @@ -27,3 +28,23 @@
collector_agent_registration = collector_agent_registration_fixture(
app_name="Python Agent Test (application_celery)", default_settings=_default_settings
)


@pytest.fixture(scope="session")
def celery_config():
# Used by celery pytest plugin to configure Celery instance
return {
"broker_url": "memory://",
"result_backend": "cache+memory://",
}


@pytest.fixture(scope="session")
def celery_worker_parameters():
# Used by celery pytest plugin to configure worker instance
return {"shutdown_timeout": 120}


@pytest.fixture(scope="session", autouse=True)
def celery_worker_available(celery_session_worker):
yield celery_session_worker
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from _target_application import add, nested_add, tsum
from _target_application import add, nested_add, shared_task_add, tsum
from testing_support.validators.validate_code_level_metrics import (
validate_code_level_metrics,
)
Expand All @@ -28,7 +28,7 @@


@validate_transaction_metrics(
name="test_celery:test_celery_task_as_function_trace",
name="test_application:test_celery_task_as_function_trace",
scoped_metrics=[("Function/_target_application.add", 1)],
background_task=True,
)
Expand Down Expand Up @@ -58,7 +58,7 @@ def test_celery_task_as_background_task():


@validate_transaction_metrics(
name="test_celery:test_celery_tasks_multiple_function_traces",
name="test_application:test_celery_tasks_multiple_function_traces",
scoped_metrics=[("Function/_target_application.add", 1), ("Function/_target_application.tsum", 1)],
background_task=True,
)
Expand Down Expand Up @@ -90,7 +90,7 @@ def test_celery_tasks_ignore_transaction():


@validate_transaction_metrics(
name="test_celery:test_celery_tasks_end_transaction",
name="test_application:test_celery_tasks_end_transaction",
scoped_metrics=[("Function/_target_application.add", 1)],
background_task=True,
)
Expand Down Expand Up @@ -126,3 +126,18 @@ def test_celery_nested_tasks():

add_result = nested_add(1, 2)
assert add_result == 3


@validate_transaction_metrics(
name="_target_application.shared_task_add", group="Celery", scoped_metrics=[], background_task=True
)
@validate_code_level_metrics("_target_application", "shared_task_add")
def test_celery_shared_task_as_background_task():
"""
Calling shared_task_add() outside of a transaction means the agent will create
a background transaction (with a group of 'Celery') and record shared_task_add()
as a background task.
"""
result = shared_task_add(3, 4)
assert result == 7
Loading

0 comments on commit 185f6f9

Please sign in to comment.