Skip to content

Commit 8698a77

Browse files
committed
Track initialized and used columns in each processor
During 2026-03 invoicing, a bug was found where the columns initialized by the New-PI credit processor (i.e `PI Balance` column), was being accessed by the PI-SU processor before it was initialized, causing an KeyError. To fix this, the codebase has been refactored to allow each processor to explicitly document which columns they initialize and use, defined in two new properties, `initializes_columns` and `operates_on_columns`. A helper function `_init_columns()` is added to initalize columns Unit test `tests/unit/processors/test_processor_list.py` is added to check each processor only uses columns that itself or previous processors initialized, and no column is initialized more than once Additionally, each column will now be encapsulated as a `InvoiceColumn` instance. `InvoiceColumn` contains the name, datatype, and default values for each column This will also enable stricter and clearer type enforcement for data entering and leaving the pipeline A new processor `ValidateInputColumnsProcessor` is added to check the input dataframe to the processing pipeline has prerequisite columns, and to cast to appropriate types The e2e test data has been updated to surface the bug that was found. It did not failed during the PR that introduced the bug [1] because the test data didn't have the right conditions to trigger the PI-SU processor Refactored unit tests to accomodate the new processor by adding a new base test class. [1] CCI-MOC#279
1 parent 72b6e76 commit 8698a77

28 files changed

Lines changed: 504 additions & 209 deletions

process_report/invoices/invoice.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,32 @@
11
from dataclasses import dataclass
2+
from typing import Any, Callable
3+
from decimal import Decimal
24
import pandas
5+
import pyarrow
6+
import logging
37

48
import process_report.util as util
59

610

11+
logger = logging.getLogger(__name__)
12+
13+
14+
@dataclass
15+
class InvoiceColumn:
16+
name: str
17+
dtype: Any
18+
default_value: Any | None = None
19+
default_initializer: Callable[[pandas.DataFrame], pandas.Series] | None = None
20+
21+
22+
# Field type definitions
23+
BALANCE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 2))
24+
RATE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 13))
25+
INTEGER_FIELD_TYPE = pandas.ArrowDtype(pyarrow.int64())
26+
STRING_FIELD_TYPE = pandas.StringDtype()
27+
BOOL_FIELD_TYPE = pandas.BooleanDtype()
28+
29+
730
### PI file field names
831
PI_PI_FIELD = "PI"
932
PI_FIRST_MONTH = "First Invoice Month"
@@ -65,18 +88,77 @@
6588
IS_COURSE_FIELD = "Is Course"
6689
###
6790

91+
### Initialized Column objects
92+
INVOICE_DATE_COLUMN = InvoiceColumn(name=INVOICE_DATE_FIELD, dtype=STRING_FIELD_TYPE)
93+
PROJECT_COLUMN = InvoiceColumn(name=PROJECT_FIELD, dtype=STRING_FIELD_TYPE)
94+
PROJECT_ID_COLUMN = InvoiceColumn(name=PROJECT_ID_FIELD, dtype=STRING_FIELD_TYPE)
95+
PI_COLUMN = InvoiceColumn(name=PI_FIELD, dtype=STRING_FIELD_TYPE)
96+
INVOICE_EMAIL_COLUMN = InvoiceColumn(name=INVOICE_EMAIL_FIELD, dtype=STRING_FIELD_TYPE)
97+
INVOICE_ADDRESS_COLUMN = InvoiceColumn(
98+
name=INVOICE_ADDRESS_FIELD, dtype=STRING_FIELD_TYPE
99+
)
100+
INSTITUTION_COLUMN = InvoiceColumn(name=INSTITUTION_FIELD, dtype=STRING_FIELD_TYPE)
101+
INSTITUTION_ID_COLUMN = InvoiceColumn(
102+
name=INSTITUTION_ID_FIELD, dtype=STRING_FIELD_TYPE
103+
)
104+
GROUP_NAME_COLUMN = InvoiceColumn(name=GROUP_NAME_FIELD, dtype=STRING_FIELD_TYPE)
105+
GROUP_INSTITUTION_COLUMN = InvoiceColumn(
106+
name=GROUP_INSTITUTION_FIELD, dtype=STRING_FIELD_TYPE
107+
)
108+
GROUP_BALANCE_COLUMN = InvoiceColumn(name=GROUP_BALANCE_FIELD, dtype=BALANCE_FIELD_TYPE)
109+
GROUP_BALANCE_USED_COLUMN = InvoiceColumn(
110+
name=GROUP_BALANCE_USED_FIELD, dtype=BALANCE_FIELD_TYPE
111+
)
112+
SU_HOURS_COLUMN = InvoiceColumn(name=SU_HOURS_FIELD, dtype=INTEGER_FIELD_TYPE)
113+
SU_TYPE_COLUMN = InvoiceColumn(name=SU_TYPE_FIELD, dtype=STRING_FIELD_TYPE)
114+
SU_CHARGE_COLUMN = InvoiceColumn(name=SU_CHARGE_FIELD, dtype=BALANCE_FIELD_TYPE)
115+
LENOVO_CHARGE_COLUMN = InvoiceColumn(name=LENOVO_CHARGE_FIELD, dtype=BALANCE_FIELD_TYPE)
116+
RATE_COLUMN = InvoiceColumn(
117+
name=RATE_FIELD, dtype=RATE_FIELD_TYPE
118+
) # Using decimal to suppress scientific notation in export
119+
COST_COLUMN = InvoiceColumn(name=COST_FIELD, dtype=BALANCE_FIELD_TYPE)
120+
CREDIT_COLUMN = InvoiceColumn(name=CREDIT_FIELD, dtype=BALANCE_FIELD_TYPE)
121+
CREDIT_CODE_COLUMN = InvoiceColumn(name=CREDIT_CODE_FIELD, dtype=STRING_FIELD_TYPE)
122+
SUBSIDY_COLUMN = InvoiceColumn(
123+
name=SUBSIDY_FIELD, dtype=BALANCE_FIELD_TYPE, default_value=Decimal(0)
124+
)
125+
BALANCE_COLUMN = InvoiceColumn(
126+
name=BALANCE_FIELD,
127+
dtype=BALANCE_FIELD_TYPE,
128+
default_initializer=lambda df: df[COST_FIELD],
129+
)
130+
PI_BALANCE_COLUMN = InvoiceColumn(
131+
name=PI_BALANCE_FIELD,
132+
dtype=BALANCE_FIELD_TYPE,
133+
default_initializer=lambda df: df[COST_FIELD],
134+
)
135+
136+
# Internally used fields
137+
IS_BILLABLE_COLUMN = InvoiceColumn(name=IS_BILLABLE_FIELD, dtype=BOOL_FIELD_TYPE)
138+
MISSING_PI_COLUMN = InvoiceColumn(name=MISSING_PI_FIELD, dtype=BOOL_FIELD_TYPE)
139+
PROJECT_NAME_COLUMN = InvoiceColumn(name=PROJECT_NAME_FIELD, dtype=STRING_FIELD_TYPE)
140+
GROUP_MANAGED_COLUMN = InvoiceColumn(name=GROUP_MANAGED_FIELD, dtype=BOOL_FIELD_TYPE)
141+
CLUSTER_NAME_COLUMN = InvoiceColumn(name=CLUSTER_NAME_FIELD, dtype=STRING_FIELD_TYPE)
142+
IS_COURSE_COLUMN = InvoiceColumn(
143+
name=IS_COURSE_FIELD, dtype=BOOL_FIELD_TYPE, default_value=False
144+
)
145+
###
146+
68147

69148
@dataclass
70149
class Invoice:
71150
export_columns_list = list()
72151
exported_columns_map = dict()
152+
initializes_columns = tuple()
153+
operates_on_columns = tuple()
73154

74155
invoice_month: str
75156
data: pandas.DataFrame
76157
name: str = ""
77158
export_data = None
78159

79160
def process(self):
161+
self._init_columns()
80162
self._prepare()
81163
self._process()
82164
self._prepare_export()
@@ -93,6 +175,24 @@ def output_s3_key(self) -> str:
93175
def output_s3_archive_key(self):
94176
return f"Invoices/{self.invoice_month}/Archive/{self.name} {self.invoice_month} {util.get_iso8601_time()}.csv"
95177

178+
def _init_columns(self):
179+
"""Initializes columns specified in `initializes_columns` and cast them to appropriate types
180+
181+
If column already exists, only do casting
182+
If no default value is given, column initialized to None
183+
"""
184+
for field in self.initializes_columns:
185+
if field.name not in self.data.columns:
186+
field_default = field.default_value
187+
if field.default_initializer:
188+
field_default = field.default_initializer(self.data)
189+
self.data[field.name] = field_default
190+
elif self.data.dtypes[field.name] != field.dtype:
191+
logger.warning(
192+
f"Column {field.name} has dtype {self.data.dtypes[field.name]} instead of expected {field.dtype}."
193+
)
194+
self.data = self.data.astype({field.name: field.dtype})
195+
96196
def _prepare(self):
97197
"""Prepares the data for processing.
98198

process_report/loader.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ def get_alias_map(self) -> dict:
112112
def load_dataframe(self, filepath: str) -> pandas.DataFrame:
113113
return pandas.read_csv(filepath)
114114

115+
@functools.lru_cache
116+
def load_prepay_credits(self) -> pandas.DataFrame:
117+
prepay_df = self.load_dataframe(invoice_settings.prepay_credits_filepath)
118+
return prepay_df.astype(
119+
{invoice.PREPAY_CREDIT_FIELD: invoice.BALANCE_FIELD_TYPE}
120+
)
121+
115122
@functools.lru_cache
116123
def _load_pi_config(self, filepath: str) -> list[dict]:
117124
with open(filepath) as file:

process_report/process_report.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import os
44

55
import pandas
6-
import pyarrow
76

87
from process_report.settings import invoice_settings
98
from process_report.loader import loader
@@ -22,6 +21,7 @@
2221
)
2322
from process_report.processors import (
2423
coldfront_fetch_processor,
24+
validate_input_column_processor,
2525
validate_pi_alias_processor,
2626
add_institution_processor,
2727
lenovo_processor,
@@ -33,6 +33,20 @@
3333
validate_cluster_name_processor,
3434
)
3535

36+
PROCESSING_ORDER = [
37+
validate_input_column_processor.ValidateInputColumnsProcessor,
38+
validate_cluster_name_processor.ValidateClusterNameProcessor,
39+
coldfront_fetch_processor.ColdfrontFetchProcessor,
40+
validate_pi_alias_processor.ValidatePIAliasProcessor,
41+
add_institution_processor.AddInstitutionProcessor,
42+
lenovo_processor.LenovoProcessor,
43+
validate_billable_pi_processor.ValidateBillablePIsProcessor,
44+
pi_su_credit_processor.PISUCreditProcessor,
45+
new_pi_credit_processor.NewPICreditProcessor,
46+
bu_subsidy_processor.BUSubsidyProcessor,
47+
prepayment_processor.PrepaymentProcessor,
48+
]
49+
3650

3751
PI_S3_FILEPATH = "PIs/PI.csv"
3852
ALIAS_S3_FILEPATH = "PIs/alias.csv"
@@ -66,20 +80,7 @@ def main():
6680

6781
### Preliminary processing
6882
processed_data = process_merged_dataframe(
69-
invoice_month,
70-
merged_dataframe,
71-
[
72-
validate_cluster_name_processor.ValidateClusterNameProcessor,
73-
coldfront_fetch_processor.ColdfrontFetchProcessor,
74-
validate_pi_alias_processor.ValidatePIAliasProcessor,
75-
add_institution_processor.AddInstitutionProcessor,
76-
lenovo_processor.LenovoProcessor,
77-
validate_billable_pi_processor.ValidateBillablePIsProcessor,
78-
pi_su_credit_processor.PISUCreditProcessor,
79-
new_pi_credit_processor.NewPICreditProcessor,
80-
bu_subsidy_processor.BUSubsidyProcessor,
81-
prepayment_processor.PrepaymentProcessor,
82-
],
83+
invoice_month, merged_dataframe, PROCESSING_ORDER
8384
)
8485

8586
### Export invoices
@@ -109,8 +110,19 @@ def merge_csv(files):
109110
file,
110111
engine="pyarrow",
111112
dtype={
112-
invoice.COST_FIELD: pandas.ArrowDtype(pyarrow.decimal128(21, 2)),
113-
invoice.RATE_FIELD: str,
113+
invoice.INVOICE_DATE_FIELD: invoice.STRING_FIELD_TYPE,
114+
invoice.PROJECT_FIELD: invoice.STRING_FIELD_TYPE,
115+
invoice.PROJECT_ID_FIELD: invoice.STRING_FIELD_TYPE,
116+
invoice.PI_FIELD: invoice.STRING_FIELD_TYPE,
117+
invoice.CLUSTER_NAME_FIELD: invoice.STRING_FIELD_TYPE,
118+
invoice.INVOICE_EMAIL_FIELD: invoice.STRING_FIELD_TYPE,
119+
invoice.INVOICE_ADDRESS_FIELD: invoice.STRING_FIELD_TYPE,
120+
invoice.INSTITUTION_FIELD: invoice.STRING_FIELD_TYPE,
121+
invoice.INSTITUTION_ID_FIELD: invoice.STRING_FIELD_TYPE,
122+
invoice.SU_HOURS_FIELD: invoice.INTEGER_FIELD_TYPE,
123+
invoice.SU_TYPE_FIELD: invoice.STRING_FIELD_TYPE,
124+
invoice.RATE_FIELD: invoice.RATE_FIELD_TYPE,
125+
invoice.COST_FIELD: invoice.BALANCE_FIELD_TYPE,
114126
},
115127
quotechar="|",
116128
)

process_report/processors/add_institution_processor.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414

1515
@dataclass
1616
class AddInstitutionProcessor(processor.Processor):
17+
operates_on_columns = (
18+
invoice.INSTITUTION_COLUMN,
19+
invoice.PI_COLUMN,
20+
invoice.PROJECT_COLUMN,
21+
)
22+
1723
def _add_institution(self):
1824
"""Determine every PI's institution name, logging any PI whose institution cannot be determined
1925
This is performed by `get_institution_from_pi()`, which tries to match the PI's username to

process_report/processors/bu_subsidy_processor.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from dataclasses import dataclass, field
2-
from decimal import Decimal
32

43
from process_report.loader import loader
54
from process_report.invoices import invoice
@@ -10,6 +9,18 @@
109
class BUSubsidyProcessor(discount_processor.DiscountProcessor):
1110
IS_DISCOUNT_BY_NERC = False
1211

12+
initializes_columns = (invoice.PROJECT_NAME_COLUMN, invoice.SUBSIDY_COLUMN)
13+
operates_on_columns = (
14+
*initializes_columns,
15+
invoice.PROJECT_COLUMN,
16+
invoice.PI_COLUMN,
17+
invoice.IS_BILLABLE_COLUMN,
18+
invoice.MISSING_PI_COLUMN,
19+
invoice.INSTITUTION_COLUMN,
20+
invoice.PI_BALANCE_COLUMN,
21+
invoice.BALANCE_COLUMN,
22+
)
23+
1324
subsidy_amount: int = field(default_factory=loader.get_bu_subsidy_amount)
1425

1526
def _prepare(self):
@@ -21,7 +32,6 @@ def get_project(row):
2132
return project_alloc[: project_alloc.rfind("-")]
2233

2334
self.data[invoice.PROJECT_NAME_FIELD] = self.data.apply(get_project, axis=1)
24-
self.data[invoice.SUBSIDY_FIELD] = Decimal(0)
2535

2636
def _process(self):
2737
self.data = self._apply_subsidy(self.data, self.subsidy_amount)

process_report/processors/coldfront_fetch_processor.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@ class ColdfrontFetchProcessor(processor.Processor):
3434
)
3535
coldfront_data_filepath: str = invoice_settings.coldfront_api_filepath
3636

37+
initializes_columns = (invoice.IS_COURSE_COLUMN,)
38+
operates_on_columns = (
39+
*initializes_columns,
40+
invoice.PROJECT_COLUMN,
41+
invoice.PROJECT_ID_COLUMN,
42+
invoice.CLUSTER_NAME_COLUMN,
43+
invoice.PI_COLUMN,
44+
invoice.INSTITUTION_ID_COLUMN,
45+
)
46+
3747
@functools.cached_property
3848
def coldfront_client(self):
3949
keycloak_url = os.environ.get("KEYCLOAK_URL", "https://keycloak.mss.mghpcc.org")
@@ -143,7 +153,6 @@ def _validate_allocation_data(self, allocation_data):
143153
)
144154

145155
def _apply_allocation_data(self, allocation_data):
146-
self.data[invoice.IS_COURSE_FIELD] = False
147156
for project_cluster_tuple, data in allocation_data.items():
148157
project_id, cluster_name = project_cluster_tuple
149158
mask = (self.data[invoice.PROJECT_ID_FIELD] == project_id) & (

process_report/processors/discount_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def apply_discount_on_project(remaining_discount_amount, project_i, project):
5151
remaining_project_balance = project[pi_balance_field]
5252
applied_discount = min(remaining_project_balance, remaining_discount_amount)
5353

54-
if invoice.at[project_i, discount_field] is None:
54+
if pandas.isna(invoice.at[project_i, discount_field]):
5555
invoice.at[project_i, discount_field] = applied_discount
5656
else:
5757
invoice.at[project_i, discount_field] += applied_discount

process_report/processors/lenovo_processor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@
99
class LenovoProcessor(processor.Processor):
1010
su_charge_info: dict = field(default_factory=loader.get_lenovo_su_charge_info)
1111

12+
initializes_columns = (invoice.SU_CHARGE_COLUMN, invoice.LENOVO_CHARGE_COLUMN)
13+
operates_on_columns = (
14+
*initializes_columns,
15+
invoice.SU_TYPE_COLUMN,
16+
invoice.SU_HOURS_COLUMN,
17+
)
18+
1219
def _apply_su_charge(self, data):
1320
for su_name, su_charge in self.su_charge_info.items():
1421
if su_name in data:

process_report/processors/new_pi_credit_processor.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,19 @@ class NewPICreditProcessor(discount_processor.DiscountProcessor):
3434
]
3535
IS_DISCOUNT_BY_NERC = True
3636

37+
operates_on_columns = (
38+
invoice.CREDIT_COLUMN,
39+
invoice.CREDIT_CODE_COLUMN,
40+
invoice.BALANCE_COLUMN,
41+
invoice.PI_BALANCE_COLUMN,
42+
invoice.SU_TYPE_COLUMN,
43+
invoice.IS_BILLABLE_COLUMN,
44+
invoice.MISSING_PI_COLUMN,
45+
invoice.INSTITUTION_COLUMN,
46+
invoice.COST_COLUMN,
47+
invoice.PI_COLUMN,
48+
)
49+
3750
old_pi_filepath: str = field(
3851
default_factory=lambda: loader.get_remote_filepath(
3952
invoice_settings.pi_remote_filepath
@@ -176,10 +189,7 @@ def _apply_credits_new_pi(
176189
credit_eligible_projects[invoice.PI_FIELD] == pi
177190
]
178191

179-
if pi_age > 1:
180-
for i, row in pi_projects.iterrows():
181-
data.at[i, invoice.BALANCE_FIELD] = row[invoice.COST_FIELD]
182-
else:
192+
if pi_age <= 1:
183193
if pi_age == 0:
184194
old_pi_df = self._upsert_pi_entry(
185195
old_pi_df,
@@ -226,10 +236,6 @@ def _apply_credits_new_pi(
226236
return (data, old_pi_df)
227237

228238
def _prepare(self):
229-
self.data[invoice.CREDIT_FIELD] = None
230-
self.data[invoice.CREDIT_CODE_FIELD] = None
231-
self.data[invoice.PI_BALANCE_FIELD] = self.data[invoice.COST_FIELD]
232-
self.data[invoice.BALANCE_FIELD] = self.data[invoice.COST_FIELD]
233239
self.old_pi_df = self._load_old_pis(self.old_pi_filepath)
234240

235241
def _process(self):

process_report/processors/pi_su_credit_processor.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,19 @@ class PISUCreditProcessor(discount_processor.DiscountProcessor):
2525
IS_DISCOUNT_BY_NERC = True
2626
PI_SU_CREDIT_CODE = "0005"
2727

28+
initializes_columns = (
29+
invoice.CREDIT_COLUMN,
30+
invoice.CREDIT_CODE_COLUMN,
31+
invoice.PI_BALANCE_COLUMN,
32+
invoice.BALANCE_COLUMN,
33+
)
34+
operates_on_columns = (
35+
*initializes_columns,
36+
invoice.SU_TYPE_COLUMN,
37+
invoice.PI_COLUMN,
38+
invoice.COST_COLUMN,
39+
)
40+
2841
pi_su_mapping: dict[str, list[str]] = field(
2942
default_factory=loader.get_pi_non_billed_su_types
3043
)

0 commit comments

Comments
 (0)