Skip to content

Commit

Permalink
fix: avoid infinite recursion of openedx-event (#312)
Browse files Browse the repository at this point in the history
Prevent event bus recursion: #79
  • Loading branch information
Ian2012 authored Feb 12, 2024
1 parent 33cd7d5 commit 3211a4f
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 6 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ Change Log
Unreleased
----------

[9.5.1] - 2024-02-12
--------------------
Changed
~~~~~~~
* Fixed recursion error when consuming events on the same service that produced them.

[9.5.0] - 2024-02-07
--------------------
Added
Expand Down
2 changes: 1 addition & 1 deletion openedx_events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
more information about the project.
"""

__version__ = "9.5.0"
__version__ = "9.5.1"
12 changes: 11 additions & 1 deletion openedx_events/apps.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""
openedx_events Django application initialization.
"""
import logging

from django.apps import AppConfig
from django.conf import settings

from openedx_events.event_bus import get_producer
from openedx_events.exceptions import ProducerConfigurationError
from openedx_events.tooling import OpenEdxPublicSignal, load_all_signals
from openedx_events.tooling import SIGNAL_PROCESSED_FROM_EVENT_BUS, OpenEdxPublicSignal, load_all_signals

logger = logging.getLogger(__name__)


def general_signal_handler(sender, signal, **kwargs): # pylint: disable=unused-argument
Expand All @@ -20,6 +23,13 @@ def general_signal_handler(sender, signal, **kwargs): # pylint: disable=unused-
# "topic_a": { "event_key_field": "my.key.field", "enabled": True },
# "topic_b": { "event_key_field": "my.key.field", "enabled": False }
# }"
if kwargs.get(SIGNAL_PROCESSED_FROM_EVENT_BUS) is True:
logger.debug(
"Declining to send signal to the Event Bus since that's "
f"where it was sent from: {signal.event_type} (preventing recursion)"
)
return

event_data = {key: kwargs.get(key) for key in signal.init_data}

for topic in event_type_producer_configs.keys():
Expand Down
23 changes: 23 additions & 0 deletions openedx_events/tests/test_producer_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,29 @@ def test_enabled_disabled_events(self, mock_producer):
call_args
)

@patch("openedx_events.apps.logger")
@patch('openedx_events.apps.get_producer')
def test_send_events_with_custom_metadata_not_replayed_by_handler(self, mock_producer, mock_logger):
"""
Check wheter XBLOCK_PUBLISHED is connected to the handler and the handler
do not send any events as the signal is marked "from_event_bus".
Args:
mock_producer: mock get_producer to inspect the arguments.
mock_logger: mock logger to inspect the arguments.
"""
mock_send = Mock()
mock_producer.return_value = mock_send
metadata = XBLOCK_PUBLISHED.generate_signal_metadata()

XBLOCK_PUBLISHED.send_event_with_custom_metadata(metadata, xblock_info=self.xblock_info)

mock_send.send.assert_not_called()
mock_logger.debug.assert_called_once_with(
"Declining to send signal to the Event Bus since that's "
f"where it was sent from: {XBLOCK_PUBLISHED.event_type} (preventing recursion)"
)

@patch('openedx_events.apps.get_producer')
@override_settings(EVENT_BUS_PRODUCER_CONFIG={})
def test_events_not_in_config(self, mock_producer):
Expand Down
6 changes: 4 additions & 2 deletions openedx_events/tests/test_tooling.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def test_send_event_allow_failure_successfully(self, send_mock, fake_metadata):
sender=None,
user=self.user_mock,
metadata=expected_metadata,
from_event_bus=False,
)

@patch("openedx_events.tooling.OpenEdxPublicSignal.generate_signal_metadata")
Expand All @@ -198,7 +199,8 @@ def test_send_robust_event_successfully(self, format_responses_mock, log_mock, f
self.public_signal.send_event(user=self.user_mock)

self.ok_receiver.assert_called_once_with(
signal=self.public_signal, sender=None, user=self.user_mock, metadata=expected_metadata
signal=self.public_signal, sender=None, user=self.user_mock, metadata=expected_metadata,
from_event_bus=False
)
# format_responses is mocked out because its output is
# complicated enough to warrant its own set of tests.
Expand Down Expand Up @@ -253,7 +255,7 @@ def test_send_event_with_custom_metadata(self, mock_send_event_with_metadata):

assert response == expected_response
mock_send_event_with_metadata.assert_called_once_with(
metadata=metadata, send_robust=True, foo="bar",
metadata=metadata, send_robust=True, foo="bar", from_event_bus=True
)

@ddt.data(
Expand Down
13 changes: 11 additions & 2 deletions openedx_events/tooling.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
"org.openedx.learning.course.notification.requested.v1",
]

SIGNAL_PROCESSED_FROM_EVENT_BUS = "from_event_bus"


class OpenEdxPublicSignal(Signal):
"""
Expand Down Expand Up @@ -115,7 +117,7 @@ def generate_signal_metadata(self, time=None):
time=time,
)

def _send_event_with_metadata(self, metadata, send_robust=True, **kwargs):
def _send_event_with_metadata(self, metadata, send_robust=True, from_event_bus=False, **kwargs):
"""
Send events to all connected receivers with the provided metadata.
Expand All @@ -124,6 +126,10 @@ def _send_event_with_metadata(self, metadata, send_robust=True, **kwargs):
Arguments:
metadata (EventsMetadata): The metadata to be sent with the signal.
send_robust (bool): Defaults to True. See Django signal docs.
from_event_bus (bool): Defaults to False. If True, the signal is
being sent from the event bus. This is used to prevent infinite
loops when the event bus is consuming events. It should not be
used when sending events from the application.
See ``send_event`` docstring for more details on its usage and behavior.
"""
Expand Down Expand Up @@ -163,6 +169,7 @@ def validate_sender():
validate_sender()

kwargs["metadata"] = metadata
kwargs[SIGNAL_PROCESSED_FROM_EVENT_BUS] = from_event_bus

if self._allow_send_event_failure or settings.DEBUG or not send_robust:
return super().send(sender=None, **kwargs)
Expand Down Expand Up @@ -234,7 +241,9 @@ def send_event_with_custom_metadata(
See ``send_event`` docstring for more details.
"""
return self._send_event_with_metadata(metadata=metadata, send_robust=send_robust, **kwargs)
return self._send_event_with_metadata(
metadata=metadata, send_robust=send_robust, from_event_bus=True, **kwargs
)

def send(self, sender, **kwargs): # pylint: disable=unused-argument
"""
Expand Down

0 comments on commit 3211a4f

Please sign in to comment.