From c9245b757c9dfeede4f14dbe012b5c1400779283 Mon Sep 17 00:00:00 2001 From: Jan Pazdziora Date: Sat, 19 Apr 2025 19:05:22 +0200 Subject: [PATCH 1/3] Add support for --challenge-deploy as an alternative to --acme-dir, to invoke a script to store the key authorization to the .well-known/acme-challenge/ location, possibly on a different machine. --- README.md | 33 ++++++++++++++++++++++++++++++++- acme_tiny.py | 43 ++++++++++++++++++++++++++----------------- 2 files changed, 58 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 40a4377a..8c64f034 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,32 @@ to be run on your server and have access to your private Let's Encrypt account key, I tried to make it as tiny as possible (currently less than 200 lines). The only prerequisites are python and openssl. +This fork of the https://github.com/diafygi/acme-tiny/ repo supports +`--challenge-deploy` as an alternative to the `--acme-dir` option, to run +a script to deploy the challenge. This makes it easier to separate privileges +needed to communicate with the ACME server (this script) from those needed +to publish the challenge material. + +On standard input on a single line, the deployment script gets +the domain, token, and key authorization strings, separated by spaces. +The rough equivalent of the + +``` + --acme-dir /var/www/acme-challenges/ +``` +behaviour can thus be achieved with +``` + --challenge-deploy 'read d t ka ; echo "$ka" > /var/www/acme-challenges/"$t"' +``` +But you probably want to use this version of the script for cases like +``` + --challenge-deploy /usr/local/bin/acme-challenge-deploy +``` +or +``` + --challenge-deploy 'ssh -i ~/.ssh/acme-deploy acme@web.example.com 2>&1' +``` + **PLEASE READ THE SOURCE CODE! YOU MUST TRUST IT WITH YOUR PRIVATE ACCOUNT KEY!** ## Donate @@ -120,6 +146,9 @@ and read your private account key and CSR. ``` # Run the script on your server python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /var/www/challenges/ > ./signed_chain.crt +# or +python acme_tiny.py --account-key ./account.key --csr ./domain.csr \ + --challenge-deploy 'read d t ka ; echo "$ka" > /var/www/acme-challenges/"$t"' > ./signed_chain.crt ``` ### Step 5: Install the certificate @@ -169,7 +198,9 @@ for example script). Example of a `renew_cert.sh`: ```sh #!/usr/bin/sh -python /path/to/acme_tiny.py --account-key /path/to/account.key --csr /path/to/domain.csr --acme-dir /var/www/challenges/ > /path/to/signed_chain.crt.tmp || exit +python /path/to/acme_tiny.py --account-key /path/to/account.key --csr /path/to/domain.csr \ + --challenge-deploy 'read d t ka ; echo "$ka" > /var/www/acme-challenges/"$t"' \ + > /path/to/signed_chain.crt.tmp || exit mv /path/to/signed_chain.crt.tmp /path/to/signed_chain.crt service nginx reload ``` diff --git a/acme_tiny.py b/acme_tiny.py index d992d02d..a2e98cbf 100755 --- a/acme_tiny.py +++ b/acme_tiny.py @@ -13,7 +13,7 @@ LOGGER.addHandler(logging.StreamHandler()) LOGGER.setLevel(logging.INFO) -def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None): +def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None, challenge_deploy=None): directory, acct_headers, alg, jwk = None, None, None, None # global variables # helper functions - base64 encode for jose spec @@ -21,8 +21,8 @@ def _b64(b): return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "") # helper function - run external commands - def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"): - proc = subprocess.Popen(cmd_list, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error", shell=False): + proc = subprocess.Popen(cmd_list, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell) out, err = proc.communicate(cmd_input) if proc.returncode != 0: raise IOError("{0}\n{1}".format(err_msg, err)) @@ -131,27 +131,33 @@ def _poll_until_not(url, pending_statuses, err_msg): continue log.info("Verifying {0}...".format(domain)) - # find the http-01 challenge and write the challenge file + # find the http-01 challenge challenge = [c for c in authorization['challenges'] if c['type'] == "http-01"][0] token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) keyauthorization = "{0}.{1}".format(token, thumbprint) - wellknown_path = os.path.join(acme_dir, token) - with open(wellknown_path, "w") as wellknown_file: - wellknown_file.write(keyauthorization) - - # check that the file is in place - try: - wellknown_url = "http://{0}{1}/.well-known/acme-challenge/{2}".format(domain, "" if check_port is None else ":{0}".format(check_port), token) - assert (disable_check or _do_request(wellknown_url)[0] == keyauthorization) - except (AssertionError, ValueError) as e: - raise ValueError("Wrote file to {0}, but couldn't download {1}: {2}".format(wellknown_path, wellknown_url, e)) + wellknown_path = os.path.join(acme_dir, token) if acme_dir else None + if wellknown_path: + with open(wellknown_path, "w") as wellknown_file: + wellknown_file.write(keyauthorization) + # check that the file is in place + try: + wellknown_url = "http://{0}{1}/.well-known/acme-challenge/{2}".format(domain, "" if check_port is None else ":{0}".format(check_port), token) + assert (disable_check or _do_request(wellknown_url)[0] == keyauthorization) + except (AssertionError, ValueError) as e: + raise ValueError("Wrote file to {0}, but couldn't download {1}: {2}".format(wellknown_path, wellknown_url, e)) + if challenge_deploy: + log.info("Running {0}...".format(challenge_deploy)) + out = _cmd(challenge_deploy, stdin=subprocess.PIPE, cmd_input="{0} {1} {2}\n".format(domain, token, keyauthorization).encode('utf8'), + shell=True, err_msg="Error storing key authorization for {0}".format(domain)) + log.info(("Finished with {0}" if out else "Finished.").format(out.decode('utf8'))) # say the challenge is done _send_signed_request(challenge['url'], {}, "Error submitting challenges: {0}".format(domain)) authorization = _poll_until_not(auth_url, ["pending"], "Error checking challenge status for {0}".format(domain)) if authorization['status'] != "valid": raise ValueError("Challenge did not pass for {0}: {1}".format(domain, authorization)) - os.remove(wellknown_path) + if wellknown_path: + os.remove(wellknown_path) log.info("{0} verified!".format(domain)) # finalize the order with the csr @@ -182,7 +188,8 @@ def main(argv=None): ) parser.add_argument("--account-key", required=True, help="path to your Let's Encrypt account private key") parser.add_argument("--csr", required=True, help="path to your certificate signing request") - parser.add_argument("--acme-dir", required=True, help="path to the .well-known/acme-challenge/ directory") + parser.add_argument("--acme-dir", help="path to the .well-known/acme-challenge/ directory") + parser.add_argument("--challenge-deploy", help="script which gets called to expose the key authorization in webserver") parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors") parser.add_argument("--disable-check", default=False, action="store_true", help="disable checking if the challenge file is hosted correctly before telling the CA") parser.add_argument("--directory-url", default=DEFAULT_DIRECTORY_URL, help="certificate authority directory url, default is Let's Encrypt") @@ -191,8 +198,10 @@ def main(argv=None): parser.add_argument("--check-port", metavar="PORT", default=None, help="what port to use when self-checking the challenge file, default is port 80") args = parser.parse_args(argv) + if not (args.acme_dir or args.challenge_deploy): + parser.error('Specify at least --acme-dir or --challenge-deploy') LOGGER.setLevel(args.quiet or LOGGER.level) - signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, check_port=args.check_port) + signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, check_port=args.check_port, challenge_deploy=args.challenge_deploy) sys.stdout.write(signed_crt) if __name__ == "__main__": # pragma: no cover From c13b4f5391b2ad27bdcbbe499066d7dc92fe7a2d Mon Sep 17 00:00:00 2001 From: Jan Pazdziora Date: Sat, 19 Apr 2025 20:54:15 +0200 Subject: [PATCH 2/3] Add support for --challenge-cleanup. --- README.md | 5 +++++ acme_tiny.py | 11 +++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 8c64f034..d702f3af 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,11 @@ The rough equivalent of the behaviour can thus be achieved with ``` --challenge-deploy 'read d t ka ; echo "$ka" > /var/www/acme-challenges/"$t"' + --challenge-cleanup 'read d t ka ; rm -f /var/www/acme-challenges/"$t"' ``` +There is also a `--challenge-cleanup` counterpart, run with the same input +after the authorization passed and the challenge material is no longer needed. + But you probably want to use this version of the script for cases like ``` --challenge-deploy /usr/local/bin/acme-challenge-deploy @@ -33,6 +37,7 @@ But you probably want to use this version of the script for cases like or ``` --challenge-deploy 'ssh -i ~/.ssh/acme-deploy acme@web.example.com 2>&1' + --challenge-cleanup 'ssh -i ~/.ssh/acme-deploy acme@web.example.com cleanup 2>&1' ``` **PLEASE READ THE SOURCE CODE! YOU MUST TRUST IT WITH YOUR PRIVATE ACCOUNT KEY!** diff --git a/acme_tiny.py b/acme_tiny.py index a2e98cbf..3fc63a39 100755 --- a/acme_tiny.py +++ b/acme_tiny.py @@ -13,7 +13,7 @@ LOGGER.addHandler(logging.StreamHandler()) LOGGER.setLevel(logging.INFO) -def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None, challenge_deploy=None): +def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None, challenge_deploy=None, challenge_cleanup=None): directory, acct_headers, alg, jwk = None, None, None, None # global variables # helper functions - base64 encode for jose spec @@ -136,6 +136,7 @@ def _poll_until_not(url, pending_statuses, err_msg): token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) keyauthorization = "{0}.{1}".format(token, thumbprint) wellknown_path = os.path.join(acme_dir, token) if acme_dir else None + cmd_input = "{0} {1} {2}\n".format(domain, token, keyauthorization).encode('utf8') if wellknown_path: with open(wellknown_path, "w") as wellknown_file: wellknown_file.write(keyauthorization) @@ -147,8 +148,7 @@ def _poll_until_not(url, pending_statuses, err_msg): raise ValueError("Wrote file to {0}, but couldn't download {1}: {2}".format(wellknown_path, wellknown_url, e)) if challenge_deploy: log.info("Running {0}...".format(challenge_deploy)) - out = _cmd(challenge_deploy, stdin=subprocess.PIPE, cmd_input="{0} {1} {2}\n".format(domain, token, keyauthorization).encode('utf8'), - shell=True, err_msg="Error storing key authorization for {0}".format(domain)) + out = _cmd(challenge_deploy, stdin=subprocess.PIPE, cmd_input=cmd_input, shell=True, err_msg="Error storing key authorization for {0}".format(domain)) log.info(("Finished with {0}" if out else "Finished.").format(out.decode('utf8'))) # say the challenge is done @@ -158,6 +158,8 @@ def _poll_until_not(url, pending_statuses, err_msg): raise ValueError("Challenge did not pass for {0}: {1}".format(domain, authorization)) if wellknown_path: os.remove(wellknown_path) + if challenge_cleanup: + _cmd(challenge_cleanup, stdin=subprocess.PIPE, cmd_input=cmd_input, shell=True, err_msg="Error cleaning up the key authorization for {0}".format(domain)) log.info("{0} verified!".format(domain)) # finalize the order with the csr @@ -190,6 +192,7 @@ def main(argv=None): parser.add_argument("--csr", required=True, help="path to your certificate signing request") parser.add_argument("--acme-dir", help="path to the .well-known/acme-challenge/ directory") parser.add_argument("--challenge-deploy", help="script which gets called to expose the key authorization in webserver") + parser.add_argument("--challenge-cleanup", help="script to cleanup the exposed key authorization") parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors") parser.add_argument("--disable-check", default=False, action="store_true", help="disable checking if the challenge file is hosted correctly before telling the CA") parser.add_argument("--directory-url", default=DEFAULT_DIRECTORY_URL, help="certificate authority directory url, default is Let's Encrypt") @@ -201,7 +204,7 @@ def main(argv=None): if not (args.acme_dir or args.challenge_deploy): parser.error('Specify at least --acme-dir or --challenge-deploy') LOGGER.setLevel(args.quiet or LOGGER.level) - signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, check_port=args.check_port, challenge_deploy=args.challenge_deploy) + signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, check_port=args.check_port, challenge_deploy=args.challenge_deploy, challenge_cleanup=args.challenge_cleanup) sys.stdout.write(signed_crt) if __name__ == "__main__": # pragma: no cover From f4c928ccfbfc092019c2aa1ed4df7b3fdd70ae9e Mon Sep 17 00:00:00 2001 From: Jan Pazdziora Date: Sat, 19 Apr 2025 21:20:55 +0200 Subject: [PATCH 3/3] Add support for the DNS-01 challenge style. --- README.md | 5 +++++ acme_tiny.py | 11 +++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index d702f3af..5d2db21f 100644 --- a/README.md +++ b/README.md @@ -36,10 +36,15 @@ But you probably want to use this version of the script for cases like ``` or ``` + --challenge-type dns-01 --challenge-deploy 'ssh -i ~/.ssh/acme-deploy acme@web.example.com 2>&1' --challenge-cleanup 'ssh -i ~/.ssh/acme-deploy acme@web.example.com cleanup 2>&1' ``` +The script also supports option `--challenge-type`, defaulting to `http-01` +and supporting `dns-01`, which formats the key authorization string passed +to the deploy and cleanup scripts in a way expected in the DNS TXT record. + **PLEASE READ THE SOURCE CODE! YOU MUST TRUST IT WITH YOUR PRIVATE ACCOUNT KEY!** ## Donate diff --git a/acme_tiny.py b/acme_tiny.py index 3fc63a39..2b21f763 100755 --- a/acme_tiny.py +++ b/acme_tiny.py @@ -13,7 +13,7 @@ LOGGER.addHandler(logging.StreamHandler()) LOGGER.setLevel(logging.INFO) -def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None, challenge_deploy=None, challenge_cleanup=None): +def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None, challenge_type='http-01', challenge_deploy=None, challenge_cleanup=None): directory, acct_headers, alg, jwk = None, None, None, None # global variables # helper functions - base64 encode for jose spec @@ -131,10 +131,12 @@ def _poll_until_not(url, pending_statuses, err_msg): continue log.info("Verifying {0}...".format(domain)) - # find the http-01 challenge - challenge = [c for c in authorization['challenges'] if c['type'] == "http-01"][0] + # find the requested challenge + challenge = [c for c in authorization['challenges'] if c['type'] == challenge_type][0] token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) keyauthorization = "{0}.{1}".format(token, thumbprint) + if challenge_type == 'dns-01': + keyauthorization = _b64(hashlib.sha256(keyauthorization.encode()).digest()) wellknown_path = os.path.join(acme_dir, token) if acme_dir else None cmd_input = "{0} {1} {2}\n".format(domain, token, keyauthorization).encode('utf8') if wellknown_path: @@ -191,6 +193,7 @@ def main(argv=None): parser.add_argument("--account-key", required=True, help="path to your Let's Encrypt account private key") parser.add_argument("--csr", required=True, help="path to your certificate signing request") parser.add_argument("--acme-dir", help="path to the .well-known/acme-challenge/ directory") + parser.add_argument("--challenge-type", default="http-01", help="http-01 (default) or dns-01, specifying the key authorization value format") parser.add_argument("--challenge-deploy", help="script which gets called to expose the key authorization in webserver") parser.add_argument("--challenge-cleanup", help="script to cleanup the exposed key authorization") parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors") @@ -204,7 +207,7 @@ def main(argv=None): if not (args.acme_dir or args.challenge_deploy): parser.error('Specify at least --acme-dir or --challenge-deploy') LOGGER.setLevel(args.quiet or LOGGER.level) - signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, check_port=args.check_port, challenge_deploy=args.challenge_deploy, challenge_cleanup=args.challenge_cleanup) + signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, check_port=args.check_port, challenge_deploy=args.challenge_deploy, challenge_cleanup=args.challenge_cleanup, challenge_type=args.challenge_type) sys.stdout.write(signed_crt) if __name__ == "__main__": # pragma: no cover