Skip to content

Commit

Permalink
close channels!
Browse files Browse the repository at this point in the history
its architecturally a bit ambiguous who should be responsible for this... channels live inside providers inside executors and its arguable that providers shoudl do that, when closed by executors (which they are not).

however, the DFK contains a bunch of channel initialization, and so it is legitimate for it to also contain a bunch of channel cleanup

TESTING: nothing actually tests this - perhaps a mock channel that we check DFK setup and shutdown works on?

I have seent this test in CI on cleanup of:

parsl/tests/test_htex/test_connected_blocks.py

E               AssertionError: The provider model assumes a provider has channel(s)
1986

but that test works for me...
?
  • Loading branch information
benclifford committed Apr 27, 2024
1 parent c3aea48 commit ef0f2bf
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,39 @@ def cleanup(self) -> None:
executor.shutdown()
logger.info(f"Shut down executor {executor.label}")

if hasattr(executor, 'provider'):
logger.info(f"Closing channel(s) for {executor.label}")

# The reasoning about which of .channel or .channels should
# be present is not well described (or describable) in the
# type system, so there are a lot of asserts here that
# attempt to describe the non-checked assumptions.

# This logic is based on the logic in add_executors.

# These two asserts could be an XOR but the 'and' and 'or'
# components of the XOR are separated here to give different
# error text.
assert hasattr(executor.provider, 'channel') or hasattr(executor.provider, 'channels'), \
"The provider model assumes a provider has channel(s)"
assert not (hasattr(executor.provider, 'channel') and hasattr(executor.provider, 'channels')), \
"The provider model assumes a provider does not have .channel and .channels"

if hasattr(executor.provider, 'channels'):
for channel in executor.provider.channels:
channel.close()
else:
assert hasattr(executor.provider, 'channel'), "If provider has no .channels, it must have .channel"
executor.provider.channel.close()

logger.info(f"Closed executor channel(s) for {executor.label}")

logger.info("Terminated executors")

logger.info("Closing channels")

logger.info("Closed channels")

self.time_completed = datetime.datetime.now()

if self.monitoring:
Expand Down

0 comments on commit ef0f2bf

Please sign in to comment.