Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: testing priority scheduler #9857

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions .circleci/devcluster/multi-resource-pools.devcluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
temp_dir: /tmp/multi_resource_pools

stages:
- db:
name: db-multi-resource-pools
db_name: db-multi-resource-pools
container_name: determined_db-multi-resource-pools
port: 5436

- master:
pre:
- sh: make -C tools prep-root
config_file:
security:
initial_user_password: $INITIAL_USER_PASSWORD
db:
host: localhost
port: 5436
password: postgres
user: postgres
name: db-multi-resource-pools
__internal:
preemption_timeout: 60s
checkpoint_storage:
type: shared_fs
host_path: /tmp
storage_path: determined-cp-multi-resource-pools
log:
level: debug
root: tools/build
cache:
cache_dir: /tmp/determined-cache
launch_error: false
telemetry:
enabled: false
resource_manager:
default_aux_resource_pool: default
default_compute_resource_pool: default
scheduler:
fitting_policy: best
type: fair_share
type: agent
resource_pools:
- agent_reattach_enabled: true
agent_reconnect_wait: 25s
description: ''
max_aux_containers_per_agent: 100
pool_name: default
provider: null
task_container_defaults: null
- pool_name: pool1
max_slots: 8
scheduler:
type: priority
scim:
enabled: true
auth:
type: basic
username: determined
password: password

- custom:
name: proxy
cmd: ["socat", "-d", "-d", "TCP-LISTEN:8083,reuseaddr,fork", "TCP:localhost:8080"]
post:
- conncheck:
port: 8083

- agent:
name: agent1
config_file:
master_host: 127.0.0.1
master_port: 8083
agent_id: agent1
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true
hooks:
on_connection_lost: ["touch", "/tmp/agent1-connection-lost"]


- agent:
name: agent2
config_file:
master_host: 127.0.0.1
master_port: 8083
agent_id: agent2
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true

- agent:
name: agent10 # Copy of agent1, but with different resource pool.
config_file:
master_host: 127.0.0.1
master_port: 8083
agent_id: agent1
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true
resource_pool: pool1

- agent:
name: agent20 # Copy of agent1, but with empty(default) resource pool.
config_file:
master_host: 127.0.0.1
master_port: 8083
agent_id: agent1
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true
5 changes: 4 additions & 1 deletion .circleci/real_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3252,6 +3252,9 @@ jobs:
- store_artifacts:
path: /tmp/priority_scheduler
destination: devcluster-priority_scheduler-logs
- store_artifacts:
path: /tmp/multi_resource_pools
destination: devcluster-multi-resource-pools-logs

send-job-level-alerts:
machine:
Expand Down Expand Up @@ -4502,7 +4505,7 @@ workflows:
- dev-ci-cluster-default-user-credentials
requires:
- build-go-ee
parallelism: 4
parallelism: 5
resource-class: large
tf2: true
mark: managed_devcluster
Expand Down
4 changes: 4 additions & 0 deletions agent/cmd/determined-agent/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const DefaultRawConfig = `
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down Expand Up @@ -100,6 +101,7 @@ func TestMergeAgentConfigViaNewViper(t *testing.T) {
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down Expand Up @@ -157,6 +159,7 @@ func TestMergeAgentConfigViaViperWithDefaults(t *testing.T) {
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down Expand Up @@ -213,6 +216,7 @@ func TestMergeAgentConfigViaViperWithDefaultsEnvAndFlags(t *testing.T) {
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down
2 changes: 2 additions & 0 deletions agent/internal/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (a *Agent) run(ctx context.Context) error {
Version: a.version,
Devices: devices,
ContainersReattached: reattached,
ResourcePoolName: a.opts.ResourcePool,
}}:
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -351,6 +352,7 @@ func (a *Agent) reconnectFlow(
Version: a.version,
Devices: devices,
ContainersReattached: reattached,
ResourcePoolName: a.opts.ResourcePool,
}}:
case <-ctx.Done():
return nil, nil, ctx.Err()
Expand Down
1 change: 1 addition & 0 deletions agent/internal/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func DefaultOptions() *Options {
Level: "trace",
Color: true,
},
ResourcePool: "default",
SlotType: "auto",
VisibleGPUs: VisibleGPUsFromEnvironment(),
BindIP: "0.0.0.0",
Expand Down
1 change: 1 addition & 0 deletions agent/internal/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ log:
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down
3 changes: 3 additions & 0 deletions e2e_tests/tests/cluster/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Makes managed_cluster fixtures available to all files in the directory"""

from .managed_cluster import ( # noqa
managed_cluster_multi_resource_pools,
managed_cluster_priority_scheduler,
managed_cluster_restarts,
managed_cluster_session,
managed_cluster_session_multi_resource_pools,
managed_cluster_session_priority_scheduler,
restartable_managed_cluster,
restartable_managed_cluster_multi_resource_pools,
)
from .managed_cluster_k8s import k8s_managed_cluster # noqa
from .managed_slurm_cluster import ( # noqa
Expand Down
41 changes: 41 additions & 0 deletions e2e_tests/tests/cluster/managed_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
DEVCLUSTER_REATTACH_OFF_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double.devcluster.yaml"
DEVCLUSTER_REATTACH_ON_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double-reattach.devcluster.yaml"
DEVCLUSTER_PRIORITY_SCHEDULER_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "priority.devcluster.yaml"
DEVCLUSTER_MULTI_RP_CONFIG_PATH = (
DEVCLUSTER_CONFIG_ROOT_PATH / "multi-resource-pools.devcluster.yaml"
)


def get_agent_data(sess: api.Session) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -204,3 +207,41 @@ def restartable_managed_cluster(
managed_cluster_restarts.restart_master()
managed_cluster_restarts.restart_agent()
raise


@pytest.fixture(scope="session")
def managed_cluster_session_multi_resource_pools(request: Any) -> Iterator[ManagedCluster]:
config = str(DEVCLUSTER_MULTI_RP_CONFIG_PATH)
with ManagedCluster(config) as mc:
mc.initial_startup()
yield mc


@pytest.fixture
def managed_cluster_multi_resource_pools(
managed_cluster_session_multi_resource_pools: ManagedCluster, request: Any
) -> Iterator[ManagedCluster]:
config = str(DEVCLUSTER_MULTI_RP_CONFIG_PATH)
utils.set_master_port(config)
nodeid = request.node.nodeid
managed_cluster_session_multi_resource_pools.log_marker(
f"pytest [{utils.now_ts()}] {nodeid} setup\n"
)
yield managed_cluster_session_multi_resource_pools
managed_cluster_session_multi_resource_pools.log_marker(
f"pytest [{utils.now_ts()}] {nodeid} teardown\n"
)


@pytest.fixture
def restartable_managed_cluster_multi_resource_pools(
managed_cluster_multi_resource_pools: ManagedCluster,
) -> Iterator[ManagedCluster]:
managed_cluster_multi_resource_pools.wait_for_agent_ok(20)
try:
yield managed_cluster_multi_resource_pools
managed_cluster_multi_resource_pools.wait_for_agent_ok(20)
except Exception:
managed_cluster_multi_resource_pools.restart_master()
managed_cluster_multi_resource_pools.restart_agent()
raise
53 changes: 53 additions & 0 deletions e2e_tests/tests/cluster/test_multi_resource_pools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# import subprocess

import pytest

from tests import api_utils

# from tests import config as conf
# from tests import detproc
# from tests import experiment as exp
from tests.cluster import managed_cluster


@pytest.mark.managed_devcluster
def test_agent_resource_pool_change(
restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster,
) -> None:
admin = api_utils.user_session() # api_utils.admin_session()
try:
restartable_managed_cluster_multi_resource_pools.kill_agent()
restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent10")

for _i in range(5):
agent_data = managed_cluster.get_agent_data(admin)
if len(agent_data) == 0:
# Agent has exploded and been wiped due to resource pool mismatch, as expected.
break
else:
pytest.fail(
f"agent with different resource pool is still present after {_i} ticks:{agent_data}"
)
finally:
restartable_managed_cluster_multi_resource_pools.dc.kill_stage("agent10")
restartable_managed_cluster_multi_resource_pools.restart_agent()


@pytest.mark.managed_devcluster
def test_agent_resource_pool_unchanged(
restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster,
) -> None:
admin = api_utils.user_session() # api_utils.admin_session()
try:
restartable_managed_cluster_multi_resource_pools.kill_agent()
restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent20")

for _i in range(5):
agent_data = managed_cluster.get_agent_data(admin)
if len(agent_data) == 0:
# Agent has exploded and been wiped due to resource pool mismatch,
# which is not expected.
pytest.fail("agent exploded even with the same resource pool")
finally:
restartable_managed_cluster_multi_resource_pools.dc.kill_stage("agent20")
restartable_managed_cluster_multi_resource_pools.restart_agent()
13 changes: 13 additions & 0 deletions master/internal/rm/agentrm/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,19 @@ func (a *agent) HandleIncomingWebsocketMessage(msg *aproto.MasterMessage) {
a.stop(err)
return
}
resourcePoolErr := a.agentState.checkAgentResourcePoolMatch(msg.AgentStarted)
if resourcePoolErr != nil {
a.syslog.WithError(resourcePoolErr).
Error("change in agent resource pool was detected")
a.socket.Outbox <- aproto.AgentMessage{
AgentShutdown: &aproto.AgentShutdown{
ErrMsg: aproto.ErrAgentMustReconnect.Error(),
},
}

a.stop(resourcePoolErr)
return
}
} else {
a.agentStarted(msg.AgentStarted)
}
Expand Down
16 changes: 16 additions & 0 deletions master/internal/rm/agentrm/agent_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/determined-ai/determined/master/pkg/model"
)

const defaultResourcePoolName = "default"

type slotEnabled struct {
deviceAdded bool
agentEnabled bool
Expand Down Expand Up @@ -289,6 +291,20 @@ func (a *agentState) checkAgentStartedDevicesMatch(
return nil
}

// We need to compare the resource pool configurations between the agent and the master.
// Ideally, the master doesn't request new resource pool information if the agent reconnects
// within the designated reconnection period, while the agent should read from its updated configuration.
// If there's a mismatch, an error will be thrown, causing the agent to stop and require a restart.
func (a *agentState) checkAgentResourcePoolMatch(
agentStarted *aproto.AgentStarted,
) error {
if a.resourcePoolName != agentStarted.ResourcePoolName {
return fmt.Errorf("resource pool has changed: %s -> %s", a.resourcePoolName, agentStarted.ResourcePoolName)
}

return nil
}

func (a *agentState) containerStateChanged(msg aproto.ContainerStateChanged) {
for _, d := range msg.Container.Devices {
s, ok := a.slotStates[d.ID]
Expand Down
Loading
Loading