Skip to content

pacman_key: support checking for expired and untrusted keys #9950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelogs/fragments/9950-pacman_key-verify-key-validity.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
minor_changes:
- pacman_key - support verifying that keys are trusted and not expired (https://github.com/ansible-collections/community.general/issues/9949, https://github.com/ansible-collections/community.general/pull/9950).
167 changes: 108 additions & 59 deletions plugins/modules/pacman_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,16 @@
default: /etc/pacman.d/gnupg
state:
description:
- Ensures that the key is present (added) or absent (revoked).
- Ensures that the key is V(present) (added) or V(absent) (revoked).
default: present
choices: [absent, present]
type: str
ensure_trusted:
description:
- Ensure that the key is trusted (signed by the Pacman machine key and not expired).
type: bool
default: false
version_added: 10.7.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That ship has sailed, please update to

Suggested change
version_added: 10.7.0
version_added: 11.0.0

"""

EXAMPLES = r"""
Expand Down Expand Up @@ -129,12 +135,55 @@
from ansible.module_utils.common.text.converters import to_native


class GpgListResult(object):
"""Wraps gpg --list-* output."""

def __init__(self, line):
self._parts = line.split(':')

@property
def kind(self):
return self._parts[0]

@property
def valid(self):
return self._parts[1]

@property
def is_fully_valid(self):
return self.valid == 'f'

@property
def key(self):
return self._parts[4]

@property
def user_id(self):
return self._parts[9]


def gpg_get_first(lines, kind, attr):
for line in lines:
glr = GpgListResult(line)
if glr.kind == kind:
return getattr(glr, attr)
Comment on lines +165 to +169
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And that would make this one:

Suggested change
def gpg_get_first(lines, kind, attr):
for line in lines:
glr = GpgListResult(line)
if glr.kind == kind:
return getattr(glr, attr)
def gpg_get_first(lines, kind, attr):
for glr in gpg_gather_all(lines):
if glr.kind == kind:
return getattr(glr, attr)



def gpg_gather_all(lines, kind, attr):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename it to parse_gpg_list?

result = []
for line in lines:
glr = GpgListResult(line)
if glr.kind == kind:
result.append(getattr(glr, attr))
return result
Comment on lines +172 to +178
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As suggested before, this could/should be simpler, as:

Suggested change
def gpg_gather_all(lines, kind, attr):
result = []
for line in lines:
glr = GpgListResult(line)
if glr.kind == kind:
result.append(getattr(glr, attr))
return result
def gpg_gather_all(lines):
return [GpgListResult(line) for line in lines]



class PacmanKey(object):
def __init__(self, module):
self.module = module
# obtain binary paths for gpg & pacman-key
self.gpg = module.get_bin_path('gpg', required=True)
self.pacman_key = module.get_bin_path('pacman-key', required=True)
self.gpg_binary = module.get_bin_path('gpg', required=True)
self.pacman_key_binary = module.get_bin_path('pacman-key', required=True)

# obtain module parameters
keyid = module.params['id']
Expand All @@ -146,47 +195,71 @@ def __init__(self, module):
force_update = module.params['force_update']
keyring = module.params['keyring']
state = module.params['state']
ensure_trusted = module.params['ensure_trusted']
self.keylength = 40

# sanitise key ID & check if key exists in the keyring
keyid = self.sanitise_keyid(keyid)
key_present = self.key_in_keyring(keyring, keyid)
key_validity = self.key_validity(keyring, keyid)
key_present = len(key_validity) > 0
key_valid = any(key_validity)

# check mode
if module.check_mode:
if state == "present":
if state == 'present':
changed = (key_present and force_update) or not key_present
if not changed and ensure_trusted:
changed = not (key_valid and self.key_is_trusted(keyring, keyid))
module.exit_json(changed=changed)
elif state == "absent":
if key_present:
module.exit_json(changed=True)
module.exit_json(changed=False)
if state == 'absent':
module.exit_json(changed=key_present)

if state == "present":
if key_present and not force_update:
if state == 'present':
trusted = key_valid and self.key_is_trusted(keyring, keyid)
if not force_update and key_present and (not ensure_trusted or trusted):
module.exit_json(changed=False)

changed = False
if data:
file = self.save_key(data)
self.add_key(keyring, file, keyid, verify)
module.exit_json(changed=True)
changed = True
elif file:
self.add_key(keyring, file, keyid, verify)
module.exit_json(changed=True)
changed = True
elif url:
data = self.fetch_key(url)
file = self.save_key(data)
self.add_key(keyring, file, keyid, verify)
module.exit_json(changed=True)
changed = True
elif keyserver:
self.recv_key(keyring, keyid, keyserver)
module.exit_json(changed=True)
elif state == "absent":
changed = True
if changed or (ensure_trusted and not trusted):
self.lsign_key(keyring=keyring, keyid=keyid)
changed = True
module.exit_json(changed=changed)
elif state == 'absent':
if key_present:
self.remove_key(keyring, keyid)
module.exit_json(changed=True)
module.exit_json(changed=False)

def gpg(self, args, keyring=None, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently **kwargs is here just to handle the different cases of check_rc. That being the case, I would suggest to drop the ** thing and make it explicit:

Suggested change
def gpg(self, args, keyring=None, **kwargs):
def gpg(self, args, keyring=None, check_rc=True):

or whichever default value you prefer, then adjust the calls to that method accordingly - I mean, the calls will work as they are right now, but you might want to remove the extra parameter for the default value (True in my suggestion).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function doesn't do anything with check_rc. IMHO **kwargs conveys that it's only passed through.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@russoz means that kwargs right now is only ever used for check_rc, so it's better to make this more explicit.

The same is true for pacman_key()'s **kwargs.

Copy link
Author

@AlD AlD May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I got that. What I was trying to say was that both functions are just wrappers around gpg/pacman-key via module.run_command, and having the parameters of the wrapped function not named explicitly helps conveying which function (wrapper/wrapped) owns which set of parameters. I acknowledge that the benefit may be slim as the size of each set is currently small (2 and 1).

For some empirical evidence, the top answers in https://stackoverflow.com/questions/1415812/why-use-kwargs-in-python-what-are-some-real-world-advantages-over-using-named for example mention wrappers as a valid use case for usage of **kwargs, so I think the use here matches common expectations.

In addition to the semantics it e.g. enables IDEs that support it to show all parameters + any present doc strings supported by run_command on these functions, without having to duplicate them.

That said, if you feel strongly about it, happy to change it. Just wanted to clarify that it was a deliberate choice.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that strong. I still think the readability would be improved by naming - or removing that parameter as it is not used - but happy to merge the PR as-is wrt this.

cmd = [self.gpg_binary]
if keyring:
cmd.append('--homedir={keyring}'.format(keyring=keyring))
cmd.extend(['--no-permission-warning', '--with-colons', '--quiet', '--batch', '--no-tty'])
return self.module.run_command(cmd + args, **kwargs)
Comment on lines +247 to +252
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to consider using CmdRunner. See https://docs.ansible.com/ansible/latest/collections/community/general/docsite/guide_cmdrunner.html for more details.


def pacman_key(self, args, keyring, **kwargs):
return self.module.run_command(
[self.pacman_key_binary, '--gpgdir', keyring] + args,
**kwargs)

def pacman_machine_key(self, keyring):
unused_rc, stdout, unused_stderr = self.gpg(['--list-secret-key'], keyring=keyring)
return gpg_get_first(stdout.splitlines(), 'sec', 'key')

def is_hexadecimal(self, string):
"""Check if a given string is valid hexadecimal"""
try:
Expand Down Expand Up @@ -216,14 +289,11 @@ def fetch_key(self, url):

def recv_key(self, keyring, keyid, keyserver):
"""Receives key via keyserver"""
cmd = [self.pacman_key, '--gpgdir', keyring, '--keyserver', keyserver, '--recv-keys', keyid]
self.module.run_command(cmd, check_rc=True)
self.lsign_key(keyring, keyid)
self.pacman_key(['--keyserver', keyserver, '--recv-keys', keyid], keyring=keyring, check_rc=True)

def lsign_key(self, keyring, keyid):
"""Locally sign key"""
cmd = [self.pacman_key, '--gpgdir', keyring]
self.module.run_command(cmd + ['--lsign-key', keyid], check_rc=True)
self.pacman_key(['--lsign-key', keyid], keyring=keyring, check_rc=True)

def save_key(self, data):
"Saves key data to a temporary file"
Expand All @@ -238,14 +308,11 @@ def add_key(self, keyring, keyfile, keyid, verify):
"""Add key to pacman's keyring"""
if verify:
self.verify_keyfile(keyfile, keyid)
cmd = [self.pacman_key, '--gpgdir', keyring, '--add', keyfile]
self.module.run_command(cmd, check_rc=True)
self.lsign_key(keyring, keyid)
self.pacman_key(['--add', keyfile], keyring=keyring, check_rc=True)

def remove_key(self, keyring, keyid):
"""Remove key from pacman's keyring"""
cmd = [self.pacman_key, '--gpgdir', keyring, '--delete', keyid]
self.module.run_command(cmd, check_rc=True)
self.pacman_key(['--delete', keyid], keyring=keyring, check_rc=True)

def verify_keyfile(self, keyfile, keyid):
"""Verify that keyfile matches the specified key ID"""
Expand All @@ -254,48 +321,29 @@ def verify_keyfile(self, keyfile, keyid):
elif keyid is None:
self.module.fail_json(msg="expected a key ID, got none")

rc, stdout, stderr = self.module.run_command(
[
self.gpg,
'--with-colons',
'--with-fingerprint',
'--batch',
'--no-tty',
'--show-keys',
keyfile
],
rc, stdout, stderr = self.gpg(
['--with-fingerprint', '--show-keys', keyfile],
check_rc=True,
)

extracted_keyid = None
for line in stdout.splitlines():
if line.startswith('fpr:'):
extracted_keyid = line.split(':')[9]
break

extracted_keyid = gpg_get_first(stdout.splitlines(), 'fpr', 'user_id')
if extracted_keyid != keyid:
self.module.fail_json(msg="key ID does not match. expected %s, got %s" % (keyid, extracted_keyid))

def key_in_keyring(self, keyring, keyid):
"Check if the key ID is in pacman's keyring"
rc, stdout, stderr = self.module.run_command(
[
self.gpg,
'--with-colons',
'--batch',
'--no-tty',
'--no-default-keyring',
'--keyring=%s/pubring.gpg' % keyring,
'--list-keys', keyid
],
check_rc=False,
)
def key_validity(self, keyring, keyid):
"Check if the key ID is in pacman's keyring and not expired"
rc, stdout, stderr = self.gpg(['--no-default-keyring', '--list-keys', keyid], keyring=keyring, check_rc=False)
if rc != 0:
if stderr.find("No public key") >= 0:
return False
return []
else:
self.module.fail_json(msg="gpg returned an error: %s" % stderr)
return True
return gpg_gather_all(stdout.splitlines(), 'uid', 'is_fully_valid')

def key_is_trusted(self, keyring, keyid):
"""Check if key is signed and not expired."""
unused_rc, stdout, unused_stderr = self.gpg(['--check-signatures', keyid], keyring=keyring)
return self.pacman_machine_key(keyring) in gpg_gather_all(stdout.splitlines(), 'sig', 'key')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function gpg_gather_all() does more than just gathering, it also filters. TBH, I think that function is doing more than it should and the code would be leaner if it only returned the list of GpgListResult objects. Then, the code here could be changed to:

Suggested change
return self.pacman_machine_key(keyring) in gpg_gather_all(stdout.splitlines(), 'sig', 'key')
return self.pacman_machine_key(keyring) in [x.key for x in gpg_gather_all(stdout.splitlines() if x.kind == 'sig')

I think that makes it easier for the reader to understand what is really going on.

Same applies to line 341 above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was that it gathers the key of all kind GPG results. I'm not really convinced that the list comprehension is easier to understand tbh.

Copy link
Collaborator

@russoz russoz May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, list comprehensions some people love them, some people don't. You can guess my type ;-).

Anyways, I'm OK without the comprehension, but simple fact that you had to add more words to explain what the function does is a good sign that its name could be improved. Maybe gather_key_from_all() ? Your thinking makes sense for you who wrote this about now. Someone else, one year from now, will read this code out of the blue and they will have to go up and down the file, read that function, see what it does, just to figure out what took a couple of words to explain. So, the point is, do the explaining now, save everybody else (including yourself 1, 3 or 5 years from now) that time later on.



def main():
Expand All @@ -309,6 +357,7 @@ def main():
verify=dict(type='bool', default=True),
force_update=dict(type='bool', default=False),
keyring=dict(type='path', default='/etc/pacman.d/gnupg'),
ensure_trusted=dict(type='bool', default=False),
state=dict(type='str', default='present', choices=['absent', 'present']),
),
supports_check_mode=True,
Expand Down
Loading
Loading