From 48e43cb7952217d1c72440d0cb9f258a0b175add Mon Sep 17 00:00:00 2001 From: Quan Pham Date: Wed, 1 Apr 2026 13:46:40 -0400 Subject: [PATCH 1/2] Track initialized and used columns in each processor During 2026-03 invoicing, a bug was found where the columns initialized by the New-PI credit processor (i.e `PI Balance` column), was being accessed by the PI-SU processor before it was initialized, causing an KeyError. To fix this, the codebase has been refactored to allow each processor to explicitly document which columns they initialize and use, defined in two new properties, `initializes_columns` and `operates_on_columns`. A helper function `_init_columns()` is added to initalize columns Unit test `tests/unit/processors/test_processor_list.py` is added to check each processor only uses columns that itself or previous processors initialized, and no column is initialized more than once Additionally, each column will now be encapsulated as a `InvoiceColumn` instance. `InvoiceColumn` contains the name, datatype, and default values for each column This will also enable stricter and clearer type enforcement for data entering and leaving the pipeline A new processor `ValidateInputColumnsProcessor` is added to check the input dataframe to the processing pipeline has prerequisite columns, and to cast to appropriate types The e2e test data has been updated to surface the bug that was found. It did not failed during the PR that introduced the bug [1] because the test data didn't have the right conditions to trigger the PI-SU processor Refactored unit tests to accomodate the new processor by adding a new base test class. [1] https://github.com/CCI-MOC/invoicing/pull/279 --- process_report/invoices/invoice.py | 100 ++++++++ process_report/loader.py | 7 + process_report/process_report.py | 46 ++-- .../processors/add_institution_processor.py | 6 + .../processors/bu_subsidy_processor.py | 14 +- .../processors/coldfront_fetch_processor.py | 11 +- .../processors/discount_processor.py | 2 +- process_report/processors/lenovo_processor.py | 7 + .../processors/new_pi_credit_processor.py | 22 +- .../processors/pi_su_credit_processor.py | 13 + .../processors/prepayment_processor.py | 29 ++- .../validate_billable_pi_processor.py | 10 + .../validate_cluster_name_processor.py | 2 + .../validate_input_column_processor.py | 37 +++ .../processors/validate_pi_alias_processor.py | 2 + process_report/tests/base.py | 52 +++- .../test_NERC OpenShift 2025-04.csv | 2 +- .../tests/e2e/test_data/test_pi.yaml | 4 +- .../e2e/test_data/test_prepay_debits.csv | 4 +- .../processors/test_bu_subsidy_processor.py | 9 +- .../test_coldfront_fetch_processor.py | 9 +- .../processors/test_discount_processor.py | 9 +- .../test_new_pi_credit_processor.py | 227 +++++++----------- .../processors/test_prepayment_processor.py | 2 +- .../unit/processors/test_processor_list.py | 26 ++ .../test_validate_billable_pi_processor.py | 2 +- .../test_validate_input_columns_processor.py | 53 ++++ process_report/tests/unit/test_util.py | 6 +- 28 files changed, 504 insertions(+), 209 deletions(-) create mode 100644 process_report/processors/validate_input_column_processor.py create mode 100644 process_report/tests/unit/processors/test_processor_list.py create mode 100644 process_report/tests/unit/processors/test_validate_input_columns_processor.py diff --git a/process_report/invoices/invoice.py b/process_report/invoices/invoice.py index 29ca5a3c..12fbb8b9 100644 --- a/process_report/invoices/invoice.py +++ b/process_report/invoices/invoice.py @@ -1,9 +1,32 @@ from dataclasses import dataclass +from typing import Any, Callable +from decimal import Decimal import pandas +import pyarrow +import logging import process_report.util as util +logger = logging.getLogger(__name__) + + +@dataclass +class InvoiceColumn: + name: str + dtype: Any + default_value: Any | None = None + default_initializer: Callable[[pandas.DataFrame], pandas.Series] | None = None + + +# Field type definitions +BALANCE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 2)) +RATE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 13)) +INTEGER_FIELD_TYPE = pandas.ArrowDtype(pyarrow.int64()) +STRING_FIELD_TYPE = pandas.StringDtype() +BOOL_FIELD_TYPE = pandas.BooleanDtype() + + ### PI file field names PI_PI_FIELD = "PI" PI_FIRST_MONTH = "First Invoice Month" @@ -65,11 +88,69 @@ IS_COURSE_FIELD = "Is Course" ### +### Initialized Column objects +INVOICE_DATE_COLUMN = InvoiceColumn(name=INVOICE_DATE_FIELD, dtype=STRING_FIELD_TYPE) +PROJECT_COLUMN = InvoiceColumn(name=PROJECT_FIELD, dtype=STRING_FIELD_TYPE) +PROJECT_ID_COLUMN = InvoiceColumn(name=PROJECT_ID_FIELD, dtype=STRING_FIELD_TYPE) +PI_COLUMN = InvoiceColumn(name=PI_FIELD, dtype=STRING_FIELD_TYPE) +INVOICE_EMAIL_COLUMN = InvoiceColumn(name=INVOICE_EMAIL_FIELD, dtype=STRING_FIELD_TYPE) +INVOICE_ADDRESS_COLUMN = InvoiceColumn( + name=INVOICE_ADDRESS_FIELD, dtype=STRING_FIELD_TYPE +) +INSTITUTION_COLUMN = InvoiceColumn(name=INSTITUTION_FIELD, dtype=STRING_FIELD_TYPE) +INSTITUTION_ID_COLUMN = InvoiceColumn( + name=INSTITUTION_ID_FIELD, dtype=STRING_FIELD_TYPE +) +GROUP_NAME_COLUMN = InvoiceColumn(name=GROUP_NAME_FIELD, dtype=STRING_FIELD_TYPE) +GROUP_INSTITUTION_COLUMN = InvoiceColumn( + name=GROUP_INSTITUTION_FIELD, dtype=STRING_FIELD_TYPE +) +GROUP_BALANCE_COLUMN = InvoiceColumn(name=GROUP_BALANCE_FIELD, dtype=BALANCE_FIELD_TYPE) +GROUP_BALANCE_USED_COLUMN = InvoiceColumn( + name=GROUP_BALANCE_USED_FIELD, dtype=BALANCE_FIELD_TYPE +) +SU_HOURS_COLUMN = InvoiceColumn(name=SU_HOURS_FIELD, dtype=INTEGER_FIELD_TYPE) +SU_TYPE_COLUMN = InvoiceColumn(name=SU_TYPE_FIELD, dtype=STRING_FIELD_TYPE) +SU_CHARGE_COLUMN = InvoiceColumn(name=SU_CHARGE_FIELD, dtype=BALANCE_FIELD_TYPE) +LENOVO_CHARGE_COLUMN = InvoiceColumn(name=LENOVO_CHARGE_FIELD, dtype=BALANCE_FIELD_TYPE) +RATE_COLUMN = InvoiceColumn( + name=RATE_FIELD, dtype=RATE_FIELD_TYPE +) # Using decimal to suppress scientific notation in export +COST_COLUMN = InvoiceColumn(name=COST_FIELD, dtype=BALANCE_FIELD_TYPE) +CREDIT_COLUMN = InvoiceColumn(name=CREDIT_FIELD, dtype=BALANCE_FIELD_TYPE) +CREDIT_CODE_COLUMN = InvoiceColumn(name=CREDIT_CODE_FIELD, dtype=STRING_FIELD_TYPE) +SUBSIDY_COLUMN = InvoiceColumn( + name=SUBSIDY_FIELD, dtype=BALANCE_FIELD_TYPE, default_value=Decimal(0) +) +BALANCE_COLUMN = InvoiceColumn( + name=BALANCE_FIELD, + dtype=BALANCE_FIELD_TYPE, + default_initializer=lambda df: df[COST_FIELD], +) +PI_BALANCE_COLUMN = InvoiceColumn( + name=PI_BALANCE_FIELD, + dtype=BALANCE_FIELD_TYPE, + default_initializer=lambda df: df[COST_FIELD], +) + +# Internally used fields +IS_BILLABLE_COLUMN = InvoiceColumn(name=IS_BILLABLE_FIELD, dtype=BOOL_FIELD_TYPE) +MISSING_PI_COLUMN = InvoiceColumn(name=MISSING_PI_FIELD, dtype=BOOL_FIELD_TYPE) +PROJECT_NAME_COLUMN = InvoiceColumn(name=PROJECT_NAME_FIELD, dtype=STRING_FIELD_TYPE) +GROUP_MANAGED_COLUMN = InvoiceColumn(name=GROUP_MANAGED_FIELD, dtype=BOOL_FIELD_TYPE) +CLUSTER_NAME_COLUMN = InvoiceColumn(name=CLUSTER_NAME_FIELD, dtype=STRING_FIELD_TYPE) +IS_COURSE_COLUMN = InvoiceColumn( + name=IS_COURSE_FIELD, dtype=BOOL_FIELD_TYPE, default_value=False +) +### + @dataclass class Invoice: export_columns_list = list() exported_columns_map = dict() + initializes_columns = tuple() + operates_on_columns = tuple() invoice_month: str data: pandas.DataFrame @@ -77,6 +158,7 @@ class Invoice: export_data = None def process(self): + self._init_columns() self._prepare() self._process() self._prepare_export() @@ -93,6 +175,24 @@ def output_s3_key(self) -> str: def output_s3_archive_key(self): return f"Invoices/{self.invoice_month}/Archive/{self.name} {self.invoice_month} {util.get_iso8601_time()}.csv" + def _init_columns(self): + """Initializes columns specified in `initializes_columns` and cast them to appropriate types + + If column already exists, only do casting + If no default value is given, column initialized to None + """ + for field in self.initializes_columns: + if field.name not in self.data.columns: + field_default = field.default_value + if field.default_initializer: + field_default = field.default_initializer(self.data) + self.data[field.name] = field_default + elif self.data.dtypes[field.name] != field.dtype: + logger.warning( + f"Column {field.name} has dtype {self.data.dtypes[field.name]} instead of expected {field.dtype}." + ) + self.data = self.data.astype({field.name: field.dtype}) + def _prepare(self): """Prepares the data for processing. diff --git a/process_report/loader.py b/process_report/loader.py index 0c5f959c..9e3c1ea6 100644 --- a/process_report/loader.py +++ b/process_report/loader.py @@ -112,6 +112,13 @@ def get_alias_map(self) -> dict: def load_dataframe(self, filepath: str) -> pandas.DataFrame: return pandas.read_csv(filepath) + @functools.lru_cache + def load_prepay_credits(self) -> pandas.DataFrame: + prepay_df = self.load_dataframe(invoice_settings.prepay_credits_filepath) + return prepay_df.astype( + {invoice.PREPAY_CREDIT_FIELD: invoice.BALANCE_FIELD_TYPE} + ) + @functools.lru_cache def _load_pi_config(self, filepath: str) -> list[dict]: with open(filepath) as file: diff --git a/process_report/process_report.py b/process_report/process_report.py index b3e62d93..fc263db6 100644 --- a/process_report/process_report.py +++ b/process_report/process_report.py @@ -3,7 +3,6 @@ import os import pandas -import pyarrow from process_report.settings import invoice_settings from process_report.loader import loader @@ -22,6 +21,7 @@ ) from process_report.processors import ( coldfront_fetch_processor, + validate_input_column_processor, validate_pi_alias_processor, add_institution_processor, lenovo_processor, @@ -33,6 +33,20 @@ validate_cluster_name_processor, ) +PROCESSING_ORDER = [ + validate_input_column_processor.ValidateInputColumnsProcessor, + validate_cluster_name_processor.ValidateClusterNameProcessor, + coldfront_fetch_processor.ColdfrontFetchProcessor, + validate_pi_alias_processor.ValidatePIAliasProcessor, + add_institution_processor.AddInstitutionProcessor, + lenovo_processor.LenovoProcessor, + validate_billable_pi_processor.ValidateBillablePIsProcessor, + pi_su_credit_processor.PISUCreditProcessor, + new_pi_credit_processor.NewPICreditProcessor, + bu_subsidy_processor.BUSubsidyProcessor, + prepayment_processor.PrepaymentProcessor, +] + PI_S3_FILEPATH = "PIs/PI.csv" ALIAS_S3_FILEPATH = "PIs/alias.csv" @@ -66,20 +80,7 @@ def main(): ### Preliminary processing processed_data = process_merged_dataframe( - invoice_month, - merged_dataframe, - [ - validate_cluster_name_processor.ValidateClusterNameProcessor, - coldfront_fetch_processor.ColdfrontFetchProcessor, - validate_pi_alias_processor.ValidatePIAliasProcessor, - add_institution_processor.AddInstitutionProcessor, - lenovo_processor.LenovoProcessor, - validate_billable_pi_processor.ValidateBillablePIsProcessor, - pi_su_credit_processor.PISUCreditProcessor, - new_pi_credit_processor.NewPICreditProcessor, - bu_subsidy_processor.BUSubsidyProcessor, - prepayment_processor.PrepaymentProcessor, - ], + invoice_month, merged_dataframe, PROCESSING_ORDER ) ### Export invoices @@ -109,8 +110,19 @@ def merge_csv(files): file, engine="pyarrow", dtype={ - invoice.COST_FIELD: pandas.ArrowDtype(pyarrow.decimal128(21, 2)), - invoice.RATE_FIELD: str, + invoice.INVOICE_DATE_FIELD: invoice.STRING_FIELD_TYPE, + invoice.PROJECT_FIELD: invoice.STRING_FIELD_TYPE, + invoice.PROJECT_ID_FIELD: invoice.STRING_FIELD_TYPE, + invoice.PI_FIELD: invoice.STRING_FIELD_TYPE, + invoice.CLUSTER_NAME_FIELD: invoice.STRING_FIELD_TYPE, + invoice.INVOICE_EMAIL_FIELD: invoice.STRING_FIELD_TYPE, + invoice.INVOICE_ADDRESS_FIELD: invoice.STRING_FIELD_TYPE, + invoice.INSTITUTION_FIELD: invoice.STRING_FIELD_TYPE, + invoice.INSTITUTION_ID_FIELD: invoice.STRING_FIELD_TYPE, + invoice.SU_HOURS_FIELD: invoice.INTEGER_FIELD_TYPE, + invoice.SU_TYPE_FIELD: invoice.STRING_FIELD_TYPE, + invoice.RATE_FIELD: invoice.RATE_FIELD_TYPE, + invoice.COST_FIELD: invoice.BALANCE_FIELD_TYPE, }, quotechar="|", ) diff --git a/process_report/processors/add_institution_processor.py b/process_report/processors/add_institution_processor.py index 42e17d21..f630c81f 100644 --- a/process_report/processors/add_institution_processor.py +++ b/process_report/processors/add_institution_processor.py @@ -14,6 +14,12 @@ @dataclass class AddInstitutionProcessor(processor.Processor): + operates_on_columns = ( + invoice.INSTITUTION_COLUMN, + invoice.PI_COLUMN, + invoice.PROJECT_COLUMN, + ) + def _add_institution(self): """Determine every PI's institution name, logging any PI whose institution cannot be determined This is performed by `get_institution_from_pi()`, which tries to match the PI's username to diff --git a/process_report/processors/bu_subsidy_processor.py b/process_report/processors/bu_subsidy_processor.py index f2e6c787..126e031c 100644 --- a/process_report/processors/bu_subsidy_processor.py +++ b/process_report/processors/bu_subsidy_processor.py @@ -1,5 +1,4 @@ from dataclasses import dataclass, field -from decimal import Decimal from process_report.loader import loader from process_report.invoices import invoice @@ -10,6 +9,18 @@ class BUSubsidyProcessor(discount_processor.DiscountProcessor): IS_DISCOUNT_BY_NERC = False + initializes_columns = (invoice.PROJECT_NAME_COLUMN, invoice.SUBSIDY_COLUMN) + operates_on_columns = ( + *initializes_columns, + invoice.PROJECT_COLUMN, + invoice.PI_COLUMN, + invoice.IS_BILLABLE_COLUMN, + invoice.MISSING_PI_COLUMN, + invoice.INSTITUTION_COLUMN, + invoice.PI_BALANCE_COLUMN, + invoice.BALANCE_COLUMN, + ) + subsidy_amount: int = field(default_factory=loader.get_bu_subsidy_amount) def _prepare(self): @@ -21,7 +32,6 @@ def get_project(row): return project_alloc[: project_alloc.rfind("-")] self.data[invoice.PROJECT_NAME_FIELD] = self.data.apply(get_project, axis=1) - self.data[invoice.SUBSIDY_FIELD] = Decimal(0) def _process(self): self.data = self._apply_subsidy(self.data, self.subsidy_amount) diff --git a/process_report/processors/coldfront_fetch_processor.py b/process_report/processors/coldfront_fetch_processor.py index e581b8c9..293ff5ed 100644 --- a/process_report/processors/coldfront_fetch_processor.py +++ b/process_report/processors/coldfront_fetch_processor.py @@ -34,6 +34,16 @@ class ColdfrontFetchProcessor(processor.Processor): ) coldfront_data_filepath: str = invoice_settings.coldfront_api_filepath + initializes_columns = (invoice.IS_COURSE_COLUMN,) + operates_on_columns = ( + *initializes_columns, + invoice.PROJECT_COLUMN, + invoice.PROJECT_ID_COLUMN, + invoice.CLUSTER_NAME_COLUMN, + invoice.PI_COLUMN, + invoice.INSTITUTION_ID_COLUMN, + ) + @functools.cached_property def coldfront_client(self): keycloak_url = os.environ.get("KEYCLOAK_URL", "https://keycloak.mss.mghpcc.org") @@ -143,7 +153,6 @@ def _validate_allocation_data(self, allocation_data): ) def _apply_allocation_data(self, allocation_data): - self.data[invoice.IS_COURSE_FIELD] = False for project_cluster_tuple, data in allocation_data.items(): project_id, cluster_name = project_cluster_tuple mask = (self.data[invoice.PROJECT_ID_FIELD] == project_id) & ( diff --git a/process_report/processors/discount_processor.py b/process_report/processors/discount_processor.py index 732e1f79..68c95e70 100644 --- a/process_report/processors/discount_processor.py +++ b/process_report/processors/discount_processor.py @@ -51,7 +51,7 @@ def apply_discount_on_project(remaining_discount_amount, project_i, project): remaining_project_balance = project[pi_balance_field] applied_discount = min(remaining_project_balance, remaining_discount_amount) - if invoice.at[project_i, discount_field] is None: + if pandas.isna(invoice.at[project_i, discount_field]): invoice.at[project_i, discount_field] = applied_discount else: invoice.at[project_i, discount_field] += applied_discount diff --git a/process_report/processors/lenovo_processor.py b/process_report/processors/lenovo_processor.py index 5b15e575..2259a3ee 100644 --- a/process_report/processors/lenovo_processor.py +++ b/process_report/processors/lenovo_processor.py @@ -9,6 +9,13 @@ class LenovoProcessor(processor.Processor): su_charge_info: dict = field(default_factory=loader.get_lenovo_su_charge_info) + initializes_columns = (invoice.SU_CHARGE_COLUMN, invoice.LENOVO_CHARGE_COLUMN) + operates_on_columns = ( + *initializes_columns, + invoice.SU_TYPE_COLUMN, + invoice.SU_HOURS_COLUMN, + ) + def _apply_su_charge(self, data): for su_name, su_charge in self.su_charge_info.items(): if su_name in data: diff --git a/process_report/processors/new_pi_credit_processor.py b/process_report/processors/new_pi_credit_processor.py index f1de23f9..94c1cdad 100644 --- a/process_report/processors/new_pi_credit_processor.py +++ b/process_report/processors/new_pi_credit_processor.py @@ -34,6 +34,19 @@ class NewPICreditProcessor(discount_processor.DiscountProcessor): ] IS_DISCOUNT_BY_NERC = True + operates_on_columns = ( + invoice.CREDIT_COLUMN, + invoice.CREDIT_CODE_COLUMN, + invoice.BALANCE_COLUMN, + invoice.PI_BALANCE_COLUMN, + invoice.SU_TYPE_COLUMN, + invoice.IS_BILLABLE_COLUMN, + invoice.MISSING_PI_COLUMN, + invoice.INSTITUTION_COLUMN, + invoice.COST_COLUMN, + invoice.PI_COLUMN, + ) + old_pi_filepath: str = field( default_factory=lambda: loader.get_remote_filepath( invoice_settings.pi_remote_filepath @@ -176,10 +189,7 @@ def _apply_credits_new_pi( credit_eligible_projects[invoice.PI_FIELD] == pi ] - if pi_age > 1: - for i, row in pi_projects.iterrows(): - data.at[i, invoice.BALANCE_FIELD] = row[invoice.COST_FIELD] - else: + if pi_age <= 1: if pi_age == 0: old_pi_df = self._upsert_pi_entry( old_pi_df, @@ -226,10 +236,6 @@ def _apply_credits_new_pi( return (data, old_pi_df) def _prepare(self): - self.data[invoice.CREDIT_FIELD] = None - self.data[invoice.CREDIT_CODE_FIELD] = None - self.data[invoice.PI_BALANCE_FIELD] = self.data[invoice.COST_FIELD] - self.data[invoice.BALANCE_FIELD] = self.data[invoice.COST_FIELD] self.old_pi_df = self._load_old_pis(self.old_pi_filepath) def _process(self): diff --git a/process_report/processors/pi_su_credit_processor.py b/process_report/processors/pi_su_credit_processor.py index 7e7a7029..be16ede2 100644 --- a/process_report/processors/pi_su_credit_processor.py +++ b/process_report/processors/pi_su_credit_processor.py @@ -25,6 +25,19 @@ class PISUCreditProcessor(discount_processor.DiscountProcessor): IS_DISCOUNT_BY_NERC = True PI_SU_CREDIT_CODE = "0005" + initializes_columns = ( + invoice.CREDIT_COLUMN, + invoice.CREDIT_CODE_COLUMN, + invoice.PI_BALANCE_COLUMN, + invoice.BALANCE_COLUMN, + ) + operates_on_columns = ( + *initializes_columns, + invoice.SU_TYPE_COLUMN, + invoice.PI_COLUMN, + invoice.COST_COLUMN, + ) + pi_su_mapping: dict[str, list[str]] = field( default_factory=loader.get_pi_non_billed_su_types ) diff --git a/process_report/processors/prepayment_processor.py b/process_report/processors/prepayment_processor.py index ba2d3a37..98974aff 100644 --- a/process_report/processors/prepayment_processor.py +++ b/process_report/processors/prepayment_processor.py @@ -20,14 +20,27 @@ class PrepaymentProcessor(discount_processor.DiscountProcessor): IS_DISCOUNT_BY_NERC = True PREPAY_DEBITS_S3_FILEPATH = "Prepay/prepay_debits.csv" + initializes_columns = ( + invoice.GROUP_NAME_COLUMN, + invoice.GROUP_INSTITUTION_COLUMN, + invoice.GROUP_MANAGED_COLUMN, + invoice.GROUP_BALANCE_COLUMN, + invoice.GROUP_BALANCE_USED_COLUMN, + ) + operates_on_columns = ( + *initializes_columns, + invoice.INVOICE_EMAIL_COLUMN, + invoice.PROJECT_NAME_COLUMN, + invoice.PI_BALANCE_COLUMN, + invoice.BALANCE_COLUMN, + ) + @property def PREPAY_DEBITS_S3_BACKUP_FILEPATH(self): return f"Prepay/Archive/prepay_debits {util.get_iso8601_time()}.csv" prepay_credits: pandas.DataFrame = field( - default_factory=lambda: loader.load_dataframe( - invoice_settings.prepay_credits_filepath - ) + default_factory=lambda: loader.load_prepay_credits() ) prepay_projects: pandas.DataFrame = field( default_factory=lambda: loader.load_dataframe( @@ -49,19 +62,15 @@ def PREPAY_DEBITS_S3_BACKUP_FILEPATH(self): @staticmethod def _load_prepay_debits(prepay_debits_filepath): try: - prepay_debits = pandas.read_csv(prepay_debits_filepath) + prepay_debits = pandas.read_csv(prepay_debits_filepath).astype( + {invoice.PREPAY_DEBIT_FIELD: invoice.BALANCE_FIELD_TYPE} + ) except FileNotFoundError: sys.exit("Applying prepayments failed. prepay debits file does not exist") return prepay_debits def _prepare(self): - self.data[invoice.GROUP_NAME_FIELD] = None - self.data[invoice.GROUP_INSTITUTION_FIELD] = None - self.data[invoice.GROUP_MANAGED_FIELD] = None - self.data[invoice.GROUP_BALANCE_FIELD] = None - self.data[invoice.GROUP_BALANCE_USED_FIELD] = None - self.prepay_debits = self._load_prepay_debits(self.prepay_debits_filepath) self.group_info_dict = self._get_prepay_group_dict() if self.upload_to_s3: diff --git a/process_report/processors/validate_billable_pi_processor.py b/process_report/processors/validate_billable_pi_processor.py index 94f62722..3806fb16 100644 --- a/process_report/processors/validate_billable_pi_processor.py +++ b/process_report/processors/validate_billable_pi_processor.py @@ -95,6 +95,16 @@ class ValidateBillablePIsProcessor(processor.Processor): - The project belongs in `NONBILLABLE_CLUSTERS` """ + initializes_columns = [invoice.IS_BILLABLE_COLUMN, invoice.MISSING_PI_COLUMN] + operates_on_columns = [ + *initializes_columns, + invoice.PI_COLUMN, + invoice.PROJECT_COLUMN, + invoice.CLUSTER_NAME_COLUMN, + invoice.IS_COURSE_COLUMN, + invoice.INSTITUTION_COLUMN, + ] + nonbillable_pis: list[str] = field(default_factory=loader.get_nonbillable_pis) nonbillable_projects: pandas.DataFrame = field( default_factory=loader.get_nonbillable_projects diff --git a/process_report/processors/validate_cluster_name_processor.py b/process_report/processors/validate_cluster_name_processor.py index 07a2e5e1..386b6081 100644 --- a/process_report/processors/validate_cluster_name_processor.py +++ b/process_report/processors/validate_cluster_name_processor.py @@ -12,6 +12,8 @@ class ValidateClusterNameProcessor(processor.Processor): "NERC-OCP-EDU": "academic", } + operates_on_columns = (invoice.CLUSTER_NAME_COLUMN,) + def _process(self): self.data[invoice.CLUSTER_NAME_FIELD] = self.data[ invoice.CLUSTER_NAME_FIELD diff --git a/process_report/processors/validate_input_column_processor.py b/process_report/processors/validate_input_column_processor.py new file mode 100644 index 00000000..edbbb07b --- /dev/null +++ b/process_report/processors/validate_input_column_processor.py @@ -0,0 +1,37 @@ +from dataclasses import dataclass + +from process_report.invoices import invoice +from process_report.processors import processor + + +@dataclass +class ValidateInputColumnsProcessor(processor.Processor): + initializes_columns = ( + invoice.INVOICE_DATE_COLUMN, + invoice.PROJECT_COLUMN, + invoice.PROJECT_ID_COLUMN, + invoice.PI_COLUMN, + invoice.CLUSTER_NAME_COLUMN, + invoice.INVOICE_EMAIL_COLUMN, + invoice.INVOICE_ADDRESS_COLUMN, + invoice.INSTITUTION_COLUMN, + invoice.INSTITUTION_ID_COLUMN, + invoice.SU_HOURS_COLUMN, + invoice.SU_TYPE_COLUMN, + invoice.RATE_COLUMN, + invoice.COST_COLUMN, + ) + + def process(self): + missing_columns = [ + column.name + for column in self.initializes_columns + if column.name not in self.data.columns + ] + if missing_columns: + raise ValueError( + f"Input dataframe is missing required columns: {', '.join(missing_columns)}. Stopping invoicing" + ) + + # Casts columns to appropriate types + self._init_columns() diff --git a/process_report/processors/validate_pi_alias_processor.py b/process_report/processors/validate_pi_alias_processor.py index 63434312..0eec53d0 100644 --- a/process_report/processors/validate_pi_alias_processor.py +++ b/process_report/processors/validate_pi_alias_processor.py @@ -9,6 +9,8 @@ class ValidatePIAliasProcessor(processor.Processor): alias_map: dict[str, list[str]] = field(default_factory=loader.get_alias_map) + operates_on_columns = (invoice.PI_COLUMN,) + def _validate_pi_aliases(self): for pi, pi_aliases in self.alias_map.items(): self.data.loc[ diff --git a/process_report/tests/base.py b/process_report/tests/base.py index 10d89006..78176a3e 100644 --- a/process_report/tests/base.py +++ b/process_report/tests/base.py @@ -3,8 +3,58 @@ from pathlib import Path from unittest import TestCase +import pandas +import pyarrow -class BaseTestCaseWithTempDir(TestCase): + +BALANCE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 2)) +RATE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 13)) +INTEGER_FIELD_TYPE = pandas.ArrowDtype(pyarrow.int64()) +STRING_FIELD_TYPE = pandas.StringDtype() +BOOL_FIELD_TYPE = pandas.BooleanDtype() + +FIELD_DTYPES = { + "Invoice Month": STRING_FIELD_TYPE, + "Project - Allocation": STRING_FIELD_TYPE, + "Project - Allocation ID": STRING_FIELD_TYPE, + "Manager (PI)": STRING_FIELD_TYPE, + "Invoice Email": STRING_FIELD_TYPE, + "Invoice Address": STRING_FIELD_TYPE, + "Institution": STRING_FIELD_TYPE, + "Institution - Specific Code": STRING_FIELD_TYPE, + "Prepaid Group Name": STRING_FIELD_TYPE, + "Prepaid Group Institution": STRING_FIELD_TYPE, + "Prepaid Group Balance": BALANCE_FIELD_TYPE, + "Prepaid Group Used": BALANCE_FIELD_TYPE, + "SU Hours (GBhr or SUhr)": INTEGER_FIELD_TYPE, + "SU Type": STRING_FIELD_TYPE, + "SU Charge": BALANCE_FIELD_TYPE, + "Charge": BALANCE_FIELD_TYPE, + "Rate": RATE_FIELD_TYPE, + "Cost": BALANCE_FIELD_TYPE, + "Credit": BALANCE_FIELD_TYPE, + "Credit Code": STRING_FIELD_TYPE, + "Subsidy": BALANCE_FIELD_TYPE, + "Balance": BALANCE_FIELD_TYPE, + "Is Billable": BOOL_FIELD_TYPE, + "Missing PI": BOOL_FIELD_TYPE, + "PI Balance": BALANCE_FIELD_TYPE, + "Project": STRING_FIELD_TYPE, + "MGHPCC Managed": BOOL_FIELD_TYPE, + "Cluster Name": STRING_FIELD_TYPE, + "Is Course": BOOL_FIELD_TYPE, +} + + +class BaseTestCase(TestCase): + def create_test_invoice(self, data_dict: dict): + present_cols = { + col: dtype for col, dtype in FIELD_DTYPES.items() if col in data_dict + } + return pandas.DataFrame(data_dict).astype(present_cols) + + +class BaseTestCaseWithTempDir(BaseTestCase): def setUp(self): self.tempdir = Path(tempfile.TemporaryDirectory(delete=False).name) diff --git a/process_report/tests/e2e/test_data/test_invoices/test_NERC OpenShift 2025-04.csv b/process_report/tests/e2e/test_data/test_invoices/test_NERC OpenShift 2025-04.csv index 7bdf6bc9..511480f5 100644 --- a/process_report/tests/e2e/test_data/test_invoices/test_NERC OpenShift 2025-04.csv +++ b/process_report/tests/e2e/test_data/test_invoices/test_NERC OpenShift 2025-04.csv @@ -2,5 +2,5 @@ Invoice Month,Project - Allocation,Project - Allocation ID,Manager (PI),Cluster 2024-01,P1ID,P1ID,,shift,,,,,280,OpenShift CPU,0.013,100.0 2024-01,P1ID,P1ID,,shift,,,,,280,OpenStack GPUA100SXM4,0.013,100 2024-01,P2ID,P2ID,,shift,,,,,280,OpenShift CPU,0.013,200 -2024-01,P3ID,P3ID,,shift,,,,,280,OpenShift CPU,0.013,300 +2024-01,P3ID,P3ID,,shift,,,,,280,Free CPU,0.013,300 2024-01,P9ID,P9ID,,shift,,,,,280,OpenShift CPU,0.013,3000 diff --git a/process_report/tests/e2e/test_data/test_pi.yaml b/process_report/tests/e2e/test_data/test_pi.yaml index 5cfeaee1..7d944c59 100644 --- a/process_report/tests/e2e/test_data/test_pi.yaml +++ b/process_report/tests/e2e/test_data/test_pi.yaml @@ -1,4 +1,4 @@ - username: PI9 -- username: PI10 +- username: pi2@harvard.edu non_billed_su_types: - - name: SU1 + - name: Free CPU diff --git a/process_report/tests/e2e/test_data/test_prepay_debits.csv b/process_report/tests/e2e/test_data/test_prepay_debits.csv index 94fed3cd..1c082816 100644 --- a/process_report/tests/e2e/test_data/test_prepay_debits.csv +++ b/process_report/tests/e2e/test_data/test_prepay_debits.csv @@ -1,3 +1,3 @@ Month,Group Name,Debit -2023-12,TestGroup1,200.0 -2023-11,TestGroup2,100.0 +2023-12,TestGroup1,200.00 +2023-11,TestGroup2,100.00 diff --git a/process_report/tests/unit/processors/test_bu_subsidy_processor.py b/process_report/tests/unit/processors/test_bu_subsidy_processor.py index 6761e209..757dc52a 100644 --- a/process_report/tests/unit/processors/test_bu_subsidy_processor.py +++ b/process_report/tests/unit/processors/test_bu_subsidy_processor.py @@ -1,10 +1,8 @@ -from unittest import TestCase -import pandas - from process_report.tests import util as test_utils +from process_report.tests.base import BaseTestCase -class TestBUSubsidyProcessor(TestCase): +class TestBUSubsidyProcessor(BaseTestCase): def _assert_result_invoice( self, subsidy_amount, @@ -48,7 +46,7 @@ def _get_test_invoice( if not missing_pi: missing_pi = [False for _ in range(len(pi))] - return pandas.DataFrame( + return self.create_test_invoice( { "Manager (PI)": pi, "Project - Allocation": project_names, @@ -57,6 +55,7 @@ def _get_test_invoice( "Institution": institution, "Is Billable": is_billable, "Missing PI": missing_pi, + "Subsidy": [0 for _ in range(len(pi))], } ) diff --git a/process_report/tests/unit/processors/test_coldfront_fetch_processor.py b/process_report/tests/unit/processors/test_coldfront_fetch_processor.py index 6f401662..13a61096 100644 --- a/process_report/tests/unit/processors/test_coldfront_fetch_processor.py +++ b/process_report/tests/unit/processors/test_coldfront_fetch_processor.py @@ -1,11 +1,12 @@ -from unittest import TestCase, mock +from unittest import mock import pandas import pytest from process_report.tests import util as test_utils +from process_report.tests.base import BaseTestCase -class TestColdfrontFetchProcessor(TestCase): +class TestColdfrontFetchProcessor(BaseTestCase): def _get_test_invoice( self, allocation_project_id, @@ -28,9 +29,9 @@ def _get_test_invoice( cluster_name = [""] * len(allocation_project_id) if not is_course: - is_course = [None] * len(allocation_project_id) + is_course = [False] * len(allocation_project_id) - return pandas.DataFrame( + return self.create_test_invoice( { "Manager (PI)": pi, "Project - Allocation": allocation_project_name, diff --git a/process_report/tests/unit/processors/test_discount_processor.py b/process_report/tests/unit/processors/test_discount_processor.py index 921fa5a9..36604604 100644 --- a/process_report/tests/unit/processors/test_discount_processor.py +++ b/process_report/tests/unit/processors/test_discount_processor.py @@ -1,12 +1,9 @@ -from unittest import TestCase - -import pandas - from process_report.invoices import invoice from process_report.processors.pi_su_credit_processor import PISUCreditProcessor +from process_report.tests.base import BaseTestCase -class TestDiscountProcessor(TestCase): +class TestDiscountProcessor(BaseTestCase): def _get_test_invoice( self, pi, @@ -26,7 +23,7 @@ def _get_test_invoice( if balance is None: balance = costs - return pandas.DataFrame( + return self.create_test_invoice( { invoice.PI_FIELD: pi, invoice.SU_TYPE_FIELD: su_type, diff --git a/process_report/tests/unit/processors/test_new_pi_credit_processor.py b/process_report/tests/unit/processors/test_new_pi_credit_processor.py index 969862e1..6df329ae 100644 --- a/process_report/tests/unit/processors/test_new_pi_credit_processor.py +++ b/process_report/tests/unit/processors/test_new_pi_credit_processor.py @@ -109,14 +109,19 @@ def _get_test_invoice( if not institution: institution = ["Foo University" for _ in range(len(pi))] - return pandas.DataFrame( + costs = [Decimal(cost) for cost in costs] + return self.create_test_invoice( { "Manager (PI)": pi, "Cost": [Decimal(cost) for cost in costs], + "Credit": None, + "Credit Code": None, "SU Type": su_type, "Is Billable": is_billable, "Missing PI": missing_pi, "Institution": institution, + "PI Balance": costs, + "Balance": costs, } ) @@ -139,19 +144,13 @@ def test_no_new_pi(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [None for _ in range(3)], - "Credit Code": [None for _ in range(3)], - "PI Balance": [100 for _ in range(3)], - "Balance": [100 for _ in range(3)], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [None for _ in range(3)], + "Credit Code": [None for _ in range(3)], + "PI Balance": [100 for _ in range(3)], + "Balance": [100 for _ in range(3)], + } ) answer_old_pi_df = test_old_pi_df.copy() @@ -182,19 +181,13 @@ def test_one_new_pi(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [100], - "Credit Code": ["0002"], - "PI Balance": [0], - "Balance": [0], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [100], + "Credit Code": ["0002"], + "PI Balance": [0], + "Balance": [0], + } ) answer_old_pi_df = pandas.DataFrame( @@ -218,19 +211,13 @@ def test_one_new_pi(self): # Two allocations, costs partially covered test_invoice = self._get_test_invoice(["PI", "PI"], [500, 1000]) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [500, 500], - "Credit Code": ["0002", "0002"], - "PI Balance": [0, 500], - "Balance": [0, 500], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [500, 500], + "Credit Code": ["0002", "0002"], + "PI Balance": [0, 500], + "Balance": [0, 500], + } ) answer_old_pi_df = pandas.DataFrame( @@ -254,19 +241,13 @@ def test_one_new_pi(self): # Two allocations, costs completely covered test_invoice = self._get_test_invoice(["PI", "PI"], [500, 400]) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [500, 400], - "Credit Code": ["0002", "0002"], - "PI Balance": [0, 0], - "Balance": [0, 0], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [500, 400], + "Credit Code": ["0002", "0002"], + "PI Balance": [0, 0], + "Balance": [0, 0], + } ) answer_old_pi_df = pandas.DataFrame( @@ -305,19 +286,13 @@ def test_one_month_pi(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [200], - "Credit Code": ["0002"], - "PI Balance": [0], - "Balance": [0], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [200], + "Credit Code": ["0002"], + "PI Balance": [0], + "Balance": [0], + } ) answer_old_pi_df = pandas.DataFrame( @@ -341,19 +316,13 @@ def test_one_month_pi(self): # Remaining credits partially covers costs test_invoice = self._get_test_invoice(["PI"], [600]) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [500], - "Credit Code": ["0002"], - "PI Balance": [100], - "Balance": [100], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [500], + "Credit Code": ["0002"], + "PI Balance": [100], + "Balance": [100], + } ) answer_old_pi_df = pandas.DataFrame( @@ -392,19 +361,13 @@ def test_two_new_pi(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [500, None, 500], - "Credit Code": ["0002", None, "0002"], - "PI Balance": [300, 500, 0], - "Balance": [300, 500, 0], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [500, None, 500], + "Credit Code": ["0002", None, "0002"], + "PI Balance": [300, 500, 0], + "Balance": [300, 500, 0], + } ) answer_old_pi_df = pandas.DataFrame( @@ -443,19 +406,13 @@ def test_old_pi_file_overwritten(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [200, None], - "Credit Code": ["0002", None], - "PI Balance": [300, 500], - "Balance": [300, 500], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [200, None], + "Credit Code": ["0002", None], + "PI Balance": [300, 500], + "Balance": [300, 500], + } ) answer_old_pi_df = pandas.DataFrame( @@ -505,19 +462,13 @@ def test_excluded_su_types(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [600, None, 400, None], - "Credit Code": ["0002", None, "0002", None], - "PI Balance": [0, 600, 200, 600], - "Balance": [0, 600, 200, 600], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [600, None, 400, None], + "Credit Code": ["0002", None, "0002", None], + "PI Balance": [0, 600, 200, 600], + "Balance": [0, 600, 200, 600], + } ) # PI2 was not eligible for credit, so should only get 0 initial credits @@ -558,19 +509,13 @@ def test_ineligible_pi_existing_old_pi_entry(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [None], - "Credit Code": [None], - "PI Balance": [500], - "Balance": [500], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [None], + "Credit Code": [None], + "PI Balance": [500], + "Balance": [500], + } ) answer_old_pi_df = pandas.DataFrame( @@ -612,19 +557,13 @@ def test_newly_eligible_pi_existing_old_pi_entry(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [None], - "Credit Code": [None], - "PI Balance": [800], - "Balance": [800], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [None], + "Credit Code": [None], + "PI Balance": [800], + "Balance": [800], + } ) answer_old_pi_df = pandas.DataFrame( diff --git a/process_report/tests/unit/processors/test_prepayment_processor.py b/process_report/tests/unit/processors/test_prepayment_processor.py index cae01f57..bf5c496b 100644 --- a/process_report/tests/unit/processors/test_prepayment_processor.py +++ b/process_report/tests/unit/processors/test_prepayment_processor.py @@ -43,7 +43,7 @@ def _get_test_invoice(self, project_names, pi_balances, balances=None): if not balances: balances = pi_balances - return pandas.DataFrame( + return self.create_test_invoice( { "Project": project_names, "PI Balance": pi_balances, diff --git a/process_report/tests/unit/processors/test_processor_list.py b/process_report/tests/unit/processors/test_processor_list.py new file mode 100644 index 00000000..46b94402 --- /dev/null +++ b/process_report/tests/unit/processors/test_processor_list.py @@ -0,0 +1,26 @@ +from process_report.process_report import PROCESSING_ORDER +from process_report.tests.base import BaseTestCase + + +class TestProcessorList(BaseTestCase): + def test_processing_order_column_dependencies(self): + initialized_columns = set() + + for processor_class in PROCESSING_ORDER: + operates_on = getattr(processor_class, "operates_on_columns", []) + initializes = getattr(processor_class, "initializes_columns", []) + + # Check that no columns are initalized more than once + for column in initializes: + assert column.name not in initialized_columns, ( + f"Column '{column.name}' initialized by {processor_class.__name__} but already initialized by a previous processor" + ) + + for column in initializes: + initialized_columns.add(column.name) + + # Check that all operated on columns have been initialized by a previous processor + for column in operates_on: + assert column.name in initialized_columns, ( + f"Column '{column.name}' operated on by {processor_class.__name__} but not initialized by itself or any previous processor" + ) diff --git a/process_report/tests/unit/processors/test_validate_billable_pi_processor.py b/process_report/tests/unit/processors/test_validate_billable_pi_processor.py index 7c1cb083..79098ff9 100644 --- a/process_report/tests/unit/processors/test_validate_billable_pi_processor.py +++ b/process_report/tests/unit/processors/test_validate_billable_pi_processor.py @@ -66,7 +66,7 @@ def test_remove_nonbillables(self): ) validate_billable_pi_proc.process() output = validate_billable_pi_proc.data - assert output[output["Is Billable"]].equals(data.iloc[[3, 4, 5, 9]]) + assert output["Is Billable"].iloc[[3, 4, 5, 9]].all() def test_billable_override_marks_project_billable(self): test_data = pandas.DataFrame( diff --git a/process_report/tests/unit/processors/test_validate_input_columns_processor.py b/process_report/tests/unit/processors/test_validate_input_columns_processor.py new file mode 100644 index 00000000..7ac2256c --- /dev/null +++ b/process_report/tests/unit/processors/test_validate_input_columns_processor.py @@ -0,0 +1,53 @@ +import pandas + +from process_report.processors.validate_input_column_processor import ( + ValidateInputColumnsProcessor, +) +from process_report.tests.base import BaseTestCase + + +class TestValidateInputColumnsProcessor(BaseTestCase): + def test_process_succeeds_when_required_columns_exist_and_keeps_extra_columns(self): + invoice_month = "2025-01" + test_data_dict = { + "Invoice Month": [invoice_month], + "Project - Allocation": ["P1"], + "Project - Allocation ID": ["P1-ID"], + "Manager (PI)": ["pi1"], + "Cluster Name": ["cluster1"], + "Invoice Email": ["pi1@example.com"], + "Invoice Address": ["123 Main St"], + "Institution": ["Example University"], + "Institution - Specific Code": ["EX-001"], + "SU Hours (GBhr or SUhr)": [10], + "SU Type": ["Compute"], + "Rate": ["1.00"], + "Cost": [100.0], + "Extra Column": ["extra"], + } + + processor = ValidateInputColumnsProcessor( + invoice_month=invoice_month, data=pandas.DataFrame(test_data_dict) + ) + processor.process() + + output_data = processor.data + expected_data = self.create_test_invoice(test_data_dict) + assert output_data.equals(expected_data) + + def test_process_raises_error_when_required_columns_are_missing(self): + invoice_month = "2025-01" + test_data = pandas.DataFrame( + {"Invoice Month": [invoice_month], "Cost": [100.0]} + ) + + processor = ValidateInputColumnsProcessor( + invoice_month=invoice_month, data=test_data + ) + + expected_message = """Input dataframe is missing required columns: Project - Allocation, Project - Allocation ID, Manager (PI), + Cluster Name, Invoice Email, Invoice Address, Institution, Institution - Specific Code, SU Hours (GBhr or SUhr), + SU Type, Rate. Stopping invoicing""" + + with self.assertRaises(ValueError, msg=expected_message): + processor.process() diff --git a/process_report/tests/unit/test_util.py b/process_report/tests/unit/test_util.py index d7225b20..ecf30437 100644 --- a/process_report/tests/unit/test_util.py +++ b/process_report/tests/unit/test_util.py @@ -28,9 +28,9 @@ class TestMergeCSV(TestCase): def setUp(self): self.header = ["Cost", "Name", "Rate"] self.data = [ - [1, "Alice, Allison", 25], - [2, "Bob", 30], - [3, "Charlie", 28], + [1, "Alice, Allison", 25.0], + [2, "Bob", 30.0], + [3, "Charlie", 28.0], ] self.csv_files = [] From bfbb7607b997f91a4505cd14ed2832581ccd5d25 Mon Sep 17 00:00:00 2001 From: Quan Pham Date: Mon, 13 Apr 2026 14:56:04 -0400 Subject: [PATCH 2/2] Enable exporting monthly invoice to Iceberg Added new invoice `IcebergInvoice` to export invoice data to Iceberg tables The export process also includes a schema update step to allow updates to Iceberg table schema. New Iceberg integration test added to validate iceberg functionality E2E test updated to include iceberg exporting Both tests use a temporary sqlite catalog --- process_report/invoices/iceberg_invoice.py | 82 ++++++++++ process_report/loader.py | 6 + process_report/process_report.py | 2 + process_report/settings.py | 5 + process_report/tests/base.py | 10 +- process_report/tests/e2e/test_e2e_pipeline.py | 25 +++ .../tests/integration/test_iceberg.py | 148 ++++++++++++++++++ process_report/tests/test-requirements.txt | 2 + requirements.txt | 1 + 9 files changed, 277 insertions(+), 4 deletions(-) create mode 100644 process_report/invoices/iceberg_invoice.py create mode 100644 process_report/tests/integration/test_iceberg.py diff --git a/process_report/invoices/iceberg_invoice.py b/process_report/invoices/iceberg_invoice.py new file mode 100644 index 00000000..e20c6a8f --- /dev/null +++ b/process_report/invoices/iceberg_invoice.py @@ -0,0 +1,82 @@ +import logging +from dataclasses import dataclass, field + +from pyiceberg.table import Table +from pyiceberg.catalog import Catalog, load_catalog +import pyarrow + +import process_report.invoices.invoice as invoice +from process_report.loader import loader +from process_report.settings import invoice_settings + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + + +def get_iceberg_catalog(config: dict, catalog_name: str) -> Catalog: + return load_catalog(name=catalog_name, **config) + + +def get_iceberg_table(catalog: Catalog, table_path) -> Table: + return catalog.load_table(table_path) + + +@dataclass +class IcebergInvoice(invoice.Invoice): + export_columns_list = [ + invoice.INVOICE_DATE_FIELD, + invoice.PROJECT_FIELD, + invoice.PROJECT_ID_FIELD, + invoice.PI_FIELD, + invoice.CLUSTER_NAME_FIELD, + invoice.INVOICE_EMAIL_FIELD, + invoice.INVOICE_ADDRESS_FIELD, + invoice.INSTITUTION_FIELD, + invoice.INSTITUTION_ID_FIELD, + invoice.IS_BILLABLE_FIELD, + invoice.SU_HOURS_FIELD, + invoice.SU_TYPE_FIELD, + invoice.RATE_FIELD, + invoice.GROUP_NAME_FIELD, + invoice.GROUP_INSTITUTION_FIELD, + invoice.GROUP_BALANCE_FIELD, + invoice.COST_FIELD, + invoice.GROUP_BALANCE_USED_FIELD, + invoice.CREDIT_FIELD, + invoice.CREDIT_CODE_FIELD, + invoice.BALANCE_FIELD, + ] + + iceberg_catalog_name: str = invoice_settings.iceberg_catalog_name + iceberg_catalog_config: dict = field( + default_factory=lambda: loader.get_iceberg_config() + ) + iceberg_table_path: str = invoice_settings.iceberg_table_path + + def _prepare(self): + iceberg_catalog = get_iceberg_catalog( + self.iceberg_catalog_config, self.iceberg_catalog_name + ) + self.iceberg_table = get_iceberg_table(iceberg_catalog, self.iceberg_table_path) + self.export_data = self.data + + def export(self): + # Overrides base invoice export behavior + self._filter_columns() + + # Update table schema, only allows "possible" migrations (i.e raises on str -> Decimal) + # TODO (Quan) When we implement typing validation for dataframes, change this to raise errors + with self.iceberg_table.update_schema() as update_schema: + try: + update_schema.union_by_name( + pyarrow.Table.from_pandas(self.export_data).schema + ) + except ValueError as e: + logger.warning( + f"Dataframe contains columns not convertable to PyIceberg: {e}" + ) + + self.iceberg_table.append(pyarrow.Table.from_pandas(self.export_data)) + + def export_s3(self, s3_bucket): + return diff --git a/process_report/loader.py b/process_report/loader.py index 9e3c1ea6..172090fe 100644 --- a/process_report/loader.py +++ b/process_report/loader.py @@ -61,6 +61,12 @@ def get_remote_filepath(self, remote_filepath: str) -> str: return util.fetch_s3(remote_filepath) return remote_filepath + @functools.lru_cache + def get_iceberg_config(self) -> dict: + """Load an Iceberg catalog config from a YAML file.""" + with open(invoice_settings.iceberg_config_path, "r") as f: + return yaml.safe_load(f) + @functools.lru_cache def get_new_pi_credit_amount(self) -> Decimal: return invoice_settings.new_pi_credit_amount or get_rates_info().get_value_at( diff --git a/process_report/process_report.py b/process_report/process_report.py index fc263db6..e2a21fd1 100644 --- a/process_report/process_report.py +++ b/process_report/process_report.py @@ -18,6 +18,7 @@ MOCA_prepaid_invoice, prepay_credits_snapshot, ocp_test_invoice, + iceberg_invoice, ) from process_report.processors import ( coldfront_fetch_processor, @@ -97,6 +98,7 @@ def main(): MOCA_prepaid_invoice.MOCAPrepaidInvoice, prepay_credits_snapshot.PrepayCreditsSnapshot, ocp_test_invoice.OcpTestInvoice, + iceberg_invoice.IcebergInvoice, ], invoice_settings.upload_to_s3, ) diff --git a/process_report/settings.py b/process_report/settings.py index 7d057161..0e902085 100644 --- a/process_report/settings.py +++ b/process_report/settings.py @@ -11,6 +11,11 @@ class Settings(BaseSettings): keycloak_client_id: str | None = None keycloak_client_secret: str | None = None + # Iceberg config + iceberg_catalog_name: str | None = None + iceberg_config_path: str | None = None + iceberg_table_path: str | None = None + invoice_path_template: str = "Invoices/{invoice_month}/Service Invoices/" invoice_month: str = (datetime.datetime.today() - relativedelta(months=1)).strftime( "%Y-%m" diff --git a/process_report/tests/base.py b/process_report/tests/base.py index 78176a3e..5e277367 100644 --- a/process_report/tests/base.py +++ b/process_report/tests/base.py @@ -55,8 +55,10 @@ def create_test_invoice(self, data_dict: dict): class BaseTestCaseWithTempDir(BaseTestCase): - def setUp(self): - self.tempdir = Path(tempfile.TemporaryDirectory(delete=False).name) + @classmethod + def setUpClass(cls): + cls.tempdir = Path(tempfile.TemporaryDirectory(delete=False).name) - def tearDown(self): - shutil.rmtree(self.tempdir) + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir) diff --git a/process_report/tests/e2e/test_e2e_pipeline.py b/process_report/tests/e2e/test_e2e_pipeline.py index 36cc7277..bd69bf27 100644 --- a/process_report/tests/e2e/test_e2e_pipeline.py +++ b/process_report/tests/e2e/test_e2e_pipeline.py @@ -5,6 +5,9 @@ import logging import subprocess from typing import Dict, List +import yaml + +from pyiceberg import schema, catalog logger = logging.getLogger(__name__) @@ -131,6 +134,28 @@ def _prepare_pipeline_execution( env.setdefault("CHROME_BIN_PATH", "/usr/bin/chromium") env["PYTHONPATH"] = str(project_root) + ":" + env.get("PYTHONPATH", "") + # Iceberg settings, init test namespace and table + env["iceberg_catalog_name"] = "test_catalog" + env["iceberg_config_path"] = workspace / "test_iceberg_config.yaml" + env["iceberg_table_path"] = "test_namespace.test_table" + + catalog_config = { + "type": "sql", + "warehouse": f"file://{workspace}", + "uri": f"sqlite:///{workspace / 'test_iceberg_catalog.db'}", + } + + with open(workspace / "test_iceberg_config.yaml", "w") as f: + yaml.dump(catalog_config, f) + + test_catalog = catalog.load_catalog(name="test_catalog", **catalog_config) + test_schema = schema.Schema( + schema.NestedField(1, "Invoice Month", schema.StringType()), + ) + + test_catalog.create_namespace_if_not_exists("test_namespace") + test_catalog.create_table_if_not_exists("test_namespace.test_table", test_schema) + return command, env diff --git a/process_report/tests/integration/test_iceberg.py b/process_report/tests/integration/test_iceberg.py new file mode 100644 index 00000000..add0cb88 --- /dev/null +++ b/process_report/tests/integration/test_iceberg.py @@ -0,0 +1,148 @@ +from pyiceberg import schema, catalog + +from process_report.invoices.iceberg_invoice import IcebergInvoice +from process_report.tests.base import BaseTestCaseWithTempDir + + +class TestIceberg(BaseTestCaseWithTempDir): + @classmethod + def setUpClass(cls): + super().setUpClass() + # Create in-memory catalog + cls.catalog_name = "catalog_foo" + cls.table_path = "namespace_foo.table_foo" + + config_dict = { + "type": "sql", + "warehouse": str(cls.tempdir), + "uri": f"sqlite:///{str(cls.tempdir)}/foo.db", + } + cls.catalog_config = config_dict + + # Initialize test schema that's used in setUp() + cls.catalog = catalog.load_catalog(name=cls.catalog_name, **config_dict) + cls.test_schema = schema.Schema( + schema.NestedField(1, "Invoice Month", schema.StringType()), + schema.NestedField(2, "Cost", schema.DecimalType(21, 2)), + schema.NestedField(3, "PI", schema.StringType()), + ) + + def setUp(self): + self.catalog.create_namespace_if_not_exists("namespace_foo") + self.catalog.create_table_if_not_exists(self.table_path, self.test_schema) + + def tearDown(self): + self.catalog.drop_table(self.table_path) + + def test_upload_one_dataframe(self): + # Create test dataframe matching table schema + test_df = self.create_test_invoice( + { + "Invoice Month": ["2024-01", "2024-01"], + "Cost": [100.0, 200.0], + "PI": ["PI1", "PI2"], + } + ) + + # Create IcebergInvoice instance + inv = IcebergInvoice( + invoice_month="2024-01", + data=test_df, + iceberg_catalog_name=self.catalog_name, + iceberg_catalog_config=self.catalog_config, + iceberg_table_path=self.table_path, + ) + # Ensure only test columns are filtered + inv.export_columns_list = ["Invoice Month", "Cost", "PI"] + inv.process() + inv.export() + + # Verify data was uploaded, and Iceberg cost column can be casted to Decimal + table = self.catalog.load_table(self.table_path) + uploaded_df = table.scan().to_pandas().astype(test_df.dtypes) + assert uploaded_df.equals(test_df) + + def test_upload_new_column(self): + # Create test dataframe with an extra column + test_df = self.create_test_invoice( + { + "Invoice Month": ["2024-02", "2024-02"], + "Cost": [150.0, 250.0], + "PI": ["PI3", "PI4"], + "extra_column": ["extra1", "extra2"], # New column + } + ) + + # Create IcebergInvoice instance + inv = IcebergInvoice( + invoice_month="2024-02", + data=test_df, + iceberg_catalog_name=self.catalog_name, + iceberg_catalog_config=self.catalog_config, + iceberg_table_path=self.table_path, + ) + inv.export_columns_list = ["Invoice Month", "Cost", "PI", "extra_column"] + inv.process() + inv.export() + + # Verify data was uploaded with new column (schema evolution) + table = self.catalog.load_table(self.table_path) + uploaded_df = table.scan().to_pandas().astype(test_df.dtypes) + assert uploaded_df.equals(test_df) + + def test_schema_evolution_with_existing_data(self): + # First, upload initial data without extra column + first_df = self.create_test_invoice( + { + "Invoice Month": ["2024-01", "2024-01"], + "Cost": [100.0, 200.0], + "PI": ["PI1", "PI2"], + } + ) + + inv = IcebergInvoice( + invoice_month="2024-01", + data=first_df, + iceberg_catalog_name=self.catalog_name, + iceberg_catalog_config=self.catalog_config, + iceberg_table_path=self.table_path, + ) + inv.export_columns_list = ["Invoice Month", "Cost", "PI"] + inv.process() + inv.export() + + # Now upload data with an extra column + second_df = self.create_test_invoice( + { + "Invoice Month": ["2024-02", "2024-02"], + "Cost": [150.0, 250.0], + "PI": ["PI3", "PI4"], + "extra_column": ["new1", "new2"], # New column + } + ) + + inv2 = IcebergInvoice( + invoice_month="2024-02", + data=second_df, + iceberg_catalog_name=self.catalog_name, + iceberg_catalog_config=self.catalog_config, + iceberg_table_path=self.table_path, + ) + inv2.export_columns_list = ["Invoice Month", "Cost", "PI", "extra_column"] + inv2.process() + inv2.export() + + table = self.catalog.load_table(self.table_path) + result_df = table.scan().to_pandas().astype(second_df.dtypes) + + # Verify the table has schema evolved with the new column + # Old rows should have None for the new column + expected_df = self.create_test_invoice( + { + "Invoice Month": ["2024-02", "2024-02", "2024-01", "2024-01"], + "Cost": [150.0, 250.0, 100.0, 200.0], + "PI": ["PI3", "PI4", "PI1", "PI2"], + "extra_column": ["new1", "new2", None, None], + } + ) + assert result_df.equals(expected_df) diff --git a/process_report/tests/test-requirements.txt b/process_report/tests/test-requirements.txt index 49ec960c..d70866da 100644 --- a/process_report/tests/test-requirements.txt +++ b/process_report/tests/test-requirements.txt @@ -1,2 +1,4 @@ pytest +pytest-env coverage +pyiceberg[sql-sqlite] diff --git a/requirements.txt b/requirements.txt index 805b39f9..1276b641 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ Jinja2 validators python-dateutil pydantic-settings +pyiceberg[pyarrow]