Skip to content

Commit

Permalink
Refactor ModelObserver to prepare messages before transaction commit
Browse files Browse the repository at this point in the history
  • Loading branch information
tumblingman committed Nov 26, 2024
1 parent b4a8f70 commit 097dae2
Showing 1 changed file with 28 additions and 34 deletions.
62 changes: 28 additions & 34 deletions djangochannelsrestframework/observer/model_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,27 +103,27 @@ def post_delete_receiver(self, instance: Model, **kwargs):
self.database_event(instance, Action.DELETE)

def database_event(self, instance: Model, action: Action):
if action == Action.DELETE:
self.post_change_receiver(instance, action)
return
"""
Handles database events and prepares messages for sending on commit.
"""
messages = list(self.prepare_messages(instance, action))

connection = transaction.get_connection()

if connection.in_atomic_block:
if len(connection.savepoint_ids) > 0:
warnings.warn(
"Model observation with save points is unsupported and will"
" result in unexpected beauvoir.",
" result in unexpected behavior.",
UnsupportedWarning,
)

connection.on_commit(partial(self.post_change_receiver, instance, action))
connection.on_commit(partial(self.send_prepared_messages, messages))

def post_change_receiver(self, instance: Model, action: Action, **kwargs):
def prepare_messages(self, instance: Model, action: Action, **kwargs):
"""
Triggers the old_binding to possibly send to its group.
Prepares messages for sending based on the given action and instance.
"""

if action == Action.CREATE:
old_group_names = set()
else:
Expand All @@ -136,37 +136,31 @@ def post_change_receiver(self, instance: Model, action: Action, **kwargs):

self.get_observer_state(instance).current_groups = new_group_names

# if post delete, new_group_names should be []
yield from self.generate_messages(instance, old_group_names, new_group_names, action, **kwargs)

# Django DDP had used the ordering of DELETE, UPDATE then CREATE for good reasons.
self.send_messages(
instance, old_group_names - new_group_names, Action.DELETE, **kwargs
)
# the object has been updated so that its groups are not the same.
self.send_messages(
instance, old_group_names & new_group_names, Action.UPDATE, **kwargs
)

#
self.send_messages(
instance, new_group_names - old_group_names, Action.CREATE, **kwargs
)
def generate_messages(self, instance: Model, old_group_names: Set[str], new_group_names: Set[str], action: Action, **kwargs):
"""
Generates messages for the given group names and action.
"""
for group_name in old_group_names - new_group_names:
yield {**self.serialize(instance, Action.DELETE, **kwargs), "group": group_name}

def send_messages(
self, instance: Model, group_names: Set[str], action: Action, **kwargs
):
if not group_names:
return
message = self.serialize(instance, action, **kwargs)
channel_layer = get_channel_layer()
for group_name in old_group_names & new_group_names:
yield {**self.serialize(instance, Action.UPDATE, **kwargs), "group": group_name}

for group_name in group_names:
message_to_send = deepcopy(message)
for group_name in new_group_names - old_group_names:
yield {**self.serialize(instance, Action.CREATE, **kwargs), "group": group_name}

# Include the group name in the message being sent
message_to_send["group"] = group_name
def send_prepared_messages(self, messages):
"""
Sends prepared messages to the channel layer.
"""
if not messages:
return

async_to_sync(channel_layer.group_send)(group_name, message_to_send)
channel_layer = get_channel_layer()
for message in messages:
async_to_sync(channel_layer.group_send)(message["group"], deepcopy(message))

def group_names(self, *args, **kwargs):
# one channel for all updates.
Expand Down

0 comments on commit 097dae2

Please sign in to comment.