Skip to content

Commit

Permalink
Merge pull request #54 from si-23/simplifications-2
Browse files Browse the repository at this point in the history
Simplifications 2
  • Loading branch information
si-23 authored Oct 31, 2024
2 parents 4b58e13 + d3225d6 commit 51d0dac
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 533 deletions.
57 changes: 23 additions & 34 deletions py_import_cycles/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

from . import __version__
from .cycles import detect_cycles
from .files import get_outputs_file_paths, iter_python_files
from .files import get_outputs_file_paths, scan_project
from .graphs import make_graph
from .log import logger, setup_logging
from .modules import Module, ModuleFactory, NamespacePackage, PyModule, RegularPackage
from .visitors import visit_python_file
from .modules import PyFile, PyFileType
from .visitors import visit_py_file


def _parse_arguments() -> argparse.Namespace:
Expand Down Expand Up @@ -101,25 +101,26 @@ def main() -> int:
setup_logging(outputs_filepaths.log, args.debug)

logger.info("Get Python files")
python_files = iter_python_files(project_path, packages)
py_files = list(scan_project(project_path, packages))

logger.info("Visit Python files, get imports by module")
module_factory = ModuleFactory(project_path, packages)
logger.info("Visit Python files, get imports of py files")
py_files_by_name = {p.name: p for p in py_files}

imports_by_module = {
visited.module: visited.imports
for path in python_files
if (visited := visit_python_file(module_factory, path)) is not None
imports_by_py_file = {
py_file: imports
for py_file in py_files
if py_file.type is not PyFileType.NAMESPACE_PACKAGE
and (imports := visit_py_file(py_files_by_name, py_file))
}

if _debug():
logger.debug(
"Imports by module:\n%s",
"\n".join(_make_readable_imports_by_module(imports_by_module)),
"Imports of py files:\n%s",
"\n".join(_make_readable_imports_by_py_file(imports_by_py_file)),
)

logger.info("Detect import cycles with strategy %s", args.strategy)
unsorted_cycles = set(detect_cycles(args.strategy, imports_by_module))
unsorted_cycles = set(detect_cycles(args.strategy, imports_by_py_file))

logger.info("Sort import cycles")
sorted_cycles = sorted(unsorted_cycles, key=lambda t: (len(t), t[0].name))
Expand Down Expand Up @@ -148,31 +149,19 @@ def main() -> int:
# '----------------------------------------------------------------------'


def _show_module(module: Module) -> str:
match module:
case NamespacePackage():
return f"{module.name}/"
case RegularPackage():
return f"{module.name}.__init__"
case PyModule():
return f"{module.name}"
case _:
raise TypeError(module)


def _make_readable_imports_by_module(
imports_by_module: Mapping[Module, Sequence[Module]],
def _make_readable_imports_by_py_file(
imports_by_py_file: Mapping[PyFile, Sequence[PyFile]],
) -> Sequence[str]:
lines = []
for ibm, ms in imports_by_module.items():
for ibm, ms in imports_by_py_file.items():
if ms:
lines.append(f" {_show_module(ibm)} imports: {', '.join(_show_module(m) for m in ms)}")
lines.append(f" {str(ibm)} imports: {', '.join(str(m) for m in ms)}")
return lines


def _make_readable_cycles(
line_handler: Callable[[int, tuple[Module, ...]], Sequence[str]],
sorted_cycles: Sequence[tuple[Module, ...]],
line_handler: Callable[[int, tuple[PyFile, ...]], Sequence[str]],
sorted_cycles: Sequence[tuple[PyFile, ...]],
) -> Sequence[str]:
return [
line for nr, ic in enumerate(sorted_cycles, start=1) for line in line_handler(nr, ic) if ic
Expand All @@ -181,11 +170,11 @@ def _make_readable_cycles(

def _log_or_show_cycles(
verbose: bool,
sorted_cycles: Sequence[tuple[Module, ...]],
sorted_cycles: Sequence[tuple[PyFile, ...]],
) -> None:
if verbose:
for line in _make_readable_cycles(
lambda nr, ic: [f" Cycle {nr}:"] + [f" {_show_module(m)}" for m in ic],
lambda nr, ic: [f" Cycle {nr}:"] + [f" {str(m)}" for m in ic],
sorted_cycles,
):
sys.stderr.write(f"{line}\n")
Expand All @@ -195,7 +184,7 @@ def _log_or_show_cycles(
"Import cycles:\n%s",
"\n".join(
_make_readable_cycles(
lambda nr, ic: [f" Cycle {nr}: {' > '.join(_show_module(m) for m in ic)}"],
lambda nr, ic: [f" Cycle {nr}: {' > '.join(str(m) for m in ic)}"],
sorted_cycles,
)
),
Expand Down
6 changes: 3 additions & 3 deletions py_import_cycles/cycles.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

from .dfs import depth_first_search
from .johnson import johnson
from .modules import Module
from .modules import PyFile
from .tarjan import strongly_connected_components


def detect_cycles(
strategy: Literal["dfs", "tarjan"],
graph: Mapping[Module, Sequence[Module]],
) -> Iterator[tuple[Module, ...]]:
graph: Mapping[PyFile, Sequence[PyFile]],
) -> Iterator[tuple[PyFile, ...]]:
if strategy == "dfs":
return depth_first_search(graph)
if strategy == "tarjan":
Expand Down
36 changes: 22 additions & 14 deletions py_import_cycles/files.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,37 @@
#!/usr/bin/env python3

import os
import time
from collections.abc import Iterator, Sequence
from dataclasses import dataclass
from pathlib import Path

from .log import logger
from .modules import PyFile

@dataclass(frozen=True, kw_only=True)
class PyFile:
package: Path
path: Path

def scan_project(project_path: Path, packages: Sequence[Path]) -> Iterator[PyFile]:
for package_path in [project_path / p for p in packages] if packages else [project_path]:
for root, _dirs, files in os.walk(package_path):
root_path = Path(root)

def iter_python_files(project_path: Path, packages: Sequence[Path]) -> Iterator[PyFile]:
if packages:
for pkg in packages:
yield from (
PyFile(package=project_path / pkg, path=p.resolve())
for p in (project_path / pkg).glob("**/*.py")
)
if root_path.name.startswith("."):
break

return
for file in files:
# Regular package or Python module
if (file_path := root_path / file).suffix == ".py":
try:
yield PyFile(package=package_path, path=file_path)
except ValueError as e:
logger.error("Cannot make py file from %s: %s", file_path, e)

for p in project_path.glob("**/*.py"):
yield PyFile(package=project_path, path=p.resolve())
if not (root_path / "__init__.py").exists():
# Namespace package
try:
yield PyFile(package=package_path, path=root_path)
except ValueError as e:
logger.error("Cannot make py file from %s: %s", root_path, e)


@dataclass(frozen=True, kw_only=True)
Expand Down
40 changes: 20 additions & 20 deletions py_import_cycles/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
from graphviz import Digraph

from .log import logger
from .modules import Module, PyModule
from .modules import PyFile, PyFileType
from .type_defs import Comparable


class ImportEdge(NamedTuple):
title: str
from_module: Module
to_module: Module
from_py_file: PyFile
to_py_file: PyFile
edge_color: str


Expand All @@ -29,7 +29,7 @@ class ImportEdge(NamedTuple):
def make_graph(
filepath: Path,
opt_strategy: Literal["dfs", "tarjan"],
import_cycles: Sequence[tuple[Module, ...]],
import_cycles: Sequence[tuple[PyFile, ...]],
) -> None:
sys.stderr.write(f"Write graph data to {filepath}\n")

Expand All @@ -42,28 +42,28 @@ def make_graph(
with d.subgraph() as ds:
for edge in edges:
ds.node(
str(edge.from_module.name),
shape=_get_shape(edge.from_module),
str(edge.from_py_file.name),
shape=_get_shape(edge.from_py_file),
)

ds.node(
str(edge.to_module.name),
shape=_get_shape(edge.to_module),
str(edge.to_py_file.name),
shape=_get_shape(edge.to_py_file),
)

ds.attr("edge", color=edge.edge_color)

if edge.title:
ds.edge(str(edge.from_module.name), str(edge.to_module.name), edge.title)
ds.edge(str(edge.from_py_file.name), str(edge.to_py_file.name), edge.title)
else:
ds.edge(str(edge.from_module.name), str(edge.to_module.name))
ds.edge(str(edge.from_py_file.name), str(edge.to_py_file.name))

d.unflatten(stagger=50)
d.view()


def _get_shape(module: Module) -> str:
return "" if isinstance(module, PyModule) else "box"
def _get_shape(py_file: PyFile) -> str:
return "" if py_file.type is PyFileType.MODULE else "box"


def pairwise(iterable: Iterable[T]) -> Iterator[tuple[T, T]]:
Expand Down Expand Up @@ -107,15 +107,15 @@ def normalize(value: float, lower: float, higher: float) -> float:

def _make_edges(
opt_strategy: Literal["dfs", "tarjan"],
import_cycles: Sequence[tuple[Module, ...]],
import_cycles: Sequence[tuple[PyFile, ...]],
) -> Sequence[ImportEdge]:
if opt_strategy == "dfs":
return _make_dfs_import_edges(import_cycles)
return _make_only_cycles_edges(import_cycles)


def _make_dfs_import_edges(
cycles: Sequence[tuple[Module, ...]],
cycles: Sequence[tuple[PyFile, ...]],
) -> Sequence[ImportEdge]:
edges = badness(dedup_edges(cycles))
if not (edges_values := edges.values()):
Expand All @@ -140,7 +140,7 @@ def _make_dfs_import_edges(


def _make_only_cycles_edges(
import_cycles: Sequence[tuple[Module, ...]],
import_cycles: Sequence[tuple[PyFile, ...]],
) -> Sequence[ImportEdge]:
edges: set[ImportEdge] = set()
for nr, import_cycle in enumerate(import_cycles, start=1):
Expand All @@ -150,15 +150,15 @@ def _make_only_cycles_edges(
random.randint(50, 200),
)

start_module = import_cycle[0]
for next_module in import_cycle[1:]:
start_py_file = import_cycle[0]
for next_py_file in import_cycle[1:]:
edges.add(
ImportEdge(
f"{str(nr)} ({len(import_cycle) - 1})",
start_module,
next_module,
start_py_file,
next_py_file,
color,
)
)
start_module = next_module
start_py_file = next_py_file
return sorted(edges)
Loading

0 comments on commit 51d0dac

Please sign in to comment.