Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncHttpConsumer improvements #1255

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions channels/generic/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class AsyncHttpConsumer(AsyncConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.body = []
self.headers_sent = False
self.more_body = True

async def send_headers(self, *, status=200, headers=None):
"""
Expand All @@ -27,6 +29,8 @@ async def send_headers(self, *, status=200, headers=None):
elif isinstance(headers, dict):
headers = list(headers.items())

self.headers_sent = True

await self.send(
{"type": "http.response.start", "status": status, "headers": headers}
)
Expand All @@ -40,9 +44,12 @@ async def send_body(self, body, *, more_body=False):
the channel will be ignored.
"""
assert isinstance(body, bytes), "Body is not bytes"
await self.send(
{"type": "http.response.body", "body": body, "more_body": more_body}
)

if self.more_body:
self.more_body = more_body
await self.send(
{"type": "http.response.body", "body": body, "more_body": more_body}
)

async def send_response(self, status, body, **kwargs):
"""
Expand Down Expand Up @@ -70,6 +77,19 @@ async def disconnect(self):
"""
pass

async def close(self, body=b"", status=500, headers=None):
"""
Closes the HTTP response from the server end.
"""
if not self.more_body:
# HTTP Response is already closed, nothing to do.
return

if not self.headers_sent:
await self.send_headers(status=status, headers=headers)

await self.send_body(body)

async def http_request(self, message):
"""
Async entrypoint - concatenates body fragments and hands off control
Expand All @@ -80,10 +100,15 @@ async def http_request(self, message):
if not message.get("more_body"):
try:
await self.handle(b"".join(self.body))
finally:
await self.disconnect()
raise StopConsumer()
except StopConsumer:
await self.close(status=200)
except:
# TODO This is just a patch, after bubbling up the exception no body is calling http_disconnect.
await self.close(body=b"Internal Server Error", status=500)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wander if it would be nicer to have a exception_handler method on the AsyncHttpConsumer that sends this 500 down the wire.

In some cases we need to format this 500 response a little differently (eg, if we got a Redis connection exception we might well want to push some retry after headers on the response)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting... if you want to format it differently you can always catch the exception in your class and handle it in your code.
Honestly I'm not familiar enough with the code base, I think when any exception bubbles up the http connection should be closed and the consumer stopped, but that doesn't happen for some reason. So this is the fix that I found.

Copy link

@adamhooper adamhooper Sep 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw ... be sure to re-raise asyncio.CancelledError. If you don't re-raise it, asyncio produces undefined behavior.

For that matter ... shouldn't every exception be re-raised? I'm a keen user of asyncio.EventLoop.set_exception_handler() -- it helps find bugs on production. (I imagine this is why finally was used in the first place.)

(To be clear: I don't know much about Channels' innards. I'm just worried because changing catch-all behavior seems like it'll have far-reaching effects....)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree un-expected exceptions should not be swallowed so catching in self.handle and sending a response would not be ideal.

the reason for wanted to be able to manage the 500 response body is that for our server we want to return a JSON error bodys that is parsable. At the moment our solution for this is to put a middleware layer between the Consume and the top level Application that proxies all send messages however since this does not have access to the source exception it can't produce more than a very generic error.

@adamhooper The issue with the finally approach currently in master is that it replaces effectively swallows your exception, since finally raises StopConsumer any exception that was raised is then dropped by the AsyncConsume parent classes __call__ method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carltongibson what do we think about:set_exception_handler or CancelledError being used here? That/and/or having us create an exception_handler method to keep the flow more organized?

Are any of the pieces in this discussion required vs optional, or is the flow a bit beyond straightforward rule-logic?

Copy link
Member

@carltongibson carltongibson Mar 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought on this, when I looked at it, was that we're entering a loosing battle. (I catch an exception, try and call close, and end-up awaiting body — but hang-on! I was closing.)

Rather, for this kind of something I really wasn't expecting went wrong case, I think we should just let the exception bubble up to Daphne, where it will be handled with a plain response (and perhaps we can add some logging to help the related issues around this space...).

Then, assuming that all looks good, we need to make sure Daphne sends the appropriate http_disconnect to stop the consumer. (Which looks like it's not happening.)

(Of course, this isn't fresh in my mind so I may need to fire up the debugger again to get back on top of it.)

raise

# TODO This should be called by daphne whenever any exception bubbles up (As it does with websockets)
# IMHO this should be parallel to websocket_disconnect, removing the consumer from potential channel_layer groups.
async def http_disconnect(self, message):
"""
Let the user do their cleanup and close the consumer.
Expand Down
56 changes: 56 additions & 0 deletions tests/test_generic_http.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import asyncio
import json

import pytest

from channels.generic.http import AsyncHttpConsumer
from channels.testing import HttpCommunicator
from django.test import override_settings
from channels.layers import get_channel_layer


@pytest.mark.asyncio
Expand Down Expand Up @@ -32,3 +35,56 @@ async def handle(self, body):
assert response["body"] == b'{"value": 42}'
assert response["status"] == 200
assert response["headers"] == [(b"Content-Type", b"application/json")]


@pytest.mark.asyncio
async def test_async_http_consumer_with_channel_layer():
"""
Tests that AsyncHttpConsumer is implemented correctly.
"""

class TestConsumer(AsyncHttpConsumer):
"""
Abstract consumer that provides a method that handles running a command and getting a response on a
device.
"""

groups = ['test_group']

async def handle(self, body):
print("Latest Channel ID: ")
await self.send_headers(
status=200,
headers=[
(b"Content-Type", b"application/json"),
],
)

async def send_to_long_poll(self, event):
print("RunCommandConsumer: Event received on send to long poll.")
command_output = str(event['data']).encode('utf8')
await self.send_body(command_output, more_body=False)

channel_layers_setting = {
"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}
}
with override_settings(CHANNEL_LAYERS=channel_layers_setting):
# Open a connection
communicator = HttpCommunicator(
TestConsumer,
method="POST",
path="/test/",
body=json.dumps({"value": 42, "anything": False}).encode("utf-8"),
)

def send_to_channel_layer():
print("send to channel layer called")
channel_layer = get_channel_layer()
# Test that the websocket channel was added to the group on connect
message = {"type": "send.to.long.poll", "data": "hello"}
asyncio.ensure_future(channel_layer.group_send("chat", message))

asyncio.get_event_loop().call_later(3, send_to_channel_layer)
print("Making http requests.")
response = await communicator.get_response(timeout=10)
assert True