Skip to content

Commit c638afc

Browse files
himdelclaude
andauthored
AAP-62673: library: collect to dataframe (#327)
* library/collector: make copy_table return DataFrame drop the output_dir logic from copy_table, no longer returns file list, but a pandas DataFrame * add dataframe_to_csv_files helper so that a collector can still be made to save to csv and go through the csvfilesplitter * load_anonymized_rollup_data: expect dataframe list instead of file list but also accept one dataframe, and a mixed list of filenames and dataframes this is probably going away completely once we move the logic to tasks * copy_table: pd.read_sql needs sqlalchemy, do the same manually with the psycopg connection instead * EE: handle both t/f bools and True/False bools * address cursor review - do not create empty csv files have dataframe_to_csv_file_list only create files for non-empty dataframes or dataframes with columns * tests: update collector tests to mock copy_table returning DataFrames Mock copy_table to return pd.DataFrame instead of file paths, matching the new interface. Remove pointless assertions that only verified the mock return value length. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * test_gather_jobs..: handle different data formats between expected csv and returned dataframe the postgres to dataframe conversion outputs timezones differently (+00 vs +00:00), bools (t vs True), and nullable-int fields (as float) DataFrame.convert_dtypes() in copy_table might be a solution, at about 40% perf cost, or knowing the types IMO the collector is the place that knows the formats and might handle that best by itself for now, just updating the test to expect the data it gets --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 456eb0f commit c638afc

26 files changed

Lines changed: 496 additions & 448 deletions

metrics_utility/anonymized_rollups/anonymized_rollups.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -479,18 +479,23 @@ def compute_anonymized_rollup_from_raw_data(input_data, salt):
479479
return anonymized_rollup
480480

481481

482-
# loads data from tarballs located in base_path/data/year/month/day/*{collector_name}*.tar.gz
483-
# inside tarball is file named {collector_name}.csv
484-
# this goes to dataframe, then filter_function is applied to the dataframe
482+
# loads data from a list of dataframes
483+
# then filter_function is applied to the dataframe
485484
# all result dataframes are concatenated into one dataframe
486-
def load_anonymized_rollup_data(rollup_object: BaseAnonymizedRollup, file_list: []):
487-
# file_list - list of csv files that needs to be read
485+
def load_anonymized_rollup_data(rollup_object: BaseAnonymizedRollup, dataframe_list):
486+
# compat for one dataframe
487+
if isinstance(dataframe_list, pd.DataFrame):
488+
prepared_data = rollup_object.prepare(dataframe_list)
489+
return rollup_object.merge(None, prepared_data)
488490

489491
concat_data = None
490492

491-
for file in file_list:
492-
df = pd.read_csv(file, encoding='utf-8')
493-
prepared_data = rollup_object.prepare(df)
493+
for dataframe in dataframe_list:
494+
# compat for CSVs
495+
if isinstance(dataframe, str):
496+
dataframe = pd.read_csv(dataframe, encoding='utf-8')
497+
498+
prepared_data = rollup_object.prepare(dataframe)
494499
concat_data = rollup_object.merge(concat_data, prepared_data)
495500

496501
return concat_data

metrics_utility/anonymized_rollups/execution_environments_anonymized_rollup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def base(self, dataframe):
2626
}
2727

2828
execution_environments_total = int(len(dataframe))
29-
dataframe['managed'] = dataframe['managed'].map({'t': True, 'f': False})
29+
dataframe['managed'] = dataframe['managed'].map({'t': True, 'f': False, True: True, False: False})
3030
execution_environments_default_total = int(dataframe['managed'].sum())
3131
execution_environments_custom_total = execution_environments_total - execution_environments_default_total
3232

metrics_utility/library/README.md

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ It provides an abstraction over collectors, packaging and storage, extraction, r
1111

1212
Collector is python function which accepts params, gathers data, and returns it in one of the supported formats.
1313

14-
It either returns a python dict, which gets serialized into JSON,
15-
or a list of filenames of temporary files it created.
14+
It either returns a python dict (for snapshot collectors like config),
15+
or a pandas DataFrame (for SQL-based collectors).
1616

1717
It's exported decorated to wrap calls into BaseCollector subclass instances, so that param passing can happen separately from .gather().
1818
The wrapper ensures that any calls to `my_collector(db=connection).gather()` do the same thing as an undecorated `my_collector(db=connection)` - this is so that initialization can happen before db locks are acquired.
@@ -28,18 +28,30 @@ Currently supported:
2828

2929
Controller collectors (in `metrics_utility.library.collectors.controller`):
3030
* `config(db, billing_provider_params).gather() -> Dict`
31-
* `execution_environments(db, [output_dir]).gather() -> [filenames]`
32-
* `job_host_summary(db, since, until, [output_dir]).gather() -> [filenames]`
33-
* `job_host_summary_service(db, since, until, [output_dir]).gather() -> [filenames]`
34-
* `main_host(db, [output_dir]).gather() -> [filenames]`
35-
* `main_indirectmanagednodeaudit(db, since, until, [output_dir]).gather() -> [filenames]`
36-
* `main_jobevent(db, since, until, [output_dir]).gather() -> [filenames]`
37-
* `main_jobevent_service(db, since, until, [output_dir]).gather() -> [filenames]`
38-
* `unified_jobs(db, since, until, [output_dir]).gather() -> [filenames]`
31+
* `execution_environments(db).gather() -> DataFrame`
32+
* `job_host_summary(db, since, until).gather() -> DataFrame`
33+
* `job_host_summary_service(db, since, until).gather() -> DataFrame`
34+
* `main_host(db).gather() -> DataFrame`
35+
* `main_host_daily(db, since, until).gather() -> DataFrame`
36+
* `main_indirectmanagednodeaudit(db, since, until).gather() -> DataFrame`
37+
* `main_jobevent(db, since, until).gather() -> DataFrame`
38+
* `main_jobevent_service(db, since, until).gather() -> DataFrame`
39+
* `unified_jobs(db, since, until).gather() -> DataFrame`
3940

4041
Other collectors (in `metrics_utility.library.collectors.others`):
4142
* `total_workers_vcpu(cluster_name, metering_enabled, prometheus_url, ca_cert_path, token) -> Dict`
4243

44+
For CLI usage or when CSV files are needed, use the `dataframe_to_csv_files()` helper from `metrics_utility.library.csv_utils`:
45+
46+
```python
47+
from metrics_utility.library.csv_utils import dataframe_to_csv_files
48+
49+
df = execution_environments(db=db).gather()
50+
csv_files = dataframe_to_csv_files(df, 'main_executionenvironment', '/tmp/output')
51+
# Returns: ['/tmp/output/main_executionenvironment_table.csv']
52+
# or ['.._split0.csv', '.._split1.csv', ...] for large datasets
53+
```
54+
4355

4456
#### Package
4557

metrics_utility/library/collectors/controller/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,10 @@ def _get_install_type():
9696
return 'traditional'
9797

9898

99+
# FIXME: psycopg.sql
99100
def _get_controller_settings(db, keys):
100101
settings = {}
101102
with db.cursor() as cursor:
102-
# FIXME: psycopg.sql ?
103103
in_sql = "'" + "', '".join(keys) + "'"
104104
cursor.execute(f'SELECT key, value FROM conf_setting WHERE key IN ({in_sql})')
105105
for key, value in cursor.fetchall():

metrics_utility/library/collectors/controller/credentials_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
@collector
5-
def credentials_service(*, db=None, since=None, until=None, output_dir=None):
5+
def credentials_service(*, db=None, since=None, until=None):
66
query = f"""
77
SELECT
88
main_credentialtype.name as credential_type,
@@ -19,4 +19,4 @@ def credentials_service(*, db=None, since=None, until=None, output_dir=None):
1919
ORDER BY main_unifiedjob.id ASC, main_credentialtype.name ASC
2020
"""
2121

22-
return copy_table(db=db, table='credentials', query=query, output_dir=output_dir)
22+
return copy_table(db=db, query=query)

metrics_utility/library/collectors/controller/execution_environments.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
@collector
5-
def execution_environments(*, db=None, output_dir=None):
5+
def execution_environments(*, db=None):
66
query = """
77
SELECT
88
id,
@@ -20,4 +20,4 @@ def execution_environments(*, db=None, output_dir=None):
2020
FROM main_executionenvironment
2121
"""
2222

23-
return copy_table(db=db, table='main_executionenvironment', query=query, output_dir=output_dir)
23+
return copy_table(db=db, query=query)

metrics_utility/library/collectors/controller/job_host_summary.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
@collector
5-
def job_host_summary(*, db=None, since=None, until=None, output_dir=None):
5+
def job_host_summary(*, db=None, since=None, until=None):
66
where = ' AND '.join(
77
[
88
f"main_jobhostsummary.modified >= '{since.isoformat()}'",
@@ -78,4 +78,4 @@ def job_host_summary(*, db=None, since=None, until=None, output_dir=None):
7878
ORDER BY main_jobhostsummary.modified ASC
7979
"""
8080

81-
return copy_table(db=db, table='main_jobhostsummary', query=query, prepend_query=True, output_dir=output_dir)
81+
return copy_table(db=db, query=query, prepend_query=True)

metrics_utility/library/collectors/controller/job_host_summary_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
@collector
5-
def job_host_summary_service(*, db=None, since=None, until=None, output_dir=None):
5+
def job_host_summary_service(*, db=None, since=None, until=None):
66
where = ' AND '.join(
77
[
88
f"mu.finished >= '{since.isoformat()}'",
@@ -85,4 +85,4 @@ def job_host_summary_service(*, db=None, since=None, until=None, output_dir=None
8585
ORDER BY mu.finished ASC
8686
"""
8787

88-
return copy_table(db=db, table='main_jobhostsummary', query=query, prepend_query=True, output_dir=output_dir)
88+
return copy_table(db=db, query=query, prepend_query=True)

metrics_utility/library/collectors/controller/main_host.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,13 @@ def _main_host_query(where):
8787

8888

8989
@collector
90-
def main_host(*, db=None, output_dir=None):
90+
def main_host(*, db=None):
9191
query = _main_host_query("enabled='t'")
92-
return copy_table(db=db, table='main_host', query=query, prepend_query=True, output_dir=output_dir)
92+
return copy_table(db=db, query=query, prepend_query=True)
9393

9494

9595
@collector
96-
def main_host_daily(*, db=None, since=None, until=None, output_dir=None):
96+
def main_host_daily(*, db=None, since=None, until=None):
9797
# prefer running with until=False, to not skip hosts that keep being modified
9898

9999
where = f"""
@@ -102,4 +102,4 @@ def main_host_daily(*, db=None, since=None, until=None, output_dir=None):
102102
OR {date_where('main_host.modified', since, until)})
103103
"""
104104
query = _main_host_query(where)
105-
return copy_table(db=db, table='main_host_daily', query=query, prepend_query=True, output_dir=output_dir)
105+
return copy_table(db=db, query=query, prepend_query=True)

metrics_utility/library/collectors/controller/main_indirectmanagednodeaudit.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
@collector
5-
def main_indirectmanagednodeaudit(*, db=None, since=None, until=None, output_dir=None):
5+
def main_indirectmanagednodeaudit(*, db=None, since=None, until=None):
66
where = ' AND '.join(
77
[
88
f"main_indirectmanagednodeaudit.created >= '{since.isoformat()}'",
@@ -40,4 +40,4 @@ def main_indirectmanagednodeaudit(*, db=None, since=None, until=None, output_dir
4040
ORDER BY main_indirectmanagednodeaudit.created ASC
4141
"""
4242

43-
return copy_table(db=db, table='main_indirectmanagednodeaudit', query=query, output_dir=output_dir)
43+
return copy_table(db=db, query=query)

0 commit comments

Comments
 (0)