diff --git a/src/dummynet/process_monitor.py b/src/dummynet/process_monitor.py index 5e12a8a..7fb6b71 100644 --- a/src/dummynet/process_monitor.py +++ b/src/dummynet/process_monitor.py @@ -53,7 +53,8 @@ def update_sudo_password(): cached_sudo_password = os.environ.get("DUMMYNET_SUDO_PASSWD", None) if cached_sudo_password: # Environment variable was set, use it instead of asking for a password - cached_sudo_password += "\n" + if not cached_sudo_password.endswith("\n"): + cached_sudo_password += "\n" return prompt = f"\n[sudo] password for {getpass.getuser()}: " @@ -79,53 +80,32 @@ class ProcessMonitor: class Poller: def __init__(self, log): self.poller = select.poll() - self.fds = {} - self.streams = {} + self.callbacks = {} self.log = log - def add_fd(self, stream, callback): + def add_fd(self, fd, callback): # Note that flags POLLHUP and POLLERR can be returned at any time # (even if were not asked for). So we don't need to explicitly # register for them. - self.poller.register(stream.fileno(), select.POLLIN) + self.poller.register(fd, select.POLLIN) - self.fds[stream.fileno()] = callback - self.streams[stream.fileno()] = stream + self.callbacks[fd] = callback - self.log.debug(f"Poller: register process fd {stream.fileno()}") + self.log.debug(f"Poller: register process fd {fd}") def del_fd(self, fd): self.poller.unregister(fd) - del self.fds[fd] - del self.streams[fd] + del self.callbacks[fd] self.log.debug(f"Poller: unregister process fd {fd}") - # def read_fd(self, fd): - - # self.log.error(f"Poller: reading from fd {fd}") - # print(dir(self.streams[fd])) - # data = os.read(fd, 4096) # Read in chunks of 4096 bytes - - # self.log.error(f"Poller: read {len(data)}") - - # if not data: - # return - - # self.log.debug(f"Poller: read {len(data)} bytes from fd {fd}") - # self.log.debug(f"Poller: data: '{data}'") - - # # Call the callback - # self.fds[fd](data.decode(encoding="utf-8", errors="replace")) - def read_fd(self, fd): data = b"" while True: - self.log.error(f"Poller: reading from fd {fd}") chunk = os.read(fd, 4096) # Read in chunks of 4096 bytes - self.log.error(f"Poller: read {len(chunk)}") data += chunk - if not chunk or len(chunk) != 4096: + if len(chunk) < 4096: + # If we read less than 4096 bytes, we are done break if not data: @@ -135,7 +115,7 @@ def read_fd(self, fd): self.log.debug(f"Poller: data: '{data}'") # Call the callback - self.fds[fd](data.decode(encoding="utf-8", errors="replace")) + self.callbacks[fd](data.decode(encoding="utf-8", errors="replace")) def poll(self, timeout): fds = self.poller.poll(timeout) @@ -157,8 +137,7 @@ def poll(self, timeout): self.del_fd(fd=fd) def wait_fd(self, fd): - - while fd in self.fds: + while fd in self.callbacks: self.poll(timeout=0.1) class Process: @@ -190,6 +169,9 @@ def __init__( # Pipe possible sudo password to the process if sudo and (cached_sudo_password is not None): + assert cached_sudo_password.endswith( + "\n" + ) # Ensure the password ends with a newline as otherwise sudo will hang self.popen.stdin.write(cached_sudo_password) self.popen.stdin.flush() @@ -227,12 +209,12 @@ def stderr_callback(data): # Get the file descriptor poller.add_fd( - self.popen.stdout, + self.popen.stdout.fileno(), stdout_callback, ) poller.add_fd( - self.popen.stderr, + self.popen.stderr.fileno(), stderr_callback, )