diff --git a/cloudinit/sources/DataSourceIBMCloud.py b/cloudinit/sources/DataSourceIBMCloud.py index 89edd79f7ea..9b3ce8d9f83 100644 --- a/cloudinit/sources/DataSourceIBMCloud.py +++ b/cloudinit/sources/DataSourceIBMCloud.py @@ -96,6 +96,7 @@ import json import logging import os +from typing import Any, Callable, Dict, Optional, Tuple from cloudinit import atomic_helper, sources, subp, util from cloudinit.sources.helpers import openstack @@ -176,7 +177,7 @@ def network_config(self): # environment handles networking configuration. Not cloud-init. return {"config": "disabled", "version": 1} if self._network_config is None: - if self.network_json is not None: + if self.network_json not in (sources.UNSET, None): LOG.debug("network config provided via network_json") self._network_config = openstack.convert_net_json( self.network_json, known_macs=None @@ -186,7 +187,12 @@ def network_config(self): return self._network_config -def _read_system_uuid(): +def _read_system_uuid() -> Optional[str]: + """ + Read the system uuid. + + :return: the system uuid or None if not available. + """ uuid_path = "/sys/hypervisor/uuid" if not os.path.isfile(uuid_path): return None @@ -194,6 +200,11 @@ def _read_system_uuid(): def _is_xen(): + """ + Return boolean indicating if this is a xen hypervisor. + + :return: True if this is a xen hypervisor, False otherwise. + """ return os.path.exists("/proc/xen") @@ -201,7 +212,7 @@ def _is_ibm_provisioning( prov_cfg="/root/provisioningConfiguration.cfg", inst_log="/root/swinstall.log", boot_ref="/proc/1/environ", -): +) -> bool: """Return boolean indicating if this boot is ibm provisioning boot.""" if os.path.exists(prov_cfg): msg = "config '%s' exists." % prov_cfg @@ -229,7 +240,7 @@ def _is_ibm_provisioning( return result -def get_ibm_platform(): +def get_ibm_platform() -> Tuple[Optional[str], Optional[str]]: """Return a tuple (Platform, path) If this is Not IBM cloud, then the return value is (None, None). @@ -242,7 +253,7 @@ def get_ibm_platform(): return not_found # fslabels contains only the first entry with a given label. - fslabels = {} + fslabels: Dict[str, Dict] = {} try: devs = util.blkid() except subp.ProcessExecutionError as e: @@ -289,10 +300,10 @@ def get_ibm_platform(): return not_found -def read_md(): +def read_md() -> Optional[Dict[str, Any]]: """Read data from IBM Cloud. - @return: None if not running on IBM Cloud. + :return: None if not running on IBM Cloud. dictionary with guaranteed fields: metadata, version and optional fields: userdata, vendordata, networkdata. Also includes the system uuid from /sys/hypervisor/uuid.""" @@ -300,7 +311,7 @@ def read_md(): if platform is None: LOG.debug("This is not an IBMCloud platform.") return None - elif platform in PROVISIONING: + elif platform in PROVISIONING or path is None: LOG.debug("Cloud-init is disabled during provisioning: %s.", platform) return None @@ -325,71 +336,76 @@ def read_md(): return ret -def metadata_from_dir(source_dir): +def metadata_from_dir(source_dir: str) -> Dict[str, Any]: """Walk source_dir extracting standardized metadata. Certain metadata keys are renamed to present a standardized set of metadata keys. This function has a lot in common with ConfigDriveReader.read_v2 but - there are a number of inconsistencies, such key renames and as only - presenting a 'latest' version which make it an unlikely candidate to share + there are a number of inconsistencies, such as key renames and only + presenting a 'latest' version, which make it an unlikely candidate to share code. - @return: Dict containing translated metadata, userdata, vendordata, + :return: Dict containing translated metadata, userdata, vendordata, networkdata as present. """ - def opath(fname): + def opath(fname: str) -> str: return os.path.join("openstack", "latest", fname) - def load_json_bytes(blob): + def load_json_bytes(blob: bytes) -> Dict[str, Any]: + """ + Load JSON from a byte string. + + This technically could return a list or a str, but we are only + assuming a dict here. + + :param blob: The byte string to load JSON from. + :return: The loaded JSON object. + """ return json.loads(blob.decode("utf-8")) + def load_file(path: str, translator: Callable[[bytes], Any]) -> Any: + try: + raw = util.load_binary_file(path) + return translator(raw) + except IOError as e: + LOG.debug("Failed reading path '%s': %s", path, e) + return None + except Exception as e: + raise sources.BrokenMetadata(f"Failed decoding {path}: {e}") + files = [ # tuples of (results_name, path, translator) ("metadata_raw", opath("meta_data.json"), load_json_bytes), - ("userdata", opath("user_data"), None), + ("userdata", opath("user_data"), lambda x: x), ("vendordata", opath("vendor_data.json"), load_json_bytes), ("networkdata", opath("network_data.json"), load_json_bytes), ] - results = {} - for (name, path, transl) in files: - fpath = os.path.join(source_dir, path) - raw = None - try: - raw = util.load_binary_file(fpath) - except IOError as e: - LOG.debug("Failed reading path '%s': %s", fpath, e) - - if raw is None or transl is None: - data = raw - else: - try: - data = transl(raw) - except Exception as e: - raise sources.BrokenMetadata( - "Failed decoding %s: %s" % (path, e) - ) + results: Dict[str, Any] = {} - results[name] = data + for name, path, transl in files: + fpath = os.path.join(source_dir, path) + results[name] = load_file(fpath, transl) - if results.get("metadata_raw") is None: + if results["metadata_raw"] is None: raise sources.BrokenMetadata( - "%s missing required file 'meta_data.json'" % source_dir + f"{source_dir} missing required file 'meta_data.json'", ) results["metadata"] = {} md_raw = results["metadata_raw"] md = results["metadata"] + if "random_seed" in md_raw: try: md["random_seed"] = base64.b64decode(md_raw["random_seed"]) except (ValueError, TypeError) as e: raise sources.BrokenMetadata( - "Badly formatted metadata random_seed entry: %s" % e + f"Badly formatted metadata random_seed entry: {e}" ) renames = ( @@ -397,9 +413,10 @@ def load_json_bytes(blob): ("hostname", "local-hostname"), ("uuid", "instance-id"), ) - for mdname, newname in renames: - if mdname in md_raw: - md[newname] = md_raw[mdname] + + for old_key, new_key in renames: + if old_key in md_raw: + md[new_key] = md_raw[old_key] return results diff --git a/cloudinit/sources/__init__.py b/cloudinit/sources/__init__.py index 27c37ee1e13..391131edc96 100644 --- a/cloudinit/sources/__init__.py +++ b/cloudinit/sources/__init__.py @@ -325,7 +325,7 @@ def __init__(self, sys_cfg, distro: Distro, paths: Paths, ud_proc=None): self.vendordata_raw = None self.vendordata2_raw = None self.metadata_address = None - self.network_json = UNSET + self.network_json: Optional[str] = UNSET self.ec2_metadata = UNSET self.ds_cfg = util.get_cfg_by_path( diff --git a/pyproject.toml b/pyproject.toml index d5578c1379b..df969290451 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -95,7 +95,6 @@ module = [ "cloudinit.sources.DataSourceExoscale", "cloudinit.sources.DataSourceGCE", "cloudinit.sources.DataSourceHetzner", - "cloudinit.sources.DataSourceIBMCloud", "cloudinit.sources.DataSourceMAAS", "cloudinit.sources.DataSourceNoCloud", "cloudinit.sources.DataSourceOVF", diff --git a/tests/unittests/sources/test_ibmcloud.py b/tests/unittests/sources/test_ibmcloud.py index bee486f4dd5..37c2594dce3 100644 --- a/tests/unittests/sources/test_ibmcloud.py +++ b/tests/unittests/sources/test_ibmcloud.py @@ -272,6 +272,8 @@ def test_template_live(self, m_platform, m_sysuuid): ) ret = ibm.read_md() + if ret is None: # this is needed for mypy - ensures ret is not None + self.fail("read_md returned None unexpectedly") self.assertEqual(ibm.Platforms.TEMPLATE_LIVE_METADATA, ret["platform"]) self.assertEqual(tmpdir, ret["source"]) self.assertEqual(self.userdata, ret["userdata"]) @@ -298,6 +300,8 @@ def test_os_code_live(self, m_platform, m_sysuuid): ) ret = ibm.read_md() + if ret is None: # this is needed for mypy - ensures ret is not None + self.fail("read_md returned None unexpectedly") self.assertEqual(ibm.Platforms.OS_CODE, ret["platform"]) self.assertEqual(tmpdir, ret["source"]) self.assertEqual(self.userdata, ret["userdata"]) @@ -320,6 +324,8 @@ def test_os_code_live_no_userdata(self, m_platform, m_sysuuid): ) ret = ibm.read_md() + if ret is None: # this is needed for mypy - ensures ret is not None + self.fail("read_md returned None unexpectedly") self.assertEqual(ibm.Platforms.OS_CODE, ret["platform"]) self.assertEqual(tmpdir, ret["source"]) self.assertIsNone(ret["userdata"])