From 9714e98b46960b9d57df8559bde06775dc24aeba Mon Sep 17 00:00:00 2001 From: ShreyaLnuHpe Date: Thu, 22 Aug 2024 09:29:16 -0700 Subject: [PATCH 1/8] testing priority scheduler --- .../tests/cluster/test_master_restart.py | 53 ++++++++ .../tests/cluster/test_priority_scheduler.py | 122 +++++++++--------- 2 files changed, 114 insertions(+), 61 deletions(-) diff --git a/e2e_tests/tests/cluster/test_master_restart.py b/e2e_tests/tests/cluster/test_master_restart.py index 1d5b1c1fd10..0e8384084be 100644 --- a/e2e_tests/tests/cluster/test_master_restart.py +++ b/e2e_tests/tests/cluster/test_master_restart.py @@ -5,6 +5,7 @@ import pytest import requests import urllib3 +import subprocess from determined.common import api from determined.common.api import bindings @@ -756,3 +757,55 @@ def test_master_restart_with_queued( for cmd_id in [running_command_id, queued_command_id]: utils.wait_for_command_state(sess, cmd_id, "TERMINATED", 90) utils.assert_command_succeeded(sess, cmd_id) + + +@pytest.mark.managed_devcluster +def test_priortity_scheduler_noop_experiment( + managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, +) -> None: + sess = api_utils.user_session() + managed_cluster_priority_scheduler.ensure_agent_ok() + assert str(conf.MASTER_PORT) == str(8082) + # uses the default priority set in cluster config + exp.run_basic_test( + sess, conf.fixtures_path("no_op/single.yaml"), conf.fixtures_path("no_op"), 1 + ) + # uses explicit priority + exp.run_basic_test( + sess, conf.fixtures_path("no_op/single.yaml"), conf.fixtures_path("no_op"), 1, priority=50 + ) + + +@pytest.mark.managed_devcluster +def test_priortity_scheduler_noop_command( + managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, +) -> None: + sess = api_utils.user_session() + managed_cluster_priority_scheduler.ensure_agent_ok() + assert str(conf.MASTER_PORT) == "8082" + # without slots (and default priority) + command_id = utils.run_command(sess, slots=0) + utils.wait_for_command_state(sess, command_id, "TERMINATED", 40) + utils.assert_command_succeeded(sess, command_id) + # with slots (and default priority) + command_id = utils.run_command(sess, slots=1) + utils.wait_for_command_state(sess, command_id, "TERMINATED", 60) + utils.assert_command_succeeded(sess, command_id) + # explicity priority + command_id = utils.run_command_set_priority(sess, slots=0, priority=60) + utils.wait_for_command_state(sess, command_id, "TERMINATED", 60) + utils.assert_command_succeeded(sess, command_id) + + +@pytest.mark.managed_devcluster +def test_slots_list_command( + managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, +) -> None: + sess = api_utils.user_session() + managed_cluster_priority_scheduler.ensure_agent_ok() + assert str(conf.MASTER_PORT) == "8082" + command = ["det", "slot", "list"] + p = detproc.run( + sess, command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + assert p.returncode == 0, f"\nstdout:\n{p.stdout} \nstderr:\n{p.stderr}" diff --git a/e2e_tests/tests/cluster/test_priority_scheduler.py b/e2e_tests/tests/cluster/test_priority_scheduler.py index cbbd8da1d82..2ee45d3f0f7 100644 --- a/e2e_tests/tests/cluster/test_priority_scheduler.py +++ b/e2e_tests/tests/cluster/test_priority_scheduler.py @@ -1,61 +1,61 @@ -import subprocess - -import pytest - -from tests import api_utils -from tests import config as conf -from tests import detproc -from tests import experiment as exp -from tests.cluster import managed_cluster, utils - - -@pytest.mark.managed_devcluster -def test_priortity_scheduler_noop_experiment( - managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, -) -> None: - sess = api_utils.user_session() - managed_cluster_priority_scheduler.ensure_agent_ok() - assert str(conf.MASTER_PORT) == str(8082) - # uses the default priority set in cluster config - exp.run_basic_test( - sess, conf.fixtures_path("no_op/single.yaml"), conf.fixtures_path("no_op"), 1 - ) - # uses explicit priority - exp.run_basic_test( - sess, conf.fixtures_path("no_op/single.yaml"), conf.fixtures_path("no_op"), 1, priority=50 - ) - - -@pytest.mark.managed_devcluster -def test_priortity_scheduler_noop_command( - managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, -) -> None: - sess = api_utils.user_session() - managed_cluster_priority_scheduler.ensure_agent_ok() - assert str(conf.MASTER_PORT) == "8082" - # without slots (and default priority) - command_id = utils.run_command(sess, slots=0) - utils.wait_for_command_state(sess, command_id, "TERMINATED", 40) - utils.assert_command_succeeded(sess, command_id) - # with slots (and default priority) - command_id = utils.run_command(sess, slots=1) - utils.wait_for_command_state(sess, command_id, "TERMINATED", 60) - utils.assert_command_succeeded(sess, command_id) - # explicity priority - command_id = utils.run_command_set_priority(sess, slots=0, priority=60) - utils.wait_for_command_state(sess, command_id, "TERMINATED", 60) - utils.assert_command_succeeded(sess, command_id) - - -@pytest.mark.managed_devcluster -def test_slots_list_command( - managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, -) -> None: - sess = api_utils.user_session() - managed_cluster_priority_scheduler.ensure_agent_ok() - assert str(conf.MASTER_PORT) == "8082" - command = ["det", "slot", "list"] - p = detproc.run( - sess, command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - assert p.returncode == 0, f"\nstdout:\n{p.stdout} \nstderr:\n{p.stderr}" +# import subprocess + +# import pytest + +# from tests import api_utils +# from tests import config as conf +# from tests import detproc +# from tests import experiment as exp +# from tests.cluster import managed_cluster, utils + + +# @pytest.mark.managed_devcluster +# def test_priortity_scheduler_noop_experiment( +# managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, +# ) -> None: +# sess = api_utils.user_session() +# managed_cluster_priority_scheduler.ensure_agent_ok() +# assert str(conf.MASTER_PORT) == str(8082) +# # uses the default priority set in cluster config +# exp.run_basic_test( +# sess, conf.fixtures_path("no_op/single.yaml"), conf.fixtures_path("no_op"), 1 +# ) +# # uses explicit priority +# exp.run_basic_test( +# sess, conf.fixtures_path("no_op/single.yaml"), conf.fixtures_path("no_op"), 1, priority=50 +# ) + + +# @pytest.mark.managed_devcluster +# def test_priortity_scheduler_noop_command( +# managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, +# ) -> None: +# sess = api_utils.user_session() +# managed_cluster_priority_scheduler.ensure_agent_ok() +# assert str(conf.MASTER_PORT) == "8082" +# # without slots (and default priority) +# command_id = utils.run_command(sess, slots=0) +# utils.wait_for_command_state(sess, command_id, "TERMINATED", 40) +# utils.assert_command_succeeded(sess, command_id) +# # with slots (and default priority) +# command_id = utils.run_command(sess, slots=1) +# utils.wait_for_command_state(sess, command_id, "TERMINATED", 60) +# utils.assert_command_succeeded(sess, command_id) +# # explicity priority +# command_id = utils.run_command_set_priority(sess, slots=0, priority=60) +# utils.wait_for_command_state(sess, command_id, "TERMINATED", 60) +# utils.assert_command_succeeded(sess, command_id) + + +# @pytest.mark.managed_devcluster +# def test_slots_list_command( +# managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, +# ) -> None: +# sess = api_utils.user_session() +# managed_cluster_priority_scheduler.ensure_agent_ok() +# assert str(conf.MASTER_PORT) == "8082" +# command = ["det", "slot", "list"] +# p = detproc.run( +# sess, command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE +# ) +# assert p.returncode == 0, f"\nstdout:\n{p.stdout} \nstderr:\n{p.stderr}" From 057c7911225435e799aac8343950264aad33e4d0 Mon Sep 17 00:00:00 2001 From: ShreyaLnuHpe Date: Thu, 22 Aug 2024 13:31:54 -0700 Subject: [PATCH 2/8] make parallel run for multi resource pools --- .../multi-resource-pools.devcluster.yaml | 111 ++++++++++++++++ agent/cmd/determined-agent/run_test.go | 4 + agent/internal/agent.go | 2 + agent/internal/options/options.go | 1 + agent/internal/options/options_test.go | 1 + e2e_tests/tests/cluster/conftest.py | 3 + e2e_tests/tests/cluster/managed_cluster.py | 41 ++++++ .../tests/cluster/test_master_restart.py | 53 -------- .../cluster/test_multi_resource_pools.py | 53 ++++++++ .../tests/cluster/test_priority_scheduler.py | 122 +++++++++--------- master/internal/rm/agentrm/agent.go | 13 ++ master/internal/rm/agentrm/agent_state.go | 16 +++ .../internal/rm/agentrm/agent_state_test.go | 58 +++++++++ .../rm/agentrm/resource_managers_test.go | 2 +- master/pkg/aproto/master_message.go | 1 + 15 files changed, 366 insertions(+), 115 deletions(-) create mode 100644 .circleci/devcluster/multi-resource-pools.devcluster.yaml create mode 100644 e2e_tests/tests/cluster/test_multi_resource_pools.py diff --git a/.circleci/devcluster/multi-resource-pools.devcluster.yaml b/.circleci/devcluster/multi-resource-pools.devcluster.yaml new file mode 100644 index 00000000000..acb71d73e36 --- /dev/null +++ b/.circleci/devcluster/multi-resource-pools.devcluster.yaml @@ -0,0 +1,111 @@ +stages: + - db: + name: db + port: 5434 + + - master: + pre: + - sh: make -C tools prep-root + config_file: + security: + initial_user_password: $INITIAL_USER_PASSWORD + db: + host: localhost + port: 5434 + password: postgres + user: postgres + name: determined + __internal: + preemption_timeout: 60s + checkpoint_storage: + type: shared_fs + host_path: /tmp + storage_path: determined-cp + log: + level: debug + root: tools/build + cache: + cache_dir: /tmp/determined-cache + launch_error: false + telemetry: + enabled: false + resource_manager: + default_aux_resource_pool: default + default_compute_resource_pool: default + scheduler: + fitting_policy: best + type: fair_share + type: agent + resource_pools: + - agent_reattach_enabled: true + agent_reconnect_wait: 25s + description: '' + max_aux_containers_per_agent: 100 + pool_name: default + provider: null + task_container_defaults: null + - pool_name: pool1 + max_slots: 8 + scheduler: + type: priority + scim: + enabled: true + auth: + type: basic + username: determined + password: password + + - custom: + name: proxy + cmd: ["socat", "-d", "-d", "TCP-LISTEN:8081,reuseaddr,fork", "TCP:localhost:8080"] + post: + - conncheck: + port: 8081 + + - agent: + name: agent1 + config_file: + master_host: 127.0.0.1 + master_port: 8081 + agent_id: agent1 + container_master_host: $DOCKER_LOCALHOST + agent_reconnect_attempts: 24 + agent_reconnect_backoff: 5 + container_auto_remove_disabled: true + hooks: + on_connection_lost: ["touch", "/tmp/agent1-connection-lost"] + + + - agent: + name: agent2 + config_file: + master_host: 127.0.0.1 + master_port: 8081 + agent_id: agent2 + container_master_host: $DOCKER_LOCALHOST + agent_reconnect_attempts: 24 + agent_reconnect_backoff: 5 + container_auto_remove_disabled: true + + - agent: + name: agent10 # Copy of agent1, but with different resource pool. + config_file: + master_host: 127.0.0.1 + master_port: 8081 + agent_id: agent1 + container_master_host: $DOCKER_LOCALHOST + agent_reconnect_attempts: 24 + agent_reconnect_backoff: 5 + container_auto_remove_disabled: true + resource_pool: pool1 + + - agent: + name: agent20 # Copy of agent1, but with empty(default) resource pool. + config_file: + master_host: 127.0.0.1 + master_port: 8081 + agent_id: agent1 + container_master_host: $DOCKER_LOCALHOST + agent_reconnect_attempts: 24 + agent_reconnect_backoff: 5 + container_auto_remove_disabled: true \ No newline at end of file diff --git a/agent/cmd/determined-agent/run_test.go b/agent/cmd/determined-agent/run_test.go index 593d749a4c5..d9ebda1ad4b 100644 --- a/agent/cmd/determined-agent/run_test.go +++ b/agent/cmd/determined-agent/run_test.go @@ -20,6 +20,7 @@ const DefaultRawConfig = ` log: level: trace color: true +resource_pool: default slot_type: auto security: tls: @@ -100,6 +101,7 @@ func TestMergeAgentConfigViaNewViper(t *testing.T) { log: level: trace color: true +resource_pool: default slot_type: auto security: tls: @@ -157,6 +159,7 @@ func TestMergeAgentConfigViaViperWithDefaults(t *testing.T) { log: level: trace color: true +resource_pool: default slot_type: auto security: tls: @@ -213,6 +216,7 @@ func TestMergeAgentConfigViaViperWithDefaultsEnvAndFlags(t *testing.T) { log: level: trace color: true +resource_pool: default slot_type: auto security: tls: diff --git a/agent/internal/agent.go b/agent/internal/agent.go index 80693407d32..e30003f92bf 100644 --- a/agent/internal/agent.go +++ b/agent/internal/agent.go @@ -160,6 +160,7 @@ func (a *Agent) run(ctx context.Context) error { Version: a.version, Devices: devices, ContainersReattached: reattached, + ResourcePoolName: a.opts.ResourcePool, }}: case <-ctx.Done(): return ctx.Err() @@ -351,6 +352,7 @@ func (a *Agent) reconnectFlow( Version: a.version, Devices: devices, ContainersReattached: reattached, + ResourcePoolName: a.opts.ResourcePool, }}: case <-ctx.Done(): return nil, nil, ctx.Err() diff --git a/agent/internal/options/options.go b/agent/internal/options/options.go index 74d72b9e426..02120f639ae 100644 --- a/agent/internal/options/options.go +++ b/agent/internal/options/options.go @@ -28,6 +28,7 @@ func DefaultOptions() *Options { Level: "trace", Color: true, }, + ResourcePool: "default", SlotType: "auto", VisibleGPUs: VisibleGPUsFromEnvironment(), BindIP: "0.0.0.0", diff --git a/agent/internal/options/options_test.go b/agent/internal/options/options_test.go index 3c114a2f513..8bc6f70813d 100644 --- a/agent/internal/options/options_test.go +++ b/agent/internal/options/options_test.go @@ -58,6 +58,7 @@ log: log: level: trace color: true +resource_pool: default slot_type: auto security: tls: diff --git a/e2e_tests/tests/cluster/conftest.py b/e2e_tests/tests/cluster/conftest.py index 9b61e3a30d1..ba545dd0546 100644 --- a/e2e_tests/tests/cluster/conftest.py +++ b/e2e_tests/tests/cluster/conftest.py @@ -1,11 +1,14 @@ """Makes managed_cluster fixtures available to all files in the directory""" from .managed_cluster import ( # noqa + managed_cluster_multi_resource_pools, managed_cluster_priority_scheduler, managed_cluster_restarts, managed_cluster_session, + managed_cluster_session_multi_resource_pools, managed_cluster_session_priority_scheduler, restartable_managed_cluster, + restartable_managed_cluster_multi_resource_pools, ) from .managed_cluster_k8s import k8s_managed_cluster # noqa from .managed_slurm_cluster import ( # noqa diff --git a/e2e_tests/tests/cluster/managed_cluster.py b/e2e_tests/tests/cluster/managed_cluster.py index ea7a99fa592..c31484b6ef9 100644 --- a/e2e_tests/tests/cluster/managed_cluster.py +++ b/e2e_tests/tests/cluster/managed_cluster.py @@ -15,6 +15,9 @@ DEVCLUSTER_REATTACH_OFF_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double.devcluster.yaml" DEVCLUSTER_REATTACH_ON_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double-reattach.devcluster.yaml" DEVCLUSTER_PRIORITY_SCHEDULER_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "priority.devcluster.yaml" +DEVCLUSTER_MULTI_RP_CONFIG_PATH = ( + DEVCLUSTER_CONFIG_ROOT_PATH / "multi-resource-pools.devcluster.yaml" +) def get_agent_data(sess: api.Session) -> List[Dict[str, Any]]: @@ -204,3 +207,41 @@ def restartable_managed_cluster( managed_cluster_restarts.restart_master() managed_cluster_restarts.restart_agent() raise + + +@pytest.fixture(scope="session") +def managed_cluster_session_multi_resource_pools(request: Any) -> Iterator[ManagedCluster]: + config = str(DEVCLUSTER_MULTI_RP_CONFIG_PATH) + with ManagedCluster(config) as mc: + mc.initial_startup() + yield mc + + +@pytest.fixture +def managed_cluster_multi_resource_pools( + managed_cluster_session_multi_resource_pools: ManagedCluster, request: Any +) -> Iterator[ManagedCluster]: + config = str(DEVCLUSTER_MULTI_RP_CONFIG_PATH) + utils.set_master_port(config) + nodeid = request.node.nodeid + managed_cluster_session_multi_resource_pools.log_marker( + f"pytest [{utils.now_ts()}] {nodeid} setup\n" + ) + yield managed_cluster_session_multi_resource_pools + managed_cluster_session_multi_resource_pools.log_marker( + f"pytest [{utils.now_ts()}] {nodeid} teardown\n" + ) + + +@pytest.fixture +def restartable_managed_cluster_multi_resource_pools( + managed_cluster_multi_resource_pools: ManagedCluster, +) -> Iterator[ManagedCluster]: + managed_cluster_multi_resource_pools.wait_for_agent_ok(20) + try: + yield managed_cluster_multi_resource_pools + managed_cluster_multi_resource_pools.wait_for_agent_ok(20) + except Exception: + managed_cluster_multi_resource_pools.restart_master() + managed_cluster_multi_resource_pools.restart_agent() + raise diff --git a/e2e_tests/tests/cluster/test_master_restart.py b/e2e_tests/tests/cluster/test_master_restart.py index 0e8384084be..1d5b1c1fd10 100644 --- a/e2e_tests/tests/cluster/test_master_restart.py +++ b/e2e_tests/tests/cluster/test_master_restart.py @@ -5,7 +5,6 @@ import pytest import requests import urllib3 -import subprocess from determined.common import api from determined.common.api import bindings @@ -757,55 +756,3 @@ def test_master_restart_with_queued( for cmd_id in [running_command_id, queued_command_id]: utils.wait_for_command_state(sess, cmd_id, "TERMINATED", 90) utils.assert_command_succeeded(sess, cmd_id) - - -@pytest.mark.managed_devcluster -def test_priortity_scheduler_noop_experiment( - managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, -) -> None: - sess = api_utils.user_session() - managed_cluster_priority_scheduler.ensure_agent_ok() - assert str(conf.MASTER_PORT) == str(8082) - # uses the default priority set in cluster config - exp.run_basic_test( - sess, conf.fixtures_path("no_op/single.yaml"), conf.fixtures_path("no_op"), 1 - ) - # uses explicit priority - exp.run_basic_test( - sess, conf.fixtures_path("no_op/single.yaml"), conf.fixtures_path("no_op"), 1, priority=50 - ) - - -@pytest.mark.managed_devcluster -def test_priortity_scheduler_noop_command( - managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, -) -> None: - sess = api_utils.user_session() - managed_cluster_priority_scheduler.ensure_agent_ok() - assert str(conf.MASTER_PORT) == "8082" - # without slots (and default priority) - command_id = utils.run_command(sess, slots=0) - utils.wait_for_command_state(sess, command_id, "TERMINATED", 40) - utils.assert_command_succeeded(sess, command_id) - # with slots (and default priority) - command_id = utils.run_command(sess, slots=1) - utils.wait_for_command_state(sess, command_id, "TERMINATED", 60) - utils.assert_command_succeeded(sess, command_id) - # explicity priority - command_id = utils.run_command_set_priority(sess, slots=0, priority=60) - utils.wait_for_command_state(sess, command_id, "TERMINATED", 60) - utils.assert_command_succeeded(sess, command_id) - - -@pytest.mark.managed_devcluster -def test_slots_list_command( - managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, -) -> None: - sess = api_utils.user_session() - managed_cluster_priority_scheduler.ensure_agent_ok() - assert str(conf.MASTER_PORT) == "8082" - command = ["det", "slot", "list"] - p = detproc.run( - sess, command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - assert p.returncode == 0, f"\nstdout:\n{p.stdout} \nstderr:\n{p.stderr}" diff --git a/e2e_tests/tests/cluster/test_multi_resource_pools.py b/e2e_tests/tests/cluster/test_multi_resource_pools.py new file mode 100644 index 00000000000..9897a54ab4a --- /dev/null +++ b/e2e_tests/tests/cluster/test_multi_resource_pools.py @@ -0,0 +1,53 @@ +# import subprocess + +import pytest + +from tests import api_utils + +# from tests import config as conf +# from tests import detproc +# from tests import experiment as exp +from tests.cluster import managed_cluster + + +@pytest.mark.managed_devcluster +def test_agent_resource_pool_change( + restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster, +) -> None: + admin = api_utils.admin_session() + try: + restartable_managed_cluster_multi_resource_pools.kill_agent() + restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent10") + + for _i in range(5): + agent_data = managed_cluster.get_agent_data(admin) + if len(agent_data) == 0: + # Agent has exploded and been wiped due to resource pool mismatch, as expected. + break + else: + pytest.fail( + f"agent with different resource pool is still present after {_i} ticks:{agent_data}" + ) + finally: + restartable_managed_cluster_multi_resource_pools.dc.kill_stage("agent10") + restartable_managed_cluster_multi_resource_pools.restart_agent() + + +@pytest.mark.managed_devcluster +def test_agent_resource_pool_unchanged( + restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster, +) -> None: + admin = api_utils.admin_session() + try: + restartable_managed_cluster_multi_resource_pools.kill_agent() + restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent20") + + for _i in range(5): + agent_data = managed_cluster.get_agent_data(admin) + if len(agent_data) == 0: + # Agent has exploded and been wiped due to resource pool mismatch, + # which is not expected. + pytest.fail("agent exploded even with the same resource pool") + finally: + restartable_managed_cluster_multi_resource_pools.dc.kill_stage("agent20") + restartable_managed_cluster_multi_resource_pools.restart_agent() diff --git a/e2e_tests/tests/cluster/test_priority_scheduler.py b/e2e_tests/tests/cluster/test_priority_scheduler.py index 2ee45d3f0f7..cbbd8da1d82 100644 --- a/e2e_tests/tests/cluster/test_priority_scheduler.py +++ b/e2e_tests/tests/cluster/test_priority_scheduler.py @@ -1,61 +1,61 @@ -# import subprocess - -# import pytest - -# from tests import api_utils -# from tests import config as conf -# from tests import detproc -# from tests import experiment as exp -# from tests.cluster import managed_cluster, utils - - -# @pytest.mark.managed_devcluster -# def test_priortity_scheduler_noop_experiment( -# managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, -# ) -> None: -# sess = api_utils.user_session() -# managed_cluster_priority_scheduler.ensure_agent_ok() -# assert str(conf.MASTER_PORT) == str(8082) -# # uses the default priority set in cluster config -# exp.run_basic_test( -# sess, conf.fixtures_path("no_op/single.yaml"), conf.fixtures_path("no_op"), 1 -# ) -# # uses explicit priority -# exp.run_basic_test( -# sess, conf.fixtures_path("no_op/single.yaml"), conf.fixtures_path("no_op"), 1, priority=50 -# ) - - -# @pytest.mark.managed_devcluster -# def test_priortity_scheduler_noop_command( -# managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, -# ) -> None: -# sess = api_utils.user_session() -# managed_cluster_priority_scheduler.ensure_agent_ok() -# assert str(conf.MASTER_PORT) == "8082" -# # without slots (and default priority) -# command_id = utils.run_command(sess, slots=0) -# utils.wait_for_command_state(sess, command_id, "TERMINATED", 40) -# utils.assert_command_succeeded(sess, command_id) -# # with slots (and default priority) -# command_id = utils.run_command(sess, slots=1) -# utils.wait_for_command_state(sess, command_id, "TERMINATED", 60) -# utils.assert_command_succeeded(sess, command_id) -# # explicity priority -# command_id = utils.run_command_set_priority(sess, slots=0, priority=60) -# utils.wait_for_command_state(sess, command_id, "TERMINATED", 60) -# utils.assert_command_succeeded(sess, command_id) - - -# @pytest.mark.managed_devcluster -# def test_slots_list_command( -# managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, -# ) -> None: -# sess = api_utils.user_session() -# managed_cluster_priority_scheduler.ensure_agent_ok() -# assert str(conf.MASTER_PORT) == "8082" -# command = ["det", "slot", "list"] -# p = detproc.run( -# sess, command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE -# ) -# assert p.returncode == 0, f"\nstdout:\n{p.stdout} \nstderr:\n{p.stderr}" +import subprocess + +import pytest + +from tests import api_utils +from tests import config as conf +from tests import detproc +from tests import experiment as exp +from tests.cluster import managed_cluster, utils + + +@pytest.mark.managed_devcluster +def test_priortity_scheduler_noop_experiment( + managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, +) -> None: + sess = api_utils.user_session() + managed_cluster_priority_scheduler.ensure_agent_ok() + assert str(conf.MASTER_PORT) == str(8082) + # uses the default priority set in cluster config + exp.run_basic_test( + sess, conf.fixtures_path("no_op/single.yaml"), conf.fixtures_path("no_op"), 1 + ) + # uses explicit priority + exp.run_basic_test( + sess, conf.fixtures_path("no_op/single.yaml"), conf.fixtures_path("no_op"), 1, priority=50 + ) + + +@pytest.mark.managed_devcluster +def test_priortity_scheduler_noop_command( + managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, +) -> None: + sess = api_utils.user_session() + managed_cluster_priority_scheduler.ensure_agent_ok() + assert str(conf.MASTER_PORT) == "8082" + # without slots (and default priority) + command_id = utils.run_command(sess, slots=0) + utils.wait_for_command_state(sess, command_id, "TERMINATED", 40) + utils.assert_command_succeeded(sess, command_id) + # with slots (and default priority) + command_id = utils.run_command(sess, slots=1) + utils.wait_for_command_state(sess, command_id, "TERMINATED", 60) + utils.assert_command_succeeded(sess, command_id) + # explicity priority + command_id = utils.run_command_set_priority(sess, slots=0, priority=60) + utils.wait_for_command_state(sess, command_id, "TERMINATED", 60) + utils.assert_command_succeeded(sess, command_id) + + +@pytest.mark.managed_devcluster +def test_slots_list_command( + managed_cluster_priority_scheduler: managed_cluster.ManagedCluster, +) -> None: + sess = api_utils.user_session() + managed_cluster_priority_scheduler.ensure_agent_ok() + assert str(conf.MASTER_PORT) == "8082" + command = ["det", "slot", "list"] + p = detproc.run( + sess, command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + assert p.returncode == 0, f"\nstdout:\n{p.stdout} \nstderr:\n{p.stderr}" diff --git a/master/internal/rm/agentrm/agent.go b/master/internal/rm/agentrm/agent.go index a5465898910..7c762565e05 100644 --- a/master/internal/rm/agentrm/agent.go +++ b/master/internal/rm/agentrm/agent.go @@ -620,6 +620,19 @@ func (a *agent) HandleIncomingWebsocketMessage(msg *aproto.MasterMessage) { a.stop(err) return } + resourcePoolErr := a.agentState.checkAgentResourcePoolMatch(msg.AgentStarted) + if resourcePoolErr != nil { + a.syslog.WithError(resourcePoolErr). + Error("change in agent resource pool was detected") + a.socket.Outbox <- aproto.AgentMessage{ + AgentShutdown: &aproto.AgentShutdown{ + ErrMsg: aproto.ErrAgentMustReconnect.Error(), + }, + } + + a.stop(resourcePoolErr) + return + } } else { a.agentStarted(msg.AgentStarted) } diff --git a/master/internal/rm/agentrm/agent_state.go b/master/internal/rm/agentrm/agent_state.go index 0a5941173db..6b4d6a4cd14 100644 --- a/master/internal/rm/agentrm/agent_state.go +++ b/master/internal/rm/agentrm/agent_state.go @@ -20,6 +20,8 @@ import ( "github.com/determined-ai/determined/master/pkg/model" ) +const defaultResourcePoolName = "default" + type slotEnabled struct { deviceAdded bool agentEnabled bool @@ -289,6 +291,20 @@ func (a *agentState) checkAgentStartedDevicesMatch( return nil } +// We need to compare the resource pool configurations between the agent and the master. +// Ideally, the master doesn't request new resource pool information if the agent reconnects +// within the designated reconnection period, while the agent should read from its updated configuration. +// If there's a mismatch, an error will be thrown, causing the agent to stop and require a restart. +func (a *agentState) checkAgentResourcePoolMatch( + agentStarted *aproto.AgentStarted, +) error { + if a.resourcePoolName != agentStarted.ResourcePoolName { + return fmt.Errorf("resource pool has changed: %s -> %s", a.resourcePoolName, agentStarted.ResourcePoolName) + } + + return nil +} + func (a *agentState) containerStateChanged(msg aproto.ContainerStateChanged) { for _, d := range msg.Container.Devices { s, ok := a.slotStates[d.ID] diff --git a/master/internal/rm/agentrm/agent_state_test.go b/master/internal/rm/agentrm/agent_state_test.go index 3341f83f9d0..921c876c337 100644 --- a/master/internal/rm/agentrm/agent_state_test.go +++ b/master/internal/rm/agentrm/agent_state_test.go @@ -70,6 +70,7 @@ func TestAgentStatePersistence(t *testing.T) { Version: "", Devices: devices, ContainersReattached: []aproto.ContainerReattachAck{}, + ResourcePoolName: defaultResourcePoolName, } state.agentStarted(started) require.Len(t, state.getSlotsSummary("/myagent"), 2) @@ -322,6 +323,62 @@ func Test_agentState_checkAgentStartedDevicesMatch(t *testing.T) { } } +func Test_agentState_checkAgentResourcePoolMatch(t *testing.T) { + tests := []struct { + name string + state agentState + agentStarted *aproto.AgentStarted + wantErrContains string + }{ + { + name: "resource pool name match", + state: agentState{ + resourcePoolName: "pool1", + }, + agentStarted: &aproto.AgentStarted{ + ResourcePoolName: "pool1", + }, + wantErrContains: "", + }, + { + name: "resource pool name is missing", + state: agentState{ + resourcePoolName: "pool1", + }, + agentStarted: &aproto.AgentStarted{ResourcePoolName: defaultResourcePoolName}, + wantErrContains: "resource pool has changed", + }, + { + name: "resource pool name is missing", + state: agentState{ + resourcePoolName: defaultResourcePoolName, + }, + agentStarted: &aproto.AgentStarted{ResourcePoolName: "pool1"}, + wantErrContains: "resource pool has changed", + }, + { + name: "mismatched resource pool name", + state: agentState{ + resourcePoolName: "pool1", + }, + agentStarted: &aproto.AgentStarted{ + ResourcePoolName: "pool2", + }, + wantErrContains: "resource pool has changed", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.state.checkAgentResourcePoolMatch(tt.agentStarted) + if tt.wantErrContains == "" { + require.NoError(t, err) + return + } + require.ErrorContains(t, err, tt.wantErrContains) + }) + } +} + func TestSlotStates(t *testing.T) { rpName := "test" state := newAgentState(aproto.ID(uuid.NewString()), 64) @@ -345,6 +402,7 @@ func TestSlotStates(t *testing.T) { Version: "", Devices: devices, ContainersReattached: []aproto.ContainerReattachAck{}, + ResourcePoolName: defaultResourcePoolName, } state.agentStarted(started) slots := state.getSlotsSummary("/") diff --git a/master/internal/rm/agentrm/resource_managers_test.go b/master/internal/rm/agentrm/resource_managers_test.go index dea2af873ca..25c81801a9d 100644 --- a/master/internal/rm/agentrm/resource_managers_test.go +++ b/master/internal/rm/agentrm/resource_managers_test.go @@ -15,7 +15,7 @@ import ( "github.com/determined-ai/determined/master/internal/user" ) -const defaultResourcePoolName = "default" +// const defaultResourcePoolName = "default" func TestResourceManagerForwardMessage(t *testing.T) { user.InitService(nil, nil) diff --git a/master/pkg/aproto/master_message.go b/master/pkg/aproto/master_message.go index edbefb08b1c..ea2385377b6 100644 --- a/master/pkg/aproto/master_message.go +++ b/master/pkg/aproto/master_message.go @@ -80,6 +80,7 @@ type AgentStarted struct { Version string Devices []device.Device ContainersReattached []ContainerReattachAck + ResourcePoolName string } // ContainerStateChanged notifies the master that the agent transitioned the container state. From 32c30e0200fbc6a535ffda07b74aadd956f92698 Mon Sep 17 00:00:00 2001 From: ShreyaLnuHpe Date: Thu, 22 Aug 2024 13:35:08 -0700 Subject: [PATCH 3/8] change ports --- .../multi-resource-pools.devcluster.yaml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/.circleci/devcluster/multi-resource-pools.devcluster.yaml b/.circleci/devcluster/multi-resource-pools.devcluster.yaml index acb71d73e36..96485b04089 100644 --- a/.circleci/devcluster/multi-resource-pools.devcluster.yaml +++ b/.circleci/devcluster/multi-resource-pools.devcluster.yaml @@ -1,7 +1,7 @@ stages: - db: name: db - port: 5434 + port: 5436 - master: pre: @@ -11,7 +11,7 @@ stages: initial_user_password: $INITIAL_USER_PASSWORD db: host: localhost - port: 5434 + port: 5436 password: postgres user: postgres name: determined @@ -57,16 +57,16 @@ stages: - custom: name: proxy - cmd: ["socat", "-d", "-d", "TCP-LISTEN:8081,reuseaddr,fork", "TCP:localhost:8080"] + cmd: ["socat", "-d", "-d", "TCP-LISTEN:8083,reuseaddr,fork", "TCP:localhost:8080"] post: - conncheck: - port: 8081 + port: 8083 - agent: name: agent1 config_file: master_host: 127.0.0.1 - master_port: 8081 + master_port: 8083 agent_id: agent1 container_master_host: $DOCKER_LOCALHOST agent_reconnect_attempts: 24 @@ -80,7 +80,7 @@ stages: name: agent2 config_file: master_host: 127.0.0.1 - master_port: 8081 + master_port: 8083 agent_id: agent2 container_master_host: $DOCKER_LOCALHOST agent_reconnect_attempts: 24 @@ -91,7 +91,7 @@ stages: name: agent10 # Copy of agent1, but with different resource pool. config_file: master_host: 127.0.0.1 - master_port: 8081 + master_port: 8083 agent_id: agent1 container_master_host: $DOCKER_LOCALHOST agent_reconnect_attempts: 24 @@ -103,7 +103,7 @@ stages: name: agent20 # Copy of agent1, but with empty(default) resource pool. config_file: master_host: 127.0.0.1 - master_port: 8081 + master_port: 8083 agent_id: agent1 container_master_host: $DOCKER_LOCALHOST agent_reconnect_attempts: 24 From c0dbd55e1eaa5ec4ff98eff9e710f1ea19f6e50d Mon Sep 17 00:00:00 2001 From: ShreyaLnuHpe Date: Thu, 22 Aug 2024 13:41:14 -0700 Subject: [PATCH 4/8] add different artifacts --- .../devcluster/multi-resource-pools.devcluster.yaml | 9 +++++++-- .circleci/real_config.yml | 3 +++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/.circleci/devcluster/multi-resource-pools.devcluster.yaml b/.circleci/devcluster/multi-resource-pools.devcluster.yaml index 96485b04089..80d771fecf5 100644 --- a/.circleci/devcluster/multi-resource-pools.devcluster.yaml +++ b/.circleci/devcluster/multi-resource-pools.devcluster.yaml @@ -1,6 +1,11 @@ +temp_dir: /tmp/multi_resource_pools + stages: - db: + name: db-multi-resource-pools name: db + db_name: db-multi-resource-pools + container_name: determined_db-multi-resource-pools port: 5436 - master: @@ -14,13 +19,13 @@ stages: port: 5436 password: postgres user: postgres - name: determined + name: db-multi-resource-pools __internal: preemption_timeout: 60s checkpoint_storage: type: shared_fs host_path: /tmp - storage_path: determined-cp + storage_path: determined-cp-multi-resource-pools log: level: debug root: tools/build diff --git a/.circleci/real_config.yml b/.circleci/real_config.yml index 41e36cc66ef..e3abd24bf85 100644 --- a/.circleci/real_config.yml +++ b/.circleci/real_config.yml @@ -3252,6 +3252,9 @@ jobs: - store_artifacts: path: /tmp/priority_scheduler destination: devcluster-priority_scheduler-logs + - store_artifacts: + path: /tmp/multi_resource_pools + destination: devcluster-multi-resource-pools-logs send-job-level-alerts: machine: From ea5fbf72600db9a9f19fdd1471deab0f86394b39 Mon Sep 17 00:00:00 2001 From: ShreyaLnuHpe Date: Thu, 22 Aug 2024 14:29:37 -0700 Subject: [PATCH 5/8] comment existing test --- e2e_tests/tests/cluster/test_agent.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/e2e_tests/tests/cluster/test_agent.py b/e2e_tests/tests/cluster/test_agent.py index 55e9cf57357..61028553b5f 100644 --- a/e2e_tests/tests/cluster/test_agent.py +++ b/e2e_tests/tests/cluster/test_agent.py @@ -30,16 +30,16 @@ def test_agent_never_connect() -> None: else: pytest.fail("Did not find expected file from agent connection loss hook") - -@pytest.mark.managed_devcluster -def test_agent_fail_reconnect(restartable_managed_cluster: managed_cluster.ManagedCluster) -> None: - restartable_managed_cluster.kill_proxy() - - for _ in range(150): # ManagedCluster agents try to reconnect for 24 * 5 seconds. TODO: eh. - if os.path.exists("/tmp/agent1-connection-lost"): - restartable_managed_cluster.restart_agent(wait_for_agent=False, wait_for_amnesia=False) - restartable_managed_cluster.restart_proxy() - break - time.sleep(1) - else: - pytest.fail("Did not find expected file from agent connection loss hook") +# +# @pytest.mark.managed_devcluster +# def test_agent_fail_reconnect(restartable_managed_cluster: managed_cluster.ManagedCluster) -> None: +# restartable_managed_cluster.kill_proxy() + +# for _ in range(150): # ManagedCluster agents try to reconnect for 24 * 5 seconds. TODO: eh. +# if os.path.exists("/tmp/agent1-connection-lost"): +# restartable_managed_cluster.restart_agent(wait_for_agent=False, wait_for_amnesia=False) +# restartable_managed_cluster.restart_proxy() +# break +# time.sleep(1) +# else: +# pytest.fail("Did not find expected file from agent connection loss hook") From 85b5b9bf534711e1a55a783e0525fac307115706 Mon Sep 17 00:00:00 2001 From: ShreyaLnuHpe Date: Thu, 22 Aug 2024 14:44:07 -0700 Subject: [PATCH 6/8] remove db name --- .circleci/devcluster/multi-resource-pools.devcluster.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.circleci/devcluster/multi-resource-pools.devcluster.yaml b/.circleci/devcluster/multi-resource-pools.devcluster.yaml index 80d771fecf5..b36fd9aedc1 100644 --- a/.circleci/devcluster/multi-resource-pools.devcluster.yaml +++ b/.circleci/devcluster/multi-resource-pools.devcluster.yaml @@ -3,7 +3,6 @@ temp_dir: /tmp/multi_resource_pools stages: - db: name: db-multi-resource-pools - name: db db_name: db-multi-resource-pools container_name: determined_db-multi-resource-pools port: 5436 From 337f3c6a649bc1d9d8a310c41eaa2b3b58e9910f Mon Sep 17 00:00:00 2001 From: ShreyaLnuHpe Date: Thu, 22 Aug 2024 15:03:52 -0700 Subject: [PATCH 7/8] change to user session --- e2e_tests/tests/cluster/test_agent.py | 26 +++++++++---------- .../cluster/test_multi_resource_pools.py | 4 +-- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/e2e_tests/tests/cluster/test_agent.py b/e2e_tests/tests/cluster/test_agent.py index 61028553b5f..55e9cf57357 100644 --- a/e2e_tests/tests/cluster/test_agent.py +++ b/e2e_tests/tests/cluster/test_agent.py @@ -30,16 +30,16 @@ def test_agent_never_connect() -> None: else: pytest.fail("Did not find expected file from agent connection loss hook") -# -# @pytest.mark.managed_devcluster -# def test_agent_fail_reconnect(restartable_managed_cluster: managed_cluster.ManagedCluster) -> None: -# restartable_managed_cluster.kill_proxy() - -# for _ in range(150): # ManagedCluster agents try to reconnect for 24 * 5 seconds. TODO: eh. -# if os.path.exists("/tmp/agent1-connection-lost"): -# restartable_managed_cluster.restart_agent(wait_for_agent=False, wait_for_amnesia=False) -# restartable_managed_cluster.restart_proxy() -# break -# time.sleep(1) -# else: -# pytest.fail("Did not find expected file from agent connection loss hook") + +@pytest.mark.managed_devcluster +def test_agent_fail_reconnect(restartable_managed_cluster: managed_cluster.ManagedCluster) -> None: + restartable_managed_cluster.kill_proxy() + + for _ in range(150): # ManagedCluster agents try to reconnect for 24 * 5 seconds. TODO: eh. + if os.path.exists("/tmp/agent1-connection-lost"): + restartable_managed_cluster.restart_agent(wait_for_agent=False, wait_for_amnesia=False) + restartable_managed_cluster.restart_proxy() + break + time.sleep(1) + else: + pytest.fail("Did not find expected file from agent connection loss hook") diff --git a/e2e_tests/tests/cluster/test_multi_resource_pools.py b/e2e_tests/tests/cluster/test_multi_resource_pools.py index 9897a54ab4a..843a5146269 100644 --- a/e2e_tests/tests/cluster/test_multi_resource_pools.py +++ b/e2e_tests/tests/cluster/test_multi_resource_pools.py @@ -14,7 +14,7 @@ def test_agent_resource_pool_change( restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster, ) -> None: - admin = api_utils.admin_session() + admin = api_utils.user_session() # api_utils.admin_session() try: restartable_managed_cluster_multi_resource_pools.kill_agent() restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent10") @@ -37,7 +37,7 @@ def test_agent_resource_pool_change( def test_agent_resource_pool_unchanged( restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster, ) -> None: - admin = api_utils.admin_session() + admin = api_utils.user_session() # api_utils.admin_session() try: restartable_managed_cluster_multi_resource_pools.kill_agent() restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent20") From dd1d65a262ed518173a91640c273396417fc7927 Mon Sep 17 00:00:00 2001 From: ShreyaLnuHpe Date: Wed, 28 Aug 2024 02:21:09 -0700 Subject: [PATCH 8/8] try increasing parallelism --- .circleci/real_config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/real_config.yml b/.circleci/real_config.yml index e3abd24bf85..1c422d5f0c3 100644 --- a/.circleci/real_config.yml +++ b/.circleci/real_config.yml @@ -4505,7 +4505,7 @@ workflows: - dev-ci-cluster-default-user-credentials requires: - build-go-ee - parallelism: 4 + parallelism: 5 resource-class: large tf2: true mark: managed_devcluster