Skip to content

Commit

Permalink
count fds
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed Jul 1, 2024
1 parent 85dfae8 commit af8da67
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 11 deletions.
10 changes: 8 additions & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def __init__(self, config: Config) -> None:
self.run_dir = make_rundir(config.run_dir)

if config.initialize_logging:
parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG)
_, self.logging_handler = parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG)

logger.info("Starting DataFlowKernel with config\n{}".format(config))

Expand Down Expand Up @@ -1340,7 +1340,13 @@ def cleanup(self) -> None:
else:
logger.debug("Cleaning up non-default DFK - not unregistering")

logger.info("DFK cleanup complete")
# TODO: do this in parsl/logutils.py
logger.info("DFK cleanup complete - removing parsl.log handler")
logger_to_remove = logging.getLogger("parsl")
logger_to_remove.removeHandler(self.logging_handler)
self.logging_handler.close()

logger.info("handler closed - is this going to break things?")

def checkpoint(self, tasks: Optional[Sequence[TaskRecord]] = None) -> str:
"""Checkpoint the dfk incrementally to a checkpoint file.
Expand Down
2 changes: 2 additions & 0 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,11 +607,13 @@ def shutdown(self, *args, **kwargs):
# Join all processes before exiting
logger.debug("Joining on submit process")
self._submit_process.join()
self._submit_process.close()
logger.debug("Joining on collector thread")
self._collector_thread.join()
if self.worker_launch_method == 'factory':
logger.debug("Joining on factory process")
self._factory_process.join()
self._factory_process.close()

# Shutdown multiprocessing queues
self._ready_task_queue.close()
Expand Down
2 changes: 2 additions & 0 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,8 @@ def shutdown(self, *args, **kwargs):

logger.debug("Joining on submit process")
self.submit_process.join()
self.submit_process.close()

logger.debug("Joining on collector thread")
self.collector_thread.join()

Expand Down
6 changes: 3 additions & 3 deletions parsl/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"""
import io
import logging
from typing import Optional
from typing import Optional, Tuple

import typeguard

Expand Down Expand Up @@ -65,7 +65,7 @@ def set_stream_logger(name: str = 'parsl',
def set_file_logger(filename: str,
name: str = 'parsl',
level: int = logging.DEBUG,
format_string: Optional[str] = None) -> logging.Logger:
format_string: Optional[str] = None) -> Tuple[logging.Logger, logging.FileHandler]:
"""Add a file log handler.
Args:
Expand Down Expand Up @@ -93,4 +93,4 @@ def set_file_logger(filename: str,
futures_logger = logging.getLogger("concurrent.futures")
futures_logger.addHandler(handler)

return logger
return (logger, handler)
6 changes: 3 additions & 3 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ def close(self) -> None:

@wrap_with_logs
def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None:
logger = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir),
name="monitoring_filesystem_radio",
level=logging.INFO)
logger, _ = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir),
name="monitoring_filesystem_radio",
level=logging.INFO)

logger.info("Starting filesystem radio receiver")
setproctitle("parsl: monitoring filesystem receiver")
Expand Down
6 changes: 3 additions & 3 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ def __init__(self,
"""
os.makedirs(logdir, exist_ok=True)
self.logger = set_file_logger("{}/monitoring_router.log".format(logdir),
name="monitoring_router",
level=logging_level)
self.logger, _ = set_file_logger("{}/monitoring_router.log".format(logdir),
name="monitoring_router",
level=logging_level)
self.logger.debug("Monitoring router starting")

self.hub_address = hub_address
Expand Down
12 changes: 12 additions & 0 deletions parsl/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
config = pytestconfig.getoption('config')[0]

if config != 'local':
this_process = psutil.Process()
start_fds = this_process.num_fds()
logger.error(f"BENC: open fds: {start_fds}")
assert threading.active_count() == 1, "precondition: only one thread can be running before this test: " + repr(threading.enumerate())

spec = importlib.util.spec_from_file_location('', config)
Expand Down Expand Up @@ -211,6 +214,9 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
assert DataFlowKernelLoader._dfk is None

assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())
end_fds = this_process.num_fds()
logger.error(f"BENC: end open fds: {end_fds} (vs {start_fds} at start)")
assert start_fds == end_fds, "number of open fds changed across test run"

else:
yield
Expand Down Expand Up @@ -273,6 +279,12 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session):
logger.error(f"BENC: end open fds: {end_fds} (vs start {start_fds}")

assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())
end_fds = this_process.num_fds()
logger.error(f"BENC: open fds END: {end_fds}")
if end_fds > start_fds:
logger.error(f"Open files (not all fds, though?): {this_process.open_files()!r}")
os.system(f"ls -l /proc/{os.getpid()}/fd")
pytest.fail("BENC: number of open fds increased across test")

else:
yield
Expand Down
1 change: 1 addition & 0 deletions parsl/tests/test_providers/test_local_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def test_ssh_channel():

def _stop_sshd(sshd_thread):
sshd_thread.stop()
sshd_thread.join()


class SSHDThread(threading.Thread):
Expand Down

0 comments on commit af8da67

Please sign in to comment.