Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions process_report/invoices/invoice.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,32 @@
from dataclasses import dataclass
from typing import Any, Callable
from decimal import Decimal
import pandas
import pyarrow
import logging

import process_report.util as util


logger = logging.getLogger(__name__)


@dataclass
class InvoiceColumn:
name: str
dtype: Any
default_value: Any | None = None
default_initializer: Callable[[pandas.DataFrame], pandas.Series] | None = None


# Field type definitions
BALANCE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 2))
Comment thread
naved001 marked this conversation as resolved.
RATE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 13))
INTEGER_FIELD_TYPE = pandas.ArrowDtype(pyarrow.int64())
STRING_FIELD_TYPE = pandas.StringDtype()
BOOL_FIELD_TYPE = pandas.BooleanDtype()


### PI file field names
PI_PI_FIELD = "PI"
PI_FIRST_MONTH = "First Invoice Month"
Expand Down Expand Up @@ -65,18 +88,77 @@
IS_COURSE_FIELD = "Is Course"
###

### Initialized Column objects
INVOICE_DATE_COLUMN = InvoiceColumn(name=INVOICE_DATE_FIELD, dtype=STRING_FIELD_TYPE)
Comment thread
knikolla marked this conversation as resolved.
PROJECT_COLUMN = InvoiceColumn(name=PROJECT_FIELD, dtype=STRING_FIELD_TYPE)
PROJECT_ID_COLUMN = InvoiceColumn(name=PROJECT_ID_FIELD, dtype=STRING_FIELD_TYPE)
PI_COLUMN = InvoiceColumn(name=PI_FIELD, dtype=STRING_FIELD_TYPE)
INVOICE_EMAIL_COLUMN = InvoiceColumn(name=INVOICE_EMAIL_FIELD, dtype=STRING_FIELD_TYPE)
INVOICE_ADDRESS_COLUMN = InvoiceColumn(
name=INVOICE_ADDRESS_FIELD, dtype=STRING_FIELD_TYPE
)
INSTITUTION_COLUMN = InvoiceColumn(name=INSTITUTION_FIELD, dtype=STRING_FIELD_TYPE)
INSTITUTION_ID_COLUMN = InvoiceColumn(
name=INSTITUTION_ID_FIELD, dtype=STRING_FIELD_TYPE
)
GROUP_NAME_COLUMN = InvoiceColumn(name=GROUP_NAME_FIELD, dtype=STRING_FIELD_TYPE)
GROUP_INSTITUTION_COLUMN = InvoiceColumn(
name=GROUP_INSTITUTION_FIELD, dtype=STRING_FIELD_TYPE
)
GROUP_BALANCE_COLUMN = InvoiceColumn(name=GROUP_BALANCE_FIELD, dtype=BALANCE_FIELD_TYPE)
GROUP_BALANCE_USED_COLUMN = InvoiceColumn(
name=GROUP_BALANCE_USED_FIELD, dtype=BALANCE_FIELD_TYPE
)
SU_HOURS_COLUMN = InvoiceColumn(name=SU_HOURS_FIELD, dtype=INTEGER_FIELD_TYPE)
SU_TYPE_COLUMN = InvoiceColumn(name=SU_TYPE_FIELD, dtype=STRING_FIELD_TYPE)
SU_CHARGE_COLUMN = InvoiceColumn(name=SU_CHARGE_FIELD, dtype=BALANCE_FIELD_TYPE)
LENOVO_CHARGE_COLUMN = InvoiceColumn(name=LENOVO_CHARGE_FIELD, dtype=BALANCE_FIELD_TYPE)
RATE_COLUMN = InvoiceColumn(
name=RATE_FIELD, dtype=RATE_FIELD_TYPE
) # Using decimal to suppress scientific notation in export
COST_COLUMN = InvoiceColumn(name=COST_FIELD, dtype=BALANCE_FIELD_TYPE)
CREDIT_COLUMN = InvoiceColumn(name=CREDIT_FIELD, dtype=BALANCE_FIELD_TYPE)
CREDIT_CODE_COLUMN = InvoiceColumn(name=CREDIT_CODE_FIELD, dtype=STRING_FIELD_TYPE)
SUBSIDY_COLUMN = InvoiceColumn(
name=SUBSIDY_FIELD, dtype=BALANCE_FIELD_TYPE, default_value=Decimal(0)
)
BALANCE_COLUMN = InvoiceColumn(
name=BALANCE_FIELD,
dtype=BALANCE_FIELD_TYPE,
default_initializer=lambda df: df[COST_FIELD],
)
PI_BALANCE_COLUMN = InvoiceColumn(
name=PI_BALANCE_FIELD,
dtype=BALANCE_FIELD_TYPE,
default_initializer=lambda df: df[COST_FIELD],
)

# Internally used fields
IS_BILLABLE_COLUMN = InvoiceColumn(name=IS_BILLABLE_FIELD, dtype=BOOL_FIELD_TYPE)
MISSING_PI_COLUMN = InvoiceColumn(name=MISSING_PI_FIELD, dtype=BOOL_FIELD_TYPE)
PROJECT_NAME_COLUMN = InvoiceColumn(name=PROJECT_NAME_FIELD, dtype=STRING_FIELD_TYPE)
GROUP_MANAGED_COLUMN = InvoiceColumn(name=GROUP_MANAGED_FIELD, dtype=BOOL_FIELD_TYPE)
CLUSTER_NAME_COLUMN = InvoiceColumn(name=CLUSTER_NAME_FIELD, dtype=STRING_FIELD_TYPE)
IS_COURSE_COLUMN = InvoiceColumn(
name=IS_COURSE_FIELD, dtype=BOOL_FIELD_TYPE, default_value=False
)
###


@dataclass
class Invoice:
export_columns_list = list()
exported_columns_map = dict()
initializes_columns = tuple()
operates_on_columns = tuple()

invoice_month: str
data: pandas.DataFrame
name: str = ""
export_data = None

def process(self):
self._init_columns()
self._prepare()
self._process()
self._prepare_export()
Expand All @@ -93,6 +175,24 @@ def output_s3_key(self) -> str:
def output_s3_archive_key(self):
return f"Invoices/{self.invoice_month}/Archive/{self.name} {self.invoice_month} {util.get_iso8601_time()}.csv"

def _init_columns(self):
"""Initializes columns specified in `initializes_columns` and cast them to appropriate types

If column already exists, only do casting
Comment thread
knikolla marked this conversation as resolved.
If no default value is given, column initialized to None
"""
for field in self.initializes_columns:
if field.name not in self.data.columns:
field_default = field.default_value
if field.default_initializer:
field_default = field.default_initializer(self.data)
self.data[field.name] = field_default
elif self.data.dtypes[field.name] != field.dtype:
logger.warning(
f"Column {field.name} has dtype {self.data.dtypes[field.name]} instead of expected {field.dtype}."
)
self.data = self.data.astype({field.name: field.dtype})

def _prepare(self):
"""Prepares the data for processing.

Expand Down
7 changes: 7 additions & 0 deletions process_report/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ def get_alias_map(self) -> dict:
def load_dataframe(self, filepath: str) -> pandas.DataFrame:
return pandas.read_csv(filepath)

@functools.lru_cache
def load_prepay_credits(self) -> pandas.DataFrame:
prepay_df = self.load_dataframe(invoice_settings.prepay_credits_filepath)
return prepay_df.astype(
{invoice.PREPAY_CREDIT_FIELD: invoice.BALANCE_FIELD_TYPE}
)

@functools.lru_cache
def _load_pi_config(self, filepath: str) -> list[dict]:
with open(filepath) as file:
Expand Down
46 changes: 29 additions & 17 deletions process_report/process_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os

import pandas
import pyarrow

from process_report.settings import invoice_settings
from process_report.loader import loader
Expand All @@ -22,6 +21,7 @@
)
from process_report.processors import (
coldfront_fetch_processor,
validate_input_column_processor,
validate_pi_alias_processor,
add_institution_processor,
lenovo_processor,
Expand All @@ -33,6 +33,20 @@
validate_cluster_name_processor,
)

PROCESSING_ORDER = [
validate_input_column_processor.ValidateInputColumnsProcessor,
validate_cluster_name_processor.ValidateClusterNameProcessor,
coldfront_fetch_processor.ColdfrontFetchProcessor,
validate_pi_alias_processor.ValidatePIAliasProcessor,
add_institution_processor.AddInstitutionProcessor,
lenovo_processor.LenovoProcessor,
validate_billable_pi_processor.ValidateBillablePIsProcessor,
pi_su_credit_processor.PISUCreditProcessor,
new_pi_credit_processor.NewPICreditProcessor,
bu_subsidy_processor.BUSubsidyProcessor,
prepayment_processor.PrepaymentProcessor,
]


PI_S3_FILEPATH = "PIs/PI.csv"
ALIAS_S3_FILEPATH = "PIs/alias.csv"
Expand Down Expand Up @@ -66,20 +80,7 @@ def main():

### Preliminary processing
processed_data = process_merged_dataframe(
invoice_month,
merged_dataframe,
[
validate_cluster_name_processor.ValidateClusterNameProcessor,
coldfront_fetch_processor.ColdfrontFetchProcessor,
validate_pi_alias_processor.ValidatePIAliasProcessor,
add_institution_processor.AddInstitutionProcessor,
lenovo_processor.LenovoProcessor,
validate_billable_pi_processor.ValidateBillablePIsProcessor,
pi_su_credit_processor.PISUCreditProcessor,
new_pi_credit_processor.NewPICreditProcessor,
bu_subsidy_processor.BUSubsidyProcessor,
prepayment_processor.PrepaymentProcessor,
],
invoice_month, merged_dataframe, PROCESSING_ORDER
)

### Export invoices
Expand Down Expand Up @@ -109,8 +110,19 @@ def merge_csv(files):
file,
engine="pyarrow",
dtype={
invoice.COST_FIELD: pandas.ArrowDtype(pyarrow.decimal128(21, 2)),
invoice.RATE_FIELD: str,
invoice.INVOICE_DATE_COLUMN.name: invoice.INVOICE_DATE_COLUMN.dtype,
invoice.PROJECT_COLUMN.name: invoice.PROJECT_COLUMN.dtype,
invoice.PROJECT_ID_COLUMN.name: invoice.PROJECT_ID_COLUMN.dtype,
invoice.PI_COLUMN.name: invoice.PI_COLUMN.dtype,
invoice.CLUSTER_NAME_COLUMN.name: invoice.CLUSTER_NAME_COLUMN.dtype,
invoice.INVOICE_EMAIL_COLUMN.name: invoice.INVOICE_EMAIL_COLUMN.dtype,
invoice.INVOICE_ADDRESS_COLUMN.name: invoice.INVOICE_ADDRESS_COLUMN.dtype,
invoice.INSTITUTION_COLUMN.name: invoice.INSTITUTION_COLUMN.dtype,
invoice.INSTITUTION_ID_COLUMN.name: invoice.INSTITUTION_ID_COLUMN.dtype,
invoice.SU_HOURS_COLUMN.name: invoice.SU_HOURS_COLUMN.dtype,
invoice.SU_TYPE_COLUMN.name: invoice.SU_TYPE_COLUMN.dtype,
invoice.RATE_COLUMN.name: invoice.RATE_COLUMN.dtype,
invoice.COST_COLUMN.name: invoice.COST_COLUMN.dtype,
},
quotechar="|",
)
Expand Down
6 changes: 6 additions & 0 deletions process_report/processors/add_institution_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

@dataclass
class AddInstitutionProcessor(processor.Processor):
operates_on_columns = (
invoice.INSTITUTION_COLUMN,
invoice.PI_COLUMN,
invoice.PROJECT_COLUMN,
)

def _add_institution(self):
"""Determine every PI's institution name, logging any PI whose institution cannot be determined
This is performed by `get_institution_from_pi()`, which tries to match the PI's username to
Expand Down
14 changes: 12 additions & 2 deletions process_report/processors/bu_subsidy_processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dataclasses import dataclass, field
from decimal import Decimal

from process_report.loader import loader
from process_report.invoices import invoice
Expand All @@ -10,6 +9,18 @@
class BUSubsidyProcessor(discount_processor.DiscountProcessor):
IS_DISCOUNT_BY_NERC = False

initializes_columns = (invoice.PROJECT_NAME_COLUMN, invoice.SUBSIDY_COLUMN)
operates_on_columns = (
*initializes_columns,
invoice.PROJECT_COLUMN,
invoice.PI_COLUMN,
invoice.IS_BILLABLE_COLUMN,
invoice.MISSING_PI_COLUMN,
invoice.INSTITUTION_COLUMN,
invoice.PI_BALANCE_COLUMN,
invoice.BALANCE_COLUMN,
)

subsidy_amount: int = field(default_factory=loader.get_bu_subsidy_amount)

def _prepare(self):
Expand All @@ -21,7 +32,6 @@ def get_project(row):
return project_alloc[: project_alloc.rfind("-")]

self.data[invoice.PROJECT_NAME_FIELD] = self.data.apply(get_project, axis=1)
self.data[invoice.SUBSIDY_FIELD] = Decimal(0)

def _process(self):
self.data = self._apply_subsidy(self.data, self.subsidy_amount)
Expand Down
11 changes: 10 additions & 1 deletion process_report/processors/coldfront_fetch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ class ColdfrontFetchProcessor(processor.Processor):
)
coldfront_data_filepath: str = invoice_settings.coldfront_api_filepath

initializes_columns = (invoice.IS_COURSE_COLUMN,)
operates_on_columns = (
*initializes_columns,
invoice.PROJECT_COLUMN,
invoice.PROJECT_ID_COLUMN,
invoice.CLUSTER_NAME_COLUMN,
invoice.PI_COLUMN,
invoice.INSTITUTION_ID_COLUMN,
)

@functools.cached_property
def coldfront_client(self):
keycloak_url = os.environ.get("KEYCLOAK_URL", "https://keycloak.mss.mghpcc.org")
Expand Down Expand Up @@ -143,7 +153,6 @@ def _validate_allocation_data(self, allocation_data):
)

def _apply_allocation_data(self, allocation_data):
self.data[invoice.IS_COURSE_FIELD] = False
for project_cluster_tuple, data in allocation_data.items():
project_id, cluster_name = project_cluster_tuple
mask = (self.data[invoice.PROJECT_ID_FIELD] == project_id) & (
Expand Down
2 changes: 1 addition & 1 deletion process_report/processors/discount_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def apply_discount_on_project(remaining_discount_amount, project_i, project):
remaining_project_balance = project[pi_balance_field]
applied_discount = min(remaining_project_balance, remaining_discount_amount)

if invoice.at[project_i, discount_field] is None:
if pandas.isna(invoice.at[project_i, discount_field]):
invoice.at[project_i, discount_field] = applied_discount
else:
invoice.at[project_i, discount_field] += applied_discount
Expand Down
7 changes: 7 additions & 0 deletions process_report/processors/lenovo_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
class LenovoProcessor(processor.Processor):
su_charge_info: dict = field(default_factory=loader.get_lenovo_su_charge_info)

initializes_columns = (invoice.SU_CHARGE_COLUMN, invoice.LENOVO_CHARGE_COLUMN)
operates_on_columns = (
*initializes_columns,
invoice.SU_TYPE_COLUMN,
invoice.SU_HOURS_COLUMN,
)

def _apply_su_charge(self, data):
for su_name, su_charge in self.su_charge_info.items():
if su_name in data:
Expand Down
22 changes: 14 additions & 8 deletions process_report/processors/new_pi_credit_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ class NewPICreditProcessor(discount_processor.DiscountProcessor):
]
IS_DISCOUNT_BY_NERC = True

operates_on_columns = (
invoice.CREDIT_COLUMN,
invoice.CREDIT_CODE_COLUMN,
invoice.BALANCE_COLUMN,
invoice.PI_BALANCE_COLUMN,
invoice.SU_TYPE_COLUMN,
invoice.IS_BILLABLE_COLUMN,
invoice.MISSING_PI_COLUMN,
invoice.INSTITUTION_COLUMN,
invoice.COST_COLUMN,
invoice.PI_COLUMN,
)

old_pi_filepath: str = field(
default_factory=lambda: loader.get_remote_filepath(
invoice_settings.pi_remote_filepath
Expand Down Expand Up @@ -176,10 +189,7 @@ def _apply_credits_new_pi(
credit_eligible_projects[invoice.PI_FIELD] == pi
]

if pi_age > 1:
for i, row in pi_projects.iterrows():
data.at[i, invoice.BALANCE_FIELD] = row[invoice.COST_FIELD]
else:
if pi_age <= 1:
if pi_age == 0:
old_pi_df = self._upsert_pi_entry(
old_pi_df,
Expand Down Expand Up @@ -226,10 +236,6 @@ def _apply_credits_new_pi(
return (data, old_pi_df)

def _prepare(self):
self.data[invoice.CREDIT_FIELD] = None
self.data[invoice.CREDIT_CODE_FIELD] = None
self.data[invoice.PI_BALANCE_FIELD] = self.data[invoice.COST_FIELD]
self.data[invoice.BALANCE_FIELD] = self.data[invoice.COST_FIELD]
self.old_pi_df = self._load_old_pis(self.old_pi_filepath)

def _process(self):
Expand Down
13 changes: 13 additions & 0 deletions process_report/processors/pi_su_credit_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ class PISUCreditProcessor(discount_processor.DiscountProcessor):
IS_DISCOUNT_BY_NERC = True
PI_SU_CREDIT_CODE = "0005"

initializes_columns = (
invoice.CREDIT_COLUMN,
invoice.CREDIT_CODE_COLUMN,
invoice.PI_BALANCE_COLUMN,
invoice.BALANCE_COLUMN,
)
operates_on_columns = (
*initializes_columns,
invoice.SU_TYPE_COLUMN,
invoice.PI_COLUMN,
invoice.COST_COLUMN,
)

pi_su_mapping: dict[str, list[str]] = field(
default_factory=loader.get_pi_non_billed_su_types
)
Expand Down
Loading
Loading