Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor and mypy up ibm datasource #5509

Merged
merged 7 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 57 additions & 40 deletions cloudinit/sources/DataSourceIBMCloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import json
import logging
import os
from typing import Any, Callable, Dict, Optional, Tuple

from cloudinit import atomic_helper, sources, subp, util
from cloudinit.sources.helpers import openstack
Expand Down Expand Up @@ -176,7 +177,7 @@ def network_config(self):
# environment handles networking configuration. Not cloud-init.
return {"config": "disabled", "version": 1}
if self._network_config is None:
if self.network_json is not None:
if self.network_json not in (sources.UNSET, None):
LOG.debug("network config provided via network_json")
self._network_config = openstack.convert_net_json(
self.network_json, known_macs=None
Expand All @@ -186,22 +187,32 @@ def network_config(self):
return self._network_config


def _read_system_uuid():
def _read_system_uuid() -> Optional[str]:
"""
Read the system uuid.

:return: the system uuid or None if not available.
"""
uuid_path = "/sys/hypervisor/uuid"
if not os.path.isfile(uuid_path):
return None
return util.load_text_file(uuid_path).strip().lower()


def _is_xen():
"""
Return boolean indicating if this is a xen hypervisor.

:return: True if this is a xen hypervisor, False otherwise.
"""
return os.path.exists("/proc/xen")


def _is_ibm_provisioning(
prov_cfg="/root/provisioningConfiguration.cfg",
inst_log="/root/swinstall.log",
boot_ref="/proc/1/environ",
):
) -> bool:
"""Return boolean indicating if this boot is ibm provisioning boot."""
if os.path.exists(prov_cfg):
msg = "config '%s' exists." % prov_cfg
Expand Down Expand Up @@ -229,7 +240,7 @@ def _is_ibm_provisioning(
return result


def get_ibm_platform():
def get_ibm_platform() -> Tuple[Optional[str], Optional[str]]:
"""Return a tuple (Platform, path)

If this is Not IBM cloud, then the return value is (None, None).
Expand All @@ -242,7 +253,7 @@ def get_ibm_platform():
return not_found

# fslabels contains only the first entry with a given label.
fslabels = {}
fslabels: Dict[str, Dict] = {}
try:
devs = util.blkid()
except subp.ProcessExecutionError as e:
Expand Down Expand Up @@ -289,18 +300,18 @@ def get_ibm_platform():
return not_found


def read_md():
def read_md() -> Optional[Dict[str, Any]]:
"""Read data from IBM Cloud.

@return: None if not running on IBM Cloud.
:return: None if not running on IBM Cloud.
dictionary with guaranteed fields: metadata, version
and optional fields: userdata, vendordata, networkdata.
Also includes the system uuid from /sys/hypervisor/uuid."""
platform, path = get_ibm_platform()
if platform is None:
LOG.debug("This is not an IBMCloud platform.")
return None
elif platform in PROVISIONING:
elif platform in PROVISIONING or path is None:
LOG.debug("Cloud-init is disabled during provisioning: %s.", platform)
return None

Expand All @@ -325,81 +336,87 @@ def read_md():
return ret


def metadata_from_dir(source_dir):
def metadata_from_dir(source_dir: str) -> Dict[str, Any]:
"""Walk source_dir extracting standardized metadata.

Certain metadata keys are renamed to present a standardized set of metadata
keys.

This function has a lot in common with ConfigDriveReader.read_v2 but
there are a number of inconsistencies, such key renames and as only
presenting a 'latest' version which make it an unlikely candidate to share
there are a number of inconsistencies, such as key renames and only
presenting a 'latest' version, which make it an unlikely candidate to share
code.

@return: Dict containing translated metadata, userdata, vendordata,
:return: Dict containing translated metadata, userdata, vendordata,
networkdata as present.
"""

def opath(fname):
def opath(fname: str) -> str:
return os.path.join("openstack", "latest", fname)

def load_json_bytes(blob):
def load_json_bytes(blob: bytes) -> Dict[str, Any]:
a-dubs marked this conversation as resolved.
Show resolved Hide resolved
"""
Load JSON from a byte string.

This technically could return a list or a str, but we are only
assuming a dict here.

:param blob: The byte string to load JSON from.
:return: The loaded JSON object.
"""
a-dubs marked this conversation as resolved.
Show resolved Hide resolved
return json.loads(blob.decode("utf-8"))

def load_file(path: str, translator: Callable[[bytes], Any]) -> Any:
try:
raw = util.load_binary_file(path)
return translator(raw)
except IOError as e:
LOG.debug("Failed reading path '%s': %s", path, e)
return None
except Exception as e:
raise sources.BrokenMetadata(f"Failed decoding {path}: {e}")

files = [
# tuples of (results_name, path, translator)
("metadata_raw", opath("meta_data.json"), load_json_bytes),
("userdata", opath("user_data"), None),
("userdata", opath("user_data"), lambda x: x),
("vendordata", opath("vendor_data.json"), load_json_bytes),
("networkdata", opath("network_data.json"), load_json_bytes),
]

results = {}
for (name, path, transl) in files:
fpath = os.path.join(source_dir, path)
raw = None
try:
raw = util.load_binary_file(fpath)
except IOError as e:
LOG.debug("Failed reading path '%s': %s", fpath, e)

if raw is None or transl is None:
data = raw
else:
try:
data = transl(raw)
except Exception as e:
raise sources.BrokenMetadata(
"Failed decoding %s: %s" % (path, e)
)
results: Dict[str, Any] = {}

results[name] = data
for name, path, transl in files:
fpath = os.path.join(source_dir, path)
results[name] = load_file(fpath, transl)

if results.get("metadata_raw") is None:
if results["metadata_raw"] is None:
raise sources.BrokenMetadata(
"%s missing required file 'meta_data.json'" % source_dir
f"{source_dir} missing required file 'meta_data.json'",
)

results["metadata"] = {}

md_raw = results["metadata_raw"]
md = results["metadata"]

if "random_seed" in md_raw:
try:
md["random_seed"] = base64.b64decode(md_raw["random_seed"])
except (ValueError, TypeError) as e:
raise sources.BrokenMetadata(
"Badly formatted metadata random_seed entry: %s" % e
f"Badly formatted metadata random_seed entry: {e}"
)

renames = (
("public_keys", "public-keys"),
("hostname", "local-hostname"),
("uuid", "instance-id"),
)
for mdname, newname in renames:
if mdname in md_raw:
md[newname] = md_raw[mdname]

for old_key, new_key in renames:
if old_key in md_raw:
md[new_key] = md_raw[old_key]

return results

Expand Down
2 changes: 1 addition & 1 deletion cloudinit/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def __init__(self, sys_cfg, distro: Distro, paths: Paths, ud_proc=None):
self.vendordata_raw = None
self.vendordata2_raw = None
self.metadata_address = None
self.network_json = UNSET
self.network_json: Optional[str] = UNSET
self.ec2_metadata = UNSET

self.ds_cfg = util.get_cfg_by_path(
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ module = [
"cloudinit.sources.DataSourceExoscale",
"cloudinit.sources.DataSourceGCE",
"cloudinit.sources.DataSourceHetzner",
"cloudinit.sources.DataSourceIBMCloud",
"cloudinit.sources.DataSourceMAAS",
"cloudinit.sources.DataSourceNoCloud",
"cloudinit.sources.DataSourceOVF",
Expand Down
6 changes: 6 additions & 0 deletions tests/unittests/sources/test_ibmcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ def test_template_live(self, m_platform, m_sysuuid):
)

ret = ibm.read_md()
if ret is None: # this is needed for mypy - ensures ret is not None
self.fail("read_md returned None unexpectedly")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TheRealFalcon is there a better way I should be doing this? I think this is pretty okay but just curious. thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tempted to disable mypy for tests, but for mypy complaining this looks good!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well now that you mention it, mypy is supposed to be disabled for tests. All I did in this PR was remove the line disabling mypy checks for the ibm datasource file, not the unit test file for it.

self.assertEqual(ibm.Platforms.TEMPLATE_LIVE_METADATA, ret["platform"])
self.assertEqual(tmpdir, ret["source"])
self.assertEqual(self.userdata, ret["userdata"])
Expand All @@ -298,6 +300,8 @@ def test_os_code_live(self, m_platform, m_sysuuid):
)

ret = ibm.read_md()
if ret is None: # this is needed for mypy - ensures ret is not None
self.fail("read_md returned None unexpectedly")
self.assertEqual(ibm.Platforms.OS_CODE, ret["platform"])
self.assertEqual(tmpdir, ret["source"])
self.assertEqual(self.userdata, ret["userdata"])
Expand All @@ -320,6 +324,8 @@ def test_os_code_live_no_userdata(self, m_platform, m_sysuuid):
)

ret = ibm.read_md()
if ret is None: # this is needed for mypy - ensures ret is not None
self.fail("read_md returned None unexpectedly")
self.assertEqual(ibm.Platforms.OS_CODE, ret["platform"])
self.assertEqual(tmpdir, ret["source"])
self.assertIsNone(ret["userdata"])
Expand Down
Loading