Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix truncated output #14

Merged
merged 7 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion NEWS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ every change, see the Git log.

Latest
------
* tbd
* Patch: Fix issue where the output of a process was being truncated at 4096
bytes.

4.0.0
-----
Expand Down
4 changes: 2 additions & 2 deletions examples/quick_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ def _proc1_stdout(data):
proc1.match(stdout="10 packets transmitted*", stderr=None)

# Since proc0 is a daemon we automatically kill it when the last
# non-daemon process is done. However we can still see the output it
# generated.
# non-daemon process is done. However we can still see the output
# it generated.
print(f"proc0: {proc0.stdout}")

finally:
Expand Down
26 changes: 18 additions & 8 deletions src/dummynet/process_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ def update_sudo_password():
cached_sudo_password = os.environ.get("DUMMYNET_SUDO_PASSWD", None)
if cached_sudo_password:
# Environment variable was set, use it instead of asking for a password
if not cached_sudo_password.endswith("\n"):
cached_sudo_password += "\n"
return

prompt = f"\n[sudo] password for {getpass.getuser()}: "
Expand All @@ -78,7 +80,7 @@ class ProcessMonitor:
class Poller:
def __init__(self, log):
self.poller = select.poll()
self.fds = {}
self.callbacks = {}
self.log = log

def add_fd(self, fd, callback):
Expand All @@ -87,18 +89,24 @@ def add_fd(self, fd, callback):
# register for them.
self.poller.register(fd, select.POLLIN)

self.fds[fd] = callback
self.callbacks[fd] = callback

self.log.debug(f"Poller: register process fd {fd}")

def del_fd(self, fd):
self.poller.unregister(fd)
del self.fds[fd]
del self.callbacks[fd]

self.log.debug(f"Poller: unregister process fd {fd}")

def read_fd(self, fd):
data = os.read(fd, 4096)
data = b""
while True:
chunk = os.read(fd, 4096) # Read in chunks of 4096 bytes
data += chunk
if len(chunk) < 4096:
# If we read less than 4096 bytes, we are done
break

if not data:
return
Expand All @@ -107,7 +115,7 @@ def read_fd(self, fd):
self.log.debug(f"Poller: data: '{data}'")

# Call the callback
self.fds[fd](data.decode(encoding="utf-8", errors="replace"))
self.callbacks[fd](data.decode(encoding="utf-8", errors="replace"))

def poll(self, timeout):
fds = self.poller.poll(timeout)
Expand All @@ -129,8 +137,7 @@ def poll(self, timeout):
self.del_fd(fd=fd)

def wait_fd(self, fd):

while fd in self.fds:
while fd in self.callbacks:
self.poll(timeout=0.1)

class Process:
Expand Down Expand Up @@ -161,7 +168,10 @@ def __init__(
)

# Pipe possible sudo password to the process
if sudo and (cached_sudo_password != None):
if sudo and (cached_sudo_password is not None):
assert cached_sudo_password.endswith(
"\n"
) # Ensure the password ends with a newline as otherwise sudo will hang
self.popen.stdin.write(cached_sudo_password)
self.popen.stdin.flush()

Expand Down
181 changes: 116 additions & 65 deletions test/test_dummynet.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pytest
import os


log = logging.getLogger("dummynet")
log.setLevel(logging.DEBUG)

Expand Down Expand Up @@ -263,49 +264,137 @@ def test_hostshell_timeout():
pass


@pytest.fixture
def sad_path():
def _hostshell_timeout_daemon():
# Seperated this in to a function to look like a typical integration
# test

# Check if we need to run as sudo
sudo = os.getuid() != 0

# Create a process monitor
process_monitor = ProcessMonitor(log=log)

# The host shell
shell = HostShell(log=log, sudo=sudo, process_monitor=process_monitor)
net = DummyNet(shell=shell)

sad_cgroup = net.add_cgroup(
name="test_cgroup_sad",
shell=shell,
log=log,
default_path="/sys/fs/cgroup",
controllers={"cpu.wrongname": 0, "memory.high": -200000000},
pid=12345,
)
return sad_cgroup
# Start a deamon process (those should not exit before the test is over)
shell.run_async(cmd="sleep 2", daemon=True)

# Next we run a blocking command that will timeout
# we expect to also be notified that the daemon process exited
# prematurely

@pytest.fixture
def happy_path():
shell.run(cmd="sleep 10", timeout=5)

# Check if we need to run as sudo
sudo = os.getuid() != 0
# Nothing to do
while process_monitor.keep_running():
pass


def test_hostshell_timeout_daemon():

# Check that we get a timeout if we run a command that takes too long

with pytest.raises(ExceptionGroup) as e:
_hostshell_timeout_daemon()

assert e.group_contains(dummynet.TimeoutError)
assert e.group_contains(dummynet.DaemonExitError)


def test_run_stdout():

# Create a process monitor
process_monitor = ProcessMonitor(log=log)
shell = HostShell(log=log, sudo=sudo, process_monitor=process_monitor)
net = DummyNet(shell=shell)

happy_cgroup = net.add_cgroup(
name="test_cgroup_happy",
shell=shell,
log=log,
default_path="/sys/fs/cgroup",
controllers={"cpu.max": 0.5, "memory.high": 200000000},
pid=os.getpid(),
)
return happy_cgroup
# The host shell used if we don't have a recording
shell = HostShell(log=log, sudo=False, process_monitor=process_monitor)
message = "Hello World"
info = shell.run(cmd=f"echo '{message}'")

assert len(info.stdout) == len(message) + 1
assert info.stdout == f"{message}\n"

long_message = "A" * 4096

info = shell.run(cmd=f"echo '{long_message}'")

assert len(info.stdout) == 4096 + 1
assert info.stdout == f"{long_message}\n"

very_long_message = "A" * 4096 * 10

info = shell.run(cmd=f"echo '{very_long_message}'")

assert len(info.stdout) == 4096 * 10 + 1
assert info.stdout == f"{very_long_message}\n"

# check timeout of function with a long message
with pytest.raises(dummynet.TimeoutError):
shell.run(cmd=f"sleep 10; echo '{very_long_message}'", timeout=1)


def test_run_async_output():

process_monitor = ProcessMonitor(log=log)

shell = HostShell(log=log, sudo=False, process_monitor=process_monitor)

out1 = shell.run_async(cmd="ping -i 0.1 -c 5 127.0.0.1")
out2 = shell.run_async(cmd="ping -i 0.1 -c 3 127.0.0.1")

out1.stdout_callback = lambda data: print("stdout1: {}".format(data))
out2.stdout_callback = lambda data: print("stdout2: {}".format(data))

while process_monitor.keep_running():
pass

out1.match(stdout="5 packets transmitted*", stderr=None)
out2.match(stdout="3 packets transmitted*", stderr=None)


# @todo re-enable this test
# @pytest.fixture
# def sad_path():

# # Check if we need to run as sudo
# sudo = os.getuid() != 0

# process_monitor = ProcessMonitor(log=log)
# shell = HostShell(log=log, sudo=sudo, process_monitor=process_monitor)
# net = DummyNet(shell=shell)

# sad_cgroup = net.add_cgroup(
# name="test_cgroup_sad",
# shell=shell,
# log=log,
# default_path="/sys/fs/cgroup",
# controllers={"cpu.wrongname": 0, "memory.high": -200000000},
# pid=12345,
# )
# return sad_cgroup


# @pytest.fixture
# def happy_path():

# # Check if we need to run as sudo
# sudo = os.getuid() != 0

# process_monitor = ProcessMonitor(log=log)
# shell = HostShell(log=log, sudo=sudo, process_monitor=process_monitor)
# net = DummyNet(shell=shell)

# happy_cgroup = net.add_cgroup(
# name="test_cgroup_happy",
# shell=shell,
# log=log,
# default_path="/sys/fs/cgroup",
# controllers={"cpu.max": 0.5, "memory.high": 200000000},
# pid=os.getpid(),
# )
# return happy_cgroup

# def test_cgroup_build(happy_path):
# cgroup_build = happy_path

Expand Down Expand Up @@ -381,41 +470,3 @@ def happy_path():
# cgroup_cleanup.hard_clean()
# except dummynet.errors.RunInfoError as e:
# assert "No such file or directory" in str(e)


def _hostshell_timeout_daemon():
# Seperated this in to a function to look like a typical integration
# test

# Check if we need to run as sudo
sudo = os.getuid() != 0

# Create a process monitor
process_monitor = ProcessMonitor(log=log)

# The host shell
shell = HostShell(log=log, sudo=sudo, process_monitor=process_monitor)

# Start a deamon process (those should not exit before the test is over)
shell.run_async(cmd="sleep 2", daemon=True)

# Next we run a blocking command that will timeout
# we expect to also be notified that the daemon process exited
# prematurely

shell.run(cmd="sleep 10", timeout=5)

# Nothing to do
while process_monitor.keep_running():
pass


def test_hostshell_timeout_daemon():

# Check that we get a timeout if we run a command that takes too long

with pytest.raises(ExceptionGroup) as e:
_hostshell_timeout_daemon()

assert e.group_contains(dummynet.TimeoutError)
assert e.group_contains(dummynet.DaemonExitError)
28 changes: 16 additions & 12 deletions wscript
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ def options(opt):
"--run_tests", default=False, action="store_true", help="Run all unit tests"
)

gr.add_option(
"--filter",
default=None,
help='Select tests based on their name. E.g. "test_run_stdout"',
)

gr.add_option(
"--pytest_basetemp",
default="pytest_temp",
Expand All @@ -44,6 +50,7 @@ def build(bld):
# Create a virtualenv in the source folder and build universal wheel
with bld.create_virtualenv() as venv:

venv.run(cmd="python -m pip install setuptools")
venv.run(cmd="python -m pip install wheel")
venv.run(cmd="python setup.py bdist_wheel --universal", cwd=bld.path)

Expand Down Expand Up @@ -122,7 +129,7 @@ def _pytest(bld):
_pytest_dev(bld=bld)

else:
_pytest_run(bld=bld)
_pytest_run(ctx=bld)


def _pytest_dev(bld):
Expand All @@ -131,13 +138,13 @@ def _pytest_dev(bld):
venv.run("python -m pip install -e .")


def _pytest_run(bld):
def _pytest_run(ctx):

venv = bld.create_virtualenv(overwrite=True)
venv = ctx.create_virtualenv(overwrite=True)
venv.run("python -m pip install -r test/requirements.txt")

# Install the dummynet plugin in the virtualenv
wheel = _find_wheel(ctx=bld)
wheel = _find_wheel(ctx=ctx)

venv.run(f"python -m pip install {wheel}")

Expand All @@ -147,23 +154,20 @@ def _pytest_run(bld):
# We override the pytest temp folder with the basetemp option,
# so the test folders will be available at the specified location
# on all platforms. The default location is the "pytest" local folder.
basetemp = os.path.abspath(os.path.expanduser(bld.options.pytest_basetemp))
basetemp = os.path.abspath(os.path.expanduser(ctx.options.pytest_basetemp))

# We need to manually remove the previously created basetemp folder,
# because pytest uses os.listdir in the removal process, and that fails
# if there are any broken symlinks in that folder.
if os.path.exists(basetemp):
waflib.extras.wurf.directory.remove_directory(path=basetemp)

# Run all tests by just passing the test directory. Specific tests can
# be enabled by specifying the full path e.g.:
#
# 'test/test_run.py::test_create_context'
#
test_filter = "test"
test_filter = ""
if ctx.options.filter:
test_filter = f"-k '{ctx.options.filter}'"

# Main test command
venv.run(f"python -B -m pytest {test_filter} --basetemp {basetemp}")
venv.run(f"python -B -m pytest -xrA {test_filter} --basetemp {basetemp}")

# Check the package
venv.run(f"twine check {wheel}")
Loading