Skip to content

Commit fee4c23

Browse files
committed
Track initialized and used columns in each processor
During 2026-03 invoicing, a bug was found where the columns initialized by the New-PI credit processor (i.e `PI Balance` column), was being accessed by the PI-SU processor before it was initialized, causing an KeyError. To fix this, the codebase has been refactored to allow each processor to explicitly document which columns they initialize and use, defined in two new properties, `initializes_columns` and `operates_on_columns`. A helper function `_init_columns()` is added to initalize columns Unit test `tests/unit/processors/test_processor_list.py` is added to check each processor only uses columns that itself or previous processors initialized, and no column is initialized more than once Additionally, each column will now be encapsulated as a `InvoiceColumn` instance. `InvoiceColumn` contains the name, datatype, and default values for each column This will also enable stricter and clearer type enforcement for data entering and leaving the pipeline A new processor `ValidateInputColumnsProcessor` is added to check the input dataframe to the processing pipeline has prerequisite columns, and to cast to appropriate types The e2e test data has been updated to surface the bug that was found. It did not failed during the PR that introduced the bug [1] because the test data didn't have the right conditions to trigger the PI-SU processor Refactored unit tests to accomodate the new processor by adding a new base test class. [1] #279
1 parent 72b6e76 commit fee4c23

27 files changed

Lines changed: 490 additions & 203 deletions

process_report/invoices/invoice.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,28 @@
11
from dataclasses import dataclass
2+
from typing import Any, Callable
3+
from decimal import Decimal
24
import pandas
5+
import pyarrow
36

47
import process_report.util as util
58

69

10+
@dataclass
11+
class InvoiceColumn:
12+
name: str
13+
dtype: Any
14+
default_value: Any | None = None
15+
default_initializer: Callable[[pandas.DataFrame], pandas.Series] | None = None
16+
17+
18+
# Field type definitions
19+
BALANCE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 2))
20+
RATE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 13))
21+
INTEGER_FIELD_TYPE = pandas.ArrowDtype(pyarrow.int64())
22+
STRING_FIELD_TYPE = pandas.StringDtype()
23+
BOOL_FIELD_TYPE = pandas.BooleanDtype()
24+
25+
726
### PI file field names
827
PI_PI_FIELD = "PI"
928
PI_FIRST_MONTH = "First Invoice Month"
@@ -65,18 +84,77 @@
6584
IS_COURSE_FIELD = "Is Course"
6685
###
6786

87+
### Initialized Column objects
88+
INVOICE_DATE_COLUMN = InvoiceColumn(name=INVOICE_DATE_FIELD, dtype=STRING_FIELD_TYPE)
89+
PROJECT_COLUMN = InvoiceColumn(name=PROJECT_FIELD, dtype=STRING_FIELD_TYPE)
90+
PROJECT_ID_COLUMN = InvoiceColumn(name=PROJECT_ID_FIELD, dtype=STRING_FIELD_TYPE)
91+
PI_COLUMN = InvoiceColumn(name=PI_FIELD, dtype=STRING_FIELD_TYPE)
92+
INVOICE_EMAIL_COLUMN = InvoiceColumn(name=INVOICE_EMAIL_FIELD, dtype=STRING_FIELD_TYPE)
93+
INVOICE_ADDRESS_COLUMN = InvoiceColumn(
94+
name=INVOICE_ADDRESS_FIELD, dtype=STRING_FIELD_TYPE
95+
)
96+
INSTITUTION_COLUMN = InvoiceColumn(name=INSTITUTION_FIELD, dtype=STRING_FIELD_TYPE)
97+
INSTITUTION_ID_COLUMN = InvoiceColumn(
98+
name=INSTITUTION_ID_FIELD, dtype=STRING_FIELD_TYPE
99+
)
100+
GROUP_NAME_COLUMN = InvoiceColumn(name=GROUP_NAME_FIELD, dtype=STRING_FIELD_TYPE)
101+
GROUP_INSTITUTION_COLUMN = InvoiceColumn(
102+
name=GROUP_INSTITUTION_FIELD, dtype=STRING_FIELD_TYPE
103+
)
104+
GROUP_BALANCE_COLUMN = InvoiceColumn(name=GROUP_BALANCE_FIELD, dtype=BALANCE_FIELD_TYPE)
105+
GROUP_BALANCE_USED_COLUMN = InvoiceColumn(
106+
name=GROUP_BALANCE_USED_FIELD, dtype=BALANCE_FIELD_TYPE
107+
)
108+
SU_HOURS_COLUMN = InvoiceColumn(name=SU_HOURS_FIELD, dtype=INTEGER_FIELD_TYPE)
109+
SU_TYPE_COLUMN = InvoiceColumn(name=SU_TYPE_FIELD, dtype=STRING_FIELD_TYPE)
110+
SU_CHARGE_COLUMN = InvoiceColumn(name=SU_CHARGE_FIELD, dtype=BALANCE_FIELD_TYPE)
111+
LENOVO_CHARGE_COLUMN = InvoiceColumn(name=LENOVO_CHARGE_FIELD, dtype=BALANCE_FIELD_TYPE)
112+
RATE_COLUMN = InvoiceColumn(
113+
name=RATE_FIELD, dtype=RATE_FIELD_TYPE
114+
) # Using decimal to suppress scientific notation in export
115+
COST_COLUMN = InvoiceColumn(name=COST_FIELD, dtype=BALANCE_FIELD_TYPE)
116+
CREDIT_COLUMN = InvoiceColumn(name=CREDIT_FIELD, dtype=BALANCE_FIELD_TYPE)
117+
CREDIT_CODE_COLUMN = InvoiceColumn(name=CREDIT_CODE_FIELD, dtype=STRING_FIELD_TYPE)
118+
SUBSIDY_COLUMN = InvoiceColumn(
119+
name=SUBSIDY_FIELD, dtype=BALANCE_FIELD_TYPE, default_value=Decimal(0)
120+
)
121+
BALANCE_COLUMN = InvoiceColumn(
122+
name=BALANCE_FIELD,
123+
dtype=BALANCE_FIELD_TYPE,
124+
default_initializer=lambda df: df[COST_FIELD],
125+
)
126+
PI_BALANCE_COLUMN = InvoiceColumn(
127+
name=PI_BALANCE_FIELD,
128+
dtype=BALANCE_FIELD_TYPE,
129+
default_initializer=lambda df: df[COST_FIELD],
130+
)
131+
132+
# Internally used fields
133+
IS_BILLABLE_COLUMN = InvoiceColumn(name=IS_BILLABLE_FIELD, dtype=BOOL_FIELD_TYPE)
134+
MISSING_PI_COLUMN = InvoiceColumn(name=MISSING_PI_FIELD, dtype=BOOL_FIELD_TYPE)
135+
PROJECT_NAME_COLUMN = InvoiceColumn(name=PROJECT_NAME_FIELD, dtype=STRING_FIELD_TYPE)
136+
GROUP_MANAGED_COLUMN = InvoiceColumn(name=GROUP_MANAGED_FIELD, dtype=BOOL_FIELD_TYPE)
137+
CLUSTER_NAME_COLUMN = InvoiceColumn(name=CLUSTER_NAME_FIELD, dtype=STRING_FIELD_TYPE)
138+
IS_COURSE_COLUMN = InvoiceColumn(
139+
name=IS_COURSE_FIELD, dtype=BOOL_FIELD_TYPE, default_value=False
140+
)
141+
###
142+
68143

69144
@dataclass
70145
class Invoice:
71146
export_columns_list = list()
72147
exported_columns_map = dict()
148+
initializes_columns = tuple()
149+
operates_on_columns = tuple()
73150

74151
invoice_month: str
75152
data: pandas.DataFrame
76153
name: str = ""
77154
export_data = None
78155

79156
def process(self):
157+
self._init_columns()
80158
self._prepare()
81159
self._process()
82160
self._prepare_export()
@@ -93,6 +171,20 @@ def output_s3_key(self) -> str:
93171
def output_s3_archive_key(self):
94172
return f"Invoices/{self.invoice_month}/Archive/{self.name} {self.invoice_month} {util.get_iso8601_time()}.csv"
95173

174+
def _init_columns(self):
175+
"""Initializes columns specified in `initializes_columns` and cast them to appropriate types
176+
177+
If column already exists, only do casting
178+
If no default value is given, column initialized to None
179+
"""
180+
for field in self.initializes_columns:
181+
if field.name not in self.data.columns:
182+
field_default = field.default_value
183+
if field.default_initializer:
184+
field_default = field.default_initializer(self.data)
185+
self.data[field.name] = field_default
186+
self.data = self.data.astype({field.name: field.dtype})
187+
96188
def _prepare(self):
97189
"""Prepares the data for processing.
98190

process_report/loader.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ def get_alias_map(self) -> dict:
112112
def load_dataframe(self, filepath: str) -> pandas.DataFrame:
113113
return pandas.read_csv(filepath)
114114

115+
@functools.lru_cache
116+
def load_prepay_credits(self) -> pandas.DataFrame:
117+
prepay_df = self.load_dataframe(invoice_settings.prepay_credits_filepath)
118+
return prepay_df.astype(
119+
{invoice.PREPAY_CREDIT_FIELD: invoice.BALANCE_FIELD_TYPE}
120+
)
121+
115122
@functools.lru_cache
116123
def _load_pi_config(self, filepath: str) -> list[dict]:
117124
with open(filepath) as file:

process_report/process_report.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
)
2323
from process_report.processors import (
2424
coldfront_fetch_processor,
25+
validate_input_column_processor,
2526
validate_pi_alias_processor,
2627
add_institution_processor,
2728
lenovo_processor,
@@ -33,6 +34,20 @@
3334
validate_cluster_name_processor,
3435
)
3536

37+
PROCESSING_ORDER = [
38+
validate_input_column_processor.ValidateInputColumnsProcessor,
39+
validate_cluster_name_processor.ValidateClusterNameProcessor,
40+
coldfront_fetch_processor.ColdfrontFetchProcessor,
41+
validate_pi_alias_processor.ValidatePIAliasProcessor,
42+
add_institution_processor.AddInstitutionProcessor,
43+
lenovo_processor.LenovoProcessor,
44+
validate_billable_pi_processor.ValidateBillablePIsProcessor,
45+
pi_su_credit_processor.PISUCreditProcessor,
46+
new_pi_credit_processor.NewPICreditProcessor,
47+
bu_subsidy_processor.BUSubsidyProcessor,
48+
prepayment_processor.PrepaymentProcessor,
49+
]
50+
3651

3752
PI_S3_FILEPATH = "PIs/PI.csv"
3853
ALIAS_S3_FILEPATH = "PIs/alias.csv"
@@ -66,20 +81,7 @@ def main():
6681

6782
### Preliminary processing
6883
processed_data = process_merged_dataframe(
69-
invoice_month,
70-
merged_dataframe,
71-
[
72-
validate_cluster_name_processor.ValidateClusterNameProcessor,
73-
coldfront_fetch_processor.ColdfrontFetchProcessor,
74-
validate_pi_alias_processor.ValidatePIAliasProcessor,
75-
add_institution_processor.AddInstitutionProcessor,
76-
lenovo_processor.LenovoProcessor,
77-
validate_billable_pi_processor.ValidateBillablePIsProcessor,
78-
pi_su_credit_processor.PISUCreditProcessor,
79-
new_pi_credit_processor.NewPICreditProcessor,
80-
bu_subsidy_processor.BUSubsidyProcessor,
81-
prepayment_processor.PrepaymentProcessor,
82-
],
84+
invoice_month, merged_dataframe, PROCESSING_ORDER
8385
)
8486

8587
### Export invoices

process_report/processors/add_institution_processor.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414

1515
@dataclass
1616
class AddInstitutionProcessor(processor.Processor):
17+
operates_on_columns = (
18+
invoice.INSTITUTION_COLUMN,
19+
invoice.PI_COLUMN,
20+
invoice.PROJECT_COLUMN,
21+
)
22+
1723
def _add_institution(self):
1824
"""Determine every PI's institution name, logging any PI whose institution cannot be determined
1925
This is performed by `get_institution_from_pi()`, which tries to match the PI's username to

process_report/processors/bu_subsidy_processor.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from dataclasses import dataclass, field
2-
from decimal import Decimal
32

43
from process_report.loader import loader
54
from process_report.invoices import invoice
@@ -10,6 +9,19 @@
109
class BUSubsidyProcessor(discount_processor.DiscountProcessor):
1110
IS_DISCOUNT_BY_NERC = False
1211

12+
initializes_columns = (invoice.PROJECT_NAME_COLUMN, invoice.SUBSIDY_COLUMN)
13+
operates_on_columns = (
14+
invoice.PROJECT_COLUMN,
15+
invoice.PROJECT_NAME_COLUMN,
16+
invoice.PI_COLUMN,
17+
invoice.IS_BILLABLE_COLUMN,
18+
invoice.MISSING_PI_COLUMN,
19+
invoice.INSTITUTION_COLUMN,
20+
invoice.PI_BALANCE_COLUMN,
21+
invoice.SUBSIDY_COLUMN,
22+
invoice.BALANCE_COLUMN,
23+
)
24+
1325
subsidy_amount: int = field(default_factory=loader.get_bu_subsidy_amount)
1426

1527
def _prepare(self):
@@ -21,7 +33,6 @@ def get_project(row):
2133
return project_alloc[: project_alloc.rfind("-")]
2234

2335
self.data[invoice.PROJECT_NAME_FIELD] = self.data.apply(get_project, axis=1)
24-
self.data[invoice.SUBSIDY_FIELD] = Decimal(0)
2536

2637
def _process(self):
2738
self.data = self._apply_subsidy(self.data, self.subsidy_amount)

process_report/processors/coldfront_fetch_processor.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@ class ColdfrontFetchProcessor(processor.Processor):
3434
)
3535
coldfront_data_filepath: str = invoice_settings.coldfront_api_filepath
3636

37+
initializes_columns = (invoice.IS_COURSE_COLUMN,)
38+
operates_on_columns = (
39+
invoice.PROJECT_COLUMN,
40+
invoice.PROJECT_ID_COLUMN,
41+
invoice.CLUSTER_NAME_COLUMN,
42+
invoice.PI_COLUMN,
43+
invoice.INSTITUTION_ID_COLUMN,
44+
invoice.IS_COURSE_COLUMN,
45+
)
46+
3747
@functools.cached_property
3848
def coldfront_client(self):
3949
keycloak_url = os.environ.get("KEYCLOAK_URL", "https://keycloak.mss.mghpcc.org")
@@ -143,7 +153,6 @@ def _validate_allocation_data(self, allocation_data):
143153
)
144154

145155
def _apply_allocation_data(self, allocation_data):
146-
self.data[invoice.IS_COURSE_FIELD] = False
147156
for project_cluster_tuple, data in allocation_data.items():
148157
project_id, cluster_name = project_cluster_tuple
149158
mask = (self.data[invoice.PROJECT_ID_FIELD] == project_id) & (

process_report/processors/discount_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def apply_discount_on_project(remaining_discount_amount, project_i, project):
5151
remaining_project_balance = project[pi_balance_field]
5252
applied_discount = min(remaining_project_balance, remaining_discount_amount)
5353

54-
if invoice.at[project_i, discount_field] is None:
54+
if pandas.isna(invoice.at[project_i, discount_field]):
5555
invoice.at[project_i, discount_field] = applied_discount
5656
else:
5757
invoice.at[project_i, discount_field] += applied_discount

process_report/processors/lenovo_processor.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@
99
class LenovoProcessor(processor.Processor):
1010
su_charge_info: dict = field(default_factory=loader.get_lenovo_su_charge_info)
1111

12+
initializes_columns = (invoice.SU_CHARGE_COLUMN, invoice.LENOVO_CHARGE_COLUMN)
13+
operates_on_columns = (
14+
invoice.SU_TYPE_COLUMN,
15+
invoice.SU_HOURS_COLUMN,
16+
invoice.SU_CHARGE_COLUMN,
17+
invoice.LENOVO_CHARGE_COLUMN,
18+
)
19+
1220
def _apply_su_charge(self, data):
1321
for su_name, su_charge in self.su_charge_info.items():
1422
if su_name in data:

process_report/processors/new_pi_credit_processor.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,19 @@ class NewPICreditProcessor(discount_processor.DiscountProcessor):
3434
]
3535
IS_DISCOUNT_BY_NERC = True
3636

37+
operates_on_columns = (
38+
invoice.CREDIT_COLUMN,
39+
invoice.CREDIT_CODE_COLUMN,
40+
invoice.BALANCE_COLUMN,
41+
invoice.PI_BALANCE_COLUMN,
42+
invoice.SU_TYPE_COLUMN,
43+
invoice.IS_BILLABLE_COLUMN,
44+
invoice.MISSING_PI_COLUMN,
45+
invoice.INSTITUTION_COLUMN,
46+
invoice.COST_COLUMN,
47+
invoice.PI_COLUMN,
48+
)
49+
3750
old_pi_filepath: str = field(
3851
default_factory=lambda: loader.get_remote_filepath(
3952
invoice_settings.pi_remote_filepath
@@ -176,10 +189,7 @@ def _apply_credits_new_pi(
176189
credit_eligible_projects[invoice.PI_FIELD] == pi
177190
]
178191

179-
if pi_age > 1:
180-
for i, row in pi_projects.iterrows():
181-
data.at[i, invoice.BALANCE_FIELD] = row[invoice.COST_FIELD]
182-
else:
192+
if pi_age <= 1:
183193
if pi_age == 0:
184194
old_pi_df = self._upsert_pi_entry(
185195
old_pi_df,
@@ -226,10 +236,6 @@ def _apply_credits_new_pi(
226236
return (data, old_pi_df)
227237

228238
def _prepare(self):
229-
self.data[invoice.CREDIT_FIELD] = None
230-
self.data[invoice.CREDIT_CODE_FIELD] = None
231-
self.data[invoice.PI_BALANCE_FIELD] = self.data[invoice.COST_FIELD]
232-
self.data[invoice.BALANCE_FIELD] = self.data[invoice.COST_FIELD]
233239
self.old_pi_df = self._load_old_pis(self.old_pi_filepath)
234240

235241
def _process(self):

process_report/processors/pi_su_credit_processor.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,22 @@ class PISUCreditProcessor(discount_processor.DiscountProcessor):
2525
IS_DISCOUNT_BY_NERC = True
2626
PI_SU_CREDIT_CODE = "0005"
2727

28+
initializes_columns = (
29+
invoice.CREDIT_COLUMN,
30+
invoice.CREDIT_CODE_COLUMN,
31+
invoice.PI_BALANCE_COLUMN,
32+
invoice.BALANCE_COLUMN,
33+
)
34+
operates_on_columns = (
35+
invoice.SU_TYPE_COLUMN,
36+
invoice.PI_COLUMN,
37+
invoice.COST_COLUMN,
38+
invoice.CREDIT_COLUMN,
39+
invoice.CREDIT_CODE_COLUMN,
40+
invoice.PI_BALANCE_COLUMN,
41+
invoice.BALANCE_COLUMN,
42+
)
43+
2844
pi_su_mapping: dict[str, list[str]] = field(
2945
default_factory=loader.get_pi_non_billed_su_types
3046
)

0 commit comments

Comments
 (0)