Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions process_report/invoices/iceberg_invoice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import logging
from dataclasses import dataclass, field

from pyiceberg.table import Table
from pyiceberg.catalog import Catalog, load_catalog
import pyarrow

import process_report.invoices.invoice as invoice
from process_report.loader import loader
from process_report.settings import invoice_settings

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


def get_iceberg_catalog(config: dict, catalog_name: str) -> Catalog:
return load_catalog(name=catalog_name, **config)


def get_iceberg_table(catalog: Catalog, table_path) -> Table:
return catalog.load_table(table_path)


@dataclass
class IcebergInvoice(invoice.Invoice):
export_columns_list = [
invoice.INVOICE_DATE_FIELD,
invoice.PROJECT_FIELD,
invoice.PROJECT_ID_FIELD,
invoice.PI_FIELD,
invoice.CLUSTER_NAME_FIELD,
invoice.INVOICE_EMAIL_FIELD,
invoice.INVOICE_ADDRESS_FIELD,
invoice.INSTITUTION_FIELD,
invoice.INSTITUTION_ID_FIELD,
invoice.IS_BILLABLE_FIELD,
invoice.SU_HOURS_FIELD,
invoice.SU_TYPE_FIELD,
invoice.RATE_FIELD,
invoice.GROUP_NAME_FIELD,
invoice.GROUP_INSTITUTION_FIELD,
invoice.GROUP_BALANCE_FIELD,
invoice.COST_FIELD,
invoice.GROUP_BALANCE_USED_FIELD,
invoice.CREDIT_FIELD,
invoice.CREDIT_CODE_FIELD,
invoice.BALANCE_FIELD,
]

iceberg_catalog_name: str = invoice_settings.iceberg_catalog_name
iceberg_catalog_config: dict = field(
default_factory=lambda: loader.get_iceberg_config()
)
iceberg_table_path: str = invoice_settings.iceberg_table_path

def _prepare(self):
iceberg_catalog = get_iceberg_catalog(
self.iceberg_catalog_config, self.iceberg_catalog_name
)
self.iceberg_table = get_iceberg_table(iceberg_catalog, self.iceberg_table_path)
self.export_data = self.data

def export(self):
# Overrides base invoice export behavior
self._filter_columns()

# Update table schema, only allows "possible" migrations (i.e raises on str -> Decimal)
# TODO (Quan) When we implement typing validation for dataframes, change this to raise errors
with self.iceberg_table.update_schema() as update_schema:
try:
update_schema.union_by_name(
pyarrow.Table.from_pandas(self.export_data).schema
)
except ValueError as e:
logger.warning(
f"Dataframe contains columns not convertable to PyIceberg: {e}"
)

self.iceberg_table.append(pyarrow.Table.from_pandas(self.export_data))

def export_s3(self, s3_bucket):
return
100 changes: 100 additions & 0 deletions process_report/invoices/invoice.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,32 @@
from dataclasses import dataclass
from typing import Any, Callable
from decimal import Decimal
import pandas
import pyarrow
import logging

import process_report.util as util


logger = logging.getLogger(__name__)


@dataclass
class InvoiceColumn:
name: str
dtype: Any
default_value: Any | None = None
default_initializer: Callable[[pandas.DataFrame], pandas.Series] | None = None


# Field type definitions
BALANCE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 2))
RATE_FIELD_TYPE = pandas.ArrowDtype(pyarrow.decimal128(21, 13))
INTEGER_FIELD_TYPE = pandas.ArrowDtype(pyarrow.int64())
STRING_FIELD_TYPE = pandas.StringDtype()
BOOL_FIELD_TYPE = pandas.BooleanDtype()


### PI file field names
PI_PI_FIELD = "PI"
PI_FIRST_MONTH = "First Invoice Month"
Expand Down Expand Up @@ -65,18 +88,77 @@
IS_COURSE_FIELD = "Is Course"
###

### Initialized Column objects
INVOICE_DATE_COLUMN = InvoiceColumn(name=INVOICE_DATE_FIELD, dtype=STRING_FIELD_TYPE)
PROJECT_COLUMN = InvoiceColumn(name=PROJECT_FIELD, dtype=STRING_FIELD_TYPE)
PROJECT_ID_COLUMN = InvoiceColumn(name=PROJECT_ID_FIELD, dtype=STRING_FIELD_TYPE)
PI_COLUMN = InvoiceColumn(name=PI_FIELD, dtype=STRING_FIELD_TYPE)
INVOICE_EMAIL_COLUMN = InvoiceColumn(name=INVOICE_EMAIL_FIELD, dtype=STRING_FIELD_TYPE)
INVOICE_ADDRESS_COLUMN = InvoiceColumn(
name=INVOICE_ADDRESS_FIELD, dtype=STRING_FIELD_TYPE
)
INSTITUTION_COLUMN = InvoiceColumn(name=INSTITUTION_FIELD, dtype=STRING_FIELD_TYPE)
INSTITUTION_ID_COLUMN = InvoiceColumn(
name=INSTITUTION_ID_FIELD, dtype=STRING_FIELD_TYPE
)
GROUP_NAME_COLUMN = InvoiceColumn(name=GROUP_NAME_FIELD, dtype=STRING_FIELD_TYPE)
GROUP_INSTITUTION_COLUMN = InvoiceColumn(
name=GROUP_INSTITUTION_FIELD, dtype=STRING_FIELD_TYPE
)
GROUP_BALANCE_COLUMN = InvoiceColumn(name=GROUP_BALANCE_FIELD, dtype=BALANCE_FIELD_TYPE)
GROUP_BALANCE_USED_COLUMN = InvoiceColumn(
name=GROUP_BALANCE_USED_FIELD, dtype=BALANCE_FIELD_TYPE
)
SU_HOURS_COLUMN = InvoiceColumn(name=SU_HOURS_FIELD, dtype=INTEGER_FIELD_TYPE)
SU_TYPE_COLUMN = InvoiceColumn(name=SU_TYPE_FIELD, dtype=STRING_FIELD_TYPE)
SU_CHARGE_COLUMN = InvoiceColumn(name=SU_CHARGE_FIELD, dtype=BALANCE_FIELD_TYPE)
LENOVO_CHARGE_COLUMN = InvoiceColumn(name=LENOVO_CHARGE_FIELD, dtype=BALANCE_FIELD_TYPE)
RATE_COLUMN = InvoiceColumn(
name=RATE_FIELD, dtype=RATE_FIELD_TYPE
) # Using decimal to suppress scientific notation in export
COST_COLUMN = InvoiceColumn(name=COST_FIELD, dtype=BALANCE_FIELD_TYPE)
CREDIT_COLUMN = InvoiceColumn(name=CREDIT_FIELD, dtype=BALANCE_FIELD_TYPE)
CREDIT_CODE_COLUMN = InvoiceColumn(name=CREDIT_CODE_FIELD, dtype=STRING_FIELD_TYPE)
SUBSIDY_COLUMN = InvoiceColumn(
name=SUBSIDY_FIELD, dtype=BALANCE_FIELD_TYPE, default_value=Decimal(0)
)
BALANCE_COLUMN = InvoiceColumn(
name=BALANCE_FIELD,
dtype=BALANCE_FIELD_TYPE,
default_initializer=lambda df: df[COST_FIELD],
)
PI_BALANCE_COLUMN = InvoiceColumn(
name=PI_BALANCE_FIELD,
dtype=BALANCE_FIELD_TYPE,
default_initializer=lambda df: df[COST_FIELD],
)

# Internally used fields
IS_BILLABLE_COLUMN = InvoiceColumn(name=IS_BILLABLE_FIELD, dtype=BOOL_FIELD_TYPE)
MISSING_PI_COLUMN = InvoiceColumn(name=MISSING_PI_FIELD, dtype=BOOL_FIELD_TYPE)
PROJECT_NAME_COLUMN = InvoiceColumn(name=PROJECT_NAME_FIELD, dtype=STRING_FIELD_TYPE)
GROUP_MANAGED_COLUMN = InvoiceColumn(name=GROUP_MANAGED_FIELD, dtype=BOOL_FIELD_TYPE)
CLUSTER_NAME_COLUMN = InvoiceColumn(name=CLUSTER_NAME_FIELD, dtype=STRING_FIELD_TYPE)
IS_COURSE_COLUMN = InvoiceColumn(
name=IS_COURSE_FIELD, dtype=BOOL_FIELD_TYPE, default_value=False
)
###


@dataclass
class Invoice:
export_columns_list = list()
exported_columns_map = dict()
initializes_columns = tuple()
operates_on_columns = tuple()

invoice_month: str
data: pandas.DataFrame
name: str = ""
export_data = None

def process(self):
self._init_columns()
self._prepare()
self._process()
self._prepare_export()
Expand All @@ -93,6 +175,24 @@ def output_s3_key(self) -> str:
def output_s3_archive_key(self):
return f"Invoices/{self.invoice_month}/Archive/{self.name} {self.invoice_month} {util.get_iso8601_time()}.csv"

def _init_columns(self):
"""Initializes columns specified in `initializes_columns` and cast them to appropriate types

If column already exists, only do casting
If no default value is given, column initialized to None
"""
for field in self.initializes_columns:
if field.name not in self.data.columns:
field_default = field.default_value
if field.default_initializer:
field_default = field.default_initializer(self.data)
self.data[field.name] = field_default
elif self.data.dtypes[field.name] != field.dtype:
logger.warning(
f"Column {field.name} has dtype {self.data.dtypes[field.name]} instead of expected {field.dtype}."
)
self.data = self.data.astype({field.name: field.dtype})

def _prepare(self):
"""Prepares the data for processing.

Expand Down
13 changes: 13 additions & 0 deletions process_report/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ def get_remote_filepath(self, remote_filepath: str) -> str:
return util.fetch_s3(remote_filepath)
return remote_filepath

@functools.lru_cache
def get_iceberg_config(self) -> dict:
"""Load an Iceberg catalog config from a YAML file."""
with open(invoice_settings.iceberg_config_path, "r") as f:
return yaml.safe_load(f)

@functools.lru_cache
def get_new_pi_credit_amount(self) -> Decimal:
return invoice_settings.new_pi_credit_amount or get_rates_info().get_value_at(
Expand Down Expand Up @@ -112,6 +118,13 @@ def get_alias_map(self) -> dict:
def load_dataframe(self, filepath: str) -> pandas.DataFrame:
return pandas.read_csv(filepath)

@functools.lru_cache
def load_prepay_credits(self) -> pandas.DataFrame:
prepay_df = self.load_dataframe(invoice_settings.prepay_credits_filepath)
return prepay_df.astype(
{invoice.PREPAY_CREDIT_FIELD: invoice.BALANCE_FIELD_TYPE}
)

@functools.lru_cache
def _load_pi_config(self, filepath: str) -> list[dict]:
with open(filepath) as file:
Expand Down
48 changes: 31 additions & 17 deletions process_report/process_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os

import pandas
import pyarrow

from process_report.settings import invoice_settings
from process_report.loader import loader
Expand All @@ -19,9 +18,11 @@
MOCA_prepaid_invoice,
prepay_credits_snapshot,
ocp_test_invoice,
iceberg_invoice,
)
from process_report.processors import (
coldfront_fetch_processor,
validate_input_column_processor,
validate_pi_alias_processor,
add_institution_processor,
lenovo_processor,
Expand All @@ -33,6 +34,20 @@
validate_cluster_name_processor,
)

PROCESSING_ORDER = [
validate_input_column_processor.ValidateInputColumnsProcessor,
validate_cluster_name_processor.ValidateClusterNameProcessor,
coldfront_fetch_processor.ColdfrontFetchProcessor,
validate_pi_alias_processor.ValidatePIAliasProcessor,
add_institution_processor.AddInstitutionProcessor,
lenovo_processor.LenovoProcessor,
validate_billable_pi_processor.ValidateBillablePIsProcessor,
pi_su_credit_processor.PISUCreditProcessor,
new_pi_credit_processor.NewPICreditProcessor,
bu_subsidy_processor.BUSubsidyProcessor,
prepayment_processor.PrepaymentProcessor,
]


PI_S3_FILEPATH = "PIs/PI.csv"
ALIAS_S3_FILEPATH = "PIs/alias.csv"
Expand Down Expand Up @@ -66,20 +81,7 @@ def main():

### Preliminary processing
processed_data = process_merged_dataframe(
invoice_month,
merged_dataframe,
[
validate_cluster_name_processor.ValidateClusterNameProcessor,
coldfront_fetch_processor.ColdfrontFetchProcessor,
validate_pi_alias_processor.ValidatePIAliasProcessor,
add_institution_processor.AddInstitutionProcessor,
lenovo_processor.LenovoProcessor,
validate_billable_pi_processor.ValidateBillablePIsProcessor,
pi_su_credit_processor.PISUCreditProcessor,
new_pi_credit_processor.NewPICreditProcessor,
bu_subsidy_processor.BUSubsidyProcessor,
prepayment_processor.PrepaymentProcessor,
],
invoice_month, merged_dataframe, PROCESSING_ORDER
)

### Export invoices
Expand All @@ -96,6 +98,7 @@ def main():
MOCA_prepaid_invoice.MOCAPrepaidInvoice,
prepay_credits_snapshot.PrepayCreditsSnapshot,
ocp_test_invoice.OcpTestInvoice,
iceberg_invoice.IcebergInvoice,
],
invoice_settings.upload_to_s3,
)
Expand All @@ -109,8 +112,19 @@ def merge_csv(files):
file,
engine="pyarrow",
dtype={
invoice.COST_FIELD: pandas.ArrowDtype(pyarrow.decimal128(21, 2)),
invoice.RATE_FIELD: str,
invoice.INVOICE_DATE_FIELD: invoice.STRING_FIELD_TYPE,
invoice.PROJECT_FIELD: invoice.STRING_FIELD_TYPE,
invoice.PROJECT_ID_FIELD: invoice.STRING_FIELD_TYPE,
invoice.PI_FIELD: invoice.STRING_FIELD_TYPE,
invoice.CLUSTER_NAME_FIELD: invoice.STRING_FIELD_TYPE,
invoice.INVOICE_EMAIL_FIELD: invoice.STRING_FIELD_TYPE,
invoice.INVOICE_ADDRESS_FIELD: invoice.STRING_FIELD_TYPE,
invoice.INSTITUTION_FIELD: invoice.STRING_FIELD_TYPE,
invoice.INSTITUTION_ID_FIELD: invoice.STRING_FIELD_TYPE,
invoice.SU_HOURS_FIELD: invoice.INTEGER_FIELD_TYPE,
invoice.SU_TYPE_FIELD: invoice.STRING_FIELD_TYPE,
invoice.RATE_FIELD: invoice.RATE_FIELD_TYPE,
invoice.COST_FIELD: invoice.BALANCE_FIELD_TYPE,
},
quotechar="|",
)
Expand Down
6 changes: 6 additions & 0 deletions process_report/processors/add_institution_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

@dataclass
class AddInstitutionProcessor(processor.Processor):
operates_on_columns = (
invoice.INSTITUTION_COLUMN,
invoice.PI_COLUMN,
invoice.PROJECT_COLUMN,
)

def _add_institution(self):
"""Determine every PI's institution name, logging any PI whose institution cannot be determined
This is performed by `get_institution_from_pi()`, which tries to match the PI's username to
Expand Down
14 changes: 12 additions & 2 deletions process_report/processors/bu_subsidy_processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dataclasses import dataclass, field
from decimal import Decimal

from process_report.loader import loader
from process_report.invoices import invoice
Expand All @@ -10,6 +9,18 @@
class BUSubsidyProcessor(discount_processor.DiscountProcessor):
IS_DISCOUNT_BY_NERC = False

initializes_columns = (invoice.PROJECT_NAME_COLUMN, invoice.SUBSIDY_COLUMN)
operates_on_columns = (
*initializes_columns,
invoice.PROJECT_COLUMN,
invoice.PI_COLUMN,
invoice.IS_BILLABLE_COLUMN,
invoice.MISSING_PI_COLUMN,
invoice.INSTITUTION_COLUMN,
invoice.PI_BALANCE_COLUMN,
invoice.BALANCE_COLUMN,
)

subsidy_amount: int = field(default_factory=loader.get_bu_subsidy_amount)

def _prepare(self):
Expand All @@ -21,7 +32,6 @@ def get_project(row):
return project_alloc[: project_alloc.rfind("-")]

self.data[invoice.PROJECT_NAME_FIELD] = self.data.apply(get_project, axis=1)
self.data[invoice.SUBSIDY_FIELD] = Decimal(0)

def _process(self):
self.data = self._apply_subsidy(self.data, self.subsidy_amount)
Expand Down
Loading
Loading