Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dumping collective metadata to csv for each rank #154

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/converter/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def convert_text(args: argparse.Namespace) -> None:
def convert_pytorch(args: argparse.Namespace) -> None:
"""Convert PyTorch input trace to Chakra execution trace."""
converter = PyTorchConverter()
converter.convert(args.input, args.output, args.simulate)
converter.convert(args.input, args.output, args.simulate, args.dump_collective_nodes)


def main() -> None:
Expand Down Expand Up @@ -72,6 +72,13 @@ def main() -> None:
"of a trace. Disabled by default because it takes a long time."
),
)
pytorch_parser.add_argument(
"--dump_collective_nodes",
action="store_true",
help=(
"Dumping all collective operations in the trace to csv, to further processing them."
),
)
pytorch_parser.set_defaults(func=convert_pytorch)

text_parser = subparsers.add_parser(
Expand Down
55 changes: 54 additions & 1 deletion src/converter/pytorch_converter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import os
from typing import IO, Dict, List, Optional, Set, Tuple

from ...schema.protobuf.et_def_pb2 import (
Expand Down Expand Up @@ -30,7 +31,7 @@ class PyTorchConverter:
of dependencies, removal of dangling nodes, and writing the final protobuf trace to the output file.
"""

def convert(self, input_filename: str, output_filename: str, simulate: bool) -> None:
def convert(self, input_filename: str, output_filename: str, simulate: bool, dump_collective_nodes: bool) -> None:
"""
Convert Chakra host + device execution traces in JSON format into the Chakra protobuf format.

Expand All @@ -39,6 +40,10 @@ def convert(self, input_filename: str, output_filename: str, simulate: bool) ->
output_filename (str): Output Chakra host + device execution trace in the protobuf format.
simulate (bool): Flag to indicate whether to simulate the execution of the converted trace. If True,
the method will simulate the execution after writing the protobuf trace to the output file.
dump_collective_nodes (bool): Flag to indicate whether to dump all collective opreations basic
metadata to a csv file. If True, the method will dump the information after writing the protobuf
trace to the output file. The flag assumes the filename is {some-string}_{rank}.{some-string}
because the dump file uses the rank to create unique file for each rank.
"""
json_trace = self.load_json_execution_traces(input_filename)
json_metadata, json_node_map = self.parse_json_trace(json_trace)
Expand All @@ -57,6 +62,9 @@ def convert(self, input_filename: str, output_filename: str, simulate: bool) ->

self.write_protobuf_execution_trace(output_filename, json_metadata, protobuf_node_map)

if dump_collective_nodes:
self.dump_collective_operation(protobuf_node_map, input_filename)

if simulate:
self.simulate_execution(json_node_map, protobuf_node_map, parent_to_children_map)

Expand Down Expand Up @@ -739,3 +747,48 @@ def simulate_execution(
issued_nodes.clear()

logging.debug("Simulation of Chakra node execution completed.")

def dump_collective_operation(self, protobuf_node_map, filename):
def rank_from_file_name(filename_str):
basename = os.path.basename(filename_str)
parts = basename.split(sep='_')
return parts[1]
try:
rank = rank_from_file_name(filename)
except Exception as e:
raise ValueError(f"dump_collective_operation: assuming the filename to be: et_$rank$.json, but instead: {filename}")

output = f"chakra_collectives_dump.{rank}.csv"
with open(output, 'w') as f:
f.write("rank,node_id,coll_name,comm_size,og_comm_size,pg_name,og_pg_name,root_rank,og_root_rank\n")
node: ChakraNode
for node in protobuf_node_map.values():
if node.type is COMM_COLL_NODE:
node_name = node.name.replace(',','_') # protect the csv
comm_size = None
pg_name = None
root_rank=None
for p in node.attr:
if p.name == 'pg_name':
pg_name = p.string_val
elif p.name == 'comm_size':
comm_size = p.int64_val
elif p.name == 'root_rank':
root_rank=p.int32_val

og_pg_name=True
if pg_name is None:
og_pg_name = False
pg_name = 0

og_comm_size=True
if comm_size is None:
comm_size=0
og_comm_size=False

og_root_rank=True
if root_rank is None:
og_root_rank=False
root_rank=0

f.write(f"{rank},{node.id},{node_name},{comm_size},{og_comm_size},{pg_name},{og_pg_name},{root_rank},{og_root_rank}\n")
Loading