diff --git a/process_report/invoices/iceberg_invoice.py b/process_report/invoices/iceberg_invoice.py new file mode 100644 index 00000000..e20c6a8f --- /dev/null +++ b/process_report/invoices/iceberg_invoice.py @@ -0,0 +1,82 @@ +import logging +from dataclasses import dataclass, field + +from pyiceberg.table import Table +from pyiceberg.catalog import Catalog, load_catalog +import pyarrow + +import process_report.invoices.invoice as invoice +from process_report.loader import loader +from process_report.settings import invoice_settings + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + + +def get_iceberg_catalog(config: dict, catalog_name: str) -> Catalog: + return load_catalog(name=catalog_name, **config) + + +def get_iceberg_table(catalog: Catalog, table_path) -> Table: + return catalog.load_table(table_path) + + +@dataclass +class IcebergInvoice(invoice.Invoice): + export_columns_list = [ + invoice.INVOICE_DATE_FIELD, + invoice.PROJECT_FIELD, + invoice.PROJECT_ID_FIELD, + invoice.PI_FIELD, + invoice.CLUSTER_NAME_FIELD, + invoice.INVOICE_EMAIL_FIELD, + invoice.INVOICE_ADDRESS_FIELD, + invoice.INSTITUTION_FIELD, + invoice.INSTITUTION_ID_FIELD, + invoice.IS_BILLABLE_FIELD, + invoice.SU_HOURS_FIELD, + invoice.SU_TYPE_FIELD, + invoice.RATE_FIELD, + invoice.GROUP_NAME_FIELD, + invoice.GROUP_INSTITUTION_FIELD, + invoice.GROUP_BALANCE_FIELD, + invoice.COST_FIELD, + invoice.GROUP_BALANCE_USED_FIELD, + invoice.CREDIT_FIELD, + invoice.CREDIT_CODE_FIELD, + invoice.BALANCE_FIELD, + ] + + iceberg_catalog_name: str = invoice_settings.iceberg_catalog_name + iceberg_catalog_config: dict = field( + default_factory=lambda: loader.get_iceberg_config() + ) + iceberg_table_path: str = invoice_settings.iceberg_table_path + + def _prepare(self): + iceberg_catalog = get_iceberg_catalog( + self.iceberg_catalog_config, self.iceberg_catalog_name + ) + self.iceberg_table = get_iceberg_table(iceberg_catalog, self.iceberg_table_path) + self.export_data = self.data + + def export(self): + # Overrides base invoice export behavior + self._filter_columns() + + # Update table schema, only allows "possible" migrations (i.e raises on str -> Decimal) + # TODO (Quan) When we implement typing validation for dataframes, change this to raise errors + with self.iceberg_table.update_schema() as update_schema: + try: + update_schema.union_by_name( + pyarrow.Table.from_pandas(self.export_data).schema + ) + except ValueError as e: + logger.warning( + f"Dataframe contains columns not convertable to PyIceberg: {e}" + ) + + self.iceberg_table.append(pyarrow.Table.from_pandas(self.export_data)) + + def export_s3(self, s3_bucket): + return diff --git a/process_report/invoices/invoice.py b/process_report/invoices/invoice.py index 29ca5a3c..12fbb8b9 100644 --- a/process_report/invoices/invoice.py +++ b/process_report/invoices/invoice.py @@ -1,9 +1,32 @@ from dataclasses import dataclass +from typing import Any, Callable +from decimal import Decimal import pandas +import pyarrow +import logging import process_report.util as util +logger = logging.getLogger(__name__) + + +@dataclass +class InvoiceColumn: + name: str + dtype: Any + default_value: Any | None = None + default_initializer: Callable[[pandas.DataFrame], pandas.Series] | None = None + + +# Field type definitions +BALANCE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 2)) +RATE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 13)) +INTEGER_FIELD_TYPE = pandas.ArrowDtype(pyarrow.int64()) +STRING_FIELD_TYPE = pandas.StringDtype() +BOOL_FIELD_TYPE = pandas.BooleanDtype() + + ### PI file field names PI_PI_FIELD = "PI" PI_FIRST_MONTH = "First Invoice Month" @@ -65,11 +88,69 @@ IS_COURSE_FIELD = "Is Course" ### +### Initialized Column objects +INVOICE_DATE_COLUMN = InvoiceColumn(name=INVOICE_DATE_FIELD, dtype=STRING_FIELD_TYPE) +PROJECT_COLUMN = InvoiceColumn(name=PROJECT_FIELD, dtype=STRING_FIELD_TYPE) +PROJECT_ID_COLUMN = InvoiceColumn(name=PROJECT_ID_FIELD, dtype=STRING_FIELD_TYPE) +PI_COLUMN = InvoiceColumn(name=PI_FIELD, dtype=STRING_FIELD_TYPE) +INVOICE_EMAIL_COLUMN = InvoiceColumn(name=INVOICE_EMAIL_FIELD, dtype=STRING_FIELD_TYPE) +INVOICE_ADDRESS_COLUMN = InvoiceColumn( + name=INVOICE_ADDRESS_FIELD, dtype=STRING_FIELD_TYPE +) +INSTITUTION_COLUMN = InvoiceColumn(name=INSTITUTION_FIELD, dtype=STRING_FIELD_TYPE) +INSTITUTION_ID_COLUMN = InvoiceColumn( + name=INSTITUTION_ID_FIELD, dtype=STRING_FIELD_TYPE +) +GROUP_NAME_COLUMN = InvoiceColumn(name=GROUP_NAME_FIELD, dtype=STRING_FIELD_TYPE) +GROUP_INSTITUTION_COLUMN = InvoiceColumn( + name=GROUP_INSTITUTION_FIELD, dtype=STRING_FIELD_TYPE +) +GROUP_BALANCE_COLUMN = InvoiceColumn(name=GROUP_BALANCE_FIELD, dtype=BALANCE_FIELD_TYPE) +GROUP_BALANCE_USED_COLUMN = InvoiceColumn( + name=GROUP_BALANCE_USED_FIELD, dtype=BALANCE_FIELD_TYPE +) +SU_HOURS_COLUMN = InvoiceColumn(name=SU_HOURS_FIELD, dtype=INTEGER_FIELD_TYPE) +SU_TYPE_COLUMN = InvoiceColumn(name=SU_TYPE_FIELD, dtype=STRING_FIELD_TYPE) +SU_CHARGE_COLUMN = InvoiceColumn(name=SU_CHARGE_FIELD, dtype=BALANCE_FIELD_TYPE) +LENOVO_CHARGE_COLUMN = InvoiceColumn(name=LENOVO_CHARGE_FIELD, dtype=BALANCE_FIELD_TYPE) +RATE_COLUMN = InvoiceColumn( + name=RATE_FIELD, dtype=RATE_FIELD_TYPE +) # Using decimal to suppress scientific notation in export +COST_COLUMN = InvoiceColumn(name=COST_FIELD, dtype=BALANCE_FIELD_TYPE) +CREDIT_COLUMN = InvoiceColumn(name=CREDIT_FIELD, dtype=BALANCE_FIELD_TYPE) +CREDIT_CODE_COLUMN = InvoiceColumn(name=CREDIT_CODE_FIELD, dtype=STRING_FIELD_TYPE) +SUBSIDY_COLUMN = InvoiceColumn( + name=SUBSIDY_FIELD, dtype=BALANCE_FIELD_TYPE, default_value=Decimal(0) +) +BALANCE_COLUMN = InvoiceColumn( + name=BALANCE_FIELD, + dtype=BALANCE_FIELD_TYPE, + default_initializer=lambda df: df[COST_FIELD], +) +PI_BALANCE_COLUMN = InvoiceColumn( + name=PI_BALANCE_FIELD, + dtype=BALANCE_FIELD_TYPE, + default_initializer=lambda df: df[COST_FIELD], +) + +# Internally used fields +IS_BILLABLE_COLUMN = InvoiceColumn(name=IS_BILLABLE_FIELD, dtype=BOOL_FIELD_TYPE) +MISSING_PI_COLUMN = InvoiceColumn(name=MISSING_PI_FIELD, dtype=BOOL_FIELD_TYPE) +PROJECT_NAME_COLUMN = InvoiceColumn(name=PROJECT_NAME_FIELD, dtype=STRING_FIELD_TYPE) +GROUP_MANAGED_COLUMN = InvoiceColumn(name=GROUP_MANAGED_FIELD, dtype=BOOL_FIELD_TYPE) +CLUSTER_NAME_COLUMN = InvoiceColumn(name=CLUSTER_NAME_FIELD, dtype=STRING_FIELD_TYPE) +IS_COURSE_COLUMN = InvoiceColumn( + name=IS_COURSE_FIELD, dtype=BOOL_FIELD_TYPE, default_value=False +) +### + @dataclass class Invoice: export_columns_list = list() exported_columns_map = dict() + initializes_columns = tuple() + operates_on_columns = tuple() invoice_month: str data: pandas.DataFrame @@ -77,6 +158,7 @@ class Invoice: export_data = None def process(self): + self._init_columns() self._prepare() self._process() self._prepare_export() @@ -93,6 +175,24 @@ def output_s3_key(self) -> str: def output_s3_archive_key(self): return f"Invoices/{self.invoice_month}/Archive/{self.name} {self.invoice_month} {util.get_iso8601_time()}.csv" + def _init_columns(self): + """Initializes columns specified in `initializes_columns` and cast them to appropriate types + + If column already exists, only do casting + If no default value is given, column initialized to None + """ + for field in self.initializes_columns: + if field.name not in self.data.columns: + field_default = field.default_value + if field.default_initializer: + field_default = field.default_initializer(self.data) + self.data[field.name] = field_default + elif self.data.dtypes[field.name] != field.dtype: + logger.warning( + f"Column {field.name} has dtype {self.data.dtypes[field.name]} instead of expected {field.dtype}." + ) + self.data = self.data.astype({field.name: field.dtype}) + def _prepare(self): """Prepares the data for processing. diff --git a/process_report/loader.py b/process_report/loader.py index 0c5f959c..172090fe 100644 --- a/process_report/loader.py +++ b/process_report/loader.py @@ -61,6 +61,12 @@ def get_remote_filepath(self, remote_filepath: str) -> str: return util.fetch_s3(remote_filepath) return remote_filepath + @functools.lru_cache + def get_iceberg_config(self) -> dict: + """Load an Iceberg catalog config from a YAML file.""" + with open(invoice_settings.iceberg_config_path, "r") as f: + return yaml.safe_load(f) + @functools.lru_cache def get_new_pi_credit_amount(self) -> Decimal: return invoice_settings.new_pi_credit_amount or get_rates_info().get_value_at( @@ -112,6 +118,13 @@ def get_alias_map(self) -> dict: def load_dataframe(self, filepath: str) -> pandas.DataFrame: return pandas.read_csv(filepath) + @functools.lru_cache + def load_prepay_credits(self) -> pandas.DataFrame: + prepay_df = self.load_dataframe(invoice_settings.prepay_credits_filepath) + return prepay_df.astype( + {invoice.PREPAY_CREDIT_FIELD: invoice.BALANCE_FIELD_TYPE} + ) + @functools.lru_cache def _load_pi_config(self, filepath: str) -> list[dict]: with open(filepath) as file: diff --git a/process_report/process_report.py b/process_report/process_report.py index b3e62d93..e2a21fd1 100644 --- a/process_report/process_report.py +++ b/process_report/process_report.py @@ -3,7 +3,6 @@ import os import pandas -import pyarrow from process_report.settings import invoice_settings from process_report.loader import loader @@ -19,9 +18,11 @@ MOCA_prepaid_invoice, prepay_credits_snapshot, ocp_test_invoice, + iceberg_invoice, ) from process_report.processors import ( coldfront_fetch_processor, + validate_input_column_processor, validate_pi_alias_processor, add_institution_processor, lenovo_processor, @@ -33,6 +34,20 @@ validate_cluster_name_processor, ) +PROCESSING_ORDER = [ + validate_input_column_processor.ValidateInputColumnsProcessor, + validate_cluster_name_processor.ValidateClusterNameProcessor, + coldfront_fetch_processor.ColdfrontFetchProcessor, + validate_pi_alias_processor.ValidatePIAliasProcessor, + add_institution_processor.AddInstitutionProcessor, + lenovo_processor.LenovoProcessor, + validate_billable_pi_processor.ValidateBillablePIsProcessor, + pi_su_credit_processor.PISUCreditProcessor, + new_pi_credit_processor.NewPICreditProcessor, + bu_subsidy_processor.BUSubsidyProcessor, + prepayment_processor.PrepaymentProcessor, +] + PI_S3_FILEPATH = "PIs/PI.csv" ALIAS_S3_FILEPATH = "PIs/alias.csv" @@ -66,20 +81,7 @@ def main(): ### Preliminary processing processed_data = process_merged_dataframe( - invoice_month, - merged_dataframe, - [ - validate_cluster_name_processor.ValidateClusterNameProcessor, - coldfront_fetch_processor.ColdfrontFetchProcessor, - validate_pi_alias_processor.ValidatePIAliasProcessor, - add_institution_processor.AddInstitutionProcessor, - lenovo_processor.LenovoProcessor, - validate_billable_pi_processor.ValidateBillablePIsProcessor, - pi_su_credit_processor.PISUCreditProcessor, - new_pi_credit_processor.NewPICreditProcessor, - bu_subsidy_processor.BUSubsidyProcessor, - prepayment_processor.PrepaymentProcessor, - ], + invoice_month, merged_dataframe, PROCESSING_ORDER ) ### Export invoices @@ -96,6 +98,7 @@ def main(): MOCA_prepaid_invoice.MOCAPrepaidInvoice, prepay_credits_snapshot.PrepayCreditsSnapshot, ocp_test_invoice.OcpTestInvoice, + iceberg_invoice.IcebergInvoice, ], invoice_settings.upload_to_s3, ) @@ -109,8 +112,19 @@ def merge_csv(files): file, engine="pyarrow", dtype={ - invoice.COST_FIELD: pandas.ArrowDtype(pyarrow.decimal128(21, 2)), - invoice.RATE_FIELD: str, + invoice.INVOICE_DATE_FIELD: invoice.STRING_FIELD_TYPE, + invoice.PROJECT_FIELD: invoice.STRING_FIELD_TYPE, + invoice.PROJECT_ID_FIELD: invoice.STRING_FIELD_TYPE, + invoice.PI_FIELD: invoice.STRING_FIELD_TYPE, + invoice.CLUSTER_NAME_FIELD: invoice.STRING_FIELD_TYPE, + invoice.INVOICE_EMAIL_FIELD: invoice.STRING_FIELD_TYPE, + invoice.INVOICE_ADDRESS_FIELD: invoice.STRING_FIELD_TYPE, + invoice.INSTITUTION_FIELD: invoice.STRING_FIELD_TYPE, + invoice.INSTITUTION_ID_FIELD: invoice.STRING_FIELD_TYPE, + invoice.SU_HOURS_FIELD: invoice.INTEGER_FIELD_TYPE, + invoice.SU_TYPE_FIELD: invoice.STRING_FIELD_TYPE, + invoice.RATE_FIELD: invoice.RATE_FIELD_TYPE, + invoice.COST_FIELD: invoice.BALANCE_FIELD_TYPE, }, quotechar="|", ) diff --git a/process_report/processors/add_institution_processor.py b/process_report/processors/add_institution_processor.py index 42e17d21..f630c81f 100644 --- a/process_report/processors/add_institution_processor.py +++ b/process_report/processors/add_institution_processor.py @@ -14,6 +14,12 @@ @dataclass class AddInstitutionProcessor(processor.Processor): + operates_on_columns = ( + invoice.INSTITUTION_COLUMN, + invoice.PI_COLUMN, + invoice.PROJECT_COLUMN, + ) + def _add_institution(self): """Determine every PI's institution name, logging any PI whose institution cannot be determined This is performed by `get_institution_from_pi()`, which tries to match the PI's username to diff --git a/process_report/processors/bu_subsidy_processor.py b/process_report/processors/bu_subsidy_processor.py index f2e6c787..126e031c 100644 --- a/process_report/processors/bu_subsidy_processor.py +++ b/process_report/processors/bu_subsidy_processor.py @@ -1,5 +1,4 @@ from dataclasses import dataclass, field -from decimal import Decimal from process_report.loader import loader from process_report.invoices import invoice @@ -10,6 +9,18 @@ class BUSubsidyProcessor(discount_processor.DiscountProcessor): IS_DISCOUNT_BY_NERC = False + initializes_columns = (invoice.PROJECT_NAME_COLUMN, invoice.SUBSIDY_COLUMN) + operates_on_columns = ( + *initializes_columns, + invoice.PROJECT_COLUMN, + invoice.PI_COLUMN, + invoice.IS_BILLABLE_COLUMN, + invoice.MISSING_PI_COLUMN, + invoice.INSTITUTION_COLUMN, + invoice.PI_BALANCE_COLUMN, + invoice.BALANCE_COLUMN, + ) + subsidy_amount: int = field(default_factory=loader.get_bu_subsidy_amount) def _prepare(self): @@ -21,7 +32,6 @@ def get_project(row): return project_alloc[: project_alloc.rfind("-")] self.data[invoice.PROJECT_NAME_FIELD] = self.data.apply(get_project, axis=1) - self.data[invoice.SUBSIDY_FIELD] = Decimal(0) def _process(self): self.data = self._apply_subsidy(self.data, self.subsidy_amount) diff --git a/process_report/processors/coldfront_fetch_processor.py b/process_report/processors/coldfront_fetch_processor.py index e581b8c9..293ff5ed 100644 --- a/process_report/processors/coldfront_fetch_processor.py +++ b/process_report/processors/coldfront_fetch_processor.py @@ -34,6 +34,16 @@ class ColdfrontFetchProcessor(processor.Processor): ) coldfront_data_filepath: str = invoice_settings.coldfront_api_filepath + initializes_columns = (invoice.IS_COURSE_COLUMN,) + operates_on_columns = ( + *initializes_columns, + invoice.PROJECT_COLUMN, + invoice.PROJECT_ID_COLUMN, + invoice.CLUSTER_NAME_COLUMN, + invoice.PI_COLUMN, + invoice.INSTITUTION_ID_COLUMN, + ) + @functools.cached_property def coldfront_client(self): keycloak_url = os.environ.get("KEYCLOAK_URL", "https://keycloak.mss.mghpcc.org") @@ -143,7 +153,6 @@ def _validate_allocation_data(self, allocation_data): ) def _apply_allocation_data(self, allocation_data): - self.data[invoice.IS_COURSE_FIELD] = False for project_cluster_tuple, data in allocation_data.items(): project_id, cluster_name = project_cluster_tuple mask = (self.data[invoice.PROJECT_ID_FIELD] == project_id) & ( diff --git a/process_report/processors/discount_processor.py b/process_report/processors/discount_processor.py index 732e1f79..68c95e70 100644 --- a/process_report/processors/discount_processor.py +++ b/process_report/processors/discount_processor.py @@ -51,7 +51,7 @@ def apply_discount_on_project(remaining_discount_amount, project_i, project): remaining_project_balance = project[pi_balance_field] applied_discount = min(remaining_project_balance, remaining_discount_amount) - if invoice.at[project_i, discount_field] is None: + if pandas.isna(invoice.at[project_i, discount_field]): invoice.at[project_i, discount_field] = applied_discount else: invoice.at[project_i, discount_field] += applied_discount diff --git a/process_report/processors/lenovo_processor.py b/process_report/processors/lenovo_processor.py index 5b15e575..2259a3ee 100644 --- a/process_report/processors/lenovo_processor.py +++ b/process_report/processors/lenovo_processor.py @@ -9,6 +9,13 @@ class LenovoProcessor(processor.Processor): su_charge_info: dict = field(default_factory=loader.get_lenovo_su_charge_info) + initializes_columns = (invoice.SU_CHARGE_COLUMN, invoice.LENOVO_CHARGE_COLUMN) + operates_on_columns = ( + *initializes_columns, + invoice.SU_TYPE_COLUMN, + invoice.SU_HOURS_COLUMN, + ) + def _apply_su_charge(self, data): for su_name, su_charge in self.su_charge_info.items(): if su_name in data: diff --git a/process_report/processors/new_pi_credit_processor.py b/process_report/processors/new_pi_credit_processor.py index f1de23f9..94c1cdad 100644 --- a/process_report/processors/new_pi_credit_processor.py +++ b/process_report/processors/new_pi_credit_processor.py @@ -34,6 +34,19 @@ class NewPICreditProcessor(discount_processor.DiscountProcessor): ] IS_DISCOUNT_BY_NERC = True + operates_on_columns = ( + invoice.CREDIT_COLUMN, + invoice.CREDIT_CODE_COLUMN, + invoice.BALANCE_COLUMN, + invoice.PI_BALANCE_COLUMN, + invoice.SU_TYPE_COLUMN, + invoice.IS_BILLABLE_COLUMN, + invoice.MISSING_PI_COLUMN, + invoice.INSTITUTION_COLUMN, + invoice.COST_COLUMN, + invoice.PI_COLUMN, + ) + old_pi_filepath: str = field( default_factory=lambda: loader.get_remote_filepath( invoice_settings.pi_remote_filepath @@ -176,10 +189,7 @@ def _apply_credits_new_pi( credit_eligible_projects[invoice.PI_FIELD] == pi ] - if pi_age > 1: - for i, row in pi_projects.iterrows(): - data.at[i, invoice.BALANCE_FIELD] = row[invoice.COST_FIELD] - else: + if pi_age <= 1: if pi_age == 0: old_pi_df = self._upsert_pi_entry( old_pi_df, @@ -226,10 +236,6 @@ def _apply_credits_new_pi( return (data, old_pi_df) def _prepare(self): - self.data[invoice.CREDIT_FIELD] = None - self.data[invoice.CREDIT_CODE_FIELD] = None - self.data[invoice.PI_BALANCE_FIELD] = self.data[invoice.COST_FIELD] - self.data[invoice.BALANCE_FIELD] = self.data[invoice.COST_FIELD] self.old_pi_df = self._load_old_pis(self.old_pi_filepath) def _process(self): diff --git a/process_report/processors/pi_su_credit_processor.py b/process_report/processors/pi_su_credit_processor.py index 7e7a7029..be16ede2 100644 --- a/process_report/processors/pi_su_credit_processor.py +++ b/process_report/processors/pi_su_credit_processor.py @@ -25,6 +25,19 @@ class PISUCreditProcessor(discount_processor.DiscountProcessor): IS_DISCOUNT_BY_NERC = True PI_SU_CREDIT_CODE = "0005" + initializes_columns = ( + invoice.CREDIT_COLUMN, + invoice.CREDIT_CODE_COLUMN, + invoice.PI_BALANCE_COLUMN, + invoice.BALANCE_COLUMN, + ) + operates_on_columns = ( + *initializes_columns, + invoice.SU_TYPE_COLUMN, + invoice.PI_COLUMN, + invoice.COST_COLUMN, + ) + pi_su_mapping: dict[str, list[str]] = field( default_factory=loader.get_pi_non_billed_su_types ) diff --git a/process_report/processors/prepayment_processor.py b/process_report/processors/prepayment_processor.py index ba2d3a37..98974aff 100644 --- a/process_report/processors/prepayment_processor.py +++ b/process_report/processors/prepayment_processor.py @@ -20,14 +20,27 @@ class PrepaymentProcessor(discount_processor.DiscountProcessor): IS_DISCOUNT_BY_NERC = True PREPAY_DEBITS_S3_FILEPATH = "Prepay/prepay_debits.csv" + initializes_columns = ( + invoice.GROUP_NAME_COLUMN, + invoice.GROUP_INSTITUTION_COLUMN, + invoice.GROUP_MANAGED_COLUMN, + invoice.GROUP_BALANCE_COLUMN, + invoice.GROUP_BALANCE_USED_COLUMN, + ) + operates_on_columns = ( + *initializes_columns, + invoice.INVOICE_EMAIL_COLUMN, + invoice.PROJECT_NAME_COLUMN, + invoice.PI_BALANCE_COLUMN, + invoice.BALANCE_COLUMN, + ) + @property def PREPAY_DEBITS_S3_BACKUP_FILEPATH(self): return f"Prepay/Archive/prepay_debits {util.get_iso8601_time()}.csv" prepay_credits: pandas.DataFrame = field( - default_factory=lambda: loader.load_dataframe( - invoice_settings.prepay_credits_filepath - ) + default_factory=lambda: loader.load_prepay_credits() ) prepay_projects: pandas.DataFrame = field( default_factory=lambda: loader.load_dataframe( @@ -49,19 +62,15 @@ def PREPAY_DEBITS_S3_BACKUP_FILEPATH(self): @staticmethod def _load_prepay_debits(prepay_debits_filepath): try: - prepay_debits = pandas.read_csv(prepay_debits_filepath) + prepay_debits = pandas.read_csv(prepay_debits_filepath).astype( + {invoice.PREPAY_DEBIT_FIELD: invoice.BALANCE_FIELD_TYPE} + ) except FileNotFoundError: sys.exit("Applying prepayments failed. prepay debits file does not exist") return prepay_debits def _prepare(self): - self.data[invoice.GROUP_NAME_FIELD] = None - self.data[invoice.GROUP_INSTITUTION_FIELD] = None - self.data[invoice.GROUP_MANAGED_FIELD] = None - self.data[invoice.GROUP_BALANCE_FIELD] = None - self.data[invoice.GROUP_BALANCE_USED_FIELD] = None - self.prepay_debits = self._load_prepay_debits(self.prepay_debits_filepath) self.group_info_dict = self._get_prepay_group_dict() if self.upload_to_s3: diff --git a/process_report/processors/validate_billable_pi_processor.py b/process_report/processors/validate_billable_pi_processor.py index 94f62722..3806fb16 100644 --- a/process_report/processors/validate_billable_pi_processor.py +++ b/process_report/processors/validate_billable_pi_processor.py @@ -95,6 +95,16 @@ class ValidateBillablePIsProcessor(processor.Processor): - The project belongs in `NONBILLABLE_CLUSTERS` """ + initializes_columns = [invoice.IS_BILLABLE_COLUMN, invoice.MISSING_PI_COLUMN] + operates_on_columns = [ + *initializes_columns, + invoice.PI_COLUMN, + invoice.PROJECT_COLUMN, + invoice.CLUSTER_NAME_COLUMN, + invoice.IS_COURSE_COLUMN, + invoice.INSTITUTION_COLUMN, + ] + nonbillable_pis: list[str] = field(default_factory=loader.get_nonbillable_pis) nonbillable_projects: pandas.DataFrame = field( default_factory=loader.get_nonbillable_projects diff --git a/process_report/processors/validate_cluster_name_processor.py b/process_report/processors/validate_cluster_name_processor.py index 07a2e5e1..386b6081 100644 --- a/process_report/processors/validate_cluster_name_processor.py +++ b/process_report/processors/validate_cluster_name_processor.py @@ -12,6 +12,8 @@ class ValidateClusterNameProcessor(processor.Processor): "NERC-OCP-EDU": "academic", } + operates_on_columns = (invoice.CLUSTER_NAME_COLUMN,) + def _process(self): self.data[invoice.CLUSTER_NAME_FIELD] = self.data[ invoice.CLUSTER_NAME_FIELD diff --git a/process_report/processors/validate_input_column_processor.py b/process_report/processors/validate_input_column_processor.py new file mode 100644 index 00000000..edbbb07b --- /dev/null +++ b/process_report/processors/validate_input_column_processor.py @@ -0,0 +1,37 @@ +from dataclasses import dataclass + +from process_report.invoices import invoice +from process_report.processors import processor + + +@dataclass +class ValidateInputColumnsProcessor(processor.Processor): + initializes_columns = ( + invoice.INVOICE_DATE_COLUMN, + invoice.PROJECT_COLUMN, + invoice.PROJECT_ID_COLUMN, + invoice.PI_COLUMN, + invoice.CLUSTER_NAME_COLUMN, + invoice.INVOICE_EMAIL_COLUMN, + invoice.INVOICE_ADDRESS_COLUMN, + invoice.INSTITUTION_COLUMN, + invoice.INSTITUTION_ID_COLUMN, + invoice.SU_HOURS_COLUMN, + invoice.SU_TYPE_COLUMN, + invoice.RATE_COLUMN, + invoice.COST_COLUMN, + ) + + def process(self): + missing_columns = [ + column.name + for column in self.initializes_columns + if column.name not in self.data.columns + ] + if missing_columns: + raise ValueError( + f"Input dataframe is missing required columns: {', '.join(missing_columns)}. Stopping invoicing" + ) + + # Casts columns to appropriate types + self._init_columns() diff --git a/process_report/processors/validate_pi_alias_processor.py b/process_report/processors/validate_pi_alias_processor.py index 63434312..0eec53d0 100644 --- a/process_report/processors/validate_pi_alias_processor.py +++ b/process_report/processors/validate_pi_alias_processor.py @@ -9,6 +9,8 @@ class ValidatePIAliasProcessor(processor.Processor): alias_map: dict[str, list[str]] = field(default_factory=loader.get_alias_map) + operates_on_columns = (invoice.PI_COLUMN,) + def _validate_pi_aliases(self): for pi, pi_aliases in self.alias_map.items(): self.data.loc[ diff --git a/process_report/settings.py b/process_report/settings.py index 7d057161..0e902085 100644 --- a/process_report/settings.py +++ b/process_report/settings.py @@ -11,6 +11,11 @@ class Settings(BaseSettings): keycloak_client_id: str | None = None keycloak_client_secret: str | None = None + # Iceberg config + iceberg_catalog_name: str | None = None + iceberg_config_path: str | None = None + iceberg_table_path: str | None = None + invoice_path_template: str = "Invoices/{invoice_month}/Service Invoices/" invoice_month: str = (datetime.datetime.today() - relativedelta(months=1)).strftime( "%Y-%m" diff --git a/process_report/tests/base.py b/process_report/tests/base.py index 10d89006..5e277367 100644 --- a/process_report/tests/base.py +++ b/process_report/tests/base.py @@ -3,10 +3,62 @@ from pathlib import Path from unittest import TestCase +import pandas +import pyarrow -class BaseTestCaseWithTempDir(TestCase): - def setUp(self): - self.tempdir = Path(tempfile.TemporaryDirectory(delete=False).name) - def tearDown(self): - shutil.rmtree(self.tempdir) +BALANCE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 2)) +RATE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 13)) +INTEGER_FIELD_TYPE = pandas.ArrowDtype(pyarrow.int64()) +STRING_FIELD_TYPE = pandas.StringDtype() +BOOL_FIELD_TYPE = pandas.BooleanDtype() + +FIELD_DTYPES = { + "Invoice Month": STRING_FIELD_TYPE, + "Project - Allocation": STRING_FIELD_TYPE, + "Project - Allocation ID": STRING_FIELD_TYPE, + "Manager (PI)": STRING_FIELD_TYPE, + "Invoice Email": STRING_FIELD_TYPE, + "Invoice Address": STRING_FIELD_TYPE, + "Institution": STRING_FIELD_TYPE, + "Institution - Specific Code": STRING_FIELD_TYPE, + "Prepaid Group Name": STRING_FIELD_TYPE, + "Prepaid Group Institution": STRING_FIELD_TYPE, + "Prepaid Group Balance": BALANCE_FIELD_TYPE, + "Prepaid Group Used": BALANCE_FIELD_TYPE, + "SU Hours (GBhr or SUhr)": INTEGER_FIELD_TYPE, + "SU Type": STRING_FIELD_TYPE, + "SU Charge": BALANCE_FIELD_TYPE, + "Charge": BALANCE_FIELD_TYPE, + "Rate": RATE_FIELD_TYPE, + "Cost": BALANCE_FIELD_TYPE, + "Credit": BALANCE_FIELD_TYPE, + "Credit Code": STRING_FIELD_TYPE, + "Subsidy": BALANCE_FIELD_TYPE, + "Balance": BALANCE_FIELD_TYPE, + "Is Billable": BOOL_FIELD_TYPE, + "Missing PI": BOOL_FIELD_TYPE, + "PI Balance": BALANCE_FIELD_TYPE, + "Project": STRING_FIELD_TYPE, + "MGHPCC Managed": BOOL_FIELD_TYPE, + "Cluster Name": STRING_FIELD_TYPE, + "Is Course": BOOL_FIELD_TYPE, +} + + +class BaseTestCase(TestCase): + def create_test_invoice(self, data_dict: dict): + present_cols = { + col: dtype for col, dtype in FIELD_DTYPES.items() if col in data_dict + } + return pandas.DataFrame(data_dict).astype(present_cols) + + +class BaseTestCaseWithTempDir(BaseTestCase): + @classmethod + def setUpClass(cls): + cls.tempdir = Path(tempfile.TemporaryDirectory(delete=False).name) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir) diff --git a/process_report/tests/e2e/test_data/test_invoices/test_NERC OpenShift 2025-04.csv b/process_report/tests/e2e/test_data/test_invoices/test_NERC OpenShift 2025-04.csv index 7bdf6bc9..511480f5 100644 --- a/process_report/tests/e2e/test_data/test_invoices/test_NERC OpenShift 2025-04.csv +++ b/process_report/tests/e2e/test_data/test_invoices/test_NERC OpenShift 2025-04.csv @@ -2,5 +2,5 @@ Invoice Month,Project - Allocation,Project - Allocation ID,Manager (PI),Cluster 2024-01,P1ID,P1ID,,shift,,,,,280,OpenShift CPU,0.013,100.0 2024-01,P1ID,P1ID,,shift,,,,,280,OpenStack GPUA100SXM4,0.013,100 2024-01,P2ID,P2ID,,shift,,,,,280,OpenShift CPU,0.013,200 -2024-01,P3ID,P3ID,,shift,,,,,280,OpenShift CPU,0.013,300 +2024-01,P3ID,P3ID,,shift,,,,,280,Free CPU,0.013,300 2024-01,P9ID,P9ID,,shift,,,,,280,OpenShift CPU,0.013,3000 diff --git a/process_report/tests/e2e/test_data/test_pi.yaml b/process_report/tests/e2e/test_data/test_pi.yaml index 5cfeaee1..7d944c59 100644 --- a/process_report/tests/e2e/test_data/test_pi.yaml +++ b/process_report/tests/e2e/test_data/test_pi.yaml @@ -1,4 +1,4 @@ - username: PI9 -- username: PI10 +- username: pi2@harvard.edu non_billed_su_types: - - name: SU1 + - name: Free CPU diff --git a/process_report/tests/e2e/test_data/test_prepay_debits.csv b/process_report/tests/e2e/test_data/test_prepay_debits.csv index 94fed3cd..1c082816 100644 --- a/process_report/tests/e2e/test_data/test_prepay_debits.csv +++ b/process_report/tests/e2e/test_data/test_prepay_debits.csv @@ -1,3 +1,3 @@ Month,Group Name,Debit -2023-12,TestGroup1,200.0 -2023-11,TestGroup2,100.0 +2023-12,TestGroup1,200.00 +2023-11,TestGroup2,100.00 diff --git a/process_report/tests/e2e/test_e2e_pipeline.py b/process_report/tests/e2e/test_e2e_pipeline.py index 36cc7277..bd69bf27 100644 --- a/process_report/tests/e2e/test_e2e_pipeline.py +++ b/process_report/tests/e2e/test_e2e_pipeline.py @@ -5,6 +5,9 @@ import logging import subprocess from typing import Dict, List +import yaml + +from pyiceberg import schema, catalog logger = logging.getLogger(__name__) @@ -131,6 +134,28 @@ def _prepare_pipeline_execution( env.setdefault("CHROME_BIN_PATH", "/usr/bin/chromium") env["PYTHONPATH"] = str(project_root) + ":" + env.get("PYTHONPATH", "") + # Iceberg settings, init test namespace and table + env["iceberg_catalog_name"] = "test_catalog" + env["iceberg_config_path"] = workspace / "test_iceberg_config.yaml" + env["iceberg_table_path"] = "test_namespace.test_table" + + catalog_config = { + "type": "sql", + "warehouse": f"file://{workspace}", + "uri": f"sqlite:///{workspace / 'test_iceberg_catalog.db'}", + } + + with open(workspace / "test_iceberg_config.yaml", "w") as f: + yaml.dump(catalog_config, f) + + test_catalog = catalog.load_catalog(name="test_catalog", **catalog_config) + test_schema = schema.Schema( + schema.NestedField(1, "Invoice Month", schema.StringType()), + ) + + test_catalog.create_namespace_if_not_exists("test_namespace") + test_catalog.create_table_if_not_exists("test_namespace.test_table", test_schema) + return command, env diff --git a/process_report/tests/integration/test_iceberg.py b/process_report/tests/integration/test_iceberg.py new file mode 100644 index 00000000..add0cb88 --- /dev/null +++ b/process_report/tests/integration/test_iceberg.py @@ -0,0 +1,148 @@ +from pyiceberg import schema, catalog + +from process_report.invoices.iceberg_invoice import IcebergInvoice +from process_report.tests.base import BaseTestCaseWithTempDir + + +class TestIceberg(BaseTestCaseWithTempDir): + @classmethod + def setUpClass(cls): + super().setUpClass() + # Create in-memory catalog + cls.catalog_name = "catalog_foo" + cls.table_path = "namespace_foo.table_foo" + + config_dict = { + "type": "sql", + "warehouse": str(cls.tempdir), + "uri": f"sqlite:///{str(cls.tempdir)}/foo.db", + } + cls.catalog_config = config_dict + + # Initialize test schema that's used in setUp() + cls.catalog = catalog.load_catalog(name=cls.catalog_name, **config_dict) + cls.test_schema = schema.Schema( + schema.NestedField(1, "Invoice Month", schema.StringType()), + schema.NestedField(2, "Cost", schema.DecimalType(21, 2)), + schema.NestedField(3, "PI", schema.StringType()), + ) + + def setUp(self): + self.catalog.create_namespace_if_not_exists("namespace_foo") + self.catalog.create_table_if_not_exists(self.table_path, self.test_schema) + + def tearDown(self): + self.catalog.drop_table(self.table_path) + + def test_upload_one_dataframe(self): + # Create test dataframe matching table schema + test_df = self.create_test_invoice( + { + "Invoice Month": ["2024-01", "2024-01"], + "Cost": [100.0, 200.0], + "PI": ["PI1", "PI2"], + } + ) + + # Create IcebergInvoice instance + inv = IcebergInvoice( + invoice_month="2024-01", + data=test_df, + iceberg_catalog_name=self.catalog_name, + iceberg_catalog_config=self.catalog_config, + iceberg_table_path=self.table_path, + ) + # Ensure only test columns are filtered + inv.export_columns_list = ["Invoice Month", "Cost", "PI"] + inv.process() + inv.export() + + # Verify data was uploaded, and Iceberg cost column can be casted to Decimal + table = self.catalog.load_table(self.table_path) + uploaded_df = table.scan().to_pandas().astype(test_df.dtypes) + assert uploaded_df.equals(test_df) + + def test_upload_new_column(self): + # Create test dataframe with an extra column + test_df = self.create_test_invoice( + { + "Invoice Month": ["2024-02", "2024-02"], + "Cost": [150.0, 250.0], + "PI": ["PI3", "PI4"], + "extra_column": ["extra1", "extra2"], # New column + } + ) + + # Create IcebergInvoice instance + inv = IcebergInvoice( + invoice_month="2024-02", + data=test_df, + iceberg_catalog_name=self.catalog_name, + iceberg_catalog_config=self.catalog_config, + iceberg_table_path=self.table_path, + ) + inv.export_columns_list = ["Invoice Month", "Cost", "PI", "extra_column"] + inv.process() + inv.export() + + # Verify data was uploaded with new column (schema evolution) + table = self.catalog.load_table(self.table_path) + uploaded_df = table.scan().to_pandas().astype(test_df.dtypes) + assert uploaded_df.equals(test_df) + + def test_schema_evolution_with_existing_data(self): + # First, upload initial data without extra column + first_df = self.create_test_invoice( + { + "Invoice Month": ["2024-01", "2024-01"], + "Cost": [100.0, 200.0], + "PI": ["PI1", "PI2"], + } + ) + + inv = IcebergInvoice( + invoice_month="2024-01", + data=first_df, + iceberg_catalog_name=self.catalog_name, + iceberg_catalog_config=self.catalog_config, + iceberg_table_path=self.table_path, + ) + inv.export_columns_list = ["Invoice Month", "Cost", "PI"] + inv.process() + inv.export() + + # Now upload data with an extra column + second_df = self.create_test_invoice( + { + "Invoice Month": ["2024-02", "2024-02"], + "Cost": [150.0, 250.0], + "PI": ["PI3", "PI4"], + "extra_column": ["new1", "new2"], # New column + } + ) + + inv2 = IcebergInvoice( + invoice_month="2024-02", + data=second_df, + iceberg_catalog_name=self.catalog_name, + iceberg_catalog_config=self.catalog_config, + iceberg_table_path=self.table_path, + ) + inv2.export_columns_list = ["Invoice Month", "Cost", "PI", "extra_column"] + inv2.process() + inv2.export() + + table = self.catalog.load_table(self.table_path) + result_df = table.scan().to_pandas().astype(second_df.dtypes) + + # Verify the table has schema evolved with the new column + # Old rows should have None for the new column + expected_df = self.create_test_invoice( + { + "Invoice Month": ["2024-02", "2024-02", "2024-01", "2024-01"], + "Cost": [150.0, 250.0, 100.0, 200.0], + "PI": ["PI3", "PI4", "PI1", "PI2"], + "extra_column": ["new1", "new2", None, None], + } + ) + assert result_df.equals(expected_df) diff --git a/process_report/tests/test-requirements.txt b/process_report/tests/test-requirements.txt index 49ec960c..d70866da 100644 --- a/process_report/tests/test-requirements.txt +++ b/process_report/tests/test-requirements.txt @@ -1,2 +1,4 @@ pytest +pytest-env coverage +pyiceberg[sql-sqlite] diff --git a/process_report/tests/unit/processors/test_bu_subsidy_processor.py b/process_report/tests/unit/processors/test_bu_subsidy_processor.py index 6761e209..757dc52a 100644 --- a/process_report/tests/unit/processors/test_bu_subsidy_processor.py +++ b/process_report/tests/unit/processors/test_bu_subsidy_processor.py @@ -1,10 +1,8 @@ -from unittest import TestCase -import pandas - from process_report.tests import util as test_utils +from process_report.tests.base import BaseTestCase -class TestBUSubsidyProcessor(TestCase): +class TestBUSubsidyProcessor(BaseTestCase): def _assert_result_invoice( self, subsidy_amount, @@ -48,7 +46,7 @@ def _get_test_invoice( if not missing_pi: missing_pi = [False for _ in range(len(pi))] - return pandas.DataFrame( + return self.create_test_invoice( { "Manager (PI)": pi, "Project - Allocation": project_names, @@ -57,6 +55,7 @@ def _get_test_invoice( "Institution": institution, "Is Billable": is_billable, "Missing PI": missing_pi, + "Subsidy": [0 for _ in range(len(pi))], } ) diff --git a/process_report/tests/unit/processors/test_coldfront_fetch_processor.py b/process_report/tests/unit/processors/test_coldfront_fetch_processor.py index 6f401662..13a61096 100644 --- a/process_report/tests/unit/processors/test_coldfront_fetch_processor.py +++ b/process_report/tests/unit/processors/test_coldfront_fetch_processor.py @@ -1,11 +1,12 @@ -from unittest import TestCase, mock +from unittest import mock import pandas import pytest from process_report.tests import util as test_utils +from process_report.tests.base import BaseTestCase -class TestColdfrontFetchProcessor(TestCase): +class TestColdfrontFetchProcessor(BaseTestCase): def _get_test_invoice( self, allocation_project_id, @@ -28,9 +29,9 @@ def _get_test_invoice( cluster_name = [""] * len(allocation_project_id) if not is_course: - is_course = [None] * len(allocation_project_id) + is_course = [False] * len(allocation_project_id) - return pandas.DataFrame( + return self.create_test_invoice( { "Manager (PI)": pi, "Project - Allocation": allocation_project_name, diff --git a/process_report/tests/unit/processors/test_discount_processor.py b/process_report/tests/unit/processors/test_discount_processor.py index 921fa5a9..36604604 100644 --- a/process_report/tests/unit/processors/test_discount_processor.py +++ b/process_report/tests/unit/processors/test_discount_processor.py @@ -1,12 +1,9 @@ -from unittest import TestCase - -import pandas - from process_report.invoices import invoice from process_report.processors.pi_su_credit_processor import PISUCreditProcessor +from process_report.tests.base import BaseTestCase -class TestDiscountProcessor(TestCase): +class TestDiscountProcessor(BaseTestCase): def _get_test_invoice( self, pi, @@ -26,7 +23,7 @@ def _get_test_invoice( if balance is None: balance = costs - return pandas.DataFrame( + return self.create_test_invoice( { invoice.PI_FIELD: pi, invoice.SU_TYPE_FIELD: su_type, diff --git a/process_report/tests/unit/processors/test_new_pi_credit_processor.py b/process_report/tests/unit/processors/test_new_pi_credit_processor.py index 969862e1..6df329ae 100644 --- a/process_report/tests/unit/processors/test_new_pi_credit_processor.py +++ b/process_report/tests/unit/processors/test_new_pi_credit_processor.py @@ -109,14 +109,19 @@ def _get_test_invoice( if not institution: institution = ["Foo University" for _ in range(len(pi))] - return pandas.DataFrame( + costs = [Decimal(cost) for cost in costs] + return self.create_test_invoice( { "Manager (PI)": pi, "Cost": [Decimal(cost) for cost in costs], + "Credit": None, + "Credit Code": None, "SU Type": su_type, "Is Billable": is_billable, "Missing PI": missing_pi, "Institution": institution, + "PI Balance": costs, + "Balance": costs, } ) @@ -139,19 +144,13 @@ def test_no_new_pi(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [None for _ in range(3)], - "Credit Code": [None for _ in range(3)], - "PI Balance": [100 for _ in range(3)], - "Balance": [100 for _ in range(3)], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [None for _ in range(3)], + "Credit Code": [None for _ in range(3)], + "PI Balance": [100 for _ in range(3)], + "Balance": [100 for _ in range(3)], + } ) answer_old_pi_df = test_old_pi_df.copy() @@ -182,19 +181,13 @@ def test_one_new_pi(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [100], - "Credit Code": ["0002"], - "PI Balance": [0], - "Balance": [0], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [100], + "Credit Code": ["0002"], + "PI Balance": [0], + "Balance": [0], + } ) answer_old_pi_df = pandas.DataFrame( @@ -218,19 +211,13 @@ def test_one_new_pi(self): # Two allocations, costs partially covered test_invoice = self._get_test_invoice(["PI", "PI"], [500, 1000]) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [500, 500], - "Credit Code": ["0002", "0002"], - "PI Balance": [0, 500], - "Balance": [0, 500], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [500, 500], + "Credit Code": ["0002", "0002"], + "PI Balance": [0, 500], + "Balance": [0, 500], + } ) answer_old_pi_df = pandas.DataFrame( @@ -254,19 +241,13 @@ def test_one_new_pi(self): # Two allocations, costs completely covered test_invoice = self._get_test_invoice(["PI", "PI"], [500, 400]) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [500, 400], - "Credit Code": ["0002", "0002"], - "PI Balance": [0, 0], - "Balance": [0, 0], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [500, 400], + "Credit Code": ["0002", "0002"], + "PI Balance": [0, 0], + "Balance": [0, 0], + } ) answer_old_pi_df = pandas.DataFrame( @@ -305,19 +286,13 @@ def test_one_month_pi(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [200], - "Credit Code": ["0002"], - "PI Balance": [0], - "Balance": [0], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [200], + "Credit Code": ["0002"], + "PI Balance": [0], + "Balance": [0], + } ) answer_old_pi_df = pandas.DataFrame( @@ -341,19 +316,13 @@ def test_one_month_pi(self): # Remaining credits partially covers costs test_invoice = self._get_test_invoice(["PI"], [600]) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [500], - "Credit Code": ["0002"], - "PI Balance": [100], - "Balance": [100], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [500], + "Credit Code": ["0002"], + "PI Balance": [100], + "Balance": [100], + } ) answer_old_pi_df = pandas.DataFrame( @@ -392,19 +361,13 @@ def test_two_new_pi(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [500, None, 500], - "Credit Code": ["0002", None, "0002"], - "PI Balance": [300, 500, 0], - "Balance": [300, 500, 0], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [500, None, 500], + "Credit Code": ["0002", None, "0002"], + "PI Balance": [300, 500, 0], + "Balance": [300, 500, 0], + } ) answer_old_pi_df = pandas.DataFrame( @@ -443,19 +406,13 @@ def test_old_pi_file_overwritten(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [200, None], - "Credit Code": ["0002", None], - "PI Balance": [300, 500], - "Balance": [300, 500], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [200, None], + "Credit Code": ["0002", None], + "PI Balance": [300, 500], + "Balance": [300, 500], + } ) answer_old_pi_df = pandas.DataFrame( @@ -505,19 +462,13 @@ def test_excluded_su_types(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [600, None, 400, None], - "Credit Code": ["0002", None, "0002", None], - "PI Balance": [0, 600, 200, 600], - "Balance": [0, 600, 200, 600], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [600, None, 400, None], + "Credit Code": ["0002", None, "0002", None], + "PI Balance": [0, 600, 200, 600], + "Balance": [0, 600, 200, 600], + } ) # PI2 was not eligible for credit, so should only get 0 initial credits @@ -558,19 +509,13 @@ def test_ineligible_pi_existing_old_pi_entry(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [None], - "Credit Code": [None], - "PI Balance": [500], - "Balance": [500], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [None], + "Credit Code": [None], + "PI Balance": [500], + "Balance": [500], + } ) answer_old_pi_df = pandas.DataFrame( @@ -612,19 +557,13 @@ def test_newly_eligible_pi_existing_old_pi_entry(self): ) test_old_pi_df.to_csv(test_old_pi_file, index=False) - answer_invoice = pandas.concat( - [ - test_invoice, - pandas.DataFrame( - { - "Credit": [None], - "Credit Code": [None], - "PI Balance": [800], - "Balance": [800], - } - ), - ], - axis=1, + answer_invoice = test_invoice.assign( + **{ + "Credit": [None], + "Credit Code": [None], + "PI Balance": [800], + "Balance": [800], + } ) answer_old_pi_df = pandas.DataFrame( diff --git a/process_report/tests/unit/processors/test_prepayment_processor.py b/process_report/tests/unit/processors/test_prepayment_processor.py index cae01f57..bf5c496b 100644 --- a/process_report/tests/unit/processors/test_prepayment_processor.py +++ b/process_report/tests/unit/processors/test_prepayment_processor.py @@ -43,7 +43,7 @@ def _get_test_invoice(self, project_names, pi_balances, balances=None): if not balances: balances = pi_balances - return pandas.DataFrame( + return self.create_test_invoice( { "Project": project_names, "PI Balance": pi_balances, diff --git a/process_report/tests/unit/processors/test_processor_list.py b/process_report/tests/unit/processors/test_processor_list.py new file mode 100644 index 00000000..46b94402 --- /dev/null +++ b/process_report/tests/unit/processors/test_processor_list.py @@ -0,0 +1,26 @@ +from process_report.process_report import PROCESSING_ORDER +from process_report.tests.base import BaseTestCase + + +class TestProcessorList(BaseTestCase): + def test_processing_order_column_dependencies(self): + initialized_columns = set() + + for processor_class in PROCESSING_ORDER: + operates_on = getattr(processor_class, "operates_on_columns", []) + initializes = getattr(processor_class, "initializes_columns", []) + + # Check that no columns are initalized more than once + for column in initializes: + assert column.name not in initialized_columns, ( + f"Column '{column.name}' initialized by {processor_class.__name__} but already initialized by a previous processor" + ) + + for column in initializes: + initialized_columns.add(column.name) + + # Check that all operated on columns have been initialized by a previous processor + for column in operates_on: + assert column.name in initialized_columns, ( + f"Column '{column.name}' operated on by {processor_class.__name__} but not initialized by itself or any previous processor" + ) diff --git a/process_report/tests/unit/processors/test_validate_billable_pi_processor.py b/process_report/tests/unit/processors/test_validate_billable_pi_processor.py index 7c1cb083..79098ff9 100644 --- a/process_report/tests/unit/processors/test_validate_billable_pi_processor.py +++ b/process_report/tests/unit/processors/test_validate_billable_pi_processor.py @@ -66,7 +66,7 @@ def test_remove_nonbillables(self): ) validate_billable_pi_proc.process() output = validate_billable_pi_proc.data - assert output[output["Is Billable"]].equals(data.iloc[[3, 4, 5, 9]]) + assert output["Is Billable"].iloc[[3, 4, 5, 9]].all() def test_billable_override_marks_project_billable(self): test_data = pandas.DataFrame( diff --git a/process_report/tests/unit/processors/test_validate_input_columns_processor.py b/process_report/tests/unit/processors/test_validate_input_columns_processor.py new file mode 100644 index 00000000..7ac2256c --- /dev/null +++ b/process_report/tests/unit/processors/test_validate_input_columns_processor.py @@ -0,0 +1,53 @@ +import pandas + +from process_report.processors.validate_input_column_processor import ( + ValidateInputColumnsProcessor, +) +from process_report.tests.base import BaseTestCase + + +class TestValidateInputColumnsProcessor(BaseTestCase): + def test_process_succeeds_when_required_columns_exist_and_keeps_extra_columns(self): + invoice_month = "2025-01" + test_data_dict = { + "Invoice Month": [invoice_month], + "Project - Allocation": ["P1"], + "Project - Allocation ID": ["P1-ID"], + "Manager (PI)": ["pi1"], + "Cluster Name": ["cluster1"], + "Invoice Email": ["pi1@example.com"], + "Invoice Address": ["123 Main St"], + "Institution": ["Example University"], + "Institution - Specific Code": ["EX-001"], + "SU Hours (GBhr or SUhr)": [10], + "SU Type": ["Compute"], + "Rate": ["1.00"], + "Cost": [100.0], + "Extra Column": ["extra"], + } + + processor = ValidateInputColumnsProcessor( + invoice_month=invoice_month, data=pandas.DataFrame(test_data_dict) + ) + processor.process() + + output_data = processor.data + expected_data = self.create_test_invoice(test_data_dict) + assert output_data.equals(expected_data) + + def test_process_raises_error_when_required_columns_are_missing(self): + invoice_month = "2025-01" + test_data = pandas.DataFrame( + {"Invoice Month": [invoice_month], "Cost": [100.0]} + ) + + processor = ValidateInputColumnsProcessor( + invoice_month=invoice_month, data=test_data + ) + + expected_message = """Input dataframe is missing required columns: Project - Allocation, Project - Allocation ID, Manager (PI), + Cluster Name, Invoice Email, Invoice Address, Institution, Institution - Specific Code, SU Hours (GBhr or SUhr), + SU Type, Rate. Stopping invoicing""" + + with self.assertRaises(ValueError, msg=expected_message): + processor.process() diff --git a/process_report/tests/unit/test_util.py b/process_report/tests/unit/test_util.py index d7225b20..ecf30437 100644 --- a/process_report/tests/unit/test_util.py +++ b/process_report/tests/unit/test_util.py @@ -28,9 +28,9 @@ class TestMergeCSV(TestCase): def setUp(self): self.header = ["Cost", "Name", "Rate"] self.data = [ - [1, "Alice, Allison", 25], - [2, "Bob", 30], - [3, "Charlie", 28], + [1, "Alice, Allison", 25.0], + [2, "Bob", 30.0], + [3, "Charlie", 28.0], ] self.csv_files = [] diff --git a/requirements.txt b/requirements.txt index 805b39f9..1276b641 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ Jinja2 validators python-dateutil pydantic-settings +pyiceberg[pyarrow]