Skip to content

Commit

Permalink
count fds
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed Aug 16, 2024
1 parent 23d9c4f commit e2b0448
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 11 deletions.
10 changes: 8 additions & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def __init__(self, config: Config) -> None:
self.run_dir = make_rundir(config.run_dir)

if config.initialize_logging:
parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG)
_, self.logging_handler = parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG)

logger.info("Starting DataFlowKernel with config\n{}".format(config))

Expand Down Expand Up @@ -1321,7 +1321,13 @@ def cleanup(self) -> None:
else:
logger.debug("Cleaning up non-default DFK - not unregistering")

logger.info("DFK cleanup complete")
# TODO: do this in parsl/logutils.py
logger.info("DFK cleanup complete - removing parsl.log handler")
logger_to_remove = logging.getLogger("parsl")
logger_to_remove.removeHandler(self.logging_handler)
self.logging_handler.close()

logger.info("handler closed - is this going to break things?")

def checkpoint(self, tasks: Optional[Sequence[TaskRecord]] = None) -> str:
"""Checkpoint the dfk incrementally to a checkpoint file.
Expand Down
6 changes: 3 additions & 3 deletions parsl/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"""
import io
import logging
from typing import Optional
from typing import Optional, Tuple

import typeguard

Expand Down Expand Up @@ -65,7 +65,7 @@ def set_stream_logger(name: str = 'parsl',
def set_file_logger(filename: str,
name: str = 'parsl',
level: int = logging.DEBUG,
format_string: Optional[str] = None) -> logging.Logger:
format_string: Optional[str] = None) -> Tuple[logging.Logger, logging.FileHandler]:
"""Add a file log handler.
Args:
Expand Down Expand Up @@ -93,4 +93,4 @@ def set_file_logger(filename: str,
futures_logger = logging.getLogger("concurrent.futures")
futures_logger.addHandler(handler)

return logger
return (logger, handler)
6 changes: 3 additions & 3 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,9 @@ def close(self) -> None:

@wrap_with_logs
def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None:
logger = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir),
name="monitoring_filesystem_radio",
level=logging.INFO)
logger, _ = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir),
name="monitoring_filesystem_radio",
level=logging.INFO)

logger.info("Starting filesystem radio receiver")
setproctitle("parsl: monitoring filesystem receiver")
Expand Down
6 changes: 3 additions & 3 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ def __init__(self,
An event that the main Parsl process will set to signal that the monitoring router should shut down.
"""
os.makedirs(logdir, exist_ok=True)
self.logger = set_file_logger("{}/monitoring_router.log".format(logdir),
name="monitoring_router",
level=logging_level)
self.logger, _ = set_file_logger("{}/monitoring_router.log".format(logdir),
name="monitoring_router",
level=logging_level)
self.logger.debug("Monitoring router starting")

self.hub_address = hub_address
Expand Down
9 changes: 9 additions & 0 deletions parsl/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
logger.error(f"BENC: end open fds: {end_fds}")

assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())
end_fds = this_process.num_fds()
logger.error(f"BENC: end open fds: {end_fds} (vs {start_fds} at start)")
assert start_fds == end_fds, "number of open fds changed across test run"

else:
yield
Expand Down Expand Up @@ -281,6 +284,12 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session):
logger.error(f"BENC: end threads: {threading.active_count()}")

assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())
end_fds = this_process.num_fds()
logger.error(f"BENC: open fds END: {end_fds}")
if end_fds > start_fds:
logger.error(f"Open files (not all fds, though?): {this_process.open_files()!r}")
os.system(f"ls -l /proc/{os.getpid()}/fd")
pytest.fail("BENC: number of open fds increased across test")

else:
yield
Expand Down

0 comments on commit e2b0448

Please sign in to comment.