Skip to content

Commit

Permalink
ci: replace no_op fixture with a noop api (#9997)
Browse files Browse the repository at this point in the history
Eliminate a couple dozen yaml files for controlling the no_op fixture,
each of which was tweaked a half dozen ways by different tests.

There were 124 usages of the no_op fixture, and it was very hard to
know what any particular test was trying to accomplish.  All of these
(except 6 from the custom searcher tests, which are removed in an
upcoming feature branch) have been re-written to use a new python module
for creating noop experiments with obvious behaviors.

By my measurements, a combined total of 34 minutes of effective sleeping
were removed from the individual tests of our test suite.  The biggest
wins were from cases where the test author probably did not realize how
long some of the no_op experiments were configured to run for.

Most tests were faithfully preserved, with the following exceptions:

- cluster/test_exp_continue:test_continue_config_file_and_args_cli
    - converted to a unit test
- cluster/test_exp_continue:test_continue_config_file_cli
    - deleted; with new unit test, adds nothing to test_continue_batches
- cluster/test_exp_continue:test_continue_fixing_broken_config
    - deleted; adds nothing to test_continue_batches
- cluster/test_exp_continue:test_continue_workloads_searcher
    - deleted since it was really a wlsq test
- cluster/test_exp_continue:test_continue_pytorch_completed_searcher
    - deleted since it was really a pytorch trainer test
- cluster/test_resource_manager:test_allocation_resources_incremental_release
    - the test has not been working, I think at least since we defaulted
      to using det.launch.torch_distributed; the non-chief container was
      not exiting until the chief exited
- experiment/test_core:test_trial_logs
    - deleted due to cluster/test_logging
- experiment/test_core:test_log_null_bytes
    - deleted, but added null bytes to test_logging.py
- experiment/test_noop:test_noop_nan_validations
    - combined with test_noop_pause
- experiment/test_noop:test_cancel_ten_experiments
    - this test is dumb, also it was pathologically slow
- experiment/test_noop:test_cancel_ten_paused_experiments
    - this test is dumb
- experiment/test_noop:test_startup_hook
    - test_logging tests startup hooks already
- run/test_api:test_run_pause_and_resume_filter_skip_empty
    - renamed to test_run_in_search_not_pausable_or_resumable to match
      its intended purpose, also simplify it, also make it stricter,
      also stop leaking adaptive searches onto the cluster after passing
  • Loading branch information
rb-determined-ai authored Oct 2, 2024
1 parent 987b2a5 commit a0cc818
Show file tree
Hide file tree
Showing 64 changed files with 1,556 additions and 2,355 deletions.
9 changes: 9 additions & 0 deletions e2e_tests/tests/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from determined.common import api
from determined.common.api import authentication, bindings, certs, errors
from determined.experimental import client
from tests import config as conf

_cert: Optional[certs.Cert] = None
Expand Down Expand Up @@ -53,6 +54,14 @@ def create_test_user(
return sess, password


def create_linked_user(uid: int, agent_user: str, gid: int, group: str) -> api.Session:
sess, _ = create_test_user()
det_obj = client.Determined._from_session(admin_session())
user = det_obj.get_user_by_name(user_name=sess.username)
user.link_with_agent(agent_gid=gid, agent_uid=uid, agent_group=group, agent_user=agent_user)
return sess


def assign_user_role(session: api.Session, user: str, role: str, workspace: Optional[str]) -> None:
user_assign = api.create_user_assignment_request(
session, user=user, role=role, workspace=workspace
Expand Down
113 changes: 56 additions & 57 deletions e2e_tests/tests/cluster/test_agent_disable.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

from determined.common import api
from determined.common.api import bindings
from tests import api_utils
from tests import config as conf
from tests import detproc
from determined.experimental import client
from tests import api_utils, detproc
from tests import experiment as exp
from tests.cluster import utils
from tests.experiment import noop


@pytest.mark.e2e_cpu
Expand Down Expand Up @@ -85,15 +85,11 @@ def test_disable_agent_experiment_resume() -> None:
assert len(slots) == 1
agent_id = slots[0]["agent_id"]

exp_id = exp.create_experiment(
sess,
conf.fixtures_path("no_op/single-medium-train-step.yaml"),
conf.fixtures_path("no_op"),
["--config", "max_restarts=0"],
)
# Make the experiment preemptible.
exp_ref = noop.create_experiment(sess, [noop.Sleep(100)], config={"max_restarts": 0})
exp.wait_for_experiment_state(
sess,
exp_id,
exp_ref.id,
bindings.experimentv1State.RUNNING,
max_wait_secs=utils.KUBERNETES_EXPERIMENT_TIMEOUT,
)
Expand All @@ -108,7 +104,18 @@ def test_disable_agent_experiment_resume() -> None:
time.sleep(1)
else:
pytest.fail("Experiment stayed scheduled after agent was disabled")
exp.wait_for_experiment_state(sess, exp_id, bindings.experimentv1State.COMPLETED)

# Wait for the experiment to be running again.
exp.wait_for_experiment_state(
sess,
exp_ref.id,
bindings.experimentv1State.RUNNING,
max_wait_secs=utils.KUBERNETES_EXPERIMENT_TIMEOUT,
)

# Now just kill it off.
exp_ref.kill()
assert exp_ref.wait(interval=0.01) == client.ExperimentState.CANCELED


@pytest.mark.e2e_cpu
Expand Down Expand Up @@ -151,50 +158,48 @@ def test_drain_agent() -> None:
assert len(slots) == 1
agent_id = slots[0]["agent_id"]

experiment_id = exp.create_experiment(
exp_ref = noop.create_experiment(
sess,
conf.fixtures_path("no_op/single-medium-train-step.yaml"),
conf.fixtures_path("no_op"),
["--config", "hyperparameters.training_batch_seconds=0.15"], # Take 15 seconds.
[
# Two Reports to meet the requirements of wait_for_workload_progress().
noop.Report({"loss": 1}),
noop.Report({"loss": 1}),
noop.Sleep(5),
# A third Report to prove we finished successfully.
noop.Report({"loss": 1}),
],
)
exp.wait_for_experiment_state(
sess,
experiment_id,
exp_ref.id,
bindings.experimentv1State.RUNNING,
max_wait_secs=utils.KUBERNETES_EXPERIMENT_TIMEOUT,
)
exp.wait_for_experiment_active_workload(sess, experiment_id)
exp.wait_for_experiment_workload_progress(sess, experiment_id)
exp.wait_for_experiment_active_workload(sess, exp_ref.id)
exp.wait_for_experiment_workload_progress(sess, exp_ref.id)

# Disable and quickly enable it back.
with _disable_agent(admin, agent_id, drain=True):
pass

# Try to launch another experiment. It shouldn't get scheduled because the
# slot is still busy with the first experiment.
experiment_id_no_start = exp.create_experiment(
sess,
conf.fixtures_path("no_op/single-medium-train-step.yaml"),
conf.fixtures_path("no_op"),
None,
)
time.sleep(5)
exp.wait_for_experiment_state(sess, experiment_id_no_start, bindings.experimentv1State.QUEUED)
no_start = noop.create_experiment(sess)
time.sleep(2)
no_start.reload()
assert no_start.state == client.ExperimentState.QUEUED, no_start.state

with _disable_agent(admin, agent_id, drain=True):
# Ensure the first one has finished with the correct number of workloads.
exp.wait_for_experiment_state(sess, experiment_id, bindings.experimentv1State.COMPLETED)
trials = exp.experiment_trials(sess, experiment_id)
assert exp_ref.wait(interval=0.01) == client.ExperimentState.COMPLETED
trials = exp.experiment_trials(sess, exp_ref.id)
assert len(trials) == 1
assert len(trials[0].workloads) == 7
assert len(trials[0].workloads) == 3

# Check for 15 seconds it doesn't get scheduled into the same slot.
for _ in range(15):
assert (
exp.experiment_state(sess, experiment_id_no_start)
== bindings.experimentv1State.QUEUED
)
time.sleep(1)
# Make sure it doesn't get scheduled into the same slot.
time.sleep(2)
no_start.reload()
assert no_start.state == client.ExperimentState.QUEUED, no_start.state

# Ensure the slot is empty.
slots = _fetch_slots(admin)
Expand All @@ -211,7 +216,7 @@ def test_drain_agent() -> None:
assert agent_data["enabled"] is False
assert agent_data["draining"] is True

exp.kill_single(sess, experiment_id_no_start)
no_start.kill()


@pytest.mark.e2e_cpu_2a
Expand All @@ -225,30 +230,27 @@ def test_drain_agent_sched() -> None:
slots = _wait_for_slots(admin, 2)
assert len(slots) == 2

exp_id1 = exp.create_experiment(
exp_ref1 = noop.create_experiment(
sess,
conf.fixtures_path("no_op/single-medium-train-step.yaml"),
conf.fixtures_path("no_op"),
None,
[
# Two Reports to meet the requirements of wait_for_workload_progress().
noop.Report({"loss": 1}),
noop.Report({"loss": 1}),
noop.Sleep(100),
],
)
exp.wait_for_experiment_workload_progress(sess, exp_id1)
exp.wait_for_experiment_workload_progress(sess, exp_ref1.id)

slots = _fetch_slots(admin)
used_slots = [s for s in slots if s["allocation_id"] != "FREE"]
assert len(used_slots) == 1
agent_id1 = used_slots[0]["agent_id"]

with _disable_agent(admin, agent_id1, drain=True):
exp_id2 = exp.create_experiment(
sess,
conf.fixtures_path("no_op/single-medium-train-step.yaml"),
conf.fixtures_path("no_op"),
None,
)
exp.wait_for_experiment_state(sess, exp_id2, bindings.experimentv1State.RUNNING)
exp_ref2 = noop.create_experiment(sess, [noop.Sleep(100)])

# Wait for a state when *BOTH* experiments are scheduled.
for _ in range(20):
for _ in range(200):
slots = _fetch_slots(admin)
assert len(slots) == 2
used_slots = [s for s in slots if s["allocation_id"] != "FREE"]
Expand All @@ -261,13 +263,10 @@ def test_drain_agent_sched() -> None:
"while the first agent was draining"
)

exp.wait_for_experiment_state(sess, exp_id1, bindings.experimentv1State.COMPLETED)
exp.wait_for_experiment_state(sess, exp_id2, bindings.experimentv1State.COMPLETED)

trials1 = exp.experiment_trials(sess, exp_id1)
trials2 = exp.experiment_trials(sess, exp_id2)
assert len(trials1) == len(trials2) == 1
assert len(trials1[0].workloads) == len(trials2[0].workloads) == 7
exp_ref1.kill()
exp_ref2.kill()
assert exp_ref1.wait(interval=0.01) == client.ExperimentState.CANCELED
assert exp_ref2.wait(interval=0.01) == client.ExperimentState.CANCELED


def _task_data(sess: api.Session, task_id: str) -> Optional[Dict[str, Any]]:
Expand Down
88 changes: 50 additions & 38 deletions e2e_tests/tests/cluster/test_agent_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

from determined.common import api
from determined.common.api import bindings
from determined.experimental import client
from tests import api_utils
from tests import config as conf
from tests import detproc
from tests import experiment as exp
from tests.cluster import managed_cluster, utils
from tests.experiment import noop

DEVCLUSTER_CONFIG_ROOT_PATH = conf.PROJECT_ROOT_PATH.joinpath(".circleci/devcluster")
DEVCLUSTER_REATTACH_OFF_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double.devcluster.yaml"
Expand Down Expand Up @@ -69,14 +71,17 @@ def test_agent_restart_exp_container_failure(
sess = api_utils.user_session()
restartable_managed_cluster.ensure_agent_ok()
try:
exp_id = exp.create_experiment(
exp_ref = noop.create_experiment(
sess,
conf.fixtures_path("no_op/single-medium-train-step.yaml"),
conf.fixtures_path("no_op"),
None,
[
# Two Reports to meet the requirements of wait_for_workload_progress().
noop.Report({"loss": 1}),
noop.Report({"loss": 1}),
noop.Sleep(5),
],
)
exp.wait_for_experiment_workload_progress(sess, exp_id)
container_ids = list(_local_container_ids_for_experiment(exp_id))
exp.wait_for_experiment_workload_progress(sess, exp_ref.id)
container_ids = list(_local_container_ids_for_experiment(exp_ref.id))
if len(container_ids) != 1:
pytest.fail(
f"unexpected number of local containers for the experiment: {len(container_ids)}"
Expand All @@ -95,24 +100,25 @@ def test_agent_restart_exp_container_failure(
restartable_managed_cluster.restart_agent()
# As soon as the agent is back, the original allocation should be considered dead,
# but the new one should be allocated.
state = exp.experiment_state(sess, exp_id)
exp_ref.reload()
# old STATE_ACTIVE got divided into three states
assert state in [
bindings.experimentv1State.ACTIVE,
bindings.experimentv1State.QUEUED,
bindings.experimentv1State.PULLING,
bindings.experimentv1State.STARTING,
bindings.experimentv1State.RUNNING,
assert exp_ref.state in [
client.ExperimentState.ACTIVE,
client.ExperimentState.QUEUED,
client.ExperimentState.PULLING,
client.ExperimentState.STARTING,
client.ExperimentState.RUNNING,
]
exp.wait_for_experiment_state(sess, exp_id, bindings.experimentv1State.RUNNING)
exp.wait_for_experiment_state(sess, exp_ref.id, bindings.experimentv1State.RUNNING)
tasks_data = _task_list_json(sess)
assert len(tasks_data) == 1
exp_task_after = list(tasks_data.values())[0]

assert exp_task_before["taskId"] == exp_task_after["taskId"]
assert exp_task_before["allocationId"] != exp_task_after["allocationId"]

exp.wait_for_experiment_state(sess, exp_id, bindings.experimentv1State.COMPLETED)
exp_ref.kill()
assert exp_ref.wait(interval=0.01) == client.ExperimentState.CANCELED


@pytest.mark.managed_devcluster
Expand Down Expand Up @@ -193,25 +199,30 @@ def test_agent_restart_recover_experiment(
sess = api_utils.user_session()
restartable_managed_cluster.ensure_agent_ok()
try:
exp_id = exp.create_experiment(
exp_ref = noop.create_experiment(
sess,
conf.fixtures_path("no_op/single-medium-train-step.yaml"),
conf.fixtures_path("no_op"),
None,
[
# Two Reports to meet the requirements of wait_for_workload_progress().
noop.Report({"loss": 1}),
noop.Report({"loss": 1}),
noop.Sleep(5),
# A third Report to prove we finished successfully.
noop.Report({"loss": 1}),
],
)
exp.wait_for_experiment_workload_progress(sess, exp_id)
exp.wait_for_experiment_workload_progress(sess, exp_ref.id)

if downtime >= 0:
restartable_managed_cluster.kill_agent()
time.sleep(downtime)
restartable_managed_cluster.restart_agent(wait_for_amnesia=False)

exp.wait_for_experiment_state(sess, exp_id, bindings.experimentv1State.COMPLETED)
trials = exp.experiment_trials(sess, exp_id)
assert exp_ref.wait(interval=0.01) == client.ExperimentState.COMPLETED
trials = exp.experiment_trials(sess, exp_ref.id)

assert len(trials) == 1
train_wls = exp.workloads_with_training(trials[0].workloads)
assert len(train_wls) == 5
assert len(train_wls) == 3
except Exception:
restartable_managed_cluster.restart_agent()
raise
Expand All @@ -223,24 +234,29 @@ def test_agent_reconnect_keep_experiment(
) -> None:
sess = api_utils.user_session()
try:
exp_id = exp.create_experiment(
exp_ref = noop.create_experiment(
sess,
conf.fixtures_path("no_op/single-medium-train-step.yaml"),
conf.fixtures_path("no_op"),
None,
[
# Two Reports to meet the requirements of wait_for_workload_progress().
noop.Report({"loss": 1}),
noop.Report({"loss": 1}),
noop.Sleep(5),
# A third Report to prove we finished successfully.
noop.Report({"loss": 1}),
],
)
exp.wait_for_experiment_workload_progress(sess, exp_id)
exp.wait_for_experiment_workload_progress(sess, exp_ref.id)

restartable_managed_cluster.kill_proxy()
time.sleep(1)
restartable_managed_cluster.restart_proxy()

exp.wait_for_experiment_state(sess, exp_id, bindings.experimentv1State.COMPLETED)
trials = exp.experiment_trials(sess, exp_id)
assert exp_ref.wait(interval=0.01) == client.ExperimentState.COMPLETED
trials = exp.experiment_trials(sess, exp_ref.id)

assert len(trials) == 1
train_wls = exp.workloads_with_training(trials[0].workloads)
assert len(train_wls) == 5
assert len(train_wls) == 3
except Exception:
restartable_managed_cluster.restart_proxy(wait_for_reconnect=False)
restartable_managed_cluster.restart_agent()
Expand Down Expand Up @@ -294,13 +310,9 @@ def test_queued_experiment_restarts_with_correct_allocation_id(
restartable_managed_cluster: managed_cluster.ManagedCluster,
) -> None:
sess = api_utils.user_session()
exp_id = exp.create_experiment(
sess,
conf.fixtures_path("no_op/single-medium-train-step.yaml"),
conf.fixtures_path("no_op"),
["--config", "resources.slots_per_trial=9999"],
)
exp.wait_for_experiment_state(sess, exp_id, bindings.experimentv1State.QUEUED)
config = {"resources": {"slots_per_trial": 9999}}
exp_ref = noop.create_experiment(sess, config=config)
assert exp_ref.state == client.ExperimentState.QUEUED, exp_ref.state

restartable_managed_cluster.kill_master()
log_marker = str(uuid.uuid4())
Expand Down
Loading

0 comments on commit a0cc818

Please sign in to comment.