From eded86e6343f7546751970663ecf00068f6416b2 Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:00:58 +0700 Subject: [PATCH 01/15] Instrument controller.py Adding many logging points, and using the ContextLoggerAdapter to provide nicely-prefixed log entries. --- aiosmtpd/controller.py | 142 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 134 insertions(+), 8 deletions(-) diff --git a/aiosmtpd/controller.py b/aiosmtpd/controller.py index d3345b8c9..8daa810a6 100644 --- a/aiosmtpd/controller.py +++ b/aiosmtpd/controller.py @@ -3,12 +3,14 @@ import asyncio import errno +import logging import os import ssl import sys import threading import time from abc import ABCMeta, abstractmethod +from collections import deque from contextlib import ExitStack from pathlib import Path from socket import AF_INET6, SOCK_STREAM, create_connection, has_ipv6 @@ -19,7 +21,17 @@ from socket import AF_UNIX except ImportError: # pragma: on-not-win32 AF_UNIX = None -from typing import Any, Coroutine, Dict, Optional, Union +from typing import ( + Any, + Callable, + Coroutine, + Deque, + Dict, + MutableMapping, + Optional, + Tuple, + Union, +) if sys.version_info >= (3, 8): from typing import Literal # pragma: py-lt-38 @@ -32,10 +44,26 @@ from aiosmtpd.smtp import SMTP AsyncServer = asyncio.base_events.Server +ExceptionHandlerType = Callable[[asyncio.AbstractEventLoop, Dict[str, Any]], None] DEFAULT_READY_TIMEOUT: float = 5.0 +class ContextLoggerAdapter(logging.LoggerAdapter): + @property + def context(self): + return self.extra.get("context") + + def process( + self, msg: Any, kwargs: MutableMapping[str, Any] + ) -> Tuple[Any, MutableMapping[str, Any]]: + msg = f"[{self.context}] {msg}" if self.context else msg + return msg, kwargs + + +log = ContextLoggerAdapter(logging.getLogger("aiosmtpd.controller"), {}) + + @public class IP6_IS: # Apparently errno.E* constants adapts to the OS, so on Windows they will @@ -97,24 +125,84 @@ def _client_connected_cb(self, reader, writer): pass +@public +class ExceptionAccumulator: + """ + Provides a simple asyncio exception handler that only record unhandled exceptions + and not do anything else. + """ + __slots__ = ("accumulator", "peaked", "with_log") + + """Indicates if accumulator ever peaked (items appended > maxlen)""" + + def __init__(self, with_log: bool = True, maxlen: int = 20): + self.accumulator: Deque[Dict[str, str]] = deque(maxlen=maxlen) + self.peaked: bool = False + self.with_log: bool = with_log + + @property + def max_items(self): + return self.accumulator.maxlen + + @max_items.setter + def max_items(self, value): + if not isinstance(value, int) or value < 1: + raise ValueError("maxlen must be an int > 0") + accu = self.accumulator + if value == accu.maxlen: + return + self.accumulator = deque(accu, maxlen=value) + + def clear(self): + self.accumulator.clear() + self.peaked = False + + def __call__(self, loop, context): + msg = str(context.get("exception", context["message"])) + hnd = repr(context.get("handle")) + fut = repr(context.get("future")) + if self.with_log: + log.error("Caught exception %s", msg) + log.error(" Handle: %s", hnd) + log.error(" Future: %s", fut) + accu = self.accumulator + if len(accu) == accu.maxlen: + self.peaked = True + accu.append(dict(msg=msg, hnd=hnd, fut=fut)) + + +class BaseControllerMapping: + __slots__ = () + + def __get__(self, instance: "BaseController", owner): + return { + "context": instance.name, + } + + @public class BaseController(metaclass=ABCMeta): + _mapping = BaseControllerMapping() + smtpd = None server: Optional[AsyncServer] = None server_coro: Optional[Coroutine] = None - _factory_invoked: threading.Event = None def __init__( self, handler: Any, loop: asyncio.AbstractEventLoop = None, *, + name: Optional[str] = None, ssl_context: Optional[ssl.SSLContext] = None, # SMTP parameters server_hostname: Optional[str] = None, **SMTP_parameters, ): self.handler = handler + handler_name = getattr(handler, "Name", type(handler).__name__) + self.name = name or f"Controller({handler_name})" + log.extra = self._mapping if loop is None: self.loop = asyncio.new_event_loop() else: @@ -137,7 +225,8 @@ def __init__( # discussed in the docs. self.SMTP_kwargs.setdefault("enable_SMTPUTF8", True) # - self._factory_invoked = threading.Event() + self._factory_invoked: threading.Event = threading.Event() + self._cancel_done: threading.Event = threading.Event() def factory(self): """Subclasses can override this to customize the handler/server creation.""" @@ -148,7 +237,7 @@ def _factory_invoker(self): try: self.smtpd = self.factory() if self.smtpd is None: - raise RuntimeError("factory() returned None") + raise RuntimeError(f"[{self.name}] factory() returned None") return self.smtpd except Exception as err: self._thread_exception = err @@ -177,6 +266,8 @@ def cancel_tasks(self, stop_loop: bool = True): Convenience method to stop the loop and cancel all tasks. Use loop.call_soon_threadsafe() to invoke this. """ + self._cancel_done.clear() + log.info("cancel_tasks(stop_loop=%s)", stop_loop) if stop_loop: # pragma: nobranch self.loop.stop() try: @@ -186,10 +277,14 @@ def cancel_tasks(self, stop_loop: bool = True): for task in _all_tasks(self.loop): # This needs to be invoked in a thread-safe way task.cancel() + time.sleep(0.1) + self._cancel_done.set() @public class BaseThreadedController(BaseController, metaclass=ABCMeta): + DefaultExceptionHandler: ExceptionHandlerType = ExceptionAccumulator() + _thread: Optional[threading.Thread] = None _thread_exception: Optional[Exception] = None @@ -223,7 +318,7 @@ class (it's lazy initialization, done only on initial connection). """ raise NotImplementedError - def _run(self, ready_event: threading.Event): + def _run(self, ready_event: threading.Event) -> None: asyncio.set_event_loop(self.loop) try: # Need to do two-step assignments here to ensure IDEs can properly @@ -244,6 +339,10 @@ def _run(self, ready_event: threading.Event): self._thread_exception = error return self.loop.call_soon(ready_event.set) + if ( # pragma: nobranch + self.loop.get_exception_handler() is None and self.DefaultExceptionHandler + ): + self.loop.set_exception_handler(self.DefaultExceptionHandler) self.loop.run_forever() # We reach this point when loop is ended (by external code) # Perform some stoppages to ensure endpoint no longer bound. @@ -252,19 +351,25 @@ def _run(self, ready_event: threading.Event): self.loop.close() self.server = None - def start(self): + def start(self, thread_name: Optional[str] = None): """ Start a thread and run the asyncio event loop in that thread """ - assert self._thread is None, "SMTP daemon already running" + if self._thread is not None: + raise RuntimeError("SMTP daemon already running") + log.info("Starting") self._factory_invoked.clear() + thread_name = thread_name or f"{self.name}-1" ready_event = threading.Event() - self._thread = threading.Thread(target=self._run, args=(ready_event,)) + self._thread = threading.Thread( + target=self._run, args=(ready_event,), name=thread_name + ) self._thread.daemon = True self._thread.start() # Wait a while until the server is responding. start = time.monotonic() + log.debug("Waiting for server to start listening") if not ready_event.wait(self.ready_timeout): # An exception within self._run will also result in ready_event not set # So, we first test for that, before raising TimeoutError @@ -272,6 +377,7 @@ def start(self): # See comment about WSL1.0 in the _run() method raise self._thread_exception else: + log.critical("Server timeout") raise TimeoutError( "SMTP server failed to start within allotted time. " "This might happen if the system is too busy. " @@ -290,29 +396,38 @@ def start(self): except Exception: # Raise other exceptions though raise + log.debug("Waiting for server to start serving") if not self._factory_invoked.wait(respond_timeout): + log.critical("Server response timeout") raise TimeoutError( "SMTP server started, but not responding within allotted time. " "This might happen if the system is too busy. " "Try increasing the `ready_timeout` parameter." ) if self._thread_exception is not None: + log.exception( + "The following exception happened:", exc_info=self._thread_exception + ) raise self._thread_exception # Defensive if self.smtpd is None: raise RuntimeError("Unknown Error, failed to init SMTP server") + log.info("Started successfully") def stop(self, no_assert: bool = False): """ Stop the loop, the tasks in the loop, and terminate the thread as well. """ assert no_assert or self._thread is not None, "SMTP daemon not running" + log.info("Stopping") self.loop.call_soon_threadsafe(self.cancel_tasks) if self._thread is not None: + log.debug("Waiting to join thread...") self._thread.join() self._thread = None self._cleanup() + log.info("Stopped successfully") @public @@ -322,6 +437,7 @@ def __init__( handler: Any, loop: asyncio.AbstractEventLoop = None, *, + name: str = None, ssl_context: Optional[ssl.SSLContext] = None, # SMTP parameters server_hostname: Optional[str] = None, @@ -330,6 +446,7 @@ def __init__( super().__init__( handler, loop, + name=name, ssl_context=ssl_context, server_hostname=server_hostname, **SMTP_parameters, @@ -341,6 +458,7 @@ def begin(self): Sets up the asyncio server task and inject it into the asyncio event loop. Does NOT actually start the event loop itself. """ + log.info("Begins") asyncio.set_event_loop(self.loop) # Need to do two-step assignments here to ensure IDEs can properly # detect the types of the vars. Cannot use `assert isinstance`, because @@ -358,12 +476,14 @@ async def finalize(self): loop.run_until_complete() (if loop has stopped) """ self.ended.clear() + log.info("Finalizing") server = self.server server.close() await server.wait_closed() self.server_coro.close() self._cleanup() self.ended.set() + log.info("Finalized") def end(self): """ @@ -372,11 +492,13 @@ def end(self): if your loop is running in a different thread. You can afterwards .wait() on ended attribute (a threading.Event) to check for completion, if needed. """ + log.info("Ending") self.ended.clear() if self.loop.is_running(): self.loop.create_task(self.finalize()) else: self.loop.run_until_complete(self.finalize()) + log.info("Ended") @public @@ -404,6 +526,7 @@ def _create_server(self) -> Coroutine: Does NOT actually start the protocol object itself; _factory_invoker() is only called upon fist connection attempt. """ + log.debug("Creating listener on %s:%s", self.hostname, self.port) return self.loop.create_server( self._factory_invoker, host=self.hostname, @@ -420,6 +543,7 @@ def _trigger_server(self): # At this point, if self.hostname is Falsy, it most likely is "" (bind to all # addresses). In such case, it should be safe to connect to localhost) hostname = self.hostname or self._localhost + log.debug("Trying to trigger server on %s:%s", hostname, self.port) with ExitStack() as stk: s = stk.enter_context(create_connection((hostname, self.port), 1.0)) if self.ssl_context: @@ -449,6 +573,7 @@ def _create_server(self) -> Coroutine: Does NOT actually start the protocol object itself; _factory_invoker() is only called upon fist connection attempt. """ + log.debug("Creating listener on %s", self.unix_socket) return self.loop.create_unix_server( self._factory_invoker, path=self.unix_socket, @@ -461,6 +586,7 @@ def _trigger_server(self): Context if necessary, and read some data from it to ensure that factory() gets invoked. """ + log.debug("Trying to trigger server on %s", self.unix_socket) with ExitStack() as stk: s: makesock = stk.enter_context(makesock(AF_UNIX, SOCK_STREAM)) s.connect(self.unix_socket) From 625e2cd03ab5813ed843e16e61c5d609c19c189d Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:03:53 +0700 Subject: [PATCH 02/15] Better instrumentation for smtp.py Logs that need peer endpoint prefixed now use PeerPrefixAdapter. This greatly simplifies a lot of code lines, making them tidier, easier to read and maintain. --- aiosmtpd/smtp.py | 98 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 67 insertions(+), 31 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index 04a34979c..290c97e42 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -22,6 +22,7 @@ Dict, Iterable, List, + MutableMapping, NamedTuple, Optional, Tuple, @@ -36,6 +37,22 @@ from aiosmtpd.proxy_protocol import ProxyData, get_proxy +# region #### Logging System ########################################################## + +class PeerPrefixAdapter(logging.LoggerAdapter): + def process( + self, msg: Any, kwargs: MutableMapping[str, Any] + ) -> Tuple[Any, MutableMapping[str, Any]]: + peer = self.extra.get("peer") + return f"{peer!r} {msg}", kwargs + + +log = logging.getLogger("mail.log") +log_peer = PeerPrefixAdapter(log, {}) + +# endregion + + # region #### Custom Data Types ####################################################### class _Missing: @@ -72,7 +89,6 @@ class _DataState(enum.Enum): "__version__", ] # Will be added to by @public __ident__ = 'Python SMTP {}'.format(__version__) -log = logging.getLogger('mail.log') BOGUS_LIMIT = 5 @@ -147,8 +163,26 @@ def __repr__(self) -> str: return str(self) +class SessionMapping: + __slots__ = () + + def __get__(self, instance: "Session", owner) -> Dict[str, Any]: + return { + "peer": instance.peer, + "host_name": instance.host_name, + "extended_smtp": instance.extended_smtp, + "proxy_data": instance.proxy_data, + "authenticated": instance.authenticated + } + + @public class Session: + mapping = SessionMapping() + """Consider this as a pared-down vars()""" + # Do NOT set the __slots__ for this class; attribute add/del should be allowed + # for inter-hook coordination (if needed) + def __init__(self, loop): self.peer = None self.ssl = None @@ -189,6 +223,8 @@ def login_data(self, value): @public class Envelope: + # Do NOT set the __slots__ for this class; attribute add/del should be allowed + # for inter-hook coordination (if needed) def __init__(self): self.mail_from = None self.mail_options = [] @@ -509,12 +545,13 @@ def connection_made(self, transport): super().connection_made(transport) self.transport = transport log.info('Peer: %r', self.session.peer) + log_peer.extra = self.session.mapping # Process the client's requests. self._handler_coroutine = self.loop.create_task( self._handle_client()) def connection_lost(self, error): - log.info('%r connection lost', self.session.peer) + log_peer.info("connection lost") self._timeout_handle.cancel() # If STARTTLS was issued, then our transport is the SSL protocol # transport, and we need to close the original transport explicitly, @@ -528,7 +565,7 @@ def connection_lost(self, error): self.transport = None def eof_received(self): - log.info('%r EOF received', self.session.peer) + log_peer.info("EOF received") self._handler_coroutine.cancel() if self.session.ssl is not None: # If STARTTLS was issued, return False, because True has no effect @@ -545,7 +582,7 @@ def _reset_timeout(self, duration=None): ) def _timeout_cb(self): - log.info('%r connection timeout', self.session.peer) + log_peer.info("connection timeout") # Calling close() on the transport will trigger connection_lost(), # which gracefully closes the SSL transport if required and cleans @@ -574,7 +611,7 @@ async def push(self, status: AnyStr): response = status assert isinstance(response, bytes) self._writer.write(response + b"\r\n") - log.debug("%r << %r", self.session.peer, response) + log_peer.debug("<< %r", response) await self._writer.drain() async def handle_exception(self, error): @@ -582,28 +619,29 @@ async def handle_exception(self, error): status = await self.event_handler.handle_exception(error) return status else: - log.exception('%r SMTP session exception', self.session.peer) + log_peer.exception("SMTP session exception") status = '500 Error: ({}) {}'.format( error.__class__.__name__, str(error)) return status async def _handle_client(self): - log.info('%r handling connection', self.session.peer) + log_peer.info("handling connection") if self._proxy_timeout is not None: self._reset_timeout(self._proxy_timeout) - log.debug("%r waiting PROXY handshake", self.session.peer) + log_peer.debug("waiting PROXY handshake") self.session.proxy_data = await get_proxy(self._reader) if self.session.proxy_data: - log.info("%r valid PROXY handshake", self.session.peer) + log_peer.info("valid PROXY handshake") status = await self._call_handler_hook("PROXY", self.session.proxy_data) - log.debug("%r handle_PROXY returned %r", self.session.peer, status) + log_peer.debug("handle_PROXY returned %r", status) else: - log.warning("%r invalid PROXY handshake", self.session.peer) + log_peer.warning("invalid PROXY handshake") status = False if status is MISSING or not status: - log.info("%r rejected by handle_PROXY", self.session.peer) - self.transport.close() + log_peer.info("PROXY rejected") + if self.transport: # pragma: nobranch + self.transport.close() return self._reset_timeout() @@ -640,7 +678,7 @@ async def _handle_client(self): sanitized_log(log.debug, '_handle_client readline: %r', line) # XXX this rstrip may not completely preserve old behavior. line = line.rstrip(b'\r\n') - sanitized_log(log.info, '%r >> %r', self.session.peer, line) + sanitized_log(log_peer.info, ">> %r", line) if not line: await self.push('500 Error: bad syntax') continue @@ -698,9 +736,7 @@ async def _handle_client(self): if self._enforce_call_limit: budget = call_limit[command] if budget < 1: - log.warning( - "%r over limit for %s", self.session.peer, command - ) + log_peer.warning("over limit for %s", command) await self.push( f"421 4.7.0 {command} sent too many times" ) @@ -710,10 +746,10 @@ async def _handle_client(self): method = self._smtp_methods.get(command) if method is None: - log.warning("%r unrecognised: %s", self.session.peer, command) + log_peer.warning("unrecognised: %s", command) bogus_budget -= 1 if bogus_budget < 1: - log.warning("%r too many bogus commands", self.session.peer) + log_peer.warning("too many bogus commands") await self.push( "502 5.5.1 Too many unrecognized commands, goodbye." ) @@ -730,13 +766,11 @@ async def _handle_client(self): # The connection got reset during the DATA command. # XXX If handler method raises ConnectionResetError, we should # verify that it was actually self._reader that was reset. - log.info('%r Connection lost during _handle_client()', - self.session.peer) + log_peer.info("Connection lost during _handle_client()") self._writer.close() raise except ConnectionResetError: - log.info('%r Connection lost during _handle_client()', - self.session.peer) + log_peer.info("Connection lost during _handle_client()") self._writer.close() raise except Exception as error: @@ -745,8 +779,7 @@ async def _handle_client(self): status = await self.handle_exception(error) except Exception as inner_error: try: - log.exception('%r Exception in handle_exception()', - self.session.peer) + log_peer.exception("Exception in handle_exception()") status = '500 Error: ({}) {}'.format( inner_error.__class__.__name__, str(inner_error)) except Exception: @@ -758,6 +791,9 @@ async def _handle_client(self): else: await self.push(status) + log_peer.debug("exiting _handle_client()") + await asyncio.sleep(0) + async def check_helo_needed(self, helo: str = "HELO") -> bool: """ Check if HELO/EHLO is needed. @@ -995,23 +1031,23 @@ async def challenge_auth( # - https://tools.ietf.org/html/rfc4954#page-4 ΒΆ 5 # - https://tools.ietf.org/html/rfc4954#page-13 "continue-req" challenge = b"334 " + (b64encode(challenge) if encode_to_b64 else challenge) - log.debug("%r << challenge: %r", self.session.peer, challenge) + log_peer.debug("<< challenge: %r", challenge) await self.push(challenge) line = await self._reader.readline() if log_client_response: warn("AUTH interaction logging is enabled!") warn("Sensitive information might be leaked!") - log.debug("%r >> %r", self.session.peer, line) + log_peer.debug(">> %r", line) blob: bytes = line.strip() # '*' handling in accordance with RFC4954 if blob == b"*": - log.warning("%r aborted AUTH with '*'", self.session.peer) + log_peer.warning("aborted AUTH with '*'") await self.push("501 5.7.0 Auth aborted") return MISSING try: decoded_blob = b64decode(blob, validate=True) except binascii.Error: - log.debug("%r can't decode base64: %s", self.session.peer, blob) + log_peer.debug("can't decode base64: %r", blob) await self.push("501 5.5.2 Can't decode base64") return MISSING return decoded_blob @@ -1268,7 +1304,7 @@ async def smtp_MAIL(self, arg: str) -> None: self.envelope.mail_from = address self.envelope.mail_options.extend(mail_options) status = '250 OK' - log.info('%r sender: %s', self.session.peer, address) + log_peer.info("sender: %r", address) await self.push(status) @syntax('RCPT TO:
', extended=' [SP ]') @@ -1310,7 +1346,7 @@ async def smtp_RCPT(self, arg: str) -> None: self.envelope.rcpt_tos.append(address) self.envelope.rcpt_options.extend(rcpt_options) status = '250 OK' - log.info('%r recip: %s', self.session.peer, address) + log_peer.info("recip: %r", address) await self.push(status) @syntax('RSET') From 2802caad13be6e0856e3dbb3dd418a6c01e69433 Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:04:36 +0700 Subject: [PATCH 03/15] Better instrumentation for main.py --- aiosmtpd/main.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/aiosmtpd/main.py b/aiosmtpd/main.py index e978c608b..25036d96a 100644 --- a/aiosmtpd/main.py +++ b/aiosmtpd/main.py @@ -215,10 +215,19 @@ def parseargs(args=None): @public def main(args=None): + logging.basicConfig(level=logging.ERROR) + log = logging.getLogger("mail.log") + parser, args = parseargs(args=args) + if args.debug > 0: + log.setLevel(logging.INFO) + if args.debug > 1: + log.setLevel(logging.DEBUG) + log.info(f"Started with args = {args}") if args.setuid: # pragma: on-win32 if pwd is None: + log.critical("Cannot import pwd") print( 'Cannot import module "pwd"; try running with -n option.', file=sys.stderr, @@ -228,6 +237,7 @@ def main(args=None): try: os.setuid(nobody) except PermissionError: + log.critical("Cannot setuid 'nobody'") print( 'Cannot setuid "nobody"; try running with -n option.', file=sys.stderr ) @@ -237,6 +247,7 @@ def main(args=None): tls_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) tls_context.check_hostname = False tls_context.load_cert_chain(str(args.tlscert), str(args.tlskey)) + log.debug(f"TLS Certificate Chain: {args.tlscert} {args.tlskey}") else: tls_context = None @@ -249,14 +260,7 @@ def main(args=None): require_starttls=args.requiretls, ) - logging.basicConfig(level=logging.ERROR) - log = logging.getLogger("mail.log") loop = asyncio.get_event_loop() - - if args.debug > 0: - log.setLevel(logging.INFO) - if args.debug > 1: - log.setLevel(logging.DEBUG) if args.debug > 2: loop.set_debug(enabled=True) @@ -264,6 +268,7 @@ def main(args=None): smtps_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) smtps_context.check_hostname = False smtps_context.load_cert_chain(str(args.smtpscert), str(args.smtpskey)) + log.debug(f"SMTPS Certificate Chain: {args.tlscert} {args.tlskey}") else: smtps_context = None @@ -275,6 +280,7 @@ def main(args=None): ) server_loop = loop.run_until_complete(server) except RuntimeError: # pragma: nocover + log.exception("Exception happened:") raise log.debug(f"server_loop = {server_loop}") log.info("Server is listening on %s:%s", args.host, args.port) @@ -288,7 +294,7 @@ def main(args=None): try: loop.run_forever() except KeyboardInterrupt: - pass + log.info("KeyboardInterrupt signaled") server_loop.close() log.debug("Completed asyncio loop") loop.run_until_complete(server_loop.wait_closed()) From 71a281df1a09ee0476d1d19d5f7d44fefbc28399 Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:05:47 +0700 Subject: [PATCH 04/15] Implement logging to file for all tests --- aiosmtpd/proxy_protocol.py | 6 +++--- aiosmtpd/tests/conftest.py | 44 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/aiosmtpd/proxy_protocol.py b/aiosmtpd/proxy_protocol.py index 621098ce2..549186722 100644 --- a/aiosmtpd/proxy_protocol.py +++ b/aiosmtpd/proxy_protocol.py @@ -536,9 +536,9 @@ async def get_proxy(reader_func: AsyncReader) -> ProxyData: :param reader_func: Async function that implements the AsyncReader protocol. :return: Proxy Data """ - log.debug("Waiting for PROXY signature") - signature = await reader_func.readexactly(5) try: + log.debug("Waiting for PROXY signature") + signature = await reader_func.readexactly(5) if signature == b"PROXY": log.debug("PROXY version 1") return await _get_v1(reader_func, signature) @@ -548,4 +548,4 @@ async def get_proxy(reader_func: AsyncReader) -> ProxyData: else: return ProxyData(version=None).with_error("PROXY unrecognized signature") except Exception as e: - return ProxyData(version=None).with_error(f"PROXY exception: {str(e)}", False) + return ProxyData(version=None).with_error(f"PROXY exception: {e}", False) diff --git a/aiosmtpd/tests/conftest.py b/aiosmtpd/tests/conftest.py index d0a6cd36f..84a86f088 100644 --- a/aiosmtpd/tests/conftest.py +++ b/aiosmtpd/tests/conftest.py @@ -3,10 +3,13 @@ import asyncio import inspect +import logging +import os import socket import ssl from contextlib import suppress from functools import wraps +from pathlib import Path from smtplib import SMTP as SMTPClient from typing import Generator, NamedTuple, Optional, Type @@ -35,6 +38,39 @@ ] +# region #### Instrumentation (Logging) ############################################### + +if os.environ.get("AIOSMTPD_TEST_NOLOG") == "1": + log_handler = logging.NullHandler() +else: + log_path = Path(".").expanduser().absolute() + while not (log_path / "pyproject.toml").exists(): + log_path = log_path.parent + log_handler = logging.FileHandler(log_path / "aiosmtpd_test.log") + log_handler.setFormatter( + logging.Formatter("{name} {levelname} {message}", style="{") + ) +log_level = int(os.environ.get("AIOSMTPD_TEST_LOGLEVEL", logging.DEBUG)) +log_handler.setLevel(log_level) +# Attach to root logger +logging.getLogger().addHandler(log_handler) + +for logname in ( + "aiosmtpd", + "aiosmtpd.controller", + "aiosmtpd.tests", + "mail", + "mail.log", + "mail.debug", +): + _logger = logging.getLogger(logname) + _logger.propagate = True + _logger.setLevel(log_level) + +log = logging.getLogger("aiosmtpd.tests") + +# endregion + # region #### Aliases ################################################################# controller_data = pytest.mark.controller_data @@ -96,6 +132,14 @@ def cache_fqdn(session_mocker): # region #### Common Fixtures ######################################################### +@pytest.fixture(autouse=True) +def log_case(request: pytest.FixtureRequest): + node_id = request.node.nodeid + log.debug("Entering %s", node_id) + yield + log.debug("Exiting %s", node_id) + + @pytest.fixture def get_controller(request): """ From 9741e599d200250abe1dc00ce9583b92f2c527fe Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:06:33 +0700 Subject: [PATCH 05/15] Test controller.py changes --- aiosmtpd/tests/test_server.py | 46 +++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/aiosmtpd/tests/test_server.py b/aiosmtpd/tests/test_server.py index 41225dc3d..df15208c8 100644 --- a/aiosmtpd/tests/test_server.py +++ b/aiosmtpd/tests/test_server.py @@ -21,6 +21,7 @@ from aiosmtpd.controller import ( Controller, + ExceptionAccumulator, UnixSocketController, UnthreadedController, UnixSocketMixin, @@ -358,6 +359,13 @@ def test_stop_noassert(self): controller = Controller(Sink()) controller.stop(no_assert=True) + def test_start_already(self): + cont = Controller(Sink()) + cont._thread = True # Anything not None + with pytest.raises(RuntimeError, match="SMTP daemon already running"): + cont.start() + del cont + @pytest.mark.skipif(in_cygwin(), reason="Cygwin AF_UNIX is problematic") @pytest.mark.skipif(in_win32(), reason="Win32 does not yet fully implement AF_UNIX") @@ -612,3 +620,41 @@ def test_version(self): from aiosmtpd.smtp import __version__ as smtp_version assert smtp_version is init_version + + +class TestAccumulator: + def test_resize(self): + ea = ExceptionAccumulator(with_log=False, maxlen=5) + assert ea.max_items == 5 + for i in range(5): + ea(None, {"message": i}) + assert len(ea.accumulator) == 5 + ea.max_items = 3 + assert ea.max_items == 3 + assert len(ea.accumulator) == 3 + + def test_samesize(self): + SIZE = 5 + ea = ExceptionAccumulator(with_log=False, maxlen=SIZE) + accu1 = ea.accumulator + ea.max_items = SIZE + assert ea.accumulator is accu1 + + def test_peaked_and_cleared(self): + SIZE = 5 + ea = ExceptionAccumulator(with_log=False, maxlen=SIZE) + for i in range(SIZE * 2): + ea(None, {"message": i}) + assert len(ea.accumulator) == SIZE + assert ea.peaked + ea.clear() + assert len(ea.accumulator) == 0 + assert not ea.peaked + + @pytest.mark.parametrize("size", [-1, 0, 1.5, None, "a", [], ()]) + def test_invalidsize(self, size): + ea = ExceptionAccumulator(with_log=False) + defsize = ea.max_items + with pytest.raises(ValueError, match="maxlen must be an int > 0"): + ea.max_items = size + assert ea.max_items == defsize From bbd237aa29a8112a5df5285ccdf3db4d105dccba Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:07:08 +0700 Subject: [PATCH 06/15] Ignore log file generated by test suite --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 460753537..dccfea06d 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,4 @@ diffcov-*.html prof/ .pytype/ ~temp* +aiosmtpd_test.log From 746244392fd6db7214b9d876fc5f29b692f7c0cb Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:07:31 +0700 Subject: [PATCH 07/15] Pass through some env vars --- tox.ini | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tox.ini b/tox.ini index 17d246e4b..5000294be 100644 --- a/tox.ini +++ b/tox.ini @@ -46,6 +46,8 @@ setenv = pypy3: INTERP=pypy3 py: INTERP=py passenv = + ASYNCIO_CATCHUP_DELAY + AIOSMTPD_TEST_* PYTHON* TRAVIS CI From 1ff0aed08e8080b4e2e8fb41dca7b5efa4ba99b7 Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:09:32 +0700 Subject: [PATCH 08/15] Some smtp.py optimizations --- aiosmtpd/smtp.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index 290c97e42..000a49be7 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -56,6 +56,8 @@ def process( # region #### Custom Data Types ####################################################### class _Missing: + __slots__ = () + def __repr__(self): return "MISSING" @@ -120,7 +122,7 @@ class _DataState(enum.Enum): # endregion -@attr.s +@attr.s(slots=True) class AuthResult: """ Contains the result of authentication, to be returned to the smtp_AUTH method. @@ -762,17 +764,18 @@ async def _handle_client(self): # Received a valid command, reset the timer. self._reset_timeout() await method(arg) + await asyncio.sleep(0) except asyncio.CancelledError: # The connection got reset during the DATA command. # XXX If handler method raises ConnectionResetError, we should # verify that it was actually self._reader that was reset. log_peer.info("Connection lost during _handle_client()") self._writer.close() - raise + break except ConnectionResetError: log_peer.info("Connection lost during _handle_client()") self._writer.close() - raise + break except Exception as error: status = None try: From 69d30dda2906addafc0761e612c2c544a4bdc31e Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:52:50 +0700 Subject: [PATCH 09/15] Optimize and tidy up controller.py --- aiosmtpd/controller.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/aiosmtpd/controller.py b/aiosmtpd/controller.py index 8daa810a6..a34e203d4 100644 --- a/aiosmtpd/controller.py +++ b/aiosmtpd/controller.py @@ -13,9 +13,14 @@ from collections import deque from contextlib import ExitStack from pathlib import Path -from socket import AF_INET6, SOCK_STREAM, create_connection, has_ipv6 -from socket import socket as makesock -from socket import timeout as socket_timeout +from socket import ( + AF_INET6, + SOCK_STREAM, + create_connection, + has_ipv6, + socket as makesock, + timeout as socket_timeout, +) try: from socket import AF_UNIX @@ -37,6 +42,7 @@ from typing import Literal # pragma: py-lt-38 else: # pragma: py-ge-38 from typing_extensions import Literal + from warnings import warn from public import public @@ -112,6 +118,7 @@ class _FakeServer(asyncio.StreamReaderProtocol): Returned by _factory_invoker() in lieu of an SMTP instance in case factory() failed to instantiate an SMTP instance. """ + __slots__ = () # 'Finalize' this class def __init__(self, loop): # Imitate what SMTP does @@ -121,7 +128,9 @@ def __init__(self, loop): loop=loop, ) - def _client_connected_cb(self, reader, writer): + def _client_connected_cb( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: pass From 972b634abfdf509013473e1c65daa0cabbe2d2bf Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:54:04 +0700 Subject: [PATCH 10/15] Tidy up mocked transport Originally this resulted in an ugly repr-ish "peer" --- aiosmtpd/tests/test_proxyprotocol.py | 1 + 1 file changed, 1 insertion(+) diff --git a/aiosmtpd/tests/test_proxyprotocol.py b/aiosmtpd/tests/test_proxyprotocol.py index bf7f939c3..a23577560 100644 --- a/aiosmtpd/tests/test_proxyprotocol.py +++ b/aiosmtpd/tests/test_proxyprotocol.py @@ -117,6 +117,7 @@ def setup_proxy_protocol(mocker: MockFixture, temp_event_loop): proxy_timeout = 1.0 responses = [] transport = mocker.Mock() + transport.get_extra_info.return_value = "" transport.write = responses.append handler = ProxyPeekerHandler() loop = temp_event_loop From 9ac5ca151002c11186d4a2debcfa7e9e6fcb8e21 Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:54:30 +0700 Subject: [PATCH 11/15] Some blackification and removal of dead code --- aiosmtpd/tests/test_server.py | 8 ++------ aiosmtpd/tests/test_smtp.py | 5 ----- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/aiosmtpd/tests/test_server.py b/aiosmtpd/tests/test_server.py index df15208c8..47b726104 100644 --- a/aiosmtpd/tests/test_server.py +++ b/aiosmtpd/tests/test_server.py @@ -452,9 +452,7 @@ def test_unixsocket(self, safe_socket_dir, autostop_loop, runner): with pytest.raises((socket.timeout, ConnectionError)): assert_smtp_socket(cont) - @pytest.mark.filterwarnings( - "ignore::pytest.PytestUnraisableExceptionWarning" - ) + @pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning") def test_inet_loopstop(self, autostop_loop, runner): """ Verify behavior when the loop is stopped before controller is stopped @@ -490,9 +488,7 @@ def test_inet_loopstop(self, autostop_loop, runner): with pytest.raises((socket.timeout, ConnectionError)): SMTPClient(cont.hostname, cont.port, timeout=0.1) - @pytest.mark.filterwarnings( - "ignore::pytest.PytestUnraisableExceptionWarning" - ) + @pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning") def test_inet_contstop(self, temp_event_loop, runner): """ Verify behavior when the controller is stopped before loop is stopped diff --git a/aiosmtpd/tests/test_smtp.py b/aiosmtpd/tests/test_smtp.py index 6fd8bfb8c..8b9ebce06 100644 --- a/aiosmtpd/tests/test_smtp.py +++ b/aiosmtpd/tests/test_smtp.py @@ -52,11 +52,6 @@ MAIL_LOG = logging.getLogger("mail.log") MAIL_LOG.setLevel(logging.DEBUG) -# fh = logging.FileHandler("~smtp.log") -# fh.setFormatter(logging.Formatter("{asctime} - {levelname} - {message}", style="{")) -# fh.setLevel(logging.DEBUG) -# MAIL_LOG.addHandler(fh) - # region #### Test Helpers ############################################################ From 49f20a268d47d079e662b42992c69a7e79f9431e Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:55:13 +0700 Subject: [PATCH 12/15] Implement close-before-release Mostly to reduce the ResourceWarning's --- aiosmtpd/controller.py | 15 ++++++++++++--- aiosmtpd/smtp.py | 2 ++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/aiosmtpd/controller.py b/aiosmtpd/controller.py index a34e203d4..770125332 100644 --- a/aiosmtpd/controller.py +++ b/aiosmtpd/controller.py @@ -266,8 +266,12 @@ def _cleanup(self): """Reset internal variables to prevent contamination""" self._thread_exception = None self._factory_invoked.clear() + if self.server: + self.server.close() + self.server = None + if self.server_coro: + self.server_coro.close() self.server_coro = None - self.server = None self.smtpd = None def cancel_tasks(self, stop_loop: bool = True): @@ -356,8 +360,11 @@ def _run(self, ready_event: threading.Event) -> None: # We reach this point when loop is ended (by external code) # Perform some stoppages to ensure endpoint no longer bound. self.server.close() - self.loop.run_until_complete(self.server.wait_closed()) - self.loop.close() + self.server_coro.close() + if not self.loop.is_closed(): # pragma: nobranch + self.loop.run_until_complete(self.server.wait_closed()) + time.sleep(0.1) + self.loop.close() self.server = None def start(self, thread_name: Optional[str] = None): @@ -431,6 +438,8 @@ def stop(self, no_assert: bool = False): assert no_assert or self._thread is not None, "SMTP daemon not running" log.info("Stopping") self.loop.call_soon_threadsafe(self.cancel_tasks) + if not self.loop.is_running(): + self.loop.close() if self._thread is not None: log.debug("Waiting to join thread...") self._thread.join() diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index 000a49be7..a6dc9f767 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -564,6 +564,8 @@ def connection_lost(self, error): self._original_transport.close() super().connection_lost(error) self._handler_coroutine.cancel() + if self.transport: # pragma: nobranch + self.transport.close() self.transport = None def eof_received(self): From 04a352dcd50b5e38426a3cc931aa6ea1faa0281b Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:56:31 +0700 Subject: [PATCH 13/15] Include python executable in the env dump --- housekeep.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/housekeep.py b/housekeep.py index 88dddd529..f529f6a18 100644 --- a/housekeep.py +++ b/housekeep.py @@ -92,11 +92,13 @@ def deldir(targ: Path, verbose: bool = True): def dump_env(): + env = dict(os.environ) + env["PYTHON_EXE"] = str(sys.executable) dumpdir = Path(DUMP_DIR) dumpdir.mkdir(exist_ok=True) with (dumpdir / f"ENV.{TOX_ENV_NAME}.py").open("wt") as fout: print("ENV = \\", file=fout) - pprint.pprint(dict(os.environ), stream=fout) + pprint.pprint(env, stream=fout) def move_prof(verbose: bool = False): From db794e08efbdf04c5bc6b4278089772db4051cda Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:56:43 +0700 Subject: [PATCH 14/15] Bump version to 1.5.0a2 --- aiosmtpd/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiosmtpd/__init__.py b/aiosmtpd/__init__.py index 9c7b938f3..e96d0ee81 100644 --- a/aiosmtpd/__init__.py +++ b/aiosmtpd/__init__.py @@ -1,4 +1,4 @@ # Copyright 2014-2021 The aiosmtpd Developers # SPDX-License-Identifier: Apache-2.0 -__version__ = "1.5.0a1" +__version__ = "1.5.0a2" From 29e1b7c299672cc33bd6db5ce2165273d93310b2 Mon Sep 17 00:00:00 2001 From: Pandu POLUAN Date: Thu, 11 Mar 2021 22:58:29 +0700 Subject: [PATCH 15/15] Update NEWS.rst --- aiosmtpd/docs/NEWS.rst | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aiosmtpd/docs/NEWS.rst b/aiosmtpd/docs/NEWS.rst index ce627a7b3..185b03a4f 100644 --- a/aiosmtpd/docs/NEWS.rst +++ b/aiosmtpd/docs/NEWS.rst @@ -3,16 +3,18 @@ ################### -1.5.0 (aiosmtpd-next-next) -========================== +1.5.0 (aiosmtpd-next) +===================== Added ----- * Unthreaded Controllers (Closes #160) +* A whole lotta instrumentation (logging) Fixed/Improved -------------- * All Controllers now have more rationale design, as they are now composited from a Base + a Mixin +* SMTP logging uses LoggingAdapter feature to reduce complexity 1.4.2 (2021-03-08)