Skip to content

Commit

Permalink
Merge pull request #140 from JoongunPark/main
Browse files Browse the repository at this point in the history
Encode communicator groups in Chakra traces
  • Loading branch information
srinivas212 authored Sep 6, 2024
2 parents d5c6802 + 52de625 commit 73edb74
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 3 deletions.
8 changes: 6 additions & 2 deletions src/converter/pytorch_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,10 @@ def convert_json_to_protobuf_nodes(
protobuf_node_map (Dict[int, ChakraNode]): Dictionary where the converted Protobuf nodes will be stored.
"""
for _, json_node in json_node_map.items():
if (json_node.get_op_type() == PyTorchNodeType.CPU_OP) or (
json_node.get_op_type() == PyTorchNodeType.LABEL
if (
(json_node.get_op_type() == PyTorchNodeType.CPU_OP)
or (json_node.get_op_type() == PyTorchNodeType.LABEL)
or (json_node.get_op_type() == PyTorchNodeType.METADATA)
):
chakra_node = self.convert_json_to_protobuf_node(json_node_map, protobuf_node_map, json_node)
protobuf_node_map[chakra_node.id] = chakra_node
Expand All @@ -242,13 +244,15 @@ def convert_json_to_protobuf_nodes(
[
ChakraAttr(name="comm_type", int64_val=collective_comm_type),
ChakraAttr(name="comm_size", int64_val=pytorch_gpu_node.comm_size),
*( [ChakraAttr(name="pg_name", string_val=pytorch_gpu_node.pg_name)] if pytorch_gpu_node.pg_name != "" else [] ),
]
)

elif chakra_gpu_node.type in {COMM_SEND_NODE, COMM_RECV_NODE}:
chakra_gpu_node.attr.extend(
[
ChakraAttr(name="comm_size", int64_val=pytorch_gpu_node.comm_size),
*( [ChakraAttr(name="pg_name", string_val=pytorch_gpu_node.pg_name)] if pytorch_gpu_node.pg_name != "" else [] ),
]
)

Expand Down
11 changes: 10 additions & 1 deletion src/converter/pytorch_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ class PyTorchNodeType(Enum):
CPU_OP (int): Represents a CPU operation.
GPU_OP (int): Represents a GPU operation.
LABEL (int): Represents a non-operator node (e.g., labels).
METADATA (int): Represents a metadata node (e.g., process group initialization).
"""

CPU_OP = 1
GPU_OP = 2
LABEL = 3 # Non-operator nodes
METADATA = 4 # Metadata nodes


class PyTorchNode:
Expand All @@ -42,6 +44,7 @@ class PyTorchNode:
inter_thread_dep (Any): Inter-thread dependency of the node.
cat (Any): Category of the node.
stream (int): Stream associated with the node.
pg_name (str): Process Group name for the inter-GPU communication.
"""

SUPPORTED_VERSIONS = ["1.0.2-chakra.0.0.4", "1.0.3-chakra.0.0.4", "1.1.0-chakra.0.0.4"]
Expand Down Expand Up @@ -109,6 +112,10 @@ def _parse_data_1_0_3_chakra_0_0_4(self, node_data: Dict[str, Any]) -> None:
self.inter_thread_dep = node_data.get("inter_thread_dep")
self.cat = node_data.get("cat")
self.stream = node_data.get("stream", 0)
# In Colletive comms nodes, pg_name is in node_data if exists.
# In SendRecv nodes, pg_name is in the attrs if exists.
# Otherwise, pg_name is not present.
self.pg_name = node_data.get("pg_name", "")

for attr in node_data.get("attrs", []):
setattr(self, attr["name"], attr["value"])
Expand All @@ -120,7 +127,9 @@ def get_op_type(self) -> PyTorchNodeType:
Returns
PyTorchNodeType: The type of the PyTorch operation.
"""
if self.is_gpu_op():
if "process_group:init" in self.name:
return PyTorchNodeType.METADATA
elif self.is_gpu_op():
return PyTorchNodeType.GPU_OP
elif hasattr(self, "op_schema") or hasattr(self, "outputs"):
return PyTorchNodeType.CPU_OP
Expand Down
6 changes: 6 additions & 0 deletions src/feeder/et_feeder_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ ETFeederNode::ETFeederNode(std::shared_ptr<ChakraProtoMsg::Node> node) {
this->comm_dst_ = static_cast<uint32_t>(attr.int32_val());
} else if (attr_name == "comm_tag") {
this->comm_tag_ = static_cast<uint32_t>(attr.int32_val());
} else if (attr_name == "pg_name") {
this->pg_name_ = static_cast<string>(attr.string_val());
} else {
this->other_attrs_.emplace(attr_name, attr);
}
Expand Down Expand Up @@ -138,3 +140,7 @@ uint32_t ETFeederNode::comm_dst() {
uint32_t ETFeederNode::comm_tag() {
return comm_tag_;
}

string ETFeederNode::pg_name() {
return pg_name_;
}
2 changes: 2 additions & 0 deletions src/feeder/et_feeder_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class ETFeederNode {
uint32_t comm_src();
uint32_t comm_dst();
uint32_t comm_tag();
std::string pg_name();

private:
void assign_attr_val(
Expand Down Expand Up @@ -65,6 +66,7 @@ class ETFeederNode {
uint32_t comm_src_;
uint32_t comm_dst_;
uint32_t comm_tag_;
std::string pg_name_;
};

} // namespace Chakra
13 changes: 13 additions & 0 deletions src/trace_link/kineto_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class KinetoOperator:
stream (Optional[int]): CUDA stream identifier associated with the operator.
rf_id (Optional[int]): Record function identifier.
correlation (int): Identifier used to correlate CUDA runtime and GPU operations.
pg_name (Optional[str]): Process Group name for the collective communication.
"""

def __init__(self, kineto_op: Dict[str, Any]) -> None:
Expand All @@ -51,6 +52,7 @@ def __init__(self, kineto_op: Dict[str, Any]) -> None:
self.stream: Optional[int] = kineto_op.get("args", {}).get("stream", None)
self.rf_id: Optional[int] = kineto_op.get("args", {}).get("Record function id", None)
self.correlation: int = kineto_op.get("args", {}).get("correlation", -1)
self.pg_name: Optional[str] = kineto_op.get("args", {}).get("Process Group Name", None)

def __repr__(self) -> str:
"""
Expand Down Expand Up @@ -153,3 +155,14 @@ def is_gpu_op(self) -> bool:
"""
gpu_categories = {"kernel", "gpu_memcpy"}
return self.category in gpu_categories

def is_inter_gpu_comms_op(self) -> bool:
"""
Check if the operator is a inter-GPU communication operator based on its name.
Both point-to-point send/receive primitives and collective communication primitives are considered.
Returns
bool: True if it's a inter-GPU communication, otherwise False.
"""
return "ncclDevKernel" in self.name
6 changes: 6 additions & 0 deletions src/trace_link/trace_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,8 +755,14 @@ def process_dependent_gpu_ops(
"exclusive_dur": gpu_op.exclusive_dur,
"ts": gpu_op.timestamp,
"stream": gpu_op.stream,
**(
{"pg_name": gpu_op.pg_name}
if gpu_op.is_inter_gpu_comms_op() and gpu_op.pg_name is not None
else {}
),
}
)

updated_gpu_ops.append(new_gpu_op)

return updated_gpu_ops
Expand Down
3 changes: 3 additions & 0 deletions tests/trace_link/test_trace_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ def test_process_dependent_gpu_ops(trace_linker, orig_op_id, cpu_op, kineto_gpu_
gpu_op.inclusive_dur = gpu_op_data["inclusive_dur"]
gpu_op.exclusive_dur = gpu_op_data["exclusive_dur"]
gpu_op.stream = gpu_op_data["stream"]
gpu_op.pg_name = gpu_op_data.get("pg_name", None)
kineto_gpu_op_objects.append(gpu_op)

host_op_id_to_kineto_ops_map = {orig_op_id: kineto_gpu_op_objects}
Expand Down Expand Up @@ -497,6 +498,8 @@ def test_process_dependent_gpu_ops(trace_linker, orig_op_id, cpu_op, kineto_gpu_
assert updated_gpu_op["exclusive_dur"] == kineto_gpu_op_objects[i].exclusive_dur
assert updated_gpu_op["ts"] == kineto_gpu_op_objects[i].timestamp
assert updated_gpu_op["stream"] == kineto_gpu_op_objects[i].stream
if kineto_gpu_op_objects[i].is_inter_gpu_comms_op() and kineto_gpu_op_objects[i].pg_name is not None:
assert updated_gpu_op["pg_name"] == kineto_gpu_op_objects[i].pg_name


@patch("builtins.open", new_callable=MagicMock)
Expand Down

0 comments on commit 73edb74

Please sign in to comment.