Skip to content

Commit

Permalink
feat: Custom keys for apt archives
Browse files Browse the repository at this point in the history
Users with local Ubuntu archive mirrors and Landscape instances have
been unable to specify the corresponding gpg keys. This resulted in
errors such as "NO_PUBKEY" on commands such as "apt update"

This commit adds the functionality to read a user specified "key" field
in the primary and security sections of the apt declaration in a
cloud-init-config.yaml. The provided key can be raw or path based, and
is formatted appropriately. The function that provides this feature,
get_mirror_key was modelled after the existing function, get_mirror,
which obtains user specified archive URIs. The deb822 templates were
updated to reflect the new primary and security key parameters

If no primary key is declared, it defaults to the distro gpg key in
/usr/share/keyrings. If no security mirror is declared, the
corresponding key falls back on the primary key. This is to match the
existing behaviour where the security mirror falls back on the primary
mirror URI. Therefore, when no keys are specified, the behaviour
becomes indistinguishable from the current cloud-init implementation

Keys can also be specified using a keyid and optionally a keyserver.
When this is the case, the key is obtained and artificially added to
the parsed mirror config and treated as described above.

The commit has new unit tests in test_apt_configure_sources_list_v3.py

I also added my GitHub to the CLA signers doc in this commit
  • Loading branch information
bryanfraschetti committed Oct 17, 2024
1 parent 1fc063a commit 578b854
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 6 deletions.
100 changes: 98 additions & 2 deletions cloudinit/config/cc_apt_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def apply_apt(cfg, cloud, gpg):

release = util.lsb_release()["codename"]
arch = util.get_dpkg_architecture()
mirrors = find_apt_mirror_info(cfg, cloud, arch=arch)
mirrors = find_apt_mirror_info(cfg, cloud, arch=arch, gpg=gpg)
LOG.debug("Apt Mirror info: %s", mirrors)

matcher = None
Expand Down Expand Up @@ -928,6 +928,35 @@ def search_for_mirror_dns(configured, mirrortype, cfg, cloud):
return mirror


def update_mirror_keys(pmirror_key, smirror_key, cloud):
"""Sets keys for primary and security mirrors and
returns default if no keys are defined
"""

# Fallback key if none provided
distro = getattr(cloud, "distro", None)
distro_name = getattr(distro, "name", None)
keypath = "/usr/share/keyrings/"
keysuffix = "-archive-keyring.gpg"
default = f"{keypath}{distro_name}{keysuffix}" if distro_name else None

if pmirror_key and smirror_key:
# Config defined primary and security sources
return {"PRIMARY_KEY": pmirror_key, "SECURITY_KEY": smirror_key}
elif pmirror_key and not smirror_key:
# Primary source was defined, but security was not
# Use primary source (and therefore key) for security repo
LOG.info("Using primary key for security mirror")
return {"PRIMARY_KEY": pmirror_key, "SECURITY_KEY": pmirror_key}

LOG.info("Setting default key for primary and security mirrors")
return {
# Primary key not defined, use default archives
"PRIMARY_KEY": default,
"SECURITY_KEY": default,
}


def update_mirror_info(pmirror, smirror, arch, cloud):
"""sets security mirror to primary if not defined.
returns defaults if no mirrors are defined"""
Expand Down Expand Up @@ -971,6 +1000,66 @@ def get_arch_mirrorconfig(cfg, mirrortype, arch):
return default


def format_security_key(key):
"""Ubuntu Deb822 template Signed-By expects the raw key to be
indented by two spaces on all lines except the first line, which
is agnostic to prefaced whitespace
This function formats raw keys accordingly. It also works for
formatting path based keys and keyservers since they pass through
unaffected
"""

lines = key.splitlines()
first_line = lines[0]
indented_lines = [" " + line for line in lines[1:]]
formatted_key = "\n".join([first_line] + indented_lines)

return formatted_key


def get_mirror_key(cfg, mirrortype, arch, cloud, gpg):
"""Obtain custom specified gpg key for apt repository
in config. Particularly useful for local/on-premise/landscape
servers which may be signed by custom keys
"""

LOG.debug("Checking %s mirror for provided key", mirrortype)

mcfg = get_arch_mirrorconfig(cfg, mirrortype, arch)
if mcfg is None:
LOG.debug("No %s configuration provided", mirrortype)
return None

LOG.debug("Configuration for %s mirror provided", mirrortype)

if "keyid" in mcfg and "key" not in mcfg:
LOG.debug("Key provided in the form of keyid")
keyserver = DEFAULT_KEYSERVER
if "keyserver" in mcfg:
keyserver = mcfg["keyserver"]
# Retrieve key from remote and store in mirror config
mcfg["key"] = gpg.getkeybyid(mcfg["keyid"], keyserver)

# Fallback key if none provided
distro = getattr(cloud, "distro", None)
distro_name = getattr(distro, "name", None)
keypath = "/usr/share/keyrings/"
keysuffix = "-archive-keyring.gpg"
default = f"{keypath}{distro_name}{keysuffix}" if distro_name else None

mirror_key = mcfg.get("key", None)
if mirror_key:
# Source and key are explicitly defined
LOG.info("Setting key for %s mirror", mirrortype)
mirror_key = format_security_key(mirror_key)
else:
# Source specified, but key not specified
LOG.info("Setting default key for %s mirror", mirrortype)
mirror_key = default

return mirror_key


def get_mirror(cfg, mirrortype, arch, cloud):
"""pass the three potential stages of mirror specification
returns None is neither of them found anything otherwise the first
Expand All @@ -997,7 +1086,7 @@ def get_mirror(cfg, mirrortype, arch, cloud):
return mirror


def find_apt_mirror_info(cfg, cloud, arch=None):
def find_apt_mirror_info(cfg, cloud, arch=None, gpg=None):
"""find_apt_mirror_info
find an apt_mirror given the cfg provided.
It can check for separate config of primary and security mirrors
Expand All @@ -1008,15 +1097,22 @@ def find_apt_mirror_info(cfg, cloud, arch=None):
if arch is None:
arch = util.get_dpkg_architecture()
LOG.debug("got arch for mirror selection: %s", arch)

pmirror = get_mirror(cfg, "primary", arch, cloud)
LOG.debug("got primary mirror: %s", pmirror)
pmirror_key = get_mirror_key(cfg, "primary", arch, cloud, gpg)

smirror = get_mirror(cfg, "security", arch, cloud)
LOG.debug("got security mirror: %s", smirror)
smirror_key = get_mirror_key(cfg, "security", arch, cloud, gpg)

mirror_info = update_mirror_info(pmirror, smirror, arch, cloud)
mirror_keys = update_mirror_keys(pmirror_key, smirror_key, cloud)

# less complex replacements use only MIRROR, derive from primary
mirror_info["MIRROR"] = mirror_info["PRIMARY"]
mirror_info["PRIMARY_KEY"] = mirror_keys["PRIMARY_KEY"]
mirror_info["SECURITY_KEY"] = mirror_keys["SECURITY_KEY"]

return mirror_info

Expand Down
4 changes: 2 additions & 2 deletions templates/sources.list.debian.deb822.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ Types: deb deb-src
URIs: {{mirror}}
Suites: {{codename}} {{codename}}-updates {{codename}}-backports
Components: main
Signed-By: /usr/share/keyrings/debian-archive-keyring.gpg
Signed-By: {{primary_key | default('/usr/share/keyrings/debian-archive-keyring.gpg', true)}}

## Major bug fix updates produced after the final release of the distribution.
Types: deb deb-src
URIs: {{security}}
Suites: {{codename}}{% if codename in ('buster', 'stretch') %}/updates{% else %}-security{% endif %}
Components: main
Signed-By: /usr/share/keyrings/debian-archive-keyring.gpg
Signed-By: {{security_key | default('/usr/share/keyrings/debian-archive-keyring.gpg', true)}}
4 changes: 2 additions & 2 deletions templates/sources.list.ubuntu.deb822.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ Types: deb
URIs: {{mirror}}
Suites: {{codename}} {{codename}}-updates {{codename}}-backports
Components: main universe restricted multiverse
Signed-By: /usr/share/keyrings/ubuntu-archive-keyring.gpg
Signed-By: {{primary_key | default('/usr/share/keyrings/ubuntu-archive-keyring.gpg', true)}}

## Ubuntu security updates. Aside from URIs and Suites,
## this should mirror your choices in the previous section.
Types: deb
URIs: {{security}}
Suites: {{codename}}-security
Components: main universe restricted multiverse
Signed-By: /usr/share/keyrings/ubuntu-archive-keyring.gpg
Signed-By: {{security_key | default('/usr/share/keyrings/ubuntu-archive-keyring.gpg', true)}}
105 changes: 105 additions & 0 deletions tests/unittests/config/test_apt_configure_sources_list_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,39 @@
Components: main restricted
"""

EXAMPLE_CUSTOM_KEY_TMPL_DEB822 = """\
## template:jinja
# Generated by cloud-init
Types: deb deb-src
URIs: {{mirror}}
Suites: {{codename}} {{codename}}-updates
Components: main restricted
Signed-By: {{primary_key}}
# Security section
Types: deb deb-src
URIs: {{security}}
Suites: {{codename}}-security
Components: fakerel-security
Signed-By: {{security_key}}
"""

EXPECTED_PRIMARY_SRC_CONTENT_BASE = """\
# Generated by cloud-init
Types: deb deb-src
URIs: http://local.ubuntu.com/ubuntu/
Suites: fakerel fakerel-updates
Components: main restricted
"""

EXPECTED_SECURITY_SRC_CONTENT_BASE = """\
# Security section
Types: deb deb-src
URIs: http://local.ubuntu.com/ubuntu/
Suites: fakerel-security
Components: fakerel-security
"""


@pytest.mark.usefixtures("fake_filesystem")
class TestAptSourceConfigSourceList:
Expand Down Expand Up @@ -332,3 +365,75 @@ def test_apt_v3_srcl_custom_deb822_feature_aware(
sources_file = tmpdir.join(apt_file)
assert expected == sources_file.read()
assert 0o644 == stat.S_IMODE(sources_file.stat().mode)

@staticmethod
def custom_key_test_case_generator():
"""Helper function to generate cases of cloud distro,
and possible arrangements of provided prim+sec keys
"""
distros = ["ubuntu", "debian"]
# No need to test pm || sm = None since sources will
# default to standard ubuntu/debian archive
pm = "http://local.ubuntu.com/ubuntu/"
pmkeys = ["fakekey 4321", None]
sm = "http://local.ubuntu.com/ubuntu/"
smkeys = ["fakekey 1234", None]

case_list = [
(distro, pm, pmkey, sm, smkey)
for distro in distros
for pmkey in pmkeys
for smkey in smkeys
]

return tuple(case_list)

@pytest.mark.parametrize(
"distro,pm,pmkey,sm,smkey", custom_key_test_case_generator()
)
def test_apt_v3_source_list_psm_deb822_custom_keys(
self,
distro,
pm,
pmkey,
sm,
smkey,
mocker,
tmpdir,
):
"""test_apt_v3_source_list_psm_custom_keys - Test specifying
primary and security mirrors and keys
"""

self.deb822 = mocker.patch.object(
cc_apt_configure.features, "APT_DEB822_SOURCE_LIST_FILE", True
)

tmpl_file = f"/etc/cloud/templates/sources.list.{distro}.deb822.tmpl"
tmpl_content = EXAMPLE_CUSTOM_KEY_TMPL_DEB822
util.write_file(tmpl_file, tmpl_content)

cfg = {
"preserve_sources_list": False,
"primary": [{"arches": ["default"], "uri": pm, "key": pmkey}],
"security": [{"arches": ["default"], "uri": sm, "key": smkey}],
}

mycloud = get_cloud(distro)
cc_apt_configure.handle("test", {"apt": cfg}, mycloud, None)

apt_file = f"/etc/apt/sources.list.d/{distro}.sources"
sources_file = tmpdir.join(apt_file)

defaultKey = f"/usr/share/keyrings/{distro}-archive-keyring.gpg"

expected_pmkey = pmkey if pmkey else defaultKey
pm_signature = f"Signed-By: {expected_pmkey}"
expected_pm = f"{EXPECTED_PRIMARY_SRC_CONTENT_BASE}{pm_signature}"

expected_smkey = pmkey if not sm else (smkey if smkey else defaultKey)
sm_signature = f"Signed-By: {expected_smkey}"
expected_sm = f"{EXPECTED_SECURITY_SRC_CONTENT_BASE}{sm_signature}"

expected = f"{expected_pm}\n\n{expected_sm}\n"
assert expected == sources_file.read()
1 change: 1 addition & 0 deletions tools/.github-cla-signers
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ bipinbachhao
BirknerAlex
bmhughes
brianphaley
bryanfraschetti
BrinKe-dev
CalvoM
candlerb
Expand Down

0 comments on commit 578b854

Please sign in to comment.