From 69e240354a562380e444fb2e2e5fbd62ff67764b Mon Sep 17 00:00:00 2001 From: Thang Nguyen <46436648+thangckt@users.noreply.github.com> Date: Fri, 6 Sep 2024 18:21:47 +0900 Subject: [PATCH 01/11] u --- mace/cli/run_train.py | 3 ++- mace/tools/arg_parser.py | 8 ++++++++ mace/tools/slurm_distributed.py | 26 +++++++++++++++++++++++++- 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/mace/cli/run_train.py b/mace/cli/run_train.py index 7acafaa6..7a0a2862 100644 --- a/mace/cli/run_train.py +++ b/mace/cli/run_train.py @@ -87,7 +87,8 @@ def run(args: argparse.Namespace) -> None: rank = distr_env.rank if rank == 0: print(distr_env) - torch.distributed.init_process_group(backend="nccl") + + torch.distributed.init_process_group(backend=args.distributed_backend) else: rank = int(0) diff --git a/mace/tools/arg_parser.py b/mace/tools/arg_parser.py index 046f04d6..158be7e1 100644 --- a/mace/tools/arg_parser.py +++ b/mace/tools/arg_parser.py @@ -76,6 +76,14 @@ def build_default_arg_parser() -> argparse.ArgumentParser: action="store_true", default=False, ) + parser.add_argument( + "--distributed_backend", + help="PyTorch distributed backend", + type=str, + choices=["nccl", "gloo", "mpi"], + default="nccl", + ) + parser.add_argument("--log_level", help="log level", type=str, default="INFO") parser.add_argument( diff --git a/mace/tools/slurm_distributed.py b/mace/tools/slurm_distributed.py index 78de52a1..d62f4a58 100644 --- a/mace/tools/slurm_distributed.py +++ b/mace/tools/slurm_distributed.py @@ -10,7 +10,31 @@ import hostlist -class DistributedEnvironment: +class DistributedEnvironmentSLURM: + def __init__(self): + self._setup_distr_env() + self.master_addr = os.environ["MASTER_ADDR"] + self.master_port = os.environ["MASTER_PORT"] + self.world_size = int(os.environ["WORLD_SIZE"]) + self.local_rank = int(os.environ["LOCAL_RANK"]) + self.rank = int(os.environ["RANK"]) + + def _setup_distr_env(self): + hostname = hostlist.expand_hostlist(os.environ["SLURM_JOB_NODELIST"])[0] + os.environ["MASTER_ADDR"] = hostname + os.environ["MASTER_PORT"] = os.environ.get("MASTER_PORT", "33333") + os.environ["WORLD_SIZE"] = os.environ.get( + "SLURM_NTASKS", + str( + int(os.environ["SLURM_NTASKS_PER_NODE"]) + * int(os.environ["SLURM_NNODES"]) + ), + ) + os.environ["LOCAL_RANK"] = os.environ["SLURM_LOCALID"] + os.environ["RANK"] = os.environ["SLURM_PROCID"] + + +class DistributedEnvironmentSGE: def __init__(self): self._setup_distr_env() self.master_addr = os.environ["MASTER_ADDR"] From 0a8daa65cd01e2fd3f5b424ea5e41647a2915fee Mon Sep 17 00:00:00 2001 From: Thang Nguyen <46436648+thangckt@users.noreply.github.com> Date: Fri, 6 Sep 2024 21:05:57 +0900 Subject: [PATCH 02/11] u --- mace/cli/run_train.py | 10 ++++++++-- mace/tools/arg_parser.py | 7 +++++++ mace/tools/slurm_distributed.py | 27 +++++---------------------- 3 files changed, 20 insertions(+), 24 deletions(-) diff --git a/mace/cli/run_train.py b/mace/cli/run_train.py index 7a0a2862..635a3174 100644 --- a/mace/cli/run_train.py +++ b/mace/cli/run_train.py @@ -50,7 +50,10 @@ print_git_commit, setup_wandb, ) -from mace.tools.slurm_distributed import DistributedEnvironment +from mace.tools.slurm_distributed import ( + DistributedEnvironmentOpenmpi, + DistributedEnvironmentSlurm, +) from mace.tools.utils import AtomicNumberTable @@ -78,7 +81,10 @@ def run(args: argparse.Namespace) -> None: ) from e if args.distributed: try: - distr_env = DistributedEnvironment() + if args.distributed_env == "slurm": + distr_env = DistributedEnvironmentSlurm() + elif args.distributed_env == "openmpi": + distr_env = DistributedEnvironmentOpenmpi() except Exception as e: # pylint: disable=W0703 logging.error(f"Failed to initialize distributed environment: {e}") return diff --git a/mace/tools/arg_parser.py b/mace/tools/arg_parser.py index 158be7e1..dbe5a853 100644 --- a/mace/tools/arg_parser.py +++ b/mace/tools/arg_parser.py @@ -83,6 +83,13 @@ def build_default_arg_parser() -> argparse.ArgumentParser: choices=["nccl", "gloo", "mpi"], default="nccl", ) + parser.add_argument( + "--distributed_env", + help="HPC cluster's job scheduler", + type=str, + choices=["slurm", "openmpi"], + default="slurm", + ) parser.add_argument("--log_level", help="log level", type=str, default="INFO") diff --git a/mace/tools/slurm_distributed.py b/mace/tools/slurm_distributed.py index d62f4a58..866cbab3 100644 --- a/mace/tools/slurm_distributed.py +++ b/mace/tools/slurm_distributed.py @@ -10,7 +10,7 @@ import hostlist -class DistributedEnvironmentSLURM: +class DistributedEnvironmentSlurm: def __init__(self): self._setup_distr_env() self.master_addr = os.environ["MASTER_ADDR"] @@ -34,25 +34,8 @@ def _setup_distr_env(self): os.environ["RANK"] = os.environ["SLURM_PROCID"] -class DistributedEnvironmentSGE: +class DistributedEnvironmentOpenmpi: def __init__(self): - self._setup_distr_env() - self.master_addr = os.environ["MASTER_ADDR"] - self.master_port = os.environ["MASTER_PORT"] - self.world_size = int(os.environ["WORLD_SIZE"]) - self.local_rank = int(os.environ["LOCAL_RANK"]) - self.rank = int(os.environ["RANK"]) - - def _setup_distr_env(self): - hostname = hostlist.expand_hostlist(os.environ["SLURM_JOB_NODELIST"])[0] - os.environ["MASTER_ADDR"] = hostname - os.environ["MASTER_PORT"] = os.environ.get("MASTER_PORT", "33333") - os.environ["WORLD_SIZE"] = os.environ.get( - "SLURM_NTASKS", - str( - int(os.environ["SLURM_NTASKS_PER_NODE"]) - * int(os.environ["SLURM_NNODES"]) - ), - ) - os.environ["LOCAL_RANK"] = os.environ["SLURM_LOCALID"] - os.environ["RANK"] = os.environ["SLURM_PROCID"] + self.world_size = int(os.environ["OMPI_COMM_WORLD_SIZE"]) + self.local_rank = int(os.environ["OMPI_COMM_WORLD_LOCAL_RANK"]) + self.rank = int(os.environ["OMPI_COMM_WORLD_RANK"]) From 4ceb97d3eee1599ea319190933550a229aa16777 Mon Sep 17 00:00:00 2001 From: Thang Nguyen <46436648+thangckt@users.noreply.github.com> Date: Fri, 6 Sep 2024 21:08:02 +0900 Subject: [PATCH 03/11] Update run_train.py --- mace/cli/run_train.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mace/cli/run_train.py b/mace/cli/run_train.py index 635a3174..edf7a078 100644 --- a/mace/cli/run_train.py +++ b/mace/cli/run_train.py @@ -671,7 +671,7 @@ def run(args: argparse.Namespace) -> None: ) try: drop_last = test_set.drop_last - except AttributeError as e: # pylint: disable=W0612 + except AttributeError as e: # pylint: disable=W0612 # noqa: F841 drop_last = False test_loader = torch_geometric.dataloader.DataLoader( test_set, @@ -758,7 +758,7 @@ def run(args: argparse.Namespace) -> None: path_complied, _extra_files=extra_files, ) - except Exception as e: # pylint: disable=W0703 + except Exception as e: # pylint: disable=W0703 # noqa: F841 pass else: torch.save(model, Path(args.model_dir) / (args.name + ".model")) @@ -773,7 +773,7 @@ def run(args: argparse.Namespace) -> None: path_complied, _extra_files=extra_files, ) - except Exception as e: # pylint: disable=W0703 + except Exception as e: # pylint: disable=W0703 # noqa: F841 pass if args.distributed: From 20649049f16bc99d5cf9284d43f6212df62cf5f5 Mon Sep 17 00:00:00 2001 From: Thang Nguyen <46436648+thangckt@users.noreply.github.com> Date: Fri, 6 Sep 2024 21:33:01 +0900 Subject: [PATCH 04/11] o --- mace/cli/run_train.py | 6 +++--- mace/tools/arg_parser.py | 2 +- mace/tools/slurm_distributed.py | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/mace/cli/run_train.py b/mace/cli/run_train.py index edf7a078..d9415496 100644 --- a/mace/cli/run_train.py +++ b/mace/cli/run_train.py @@ -51,7 +51,7 @@ setup_wandb, ) from mace.tools.slurm_distributed import ( - DistributedEnvironmentOpenmpi, + DistributedEnvironmentDefault, DistributedEnvironmentSlurm, ) from mace.tools.utils import AtomicNumberTable @@ -83,8 +83,8 @@ def run(args: argparse.Namespace) -> None: try: if args.distributed_env == "slurm": distr_env = DistributedEnvironmentSlurm() - elif args.distributed_env == "openmpi": - distr_env = DistributedEnvironmentOpenmpi() + elif args.distributed_env == "default": + distr_env = DistributedEnvironmentDefault() except Exception as e: # pylint: disable=W0703 logging.error(f"Failed to initialize distributed environment: {e}") return diff --git a/mace/tools/arg_parser.py b/mace/tools/arg_parser.py index dbe5a853..5ec13d8b 100644 --- a/mace/tools/arg_parser.py +++ b/mace/tools/arg_parser.py @@ -87,7 +87,7 @@ def build_default_arg_parser() -> argparse.ArgumentParser: "--distributed_env", help="HPC cluster's job scheduler", type=str, - choices=["slurm", "openmpi"], + choices=["slurm", "default"], default="slurm", ) diff --git a/mace/tools/slurm_distributed.py b/mace/tools/slurm_distributed.py index 866cbab3..c390afd2 100644 --- a/mace/tools/slurm_distributed.py +++ b/mace/tools/slurm_distributed.py @@ -34,8 +34,8 @@ def _setup_distr_env(self): os.environ["RANK"] = os.environ["SLURM_PROCID"] -class DistributedEnvironmentOpenmpi: +class DistributedEnvironmentDefault: def __init__(self): - self.world_size = int(os.environ["OMPI_COMM_WORLD_SIZE"]) - self.local_rank = int(os.environ["OMPI_COMM_WORLD_LOCAL_RANK"]) - self.rank = int(os.environ["OMPI_COMM_WORLD_RANK"]) + self.world_size = int(os.environ["WORLD_SIZE"]) + self.local_rank = int(os.environ["LOCAL_RANK"]) + self.rank = int(os.environ["RANK"]) From 65d0924932e8d587d2dee99ab33d2955cee3b87d Mon Sep 17 00:00:00 2001 From: Thang Nguyen <46436648+thangckt@users.noreply.github.com> Date: Fri, 6 Sep 2024 21:52:07 +0900 Subject: [PATCH 05/11] u --- mace/cli/run_train.py | 6 +++--- mace/tools/arg_parser.py | 2 +- mace/tools/slurm_distributed.py | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/mace/cli/run_train.py b/mace/cli/run_train.py index d9415496..edf7a078 100644 --- a/mace/cli/run_train.py +++ b/mace/cli/run_train.py @@ -51,7 +51,7 @@ setup_wandb, ) from mace.tools.slurm_distributed import ( - DistributedEnvironmentDefault, + DistributedEnvironmentOpenmpi, DistributedEnvironmentSlurm, ) from mace.tools.utils import AtomicNumberTable @@ -83,8 +83,8 @@ def run(args: argparse.Namespace) -> None: try: if args.distributed_env == "slurm": distr_env = DistributedEnvironmentSlurm() - elif args.distributed_env == "default": - distr_env = DistributedEnvironmentDefault() + elif args.distributed_env == "openmpi": + distr_env = DistributedEnvironmentOpenmpi() except Exception as e: # pylint: disable=W0703 logging.error(f"Failed to initialize distributed environment: {e}") return diff --git a/mace/tools/arg_parser.py b/mace/tools/arg_parser.py index 5ec13d8b..dbe5a853 100644 --- a/mace/tools/arg_parser.py +++ b/mace/tools/arg_parser.py @@ -87,7 +87,7 @@ def build_default_arg_parser() -> argparse.ArgumentParser: "--distributed_env", help="HPC cluster's job scheduler", type=str, - choices=["slurm", "default"], + choices=["slurm", "openmpi"], default="slurm", ) diff --git a/mace/tools/slurm_distributed.py b/mace/tools/slurm_distributed.py index c390afd2..866cbab3 100644 --- a/mace/tools/slurm_distributed.py +++ b/mace/tools/slurm_distributed.py @@ -34,8 +34,8 @@ def _setup_distr_env(self): os.environ["RANK"] = os.environ["SLURM_PROCID"] -class DistributedEnvironmentDefault: +class DistributedEnvironmentOpenmpi: def __init__(self): - self.world_size = int(os.environ["WORLD_SIZE"]) - self.local_rank = int(os.environ["LOCAL_RANK"]) - self.rank = int(os.environ["RANK"]) + self.world_size = int(os.environ["OMPI_COMM_WORLD_SIZE"]) + self.local_rank = int(os.environ["OMPI_COMM_WORLD_LOCAL_RANK"]) + self.rank = int(os.environ["OMPI_COMM_WORLD_RANK"]) From d1c683f29b854331a6eab53a824e445d9b0627e9 Mon Sep 17 00:00:00 2001 From: Thang Nguyen <46436648+thangckt@users.noreply.github.com> Date: Fri, 6 Sep 2024 22:15:55 +0900 Subject: [PATCH 06/11] u --- mace/cli/run_train.py | 3 ++- mace/tools/arg_parser.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/mace/cli/run_train.py b/mace/cli/run_train.py index edf7a078..cdfdf20b 100644 --- a/mace/cli/run_train.py +++ b/mace/cli/run_train.py @@ -106,7 +106,8 @@ def run(args: argparse.Namespace) -> None: logging.log(level=loglevel, msg=message) if args.distributed: - torch.cuda.set_device(local_rank) + if args.device == "cuda": + torch.cuda.set_device(local_rank) logging.info(f"Process group initialized: {torch.distributed.is_initialized()}") logging.info(f"Processes: {world_size}") diff --git a/mace/tools/arg_parser.py b/mace/tools/arg_parser.py index dbe5a853..511bee70 100644 --- a/mace/tools/arg_parser.py +++ b/mace/tools/arg_parser.py @@ -85,7 +85,7 @@ def build_default_arg_parser() -> argparse.ArgumentParser: ) parser.add_argument( "--distributed_env", - help="HPC cluster's job scheduler", + help="The parallel environment to use for distributed training", type=str, choices=["slurm", "openmpi"], default="slurm", From 5f61f6fd80691ad4f8107923a721084b42ab3c74 Mon Sep 17 00:00:00 2001 From: Thang Nguyen <46436648+thangckt@users.noreply.github.com> Date: Fri, 6 Sep 2024 22:30:48 +0900 Subject: [PATCH 07/11] Update run_train.py --- mace/cli/run_train.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mace/cli/run_train.py b/mace/cli/run_train.py index cdfdf20b..2b2e3cd3 100644 --- a/mace/cli/run_train.py +++ b/mace/cli/run_train.py @@ -575,7 +575,7 @@ def run(args: argparse.Namespace) -> None: if args.wandb: setup_wandb(args) - if args.distributed: + if args.distributed and args.device == "cuda": distributed_model = DDP(model, device_ids=[local_rank]) else: distributed_model = None @@ -693,7 +693,7 @@ def run(args: argparse.Namespace) -> None: device=device, ) model.to(device) - if args.distributed: + if args.distributed and args.device == "cuda": distributed_model = DDP(model, device_ids=[local_rank]) model_to_evaluate = model if not args.distributed else distributed_model if swa_eval: From f25be298b04135b5901cce09a81507b07e8acd92 Mon Sep 17 00:00:00 2001 From: Thang Nguyen <46436648+thangckt@users.noreply.github.com> Date: Tue, 10 Sep 2024 14:28:51 +0900 Subject: [PATCH 08/11] Update run_train.py --- mace/cli/run_train.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/mace/cli/run_train.py b/mace/cli/run_train.py index 2b2e3cd3..28c3dd26 100644 --- a/mace/cli/run_train.py +++ b/mace/cli/run_train.py @@ -575,8 +575,11 @@ def run(args: argparse.Namespace) -> None: if args.wandb: setup_wandb(args) - if args.distributed and args.device == "cuda": - distributed_model = DDP(model, device_ids=[local_rank]) + if args.distributed: + if args.device == "cuda": + distributed_model = DDP(model, device_ids=[local_rank]) + elif args.device == "cpu": + distributed_model = DDP(model, device_ids=[rank]) else: distributed_model = None From a5b0b12660c530dba8f7378b363ef75a0bad6dff Mon Sep 17 00:00:00 2001 From: Thang Nguyen <46436648+thangckt@users.noreply.github.com> Date: Tue, 10 Sep 2024 14:59:00 +0900 Subject: [PATCH 09/11] u --- mace/cli/run_train.py | 7 +++++-- mace/tools/slurm_distributed.py | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/mace/cli/run_train.py b/mace/cli/run_train.py index 28c3dd26..c0953956 100644 --- a/mace/cli/run_train.py +++ b/mace/cli/run_train.py @@ -696,8 +696,11 @@ def run(args: argparse.Namespace) -> None: device=device, ) model.to(device) - if args.distributed and args.device == "cuda": - distributed_model = DDP(model, device_ids=[local_rank]) + if args.distributed: + if args.device == "cuda": + distributed_model = DDP(model, device_ids=[local_rank]) + elif args.device == "cpu": + distributed_model = DDP(model, device_ids=[rank]) model_to_evaluate = model if not args.distributed else distributed_model if swa_eval: logging.info(f"Loaded Stage two model from epoch {epoch} for evaluation") diff --git a/mace/tools/slurm_distributed.py b/mace/tools/slurm_distributed.py index 866cbab3..f58ac675 100644 --- a/mace/tools/slurm_distributed.py +++ b/mace/tools/slurm_distributed.py @@ -39,3 +39,4 @@ def __init__(self): self.world_size = int(os.environ["OMPI_COMM_WORLD_SIZE"]) self.local_rank = int(os.environ["OMPI_COMM_WORLD_LOCAL_RANK"]) self.rank = int(os.environ["OMPI_COMM_WORLD_RANK"]) + return From 1ace578df2bf5c70bedc95e9fbe4ebcf2fa64b40 Mon Sep 17 00:00:00 2001 From: Thang Nguyen <46436648+thangckt@users.noreply.github.com> Date: Tue, 10 Sep 2024 15:25:22 +0900 Subject: [PATCH 10/11] u --- mace/cli/run_train.py | 7 +++---- mace/tools/slurm_distributed.py | 1 - 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/mace/cli/run_train.py b/mace/cli/run_train.py index c0953956..715a7c53 100644 --- a/mace/cli/run_train.py +++ b/mace/cli/run_train.py @@ -92,8 +92,7 @@ def run(args: argparse.Namespace) -> None: local_rank = distr_env.local_rank rank = distr_env.rank if rank == 0: - print(distr_env) - + print("Using distributed Environment: ", distr_env) torch.distributed.init_process_group(backend=args.distributed_backend) else: rank = int(0) @@ -579,7 +578,7 @@ def run(args: argparse.Namespace) -> None: if args.device == "cuda": distributed_model = DDP(model, device_ids=[local_rank]) elif args.device == "cpu": - distributed_model = DDP(model, device_ids=[rank]) + distributed_model = DDP(model) else: distributed_model = None @@ -700,7 +699,7 @@ def run(args: argparse.Namespace) -> None: if args.device == "cuda": distributed_model = DDP(model, device_ids=[local_rank]) elif args.device == "cpu": - distributed_model = DDP(model, device_ids=[rank]) + distributed_model = DDP(model) model_to_evaluate = model if not args.distributed else distributed_model if swa_eval: logging.info(f"Loaded Stage two model from epoch {epoch} for evaluation") diff --git a/mace/tools/slurm_distributed.py b/mace/tools/slurm_distributed.py index f58ac675..866cbab3 100644 --- a/mace/tools/slurm_distributed.py +++ b/mace/tools/slurm_distributed.py @@ -39,4 +39,3 @@ def __init__(self): self.world_size = int(os.environ["OMPI_COMM_WORLD_SIZE"]) self.local_rank = int(os.environ["OMPI_COMM_WORLD_LOCAL_RANK"]) self.rank = int(os.environ["OMPI_COMM_WORLD_RANK"]) - return From 8e66bbcf247aabb42a1630ed7a8e317d8b361127 Mon Sep 17 00:00:00 2001 From: Thang Nguyen <46436648+thangckt@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:09:24 +0900 Subject: [PATCH 11/11] Update run_train.py --- mace/cli/run_train.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mace/cli/run_train.py b/mace/cli/run_train.py index 715a7c53..4ed6e733 100644 --- a/mace/cli/run_train.py +++ b/mace/cli/run_train.py @@ -92,7 +92,7 @@ def run(args: argparse.Namespace) -> None: local_rank = distr_env.local_rank rank = distr_env.rank if rank == 0: - print("Using distributed Environment: ", distr_env) + print("Using Distributed Environment: ", distr_env) torch.distributed.init_process_group(backend=args.distributed_backend) else: rank = int(0)