From 5198e0b5c02e0cde9c6922aaab2988181699bdbf Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 17:47:36 -0700 Subject: [PATCH 01/21] try setuptools --- {python => kapel}/KAPEL.py | 0 {python => kapel}/KAPELConfig.py | 0 {python => kapel}/requirements.txt | 0 setup.py | 11 +++++++++++ 4 files changed, 11 insertions(+) rename {python => kapel}/KAPEL.py (100%) rename {python => kapel}/KAPELConfig.py (100%) rename {python => kapel}/requirements.txt (100%) create mode 100644 setup.py diff --git a/python/KAPEL.py b/kapel/KAPEL.py similarity index 100% rename from python/KAPEL.py rename to kapel/KAPEL.py diff --git a/python/KAPELConfig.py b/kapel/KAPELConfig.py similarity index 100% rename from python/KAPELConfig.py rename to kapel/KAPELConfig.py diff --git a/python/requirements.txt b/kapel/requirements.txt similarity index 100% rename from python/requirements.txt rename to kapel/requirements.txt diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..e2411fb --- /dev/null +++ b/setup.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 + +from setuptools import setup, find_packages +from os import environ + +environ['PIP_NO_WARN_SCRIPT_LOCATION'] = "0" + +setup(name='kapel', +# version='0.1', + packages = find_packages(), + ) From 5690d82566f4d1356b9b57b7b7950af5bc5d862b Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 17:53:59 -0700 Subject: [PATCH 02/21] move requirements --- kapel/requirements.txt => requirements.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename kapel/requirements.txt => requirements.txt (100%) diff --git a/kapel/requirements.txt b/requirements.txt similarity index 100% rename from kapel/requirements.txt rename to requirements.txt From 67deffcca7dbb24ece7669f0ef1f1b0ae679035a Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 17:56:04 -0700 Subject: [PATCH 03/21] requirements --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index e2411fb..6a83b2d 100644 --- a/setup.py +++ b/setup.py @@ -8,4 +8,5 @@ setup(name='kapel', # version='0.1', packages = find_packages(), + install_requires=['environs', 'dirq', 'prometheus-api-client'] ) From 81a74e12e4f39600cde0dc4a9c9d38112c145739 Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 17:58:12 -0700 Subject: [PATCH 04/21] update requirements --- requirements.txt | 9 --------- setup.py | 10 +++++++--- 2 files changed, 7 insertions(+), 12 deletions(-) delete mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 2c7ec0c..0000000 --- a/requirements.txt +++ /dev/null @@ -1,9 +0,0 @@ -# Useful for handling configuration -environs -# Useful for adding messages to outgoing queue -dirq -# For querying Prometheus -prometheus-api-client -# For Prometheus API queries - not needed anymore if we use API lib instead -#requests - diff --git a/setup.py b/setup.py index 6a83b2d..3da268d 100644 --- a/setup.py +++ b/setup.py @@ -7,6 +7,10 @@ setup(name='kapel', # version='0.1', - packages = find_packages(), - install_requires=['environs', 'dirq', 'prometheus-api-client'] - ) + packages = find_packages(), + install_requires=[ + 'environs', # for handling configuration + 'dirq', # for sending messages + 'prometheus-api-client' # for querying Prometheus + ] +) From c482243128df830b5667fd67a58dc470a32bee6d Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 18:14:50 -0700 Subject: [PATCH 05/21] separate main function --- kapel/KAPEL.py | 21 +-------------------- kapel/KAPELConfig.py | 2 ++ kapel/__main__.py | 21 +++++++++++++++++++++ setup.py | 12 ++++++++---- 4 files changed, 32 insertions(+), 24 deletions(-) create mode 100644 kapel/__main__.py diff --git a/kapel/KAPEL.py b/kapel/KAPEL.py index 8947321..b0248bf 100644 --- a/kapel/KAPEL.py +++ b/kapel/KAPEL.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # This is a python script for container-native publishing of kubernetes job data for APEL accounting. # It follows container-native practices such as using environment variables for configuration @@ -201,22 +201,3 @@ def processPeriod(config, iYear, iMonth, iInstant, iRange): print(f'Processed {len(endtime)} records in {end - start} s, writing output to {config.output_path}/{outFile}:') print('--------------------------------\n' + outputMessage + '--------------------------------') -def main(envFile): - # TODO: need error handling if env file doesn't exist. See https://github.com/theskumar/python-dotenv/issues/297 - print('Starting KAPEL processor: ' + __file__) - cfg = KAPELConfig(envFile) - - periods = getTimePeriods(cfg.publishing_mode, startTime=cfg.query_start, endTime=cfg.query_end) - print('time periods:') - print(periods) - - for i in periods: - r = str(i['queryRangeSeconds']) + 's' - processPeriod(config=cfg, iYear=i['year'], iMonth=i['month'], iInstant=i['queryInstant'], iRange=r) - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Extract Kubernetes job accounting data from Prometheus and prepare it for APEL publishing.") - # This should be the only CLI argument, since all other config should be specified via env. - parser.add_argument("-e", "--env-file", default=None, help="name of file containing environment variables for configuration") - args = parser.parse_args() - main(args.env_file) diff --git a/kapel/KAPELConfig.py b/kapel/KAPELConfig.py index 727a3ae..d1f54ef 100644 --- a/kapel/KAPELConfig.py +++ b/kapel/KAPELConfig.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + # Configuration module for KAPEL from environs import Env diff --git a/kapel/__main__.py b/kapel/__main__.py new file mode 100644 index 0000000..62266c2 --- /dev/null +++ b/kapel/__main__.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 + +def main(envFile): + # TODO: need error handling if env file doesn't exist. See https://github.com/theskumar/python-dotenv/issues/297 + print('Starting KAPEL processor: ' + __file__) + cfg = KAPELConfig(envFile) + + periods = getTimePeriods(cfg.publishing_mode, startTime=cfg.query_start, endTime=cfg.query_end) + print('time periods:') + print(periods) + + for i in periods: + r = str(i['queryRangeSeconds']) + 's' + processPeriod(config=cfg, iYear=i['year'], iMonth=i['month'], iInstant=i['queryInstant'], iRange=r) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Extract Kubernetes job accounting data from Prometheus and prepare it for APEL publishing.") + # This should be the only CLI argument, since all other config should be specified via env. + parser.add_argument("-e", "--env-file", default=None, help="name of file containing environment variables for configuration") + args = parser.parse_args() + main(args.env_file) diff --git a/setup.py b/setup.py index 3da268d..d844c3c 100644 --- a/setup.py +++ b/setup.py @@ -3,14 +3,18 @@ from setuptools import setup, find_packages from os import environ -environ['PIP_NO_WARN_SCRIPT_LOCATION'] = "0" - -setup(name='kapel', +setup( + name='kapel', # version='0.1', packages = find_packages(), install_requires=[ 'environs', # for handling configuration 'dirq', # for sending messages 'prometheus-api-client' # for querying Prometheus - ] + ], + entry_points={ + 'console_scripts': [ + 'kapel = kapel:main', + ], + }, ) From 15c0d180a47e0331e8cbc84b2e871780d80bc219 Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 18:23:07 -0700 Subject: [PATCH 06/21] test --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index d844c3c..7850f41 100644 --- a/setup.py +++ b/setup.py @@ -5,8 +5,8 @@ setup( name='kapel', -# version='0.1', - packages = find_packages(), + version='0.1', +# packages = find_packages(), install_requires=[ 'environs', # for handling configuration 'dirq', # for sending messages From 619a9845992c6ef084f56b1babbc4c446f2a17e3 Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 18:25:03 -0700 Subject: [PATCH 07/21] test --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 7850f41..54174b1 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name='kapel', version='0.1', -# packages = find_packages(), + packages = 'kapel', install_requires=[ 'environs', # for handling configuration 'dirq', # for sending messages From 896a9cc7f63988ace642976c5b80b7beb0eeac73 Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 18:26:55 -0700 Subject: [PATCH 08/21] fix --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 54174b1..8549d75 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name='kapel', version='0.1', - packages = 'kapel', + packages = ['kapel'], install_requires=[ 'environs', # for handling configuration 'dirq', # for sending messages From d2e15dba888ae9e86d1514a940ce0447c752823b Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 18:37:24 -0700 Subject: [PATCH 09/21] renames --- kapel/__main__.py | 4 ++-- kapel/{KAPEL.py => kapel.py} | 4 ++-- kapel/{KAPELConfig.py => kapelConfig.py} | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) rename kapel/{KAPEL.py => kapel.py} (99%) rename kapel/{KAPELConfig.py => kapelConfig.py} (99%) diff --git a/kapel/__main__.py b/kapel/__main__.py index 62266c2..f6c441f 100644 --- a/kapel/__main__.py +++ b/kapel/__main__.py @@ -2,8 +2,8 @@ def main(envFile): # TODO: need error handling if env file doesn't exist. See https://github.com/theskumar/python-dotenv/issues/297 - print('Starting KAPEL processor: ' + __file__) - cfg = KAPELConfig(envFile) + print('Starting kapel processor: ' + __file__) + cfg = kapelConfig(envFile) periods = getTimePeriods(cfg.publishing_mode, startTime=cfg.query_start, endTime=cfg.query_end) print('time periods:') diff --git a/kapel/KAPEL.py b/kapel/kapel.py similarity index 99% rename from kapel/KAPEL.py rename to kapel/kapel.py index b0248bf..a38cb07 100644 --- a/kapel/KAPEL.py +++ b/kapel/kapel.py @@ -13,7 +13,7 @@ import datetime from dateutil.rrule import rrule, MONTHLY -from KAPELConfig import KAPELConfig +from kapelConfig import kapelConfig from prometheus_api_client import PrometheusConnect from dirq.QueueSimple import QueueSimple from timeit import default_timer as timer @@ -139,7 +139,7 @@ def rearrange(x): yield item['metric']['exported_pod'], float(item['value'][1]) # process a time period (do prom query, process data, write output) -# takes a KAPELConfig object and one element of output from getTimePeriods +# takes a kapelConfig object and one element of output from getTimePeriods def processPeriod(config, iYear, iMonth, iInstant, iRange): print(f'Processing year {iYear}, month {iMonth}, starting at {iInstant} and going back {iRange}.') diff --git a/kapel/KAPELConfig.py b/kapel/kapelConfig.py similarity index 99% rename from kapel/KAPELConfig.py rename to kapel/kapelConfig.py index d1f54ef..1dda852 100644 --- a/kapel/KAPELConfig.py +++ b/kapel/kapelConfig.py @@ -6,7 +6,7 @@ # Read config settings from environment variables (and a named env file in CWD if specified), # do input validation, and return a config object. Note, if a '.env' file exists in CWD it will be used by default. -class KAPELConfig: +class kapelConfig: def __init__(self, envFile=None): env = Env() # Read a .env file if one is specified, otherwise only environment variables will be used. From e00e21ce412b01cb7b9045878273625371b3468e Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 18:38:04 -0700 Subject: [PATCH 10/21] bump --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 8549d75..8eb6e9c 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='kapel', - version='0.1', + version='0.2', packages = ['kapel'], install_requires=[ 'environs', # for handling configuration From 997e17899a5df244930cbd5391339ebd39682f69 Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 18:46:38 -0700 Subject: [PATCH 11/21] combine again --- kapel/__main__.py | 21 --------------------- kapel/kapel.py | 19 +++++++++++++++++++ 2 files changed, 19 insertions(+), 21 deletions(-) delete mode 100644 kapel/__main__.py diff --git a/kapel/__main__.py b/kapel/__main__.py deleted file mode 100644 index f6c441f..0000000 --- a/kapel/__main__.py +++ /dev/null @@ -1,21 +0,0 @@ -#!/usr/bin/env python3 - -def main(envFile): - # TODO: need error handling if env file doesn't exist. See https://github.com/theskumar/python-dotenv/issues/297 - print('Starting kapel processor: ' + __file__) - cfg = kapelConfig(envFile) - - periods = getTimePeriods(cfg.publishing_mode, startTime=cfg.query_start, endTime=cfg.query_end) - print('time periods:') - print(periods) - - for i in periods: - r = str(i['queryRangeSeconds']) + 's' - processPeriod(config=cfg, iYear=i['year'], iMonth=i['month'], iInstant=i['queryInstant'], iRange=r) - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Extract Kubernetes job accounting data from Prometheus and prepare it for APEL publishing.") - # This should be the only CLI argument, since all other config should be specified via env. - parser.add_argument("-e", "--env-file", default=None, help="name of file containing environment variables for configuration") - args = parser.parse_args() - main(args.env_file) diff --git a/kapel/kapel.py b/kapel/kapel.py index a38cb07..bc56dc9 100644 --- a/kapel/kapel.py +++ b/kapel/kapel.py @@ -201,3 +201,22 @@ def processPeriod(config, iYear, iMonth, iInstant, iRange): print(f'Processed {len(endtime)} records in {end - start} s, writing output to {config.output_path}/{outFile}:') print('--------------------------------\n' + outputMessage + '--------------------------------') +def main(envFile): + # TODO: need error handling if env file doesn't exist. See https://github.com/theskumar/python-dotenv/issues/297 + print('Starting kapel processor: ' + __file__) + cfg = kapelConfig(envFile) + + periods = getTimePeriods(cfg.publishing_mode, startTime=cfg.query_start, endTime=cfg.query_end) + print('time periods:') + print(periods) + + for i in periods: + r = str(i['queryRangeSeconds']) + 's' + processPeriod(config=cfg, iYear=i['year'], iMonth=i['month'], iInstant=i['queryInstant'], iRange=r) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Extract Kubernetes job accounting data from Prometheus and prepare it for APEL publishing.") + # This should be the only CLI argument, since all other config should be specified via env. + parser.add_argument("-e", "--env-file", default=None, help="name of file containing environment variables for configuration") + args = parser.parse_args() + main(args.env_file) From 0fde9c6e912a41286515c4595b338fd3e11f2cf8 Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 18:47:01 -0700 Subject: [PATCH 12/21] bump --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 8eb6e9c..2507907 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='kapel', - version='0.2', + version='0.3', packages = ['kapel'], install_requires=[ 'environs', # for handling configuration From 2e9dc9cab6f9c96d44e5b3ad9291ef72c9b58f1c Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 18:54:56 -0700 Subject: [PATCH 13/21] try find --- setup.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 2507907..c4c49f3 100644 --- a/setup.py +++ b/setup.py @@ -5,8 +5,9 @@ setup( name='kapel', - version='0.3', - packages = ['kapel'], + version='0.4', +# packages=['kapel'], + packages=find_packages() install_requires=[ 'environs', # for handling configuration 'dirq', # for sending messages From b5536f0758231f3099abfa2f47bec905df118e88 Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 18:55:21 -0700 Subject: [PATCH 14/21] fix --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index c4c49f3..4dd1823 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ name='kapel', version='0.4', # packages=['kapel'], - packages=find_packages() + packages=find_packages(), install_requires=[ 'environs', # for handling configuration 'dirq', # for sending messages From fb4bb051f1e3500c389fca9b6828ebb9a42152ff Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 19:05:08 -0700 Subject: [PATCH 15/21] change --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 4dd1823..9488762 100644 --- a/setup.py +++ b/setup.py @@ -6,8 +6,8 @@ setup( name='kapel', version='0.4', -# packages=['kapel'], - packages=find_packages(), + packages=['kapel'], +# packages=find_packages(), install_requires=[ 'environs', # for handling configuration 'dirq', # for sending messages From 60cea678f59fcdd6b703f789e9de4e8eafc8fc30 Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 19:20:44 -0700 Subject: [PATCH 16/21] init --- kapel/__init__.py | 0 setup.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 kapel/__init__.py diff --git a/kapel/__init__.py b/kapel/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/setup.py b/setup.py index 9488762..e4b4039 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='kapel', - version='0.4', + version='0.5', packages=['kapel'], # packages=find_packages(), install_requires=[ From 74e3207104cb9c7683b10280272ed6238f6719a9 Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 19:26:56 -0700 Subject: [PATCH 17/21] test --- kapel/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kapel/__init__.py b/kapel/__init__.py index e69de29..13da48f 100644 --- a/kapel/__init__.py +++ b/kapel/__init__.py @@ -0,0 +1,2 @@ +import kapelConfig +import kapel From 047559c221da608e7c26192e5eeb1dbb64b3a6d3 Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 19:31:43 -0700 Subject: [PATCH 18/21] revert --- kapel/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/kapel/__init__.py b/kapel/__init__.py index 13da48f..e69de29 100644 --- a/kapel/__init__.py +++ b/kapel/__init__.py @@ -1,2 +0,0 @@ -import kapelConfig -import kapel From 763a66b126ea7e7c117e96c0bd14a22c6d4dc765 Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 19:44:32 -0700 Subject: [PATCH 19/21] use module name --- kapel/kapel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kapel/kapel.py b/kapel/kapel.py index bc56dc9..e81882e 100644 --- a/kapel/kapel.py +++ b/kapel/kapel.py @@ -13,7 +13,7 @@ import datetime from dateutil.rrule import rrule, MONTHLY -from kapelConfig import kapelConfig +from kapel import kapelConfig from prometheus_api_client import PrometheusConnect from dirq.QueueSimple import QueueSimple from timeit import default_timer as timer From a468cede74b8f63838777ef79fe5b5e568d6868f Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 19:45:22 -0700 Subject: [PATCH 20/21] rename to init --- kapel/__init__.py | 222 ++++++++++++++++++++++++++++++++++++++++++++++ kapel/kapel.py | 222 ---------------------------------------------- setup.py | 2 +- 3 files changed, 223 insertions(+), 223 deletions(-) delete mode 100644 kapel/kapel.py diff --git a/kapel/__init__.py b/kapel/__init__.py index e69de29..e81882e 100644 --- a/kapel/__init__.py +++ b/kapel/__init__.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 + +# This is a python script for container-native publishing of kubernetes job data for APEL accounting. +# It follows container-native practices such as using environment variables for configuration +# (likely from a ConfigMap), writing log info to stdout, etc. +# This script would likely run as an init container for a pod +# (wherein the main container would run ssmsend) launched as a CronJob. + +# Requires python >= 3.6 for new f-strings + +import argparse +import dateutil.relativedelta +import datetime +from dateutil.rrule import rrule, MONTHLY + +from kapel import kapelConfig +from prometheus_api_client import PrometheusConnect +from dirq.QueueSimple import QueueSimple +from timeit import default_timer as timer + +# for debugging +#import code +#from memory_profiler import profile + +# Contains the PromQL queries +class QueryLogic: + def __init__(self, queryRange): + # Use a query that returns individual job records to get high granularity information, which can be processed into summary records as needed. + + # queryRange determines how far back to query. The query will cover the period from (t - queryRange) to t, + # where 't' is defined in the Prometheus connection parameters. + # See https://prometheus.io/docs/prometheus/latest/querying/basics/#range-vector-selectors + # Format: https://prometheus.io/docs/prometheus/latest/querying/basics/#time-durations + + # Select the 'max_over_time' of each metric in the query range. The metrics we're looking at are constants that will not vary over time, + # but selecting the max returns just one single point (instead of a list of multiple identical points) from each potentially non-aligned metric set. + # On the CPU core side, some pods don't share all the same labels: pods that haven't yet gotten scheduled to a node don't have the node label, + # so we filter out the rows where it's null with '{node != ""}' + + # Note: Prometheus renames some labels from KSM, e.g. from 'pod' to 'exported_pod', the latter of which is the label we're interested in. + # The 'instance' and 'pod' labels in Prometheus actually represent the IP:port and name, respectively, of the KSM pod that Prometheus got the metric from. + # When KSM is redeployed (or if the KSM deployment has > 1 pod), the 'instance' and 'pod' labels may have different values, + # which would cause a label mismatch, so we use without to exclude them. + # Also, use the 'max' aggregation operator (which only takes a scalar) on the result of max_over_time + # (which takes a range and returns a scalar), and as a result get the whole metric set. Finally, use group_left for many-to-one matching. + # https://prometheus.io/docs/prometheus/latest/querying/operators/#aggregation-operators + # https://prometheus.io/docs/prometheus/latest/querying/operators/#many-to-one-and-one-to-many-vector-matches + + # Note some of these queries return duplicate results - without (instance, pod) does not seem to work properly. + # Would be very bad but rearrange() overwrites duplicates + self.cputime = f'(max_over_time(kube_pod_completion_time[{queryRange}]) - max_over_time(kube_pod_start_time[{queryRange}])) * on (exported_pod) group_left() max without (instance, pod) (max_over_time(kube_pod_container_resource_requests_cpu_cores{{node != ""}}[{queryRange}]))' + self.endtime = f'max_over_time(kube_pod_completion_time[{queryRange}])' + self.starttime = f'max_over_time(kube_pod_start_time[{queryRange}])' + self.cores = f'max_over_time(kube_pod_container_resource_requests_cpu_cores{{node != ""}}[{queryRange}])' + +def summaryMessage(config, year, month, wallDuration, cpuDuration, numJobs): + output = ( + f'APEL-summary-job-message: v0.2\n' + f'Site: {config.site_name}\n' + f'Month: {month}\n' + f'Year: {year}\n' + f'VO: {config.vo_name}\n' + f'SubmitHost: {config.submit_host}\n' + f'InfrastructureType: {config.infrastructure_type}\n' + #f'InfrastructureDescription: {config.infrastructure_description}\n' + # si2k = HS06 * 250 + f'ServiceLevelType: si2k\n' + f'ServiceLevel: {config.benchmark_value * 250}\n' + f'WallDuration: {wallDuration}\n' + f'CpuDuration: {cpuDuration}\n' + f'NumberOfJobs: {numJobs}\n' + f'%%\n' + ) + return output + +# return a list, each item of which is a dict representing a time period, in the form of +# an instant (end of the period) and a number of seconds to go back from then to reach the start of the period. +# Auto mode: there will be 2 dicts in the list, one for this month and one for last month. +# Gap mode: there will be a dict for each month in the gap period, and start and end are required. +def getTimePeriods(mode, startTime=None, endTime=None): + if mode == 'auto': + # get current time of script execution, in ISO8601 and UTC, ignoring microseconds. + # This will be the singular reference time with respect to which we determine other times. + runTime = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0) + startOfThisMonth = runTime.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + startOfLastMonth = startOfThisMonth + dateutil.relativedelta.relativedelta(months=-1) + return getGapTimePeriods(start=startOfLastMonth, end=runTime) + elif mode == 'gap': + return getGapTimePeriods(start=startTime, end=endTime) + else: + raise ValueError('Invalid mode') + +def getGapTimePeriods(start, end): + assert isinstance(start, datetime.datetime), "start is not type datetime.datetime" + assert isinstance(end, datetime.datetime), "end is not type datetime.datetime" + assert start < end, "start is not after end" + + # To avoid invalid dates (e.g. Feb 30) use the very beginning of the month to determine intervals + intervalStart = start.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + assert intervalStart <= start + # https://dateutil.readthedocs.io/en/stable/rrule.html + intervals = list(rrule(freq=MONTHLY, dtstart=intervalStart, until=end)) + assert len(intervals) >= 1 + + # Replace the 1st element of the list (which has day artificially set to 1) with the real start + intervals.pop(0) + intervals.insert(0, start) + # make sure end is after the last interval, then add it as the last + assert end > intervals[-1] + intervals.append(end) + assert len(intervals) >= 2 + # finally we have a list of intervals. Each item will be the start of a monthly publishing period, going until the next item. + + print('intervals:') + for i in intervals: + print(i.isoformat()) + + periods = [] + + for i, time in enumerate(intervals): + # process all except the last item, since last one has already been used as the end for the previous item + if i < len(intervals) - 1: + thisMonth = { + 'year': time.year, + 'month': time.month, + 'queryInstant': intervals[i + 1].isoformat(), + 'queryRangeSeconds': int((intervals[i + 1] - time).total_seconds()) + } + periods.append(thisMonth) + + return periods + +# Take a list of dicts from the prom query and construct a random-accessible dict +# (actually a list of tuples, so use dict() on the output) that can be referenced by the 'exported_pod' label as a key. +# Cast from string to float while we're at it. +# NB: this overwrites duplicate results if we get any from the prom query! +def rearrange(x): + for item in x: + yield item['metric']['exported_pod'], float(item['value'][1]) + +# process a time period (do prom query, process data, write output) +# takes a kapelConfig object and one element of output from getTimePeriods +def processPeriod(config, iYear, iMonth, iInstant, iRange): + + print(f'Processing year {iYear}, month {iMonth}, starting at {iInstant} and going back {iRange}.') + queries = QueryLogic(queryRange=iRange) + + # SSL generally not used for Prometheus access within a cluster + # Docs on instant query API: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries + prom = PrometheusConnect(url=config.prometheus_server, disable_ssl=True) + prom_connect_params = {'time': iInstant, 'timeout': config.query_timeout} + + rawResults, results, resultLengths = {}, {}, [] + # iterate over each query (cputime, starttime, endtime, cores) producing rawResults['cputime'] etc. + for queryName, queryString in vars(queries).items(): + # Each of these rawResults is a list of dicts. Each dict in the list represents an individual data point, and contains: + # 'metric': a dict of one or more key-value pairs of labels, one of which is the pod name ('exported_pod'). + # 'value': a list in which the 0th element is the timestamp of the value, and 1th element is the actual value we're interested in. + print(f'Executing {queryName} query: {queryString}') + rawResults[queryName] = prom.custom_query(query=queryString, params=prom_connect_params) + results[queryName] = dict(rearrange(rawResults[queryName])) + resultLengths.append(len(results[queryName])) + print(f'Got {len(rawResults[queryName])} results from query, processed into {len(results[queryName])} items.') + del rawResults[queryName] + + cputime = results['cputime'] + endtime = results['endtime'] + starttime = results['starttime'] + cores = results['cores'] + + # Confirm the assumption that cputime (and endtime) should have the fewest entries, while starttime and cores may have additional ones + # corresponding to jobs that have started but not finished yet. We only want the (completed) jobs for which all values are available. + assert len(endtime) == min(resultLengths), "endtime should be the shortest list" + + summary_cputime, summary_walltime = 0, 0 + + #code.interact(local=locals()) + + start = timer() + for key in endtime: + # double check cputime calc of this job + delta = abs(cputime[key] - (endtime[key] - starttime[key])*cores[key]) + assert delta < 0.001, "cputime calculation is inaccurate" + # make sure walltime is positive + walltime = endtime[key] - starttime[key] + assert walltime > 0, "job end time is before start time" + # sum up cputime and walltime over all jobs + summary_cputime += cputime[key] + summary_walltime += walltime + + summary_cputime = round(summary_cputime) + summary_walltime = round(summary_walltime) + + print(f'total cputime: {summary_cputime}, total walltime: {summary_walltime}') + # Write output to the message queue on local filesystem + # https://dirq.readthedocs.io/en/latest/queuesimple.html#directory-structure + dirq = QueueSimple(str(config.output_path)) + outputMessage = summaryMessage(config, year=iYear, month=iMonth, wallDuration=summary_walltime, cpuDuration=summary_cputime, numJobs=len(endtime)) + outFile = dirq.add(outputMessage) + end = timer() + print(f'Processed {len(endtime)} records in {end - start} s, writing output to {config.output_path}/{outFile}:') + print('--------------------------------\n' + outputMessage + '--------------------------------') + +def main(envFile): + # TODO: need error handling if env file doesn't exist. See https://github.com/theskumar/python-dotenv/issues/297 + print('Starting kapel processor: ' + __file__) + cfg = kapelConfig(envFile) + + periods = getTimePeriods(cfg.publishing_mode, startTime=cfg.query_start, endTime=cfg.query_end) + print('time periods:') + print(periods) + + for i in periods: + r = str(i['queryRangeSeconds']) + 's' + processPeriod(config=cfg, iYear=i['year'], iMonth=i['month'], iInstant=i['queryInstant'], iRange=r) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Extract Kubernetes job accounting data from Prometheus and prepare it for APEL publishing.") + # This should be the only CLI argument, since all other config should be specified via env. + parser.add_argument("-e", "--env-file", default=None, help="name of file containing environment variables for configuration") + args = parser.parse_args() + main(args.env_file) diff --git a/kapel/kapel.py b/kapel/kapel.py deleted file mode 100644 index e81882e..0000000 --- a/kapel/kapel.py +++ /dev/null @@ -1,222 +0,0 @@ -#!/usr/bin/env python3 - -# This is a python script for container-native publishing of kubernetes job data for APEL accounting. -# It follows container-native practices such as using environment variables for configuration -# (likely from a ConfigMap), writing log info to stdout, etc. -# This script would likely run as an init container for a pod -# (wherein the main container would run ssmsend) launched as a CronJob. - -# Requires python >= 3.6 for new f-strings - -import argparse -import dateutil.relativedelta -import datetime -from dateutil.rrule import rrule, MONTHLY - -from kapel import kapelConfig -from prometheus_api_client import PrometheusConnect -from dirq.QueueSimple import QueueSimple -from timeit import default_timer as timer - -# for debugging -#import code -#from memory_profiler import profile - -# Contains the PromQL queries -class QueryLogic: - def __init__(self, queryRange): - # Use a query that returns individual job records to get high granularity information, which can be processed into summary records as needed. - - # queryRange determines how far back to query. The query will cover the period from (t - queryRange) to t, - # where 't' is defined in the Prometheus connection parameters. - # See https://prometheus.io/docs/prometheus/latest/querying/basics/#range-vector-selectors - # Format: https://prometheus.io/docs/prometheus/latest/querying/basics/#time-durations - - # Select the 'max_over_time' of each metric in the query range. The metrics we're looking at are constants that will not vary over time, - # but selecting the max returns just one single point (instead of a list of multiple identical points) from each potentially non-aligned metric set. - # On the CPU core side, some pods don't share all the same labels: pods that haven't yet gotten scheduled to a node don't have the node label, - # so we filter out the rows where it's null with '{node != ""}' - - # Note: Prometheus renames some labels from KSM, e.g. from 'pod' to 'exported_pod', the latter of which is the label we're interested in. - # The 'instance' and 'pod' labels in Prometheus actually represent the IP:port and name, respectively, of the KSM pod that Prometheus got the metric from. - # When KSM is redeployed (or if the KSM deployment has > 1 pod), the 'instance' and 'pod' labels may have different values, - # which would cause a label mismatch, so we use without to exclude them. - # Also, use the 'max' aggregation operator (which only takes a scalar) on the result of max_over_time - # (which takes a range and returns a scalar), and as a result get the whole metric set. Finally, use group_left for many-to-one matching. - # https://prometheus.io/docs/prometheus/latest/querying/operators/#aggregation-operators - # https://prometheus.io/docs/prometheus/latest/querying/operators/#many-to-one-and-one-to-many-vector-matches - - # Note some of these queries return duplicate results - without (instance, pod) does not seem to work properly. - # Would be very bad but rearrange() overwrites duplicates - self.cputime = f'(max_over_time(kube_pod_completion_time[{queryRange}]) - max_over_time(kube_pod_start_time[{queryRange}])) * on (exported_pod) group_left() max without (instance, pod) (max_over_time(kube_pod_container_resource_requests_cpu_cores{{node != ""}}[{queryRange}]))' - self.endtime = f'max_over_time(kube_pod_completion_time[{queryRange}])' - self.starttime = f'max_over_time(kube_pod_start_time[{queryRange}])' - self.cores = f'max_over_time(kube_pod_container_resource_requests_cpu_cores{{node != ""}}[{queryRange}])' - -def summaryMessage(config, year, month, wallDuration, cpuDuration, numJobs): - output = ( - f'APEL-summary-job-message: v0.2\n' - f'Site: {config.site_name}\n' - f'Month: {month}\n' - f'Year: {year}\n' - f'VO: {config.vo_name}\n' - f'SubmitHost: {config.submit_host}\n' - f'InfrastructureType: {config.infrastructure_type}\n' - #f'InfrastructureDescription: {config.infrastructure_description}\n' - # si2k = HS06 * 250 - f'ServiceLevelType: si2k\n' - f'ServiceLevel: {config.benchmark_value * 250}\n' - f'WallDuration: {wallDuration}\n' - f'CpuDuration: {cpuDuration}\n' - f'NumberOfJobs: {numJobs}\n' - f'%%\n' - ) - return output - -# return a list, each item of which is a dict representing a time period, in the form of -# an instant (end of the period) and a number of seconds to go back from then to reach the start of the period. -# Auto mode: there will be 2 dicts in the list, one for this month and one for last month. -# Gap mode: there will be a dict for each month in the gap period, and start and end are required. -def getTimePeriods(mode, startTime=None, endTime=None): - if mode == 'auto': - # get current time of script execution, in ISO8601 and UTC, ignoring microseconds. - # This will be the singular reference time with respect to which we determine other times. - runTime = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0) - startOfThisMonth = runTime.replace(day=1, hour=0, minute=0, second=0, microsecond=0) - startOfLastMonth = startOfThisMonth + dateutil.relativedelta.relativedelta(months=-1) - return getGapTimePeriods(start=startOfLastMonth, end=runTime) - elif mode == 'gap': - return getGapTimePeriods(start=startTime, end=endTime) - else: - raise ValueError('Invalid mode') - -def getGapTimePeriods(start, end): - assert isinstance(start, datetime.datetime), "start is not type datetime.datetime" - assert isinstance(end, datetime.datetime), "end is not type datetime.datetime" - assert start < end, "start is not after end" - - # To avoid invalid dates (e.g. Feb 30) use the very beginning of the month to determine intervals - intervalStart = start.replace(day=1, hour=0, minute=0, second=0, microsecond=0) - assert intervalStart <= start - # https://dateutil.readthedocs.io/en/stable/rrule.html - intervals = list(rrule(freq=MONTHLY, dtstart=intervalStart, until=end)) - assert len(intervals) >= 1 - - # Replace the 1st element of the list (which has day artificially set to 1) with the real start - intervals.pop(0) - intervals.insert(0, start) - # make sure end is after the last interval, then add it as the last - assert end > intervals[-1] - intervals.append(end) - assert len(intervals) >= 2 - # finally we have a list of intervals. Each item will be the start of a monthly publishing period, going until the next item. - - print('intervals:') - for i in intervals: - print(i.isoformat()) - - periods = [] - - for i, time in enumerate(intervals): - # process all except the last item, since last one has already been used as the end for the previous item - if i < len(intervals) - 1: - thisMonth = { - 'year': time.year, - 'month': time.month, - 'queryInstant': intervals[i + 1].isoformat(), - 'queryRangeSeconds': int((intervals[i + 1] - time).total_seconds()) - } - periods.append(thisMonth) - - return periods - -# Take a list of dicts from the prom query and construct a random-accessible dict -# (actually a list of tuples, so use dict() on the output) that can be referenced by the 'exported_pod' label as a key. -# Cast from string to float while we're at it. -# NB: this overwrites duplicate results if we get any from the prom query! -def rearrange(x): - for item in x: - yield item['metric']['exported_pod'], float(item['value'][1]) - -# process a time period (do prom query, process data, write output) -# takes a kapelConfig object and one element of output from getTimePeriods -def processPeriod(config, iYear, iMonth, iInstant, iRange): - - print(f'Processing year {iYear}, month {iMonth}, starting at {iInstant} and going back {iRange}.') - queries = QueryLogic(queryRange=iRange) - - # SSL generally not used for Prometheus access within a cluster - # Docs on instant query API: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries - prom = PrometheusConnect(url=config.prometheus_server, disable_ssl=True) - prom_connect_params = {'time': iInstant, 'timeout': config.query_timeout} - - rawResults, results, resultLengths = {}, {}, [] - # iterate over each query (cputime, starttime, endtime, cores) producing rawResults['cputime'] etc. - for queryName, queryString in vars(queries).items(): - # Each of these rawResults is a list of dicts. Each dict in the list represents an individual data point, and contains: - # 'metric': a dict of one or more key-value pairs of labels, one of which is the pod name ('exported_pod'). - # 'value': a list in which the 0th element is the timestamp of the value, and 1th element is the actual value we're interested in. - print(f'Executing {queryName} query: {queryString}') - rawResults[queryName] = prom.custom_query(query=queryString, params=prom_connect_params) - results[queryName] = dict(rearrange(rawResults[queryName])) - resultLengths.append(len(results[queryName])) - print(f'Got {len(rawResults[queryName])} results from query, processed into {len(results[queryName])} items.') - del rawResults[queryName] - - cputime = results['cputime'] - endtime = results['endtime'] - starttime = results['starttime'] - cores = results['cores'] - - # Confirm the assumption that cputime (and endtime) should have the fewest entries, while starttime and cores may have additional ones - # corresponding to jobs that have started but not finished yet. We only want the (completed) jobs for which all values are available. - assert len(endtime) == min(resultLengths), "endtime should be the shortest list" - - summary_cputime, summary_walltime = 0, 0 - - #code.interact(local=locals()) - - start = timer() - for key in endtime: - # double check cputime calc of this job - delta = abs(cputime[key] - (endtime[key] - starttime[key])*cores[key]) - assert delta < 0.001, "cputime calculation is inaccurate" - # make sure walltime is positive - walltime = endtime[key] - starttime[key] - assert walltime > 0, "job end time is before start time" - # sum up cputime and walltime over all jobs - summary_cputime += cputime[key] - summary_walltime += walltime - - summary_cputime = round(summary_cputime) - summary_walltime = round(summary_walltime) - - print(f'total cputime: {summary_cputime}, total walltime: {summary_walltime}') - # Write output to the message queue on local filesystem - # https://dirq.readthedocs.io/en/latest/queuesimple.html#directory-structure - dirq = QueueSimple(str(config.output_path)) - outputMessage = summaryMessage(config, year=iYear, month=iMonth, wallDuration=summary_walltime, cpuDuration=summary_cputime, numJobs=len(endtime)) - outFile = dirq.add(outputMessage) - end = timer() - print(f'Processed {len(endtime)} records in {end - start} s, writing output to {config.output_path}/{outFile}:') - print('--------------------------------\n' + outputMessage + '--------------------------------') - -def main(envFile): - # TODO: need error handling if env file doesn't exist. See https://github.com/theskumar/python-dotenv/issues/297 - print('Starting kapel processor: ' + __file__) - cfg = kapelConfig(envFile) - - periods = getTimePeriods(cfg.publishing_mode, startTime=cfg.query_start, endTime=cfg.query_end) - print('time periods:') - print(periods) - - for i in periods: - r = str(i['queryRangeSeconds']) + 's' - processPeriod(config=cfg, iYear=i['year'], iMonth=i['month'], iInstant=i['queryInstant'], iRange=r) - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Extract Kubernetes job accounting data from Prometheus and prepare it for APEL publishing.") - # This should be the only CLI argument, since all other config should be specified via env. - parser.add_argument("-e", "--env-file", default=None, help="name of file containing environment variables for configuration") - args = parser.parse_args() - main(args.env_file) diff --git a/setup.py b/setup.py index e4b4039..1907a25 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='kapel', - version='0.5', + version='0.6', packages=['kapel'], # packages=find_packages(), install_requires=[ From 14173e7ecd138d0dcc1e4829f0c8f81d27169fb3 Mon Sep 17 00:00:00 2001 From: Ryan Taylor <1686627+rptaylor@users.noreply.github.com> Date: Wed, 12 May 2021 19:49:07 -0700 Subject: [PATCH 21/21] use class of module --- kapel/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kapel/__init__.py b/kapel/__init__.py index e81882e..15e95bd 100644 --- a/kapel/__init__.py +++ b/kapel/__init__.py @@ -204,7 +204,7 @@ def processPeriod(config, iYear, iMonth, iInstant, iRange): def main(envFile): # TODO: need error handling if env file doesn't exist. See https://github.com/theskumar/python-dotenv/issues/297 print('Starting kapel processor: ' + __file__) - cfg = kapelConfig(envFile) + cfg = kapel.kapelConfig.kapelConfig(envFile) periods = getTimePeriods(cfg.publishing_mode, startTime=cfg.query_start, endTime=cfg.query_end) print('time periods:')