Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wait to allow PodDefaults to synced in Job's namespace before running tests #120

Merged
merged 10 commits into from
Sep 17, 2024
29 changes: 28 additions & 1 deletion driver/test_kubeflow_workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import os
import subprocess
import time
from pathlib import Path
from typing import Dict

Expand All @@ -15,7 +16,13 @@
load_in_cluster_generic_resources,
)
from lightkube.types import CascadeType
from utils import assert_namespace_active, assert_profile_deleted, fetch_job_logs, wait_for_job
from utils import (
assert_namespace_active,
assert_poddefault_created_in_namespace,
assert_profile_deleted,
fetch_job_logs,
wait_for_job,
)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,6 +55,8 @@
)
PODDEFAULT_WITH_PROXY_PATH = Path("tests") / "proxy-poddefault.yaml.j2"

KFP_PODDEFAULT_NAME = "access-ml-pipeline"


@pytest.fixture(scope="session")
def pytest_filter(request):
Expand Down Expand Up @@ -146,6 +155,24 @@ async def test_create_profile(lightkube_client, create_profile):

assert_namespace_active(lightkube_client, NAMESPACE)

# Wait until KFP PodDefault is created in the namespace
assert_poddefault_created_in_namespace(lightkube_client, KFP_PODDEFAULT_NAME, NAMESPACE)

# Sync of other PodDefaults to the namespace can take up to 10 seconds
# Wait here is necessary to allow the creation of PodDefaults before Job is created
sleep_time_seconds = 10
log.info(
f"Sleeping for {sleep_time_seconds}s to allow the creation of PodDefaults in {NAMESPACE} namespace.."
)
time.sleep(sleep_time_seconds)

# Get PodDefaults in the test namespace
created_poddefaults_list = lightkube_client.list(PODDEFAULT_RESOURCE, namespace=NAMESPACE)
created_poddefaults_names = [pd.metadata.name for pd in created_poddefaults_list]

# Print the names of PodDefaults in the test namespace
log.info(f"PodDefaults in {NAMESPACE} namespace are {created_poddefaults_names}.")


def test_kubeflow_workloads(
lightkube_client, pytest_cmd, tests_checked_out_commit, request, create_poddefaults_on_proxy
Expand Down
31 changes: 30 additions & 1 deletion driver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import tenacity
from lightkube import ApiError, Client
from lightkube.generic_resource import create_global_resource
from lightkube.generic_resource import create_global_resource, create_namespaced_resource
from lightkube.resources.batch_v1 import Job
from lightkube.resources.core_v1 import Namespace

Expand All @@ -19,6 +19,13 @@

log = logging.getLogger(__name__)

PODDEFAULT_RESOURCE = create_namespaced_resource(
group="kubeflow.org",
version="v1alpha1",
kind="poddefault",
plural="poddefaults",
)


@tenacity.retry(
wait=tenacity.wait_exponential(multiplier=2, min=1, max=10),
Expand All @@ -41,6 +48,28 @@ def assert_namespace_active(
assert phase == "Active", f"Waited too long for namespace {namespace}!"


@tenacity.retry(
wait=tenacity.wait_exponential(multiplier=2, min=1, max=10),
stop=tenacity.stop_after_attempt(15),
reraise=True,
)
def assert_poddefault_created_in_namespace(
client: Client,
name: str,
namespace: str,
):
"""Test that the given namespace contains the PodDefault required.

Retries multiple times to allow for the PodDefault to be synced to the namespace.
"""
pod_default = None
try:
pod_default = client.get(PODDEFAULT_RESOURCE, name, namespace=namespace)
except ApiError:
log.info(f"Waiting for PodDefault {name} to be created in namespace {namespace}..")
assert pod_default is not None, f"Waited too long for PodDefault {name} to be created."


def _log_before_sleep(retry_state):
"""Custom callback to log the number of seconds before the next attempt."""
next_attempt = retry_state.attempt_number
Expand Down