Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed untyped defs in cloudinit analyze #5823

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions cloudinit/analyze/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import re
import sys
from datetime import datetime
from typing import IO
from typing import IO, List, Tuple, Dict, Union

from cloudinit.analyze import dump, show
from cloudinit.atomic_helper import json_dumps


def get_parser(parser=None):
def get_parser(
parser: argparse.ArgumentParser = None
) -> argparse.ArgumentParser:
if not parser:
parser = argparse.ArgumentParser(
prog="cloudinit-analyze",
Expand Down Expand Up @@ -113,7 +115,7 @@ def get_parser(parser=None):
return parser


def analyze_boot(name, args):
def analyze_boot(name: str, args: argparse.Namespace) -> int:
"""Report a list of how long different boot operations took.

For Example:
Expand Down Expand Up @@ -196,7 +198,7 @@ def analyze_boot(name, args):
return status_code


def analyze_blame(name, args):
def analyze_blame(name: str, args: argparse.Namespace) -> None:
"""Report a list of records sorted by largest time delta.

For example:
Expand All @@ -223,7 +225,7 @@ def analyze_blame(name, args):
clean_io(infh, outfh)


def analyze_show(name, args):
def analyze_show(name: str, args: argparse.Namespace) -> None:
"""Generate output records using the 'standard' format to printing events.

Example output follows:
Expand Down Expand Up @@ -260,22 +262,22 @@ def analyze_show(name, args):
clean_io(infh, outfh)


def analyze_dump(name, args):
def analyze_dump(name: str, args: argparse.Namespace) -> None:
"""Dump cloud-init events in json format"""
infh, outfh = configure_io(args)
outfh.write(json_dumps(_get_events(infh)) + "\n")
clean_io(infh, outfh)


def _get_events(infile):
def _get_events(infile: IO)-> List[Dict[str, Union[str, float]]]:
rawdata = None
events, rawdata = show.load_events_infile(infile)
if not events:
events, _ = dump.dump_events(rawdata=rawdata)
return events


def configure_io(args):
def configure_io(args: argparse.Namespace) -> Tuple[IO, IO]:
"""Common parsing and setup of input/output files"""
if args.infile == "-":
infh = sys.stdin
Expand Down
19 changes: 12 additions & 7 deletions cloudinit/analyze/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import calendar
import sys
from datetime import datetime, timezone
from typing import Optional, List, Tuple, Dict, Union


from cloudinit import atomic_helper, subp, util

stage_to_description = {
stage_to_description: Dict[str, str] = {
"finished": "finished running cloud-init",
"init-local": "starting search for local datasources",
"init-network": "searching for network datasources",
Expand All @@ -27,7 +29,7 @@
DEFAULT_FMT = "%b %d %H:%M:%S %Y"


def parse_timestamp(timestampstr):
def parse_timestamp(timestampstr: str) -> float:
# default syslog time does not include the current year
months = [calendar.month_abbr[m] for m in range(1, 13)]
if timestampstr.split()[0] in months:
Expand All @@ -54,7 +56,7 @@ def parse_timestamp(timestampstr):
return float(timestamp)


def has_gnu_date():
def has_gnu_date() -> bool:
"""GNU date includes a string containing the word GNU in it in
help output. Posix date does not. Use this to indicate on Linux
systems without GNU date that the extended parsing is not
Expand All @@ -63,7 +65,7 @@ def has_gnu_date():
return "GNU" in subp.subp(["date", "--help"]).stdout


def parse_timestamp_from_date(timestampstr):
def parse_timestamp_from_date(timestampstr: str) -> float:
if not util.is_Linux() and subp.which("gdate"):
date = "gdate"
elif has_gnu_date():
Expand All @@ -77,7 +79,7 @@ def parse_timestamp_from_date(timestampstr):
)


def parse_ci_logline(line):
def parse_ci_logline(line: str) -> Optional[Dict[str, Union[str, float]]]:
# Stage Starts:
# Cloud-init v. 0.7.7 running 'init-local' at \
# Fri, 02 Sep 2016 19:28:07 +0000. Up 1.0 seconds.
Expand Down Expand Up @@ -163,7 +165,10 @@ def parse_ci_logline(line):
return event


def dump_events(cisource=None, rawdata=None):
def dump_events(
cisource: Optional[object] = None,
rawdata: Optional[str] = None
) -> Tuple[List[Dict[str, Union[str, float]]], List[str]]:
events = []
event = None
CI_EVENT_MATCHES = ["start:", "finish:", "Cloud-init v."]
Expand All @@ -189,7 +194,7 @@ def dump_events(cisource=None, rawdata=None):
return events, data


def main():
def main() -> str:
if len(sys.argv) > 1:
cisource = open(sys.argv[1])
else:
Expand Down
59 changes: 37 additions & 22 deletions cloudinit/analyze/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from cloudinit import subp, util
from cloudinit.distros import uses_systemd

from typing import TextIO, Optional, List, Tuple, Dict, Union


# Example events:
# {
# "description": "executing late commands",
Expand All @@ -31,7 +34,7 @@
# "timestamp": 1461164249.1590767
# }

format_key = {
format_key: Dict[str, str] = {
"%d": "delta",
"%D": "description",
"%E": "elapsed",
Expand All @@ -51,7 +54,7 @@
TIMESTAMP_UNKNOWN = (FAIL_CODE, -1, -1, -1)


def format_record(msg, event):
def format_record(msg:str, event: Dict[str, Union[str, float]]) -> str:
for i, j in format_key.items():
if i in msg:
# ensure consistent formatting of time values
Expand All @@ -62,41 +65,48 @@ def format_record(msg, event):
return msg.format(**event)


def event_name(event):
def event_name(event: Dict[str, Union[str, float]]) -> Optional[str]:
if event:
return event.get("name")
return None


def event_type(event):
def event_type(event: Dict[str, Union[str, float]]) -> Optional[str]:
if event:
return event.get("event_type")
return None


def event_parent(event):
def event_parent(event: Dict[str, Union[str, float]]) -> Optional[str]:
if event:
return event_name(event).split("/")[0]
return None


def event_timestamp(event):
def event_timestamp(event: Dict[str, Union[str, float]]) -> float:
return float(event.get("timestamp"))


def event_datetime(event):
def event_datetime(event: Dict[str, Union[str, float]]) -> datetime.datetime:
return datetime.datetime.utcfromtimestamp(event_timestamp(event))


def delta_seconds(t1, t2):
def delta_seconds(t1: datetime.datetime, t2: datetime.datetime) -> float:
return (t2 - t1).total_seconds()


def event_duration(start, finish):
def event_duration(
start: Dict[str, Union[str, float]],
finish: Dict[str, Union[str, float]]
) -> float:
return delta_seconds(event_datetime(start), event_datetime(finish))


def event_record(start_time, start, finish):
def event_record(
start_time:datetime.datetime,
start: Dict[str, Union[str, float]],
finish: Dict[str, Union[str, float]]
) -> Dict[str, Union[str, float]]:
record = finish.copy()
record.update(
{
Expand All @@ -109,7 +119,7 @@ def event_record(start_time, start, finish):
return record


def total_time_record(total_time):
def total_time_record(total_time: float) -> str:
return "Total Time: %3.5f seconds\n" % total_time


Expand All @@ -118,7 +128,7 @@ class SystemctlReader:
Class for dealing with all systemctl subp calls in a consistent manner.
"""

def __init__(self, property, parameter=None):
def __init__(self, property: str, parameter: Optional[str] = None) -> None:
self.epoch = None
self.args = [subp.which("systemctl"), "show"]
if parameter:
Expand All @@ -129,7 +139,7 @@ def __init__(self, property, parameter=None):
# requested from the object
self.failure = self.subp()

def subp(self):
def subp(self) -> Optional[Union[str, Exception]]:
"""
Make a subp call based on set args and handle errors by setting
failure code
Expand All @@ -145,7 +155,7 @@ def subp(self):
except Exception as systemctl_fail:
return systemctl_fail

def parse_epoch_as_float(self):
def parse_epoch_as_float(self) -> float:
"""
If subp call succeeded, return the timestamp from subp as a float.

Expand All @@ -166,7 +176,7 @@ def parse_epoch_as_float(self):
return float(timestamp) / 1000000


def dist_check_timestamp():
def dist_check_timestamp() -> Tuple[str, float, float, float]:
"""
Determine which init system a particular linux distro is using.
Each init system (systemd, etc) has a different way of
Expand All @@ -188,7 +198,7 @@ def dist_check_timestamp():
return TIMESTAMP_UNKNOWN


def gather_timestamps_using_dmesg():
def gather_timestamps_using_dmesg() -> Tuple[str, float, float, float]:
"""
Gather timestamps that corresponds to kernel begin initialization,
kernel finish initialization using dmesg as opposed to systemctl
Expand Down Expand Up @@ -219,7 +229,7 @@ def gather_timestamps_using_dmesg():
return TIMESTAMP_UNKNOWN


def gather_timestamps_using_systemd():
def gather_timestamps_using_systemd() -> Tuple[str, float, float, float]:
"""
Gather timestamps that corresponds to kernel begin initialization,
kernel finish initialization. and cloud-init systemd unit activation
Expand Down Expand Up @@ -253,9 +263,9 @@ def gather_timestamps_using_systemd():


def generate_records(
events,
print_format="(%n) %d seconds in %I%D",
):
events: List[Dict[str, Union[str, float]]],
print_format: str ="(%n) %d seconds in %I%D",
) -> List[List[str]]:
"""
Take in raw events and create parent-child dependencies between events
in order to order events in chronological order.
Expand Down Expand Up @@ -326,7 +336,10 @@ def generate_records(
return boot_records


def show_events(events, print_format):
def show_events(
events: List[Dict[str, Union[str, float]]],
print_format: str
) -> List[List[str]]:
"""
A passthrough method that makes it easier to call generate_records()

Expand All @@ -339,7 +352,9 @@ def show_events(events, print_format):
return generate_records(events, print_format=print_format)


def load_events_infile(infile):
def load_events_infile(
infile: TextIO
) -> Tuple[Optional[List[Dict[str, Union[str, float]]]], str]:
"""
Takes in a log file, read it, and convert to json.

Expand Down
1 change: 1 addition & 0 deletions tools/.github-cla-signers
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
a-dubs
abdulganiyy
aciba90
acourdavAkamai
ader1990
Expand Down
Loading