Skip to content

Commit

Permalink
Generate plugin commands
Browse files Browse the repository at this point in the history
  • Loading branch information
TaekyungHeo committed Oct 24, 2024
1 parent ce2d5cc commit 852fee8
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 3 deletions.
26 changes: 26 additions & 0 deletions src/cloudai/_core/command_gen_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,29 @@ def gen_exec_command(self, tr: TestRun) -> str:
str: The generated execution command.
"""
pass

@abstractmethod
def gen_srun_command(self, tr: TestRun) -> str:
"""
Generate the Slurm srun command for a test based on the given parameters.
Args:
tr (TestRun): Contains the test and its run-specific configurations.
Returns:
str: The generated Slurm srun command.
"""
pass

@abstractmethod
def gen_srun_success_check(self, tr: TestRun) -> str:
"""
Generate the Slurm success check command to verify if a test run was successful.
Args:
tr (TestRun): Contains the test and its run-specific configurations.
Returns:
str: The generated command to check the success of the test run.
"""
pass
34 changes: 34 additions & 0 deletions src/cloudai/_core/test_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,40 @@ def gen_exec_command(self, tr: TestRun) -> str:
)
return self.command_gen_strategy.gen_exec_command(tr)

def gen_srun_command(self, tr: TestRun) -> str:
"""
Generate an Slurm srun command for a test using the provided command generation strategy.
Args:
tr (TestRun): Contains the test and its run-specific configurations.
Returns:
str: The generated Slurm srun command.
"""
if self.command_gen_strategy is None:
raise ValueError(
"command_gen_strategy is missing. Ensure the strategy is registered in the Registry "
"by calling the appropriate registration function for the system type."
)
return self.command_gen_strategy.gen_srun_command(tr)

def gen_srun_success_check(self, tr: TestRun) -> str:
"""
Generate a Slurm success check command for a test using the provided command generation strategy.
Args:
tr (TestRun): Contains the test and its run-specific configurations.
Returns:
str: The generated command to check the success of the test run.
"""
if self.command_gen_strategy is None:
raise ValueError(
"command_gen_strategy is missing. Ensure the strategy is registered in the Registry "
"by calling the appropriate registration function for the system type."
)
return self.command_gen_strategy.gen_srun_success_check(tr)

def gen_json(self, tr: TestRun) -> Dict[Any, Any]:
"""
Generate a JSON string representing the Kubernetes job specification for this test using this template.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from pathlib import Path
from typing import Any, Dict, List

from cloudai import TestRun
from cloudai.systems.slurm.strategy import SlurmCommandGenStrategy

from .slurm_install_strategy import NcclTestSlurmInstallStrategy
Expand Down Expand Up @@ -83,3 +84,7 @@ def generate_test_command(
srun_command_parts.append(extra_cmd_args)

return srun_command_parts

def generate_slurm_success_check(self, tr: TestRun) -> str:
output_file = Path(tr.output_path) / "stdout.txt"
return f'grep -q "Avg bus bandwidth" {output_file}'
92 changes: 90 additions & 2 deletions src/cloudai/systems/slurm/strategy/slurm_command_gen_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from pathlib import Path
from typing import Any, Dict, List

from cloudai import CommandGenStrategy, TestRun
from cloudai import CommandGenStrategy, TestRun, TestScenario
from cloudai.systems import SlurmSystem
from cloudai.util.docker_image_cache_manager import DockerImageCacheManager

Expand Down Expand Up @@ -63,8 +63,30 @@ def gen_exec_command(self, tr: TestRun) -> str:
slurm_args = self._parse_slurm_args(
tr.test.test_template.__class__.__name__, env_vars, cmd_args, tr.num_nodes, tr.nodes
)

prologue_command = self.gen_prologue(tr.prologue, tr.output_path) if tr.prologue else "PROLOGUE_SUCCESS=1\n"
srun_command = self._gen_srun_command(slurm_args, env_vars, cmd_args, tr.test.extra_cmd_args)
return self._write_sbatch_script(slurm_args, env_vars, srun_command, tr.output_path)
epilogue_command = self.gen_epilogue(tr.epilogue, tr.output_path) if tr.epilogue else ""

full_command = "\n".join(
[
prologue_command,
"if [ $PROLOGUE_SUCCESS -eq 1 ]; then",
f" {srun_command}",
f" {epilogue_command}",
"fi",
]
).strip()

return self._write_sbatch_script(slurm_args, env_vars, full_command, tr.output_path)

def gen_srun_command(self, tr: TestRun) -> str:
env_vars = self._override_env_vars(self.system.global_env_vars, tr.test.extra_env_vars)
cmd_args = self._override_cmd_args(self.default_cmd_args, tr.test.cmd_args)
slurm_args = self._parse_slurm_args(
tr.test.test_template.__class__.__name__, env_vars, cmd_args, tr.num_nodes, tr.nodes
)
return self._gen_srun_command(slurm_args, env_vars, cmd_args, tr.test.extra_cmd_args)

def _parse_slurm_args(
self,
Expand Down Expand Up @@ -112,6 +134,72 @@ def job_name(self, job_name_prefix: str) -> str:
job_name = f"{self.system.account}-{job_name_prefix}.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return job_name

def gen_prologue(self, prologue: TestScenario, base_output_path: Path) -> str:
"""
Generate the prologue command by running all tests defined in the prologue test scenario.
Args:
prologue (TestScenario): The prologue test scenario containing the tests to be run.
base_output_path (Path): The base output directory path for storing prologue outputs.
Returns:
str: A string with all the Slurm srun commands generated for the prologue.
"""
if not prologue.test_runs:
return "PROLOGUE_SUCCESS=1\n"

prologue_output_dir = base_output_path / "prologue"
prologue_output_dir.mkdir(parents=True, exist_ok=True)

prologue_commands = []
success_vars = []

for idx, tr in enumerate(prologue.test_runs):
plugin_dir = prologue_output_dir / tr.test.name
plugin_dir.mkdir(parents=True, exist_ok=True)
tr.output_path = plugin_dir

srun_command = tr.test.test_template.gen_srun_command(tr)
prologue_commands.append(srun_command)

success_var = f"SUCCESS_{idx}"
success_vars.append(success_var)

success_check_command = tr.test.test_template.gen_srun_success_check(tr)
prologue_commands.append(f"{success_var}=$({success_check_command})")

combined_success_var = " && ".join([f"${var}" for var in success_vars])
return "\n".join(prologue_commands) + f"\nPROLOGUE_SUCCESS=({combined_success_var})"

def gen_epilogue(self, epilogue: TestScenario, base_output_path: Path) -> str:
"""
Generate the epilogue command by running all tests defined in the epilogue test scenario.
Args:
epilogue (TestScenario): The epilogue test scenario containing the tests to be run.
base_output_path (Path): The base output directory path for storing epilogue outputs.
Returns:
str: A string with all the Slurm srun commands generated for the epilogue.
"""
if not epilogue.test_runs:
return ""

epilogue_output_dir = base_output_path / "epilogue"
epilogue_output_dir.mkdir(parents=True, exist_ok=True)

epilogue_commands = []

for tr in epilogue.test_runs:
plugin_dir = epilogue_output_dir / tr.test.name
plugin_dir.mkdir(parents=True, exist_ok=True)
tr.output_path = plugin_dir

srun_command = tr.test.test_template.gen_srun_command(tr)
epilogue_commands.append(srun_command)

return "\n".join(epilogue_commands)

def _gen_srun_command(
self, slurm_args: Dict[str, Any], env_vars: Dict[str, str], cmd_args: Dict[str, str], extra_cmd_args: str
) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from unittest.mock import Mock

import pytest
from cloudai import Test, TestDefinition, TestRun, TestTemplate
from cloudai import Test, TestDefinition, TestRun, TestScenario, TestTemplate
from cloudai.systems import SlurmSystem
from cloudai.systems.slurm.strategy import SlurmCommandGenStrategy

Expand Down Expand Up @@ -119,3 +119,135 @@ def test_raises_if_no_default_partition(slurm_system: SlurmSystem):
"system configuration. Please ensure that 'default_partition' is set correctly "
"in the corresponding system configuration (e.g., system.toml)."
) in str(exc_info.value)


@pytest.mark.parametrize(
"prologue,epilogue,expected_script_lines",
[
# No prologue, no epilogue
(None, None, ["PROLOGUE_SUCCESS=1", "if [ $PROLOGUE_SUCCESS -eq 1 ]; then", " srun", "fi"]),
# One prologue, no epilogue
(
[Mock(test=Mock(name="test1", test_template=Mock()))],
None,
[
"SUCCESS_0=$(grep -q 'Avg bus bandwidth' stdout.txt)",
"PROLOGUE_SUCCESS=($SUCCESS_0)",
"if [ $PROLOGUE_SUCCESS -eq 1 ]; then",
" srun \\",
"fi",
],
),
# No prologue, one epilogue
(
None,
[Mock(test=Mock(name="test2", test_template=Mock()))],
["PROLOGUE_SUCCESS=1", "if [ $PROLOGUE_SUCCESS -eq 1 ]; then", " srun", " epilogue", "fi"],
),
# One prologue, one epilogue
(
[Mock(test=Mock(name="test1", test_template=Mock()))],
[Mock(test=Mock(name="test2", test_template=Mock()))],
[
"SUCCESS_0=$(grep -q 'Avg bus bandwidth' stdout.txt)",
"PROLOGUE_SUCCESS=($SUCCESS_0)",
"if [ $PROLOGUE_SUCCESS -eq 1 ]; then",
" srun",
" epilogue",
"fi",
],
),
# Multiple prologues, multiple epilogues
(
[Mock(test=Mock(name="test1", test_template=Mock())), Mock(test=Mock(name="test2", test_template=Mock()))],
[Mock(test=Mock(name="test3", test_template=Mock())), Mock(test=Mock(name="test4", test_template=Mock()))],
[
"SUCCESS_0=$(grep -q 'Avg bus bandwidth' stdout.txt)",
"SUCCESS_1=$(grep -q 'Avg bus bandwidth' stdout.txt)",
"PROLOGUE_SUCCESS=($SUCCESS_0 && $SUCCESS_1)",
"if [ $PROLOGUE_SUCCESS -eq 1 ]; then",
" srun",
" epilogue",
"fi",
],
),
# Multiple prologues, no epilogue
(
[Mock(test=Mock(name="test1", test_template=Mock())), Mock(test=Mock(name="test2", test_template=Mock()))],
None,
[
"SUCCESS_0=$(grep -q 'Avg bus bandwidth' stdout.txt)",
"SUCCESS_1=$(grep -q 'Avg bus bandwidth' stdout.txt)",
"PROLOGUE_SUCCESS=($SUCCESS_0 && $SUCCESS_1)",
"if [ $PROLOGUE_SUCCESS -eq 1 ]; then",
" srun",
"fi",
],
),
# No prologue, multiple epilogues
(
None,
[Mock(test=Mock(name="test3", test_template=Mock())), Mock(test=Mock(name="test4", test_template=Mock()))],
[
"PROLOGUE_SUCCESS=1",
"if [ $PROLOGUE_SUCCESS -eq 1 ]; then",
" srun",
" epilogue",
" epilogue",
"fi",
],
),
# Multiple prologues, single epilogue
(
[Mock(test=Mock(name="test1", test_template=Mock())), Mock(test=Mock(name="test2", test_template=Mock()))],
[Mock(test=Mock(name="test3", test_template=Mock()))],
[
"SUCCESS_0=$(grep -q 'Avg bus bandwidth' stdout.txt)",
"SUCCESS_1=$(grep -q 'Avg bus bandwidth' stdout.txt)",
"PROLOGUE_SUCCESS=($SUCCESS_0 && $SUCCESS_1)",
"if [ $PROLOGUE_SUCCESS -eq 1 ]; then",
" srun",
" epilogue",
"fi",
],
),
],
)
def test_prologue_epilogue_combinations(
strategy_fixture: SlurmCommandGenStrategy,
testrun_fixture: TestRun,
prologue,
epilogue,
expected_script_lines,
tmp_path,
):
testrun_fixture.prologue = Mock(spec=TestScenario) if prologue else None
testrun_fixture.epilogue = Mock(spec=TestScenario) if epilogue else None

if prologue is not None:
testrun_fixture.prologue = Mock(spec=TestScenario)
testrun_fixture.prologue.test_runs = prologue
for idx, run in enumerate(prologue):
run.test.test_template.gen_srun_success_check.return_value = "grep -q 'Avg bus bandwidth' stdout.txt"
run.test.test_template.gen_srun_command.return_value = "srun"
run.test.name = f"test{idx+1}"
else:
testrun_fixture.prologue = None

if epilogue is not None:
testrun_fixture.epilogue = Mock(spec=TestScenario)
testrun_fixture.epilogue.test_runs = epilogue
for idx, run in enumerate(epilogue):
run.test.test_template.gen_srun_command.return_value = "epilogue"
run.test.name = f"test{idx+1}"
else:
testrun_fixture.epilogue = None

sbatch_command = strategy_fixture.gen_exec_command(testrun_fixture)
script_file_path = sbatch_command.split()[-1]

with open(script_file_path, "r") as script_file:
script_content = script_file.read()

for expected_line in expected_script_lines:
assert expected_line in script_content

0 comments on commit 852fee8

Please sign in to comment.