Skip to content

Commit bfe5338

Browse files
committed
Enable exporting monthly invoice to Iceberg
Added new invoice `IcebergInvoice` to export invoice data to Iceberg tables The export process also includes a schema update step to allow updates to Iceberg table schema. New Iceberg integration test added to validate iceberg functionality E2E test updated to include iceberg exporting Both tests use a temporary sqlite catalog
1 parent 48e43cb commit bfe5338

9 files changed

Lines changed: 274 additions & 4 deletions

File tree

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import logging
2+
from dataclasses import dataclass, field
3+
4+
from pyiceberg.table import Table
5+
from pyiceberg.catalog import Catalog, load_catalog
6+
import pyarrow
7+
8+
import process_report.invoices.invoice as invoice
9+
from process_report.loader import loader
10+
from process_report.settings import invoice_settings
11+
12+
logger = logging.getLogger(__name__)
13+
logging.basicConfig(level=logging.INFO)
14+
15+
16+
def get_iceberg_catalog(config: dict, catalog_name: str) -> Catalog:
17+
return load_catalog(name=catalog_name, **config)
18+
19+
20+
def get_iceberg_table(catalog: Catalog, table_path) -> Table:
21+
return catalog.load_table(table_path)
22+
23+
24+
@dataclass
25+
class IcebergInvoice(invoice.Invoice):
26+
export_columns_list = [
27+
invoice.INVOICE_DATE_FIELD,
28+
invoice.PROJECT_FIELD,
29+
invoice.PROJECT_ID_FIELD,
30+
invoice.PI_FIELD,
31+
invoice.CLUSTER_NAME_FIELD,
32+
invoice.INVOICE_EMAIL_FIELD,
33+
invoice.INVOICE_ADDRESS_FIELD,
34+
invoice.INSTITUTION_FIELD,
35+
invoice.INSTITUTION_ID_FIELD,
36+
invoice.IS_BILLABLE_FIELD,
37+
invoice.SU_HOURS_FIELD,
38+
invoice.SU_TYPE_FIELD,
39+
invoice.RATE_FIELD,
40+
invoice.GROUP_NAME_FIELD,
41+
invoice.GROUP_INSTITUTION_FIELD,
42+
invoice.GROUP_BALANCE_FIELD,
43+
invoice.COST_FIELD,
44+
invoice.GROUP_BALANCE_USED_FIELD,
45+
invoice.CREDIT_FIELD,
46+
invoice.CREDIT_CODE_FIELD,
47+
invoice.BALANCE_FIELD,
48+
]
49+
50+
iceberg_catalog_name: str = invoice_settings.iceberg_catalog_name
51+
iceberg_catalog_config: dict = field(
52+
default_factory=lambda: loader.get_iceberg_config()
53+
)
54+
iceberg_table_path: str = invoice_settings.iceberg_table_path
55+
56+
def _prepare(self):
57+
iceberg_catalog = get_iceberg_catalog(
58+
self.iceberg_catalog_config, self.iceberg_catalog_name
59+
)
60+
self.iceberg_table = get_iceberg_table(iceberg_catalog, self.iceberg_table_path)
61+
self.export_data = self.data
62+
63+
def export(self):
64+
# Overrides base invoice export behavior
65+
self._filter_columns()
66+
67+
# Update table schema, only allows "possible" migrations (i.e raises on str -> Decimal)
68+
# TODO (Quan) When we implement typing validation for dataframes, change this to raise errors
69+
with self.iceberg_table.update_schema() as update_schema:
70+
try:
71+
update_schema.union_by_name(
72+
pyarrow.Table.from_pandas(self.export_data).schema
73+
)
74+
except ValueError as e:
75+
logger.warning(
76+
f"Dataframe contains columns not convertable to PyIceberg: {e}"
77+
)
78+
79+
self.iceberg_table.append(pyarrow.Table.from_pandas(self.export_data))
80+
81+
def export_s3(self, s3_bucket):
82+
return

process_report/loader.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ def get_remote_filepath(self, remote_filepath: str) -> str:
6161
return util.fetch_s3(remote_filepath)
6262
return remote_filepath
6363

64+
@functools.lru_cache
65+
def get_iceberg_config(self) -> dict:
66+
"""Load an Iceberg catalog config from a YAML file."""
67+
with open(invoice_settings.iceberg_config_path, "r") as f:
68+
return yaml.safe_load(f)
69+
6470
@functools.lru_cache
6571
def get_new_pi_credit_amount(self) -> Decimal:
6672
return invoice_settings.new_pi_credit_amount or get_rates_info().get_value_at(

process_report/process_report.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
MOCA_prepaid_invoice,
1919
prepay_credits_snapshot,
2020
ocp_test_invoice,
21+
iceberg_invoice,
2122
)
2223
from process_report.processors import (
2324
coldfront_fetch_processor,
@@ -97,6 +98,7 @@ def main():
9798
MOCA_prepaid_invoice.MOCAPrepaidInvoice,
9899
prepay_credits_snapshot.PrepayCreditsSnapshot,
99100
ocp_test_invoice.OcpTestInvoice,
101+
iceberg_invoice.IcebergInvoice,
100102
],
101103
invoice_settings.upload_to_s3,
102104
)

process_report/settings.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ class Settings(BaseSettings):
1111
keycloak_client_id: str | None = None
1212
keycloak_client_secret: str | None = None
1313

14+
# Iceberg config
15+
iceberg_catalog_name: str | None = None
16+
iceberg_config_path: str | None = None
17+
iceberg_table_path: str | None = None
18+
1419
invoice_path_template: str = "Invoices/{invoice_month}/Service Invoices/"
1520
invoice_month: str = (datetime.datetime.today() - relativedelta(months=1)).strftime(
1621
"%Y-%m"

process_report/tests/base.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@ def create_test_invoice(self, data_dict: dict):
5555

5656

5757
class BaseTestCaseWithTempDir(BaseTestCase):
58-
def setUp(self):
59-
self.tempdir = Path(tempfile.TemporaryDirectory(delete=False).name)
58+
@classmethod
59+
def setUpClass(cls):
60+
cls.tempdir = Path(tempfile.TemporaryDirectory(delete=False).name)
6061

61-
def tearDown(self):
62-
shutil.rmtree(self.tempdir)
62+
@classmethod
63+
def tearDownClass(cls):
64+
shutil.rmtree(cls.tempdir)

process_report/tests/e2e/test_e2e_pipeline.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
import logging
66
import subprocess
77
from typing import Dict, List
8+
import yaml
9+
10+
from pyiceberg import schema, catalog
811

912
logger = logging.getLogger(__name__)
1013

@@ -131,6 +134,28 @@ def _prepare_pipeline_execution(
131134
env.setdefault("CHROME_BIN_PATH", "/usr/bin/chromium")
132135
env["PYTHONPATH"] = str(project_root) + ":" + env.get("PYTHONPATH", "")
133136

137+
# Iceberg settings, init test namespace and table
138+
env["iceberg_catalog_name"] = "test_catalog"
139+
env["iceberg_config_path"] = workspace / "test_iceberg_config.yaml"
140+
env["iceberg_table_path"] = "test_namespace.test_table"
141+
142+
catalog_config = {
143+
"type": "sql",
144+
"warehouse": f"file://{workspace}",
145+
"uri": f"sqlite:///{workspace / 'test_iceberg_catalog.db'}",
146+
}
147+
148+
with open(workspace / "test_iceberg_config.yaml", "w") as f:
149+
yaml.dump(catalog_config, f)
150+
151+
test_catalog = catalog.load_catalog(name="test_catalog", **catalog_config)
152+
test_schema = schema.Schema(
153+
schema.NestedField(1, "Invoice Month", schema.StringType()),
154+
)
155+
156+
test_catalog.create_namespace_if_not_exists("test_namespace")
157+
test_catalog.create_table_if_not_exists("test_namespace.test_table", test_schema)
158+
134159
return command, env
135160

136161

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import pandas
2+
import pyarrow
3+
from pyiceberg import schema, catalog
4+
5+
from process_report.invoices.iceberg_invoice import IcebergInvoice
6+
from process_report.tests.base import BaseTestCaseWithTempDir
7+
8+
9+
class TestIceberg(BaseTestCaseWithTempDir):
10+
@classmethod
11+
def setUpClass(cls):
12+
super().setUpClass()
13+
# Create in-memory catalog
14+
cls.catalog_name = "catalog_foo"
15+
cls.table_path = "namespace_foo.table_foo"
16+
17+
config_dict = {
18+
"type": "sql",
19+
"warehouse": str(cls.tempdir),
20+
"uri": f"sqlite:///{str(cls.tempdir)}/foo.db",
21+
}
22+
cls.catalog_config = config_dict
23+
24+
# Initialize test schema that's used in setUp()
25+
cls.catalog = catalog.load_catalog(name=cls.catalog_name, **config_dict)
26+
cls.test_schema = schema.Schema(
27+
schema.NestedField(1, "Invoice Month", schema.StringType()),
28+
schema.NestedField(2, "Cost", schema.DecimalType(21, 2)),
29+
schema.NestedField(3, "PI", schema.StringType()),
30+
)
31+
32+
def setUp(self):
33+
self.catalog.create_namespace_if_not_exists("namespace_foo")
34+
self.catalog.create_table_if_not_exists(self.table_path, self.test_schema)
35+
36+
def tearDown(self):
37+
self.catalog.drop_table(self.table_path)
38+
39+
def test_upload_one_dataframe(self):
40+
# Create test dataframe matching table schema
41+
test_df = pandas.DataFrame(
42+
{
43+
"Invoice Month": ["2024-01", "2024-01"],
44+
"Cost": [100.0, 200.0],
45+
"PI": ["PI1", "PI2"],
46+
},
47+
).astype({"Cost": pandas.ArrowDtype(pyarrow.decimal128(21, 2))})
48+
49+
# Create IcebergInvoice instance
50+
inv = IcebergInvoice(
51+
invoice_month="2024-01",
52+
data=test_df,
53+
iceberg_catalog_name=self.catalog_name,
54+
iceberg_catalog_config=self.catalog_config,
55+
iceberg_table_path=self.table_path,
56+
)
57+
inv.process()
58+
inv.export()
59+
60+
# Verify data was uploaded, and Iceberg cost column can be casted to Decimal
61+
table = self.catalog.load_table(self.table_path)
62+
uploaded_df = table.scan().to_pandas().astype(test_df.dtypes)
63+
assert uploaded_df.equals(test_df)
64+
65+
def test_upload_new_column(self):
66+
# Create test dataframe with an extra column
67+
test_df = pandas.DataFrame(
68+
{
69+
"Invoice Month": ["2024-02", "2024-02"],
70+
"Cost": [150.0, 250.0],
71+
"PI": ["PI3", "PI4"],
72+
"extra_column": ["extra1", "extra2"], # New column
73+
}
74+
).astype({"Cost": pandas.ArrowDtype(pyarrow.decimal128(21, 2))})
75+
76+
# Create IcebergInvoice instance
77+
inv = IcebergInvoice(
78+
invoice_month="2024-02",
79+
data=test_df,
80+
iceberg_catalog_name=self.catalog_name,
81+
iceberg_catalog_config=self.catalog_config,
82+
iceberg_table_path=self.table_path,
83+
)
84+
inv.process()
85+
inv.export()
86+
87+
# Verify data was uploaded with new column (schema evolution)
88+
table = self.catalog.load_table(self.table_path)
89+
uploaded_df = table.scan().to_pandas().astype(test_df.dtypes)
90+
assert uploaded_df.equals(test_df)
91+
92+
def test_schema_evolution_with_existing_data(self):
93+
# First, upload initial data without extra column
94+
first_df = pandas.DataFrame(
95+
{
96+
"Invoice Month": ["2024-01", "2024-01"],
97+
"Cost": [100.0, 200.0],
98+
"PI": ["PI1", "PI2"],
99+
}
100+
).astype({"Cost": pandas.ArrowDtype(pyarrow.decimal128(21, 2))})
101+
102+
inv = IcebergInvoice(
103+
invoice_month="2024-01",
104+
data=first_df,
105+
iceberg_catalog_name=self.catalog_name,
106+
iceberg_catalog_config=self.catalog_config,
107+
iceberg_table_path=self.table_path,
108+
)
109+
inv.process()
110+
inv.export()
111+
112+
# Now upload data with an extra column
113+
second_df = pandas.DataFrame(
114+
{
115+
"Invoice Month": ["2024-02", "2024-02"],
116+
"Cost": [150.0, 250.0],
117+
"PI": ["PI3", "PI4"],
118+
"extra_column": ["new1", "new2"], # New column
119+
}
120+
).astype({"Cost": pandas.ArrowDtype(pyarrow.decimal128(21, 2))})
121+
122+
inv2 = IcebergInvoice(
123+
invoice_month="2024-02",
124+
data=second_df,
125+
iceberg_catalog_name=self.catalog_name,
126+
iceberg_catalog_config=self.catalog_config,
127+
iceberg_table_path=self.table_path,
128+
)
129+
inv2.process()
130+
inv2.export()
131+
132+
table = self.catalog.load_table(self.table_path)
133+
result_df = table.scan().to_pandas().astype(second_df.dtypes)
134+
135+
# Verify the table has schema evolved with the new column
136+
# Old rows should have None for the new column
137+
expected_df = pandas.DataFrame(
138+
{
139+
"Invoice Month": ["2024-02", "2024-02", "2024-01", "2024-01"],
140+
"Cost": [150.0, 250.0, 100.0, 200.0],
141+
"PI": ["PI3", "PI4", "PI1", "PI2"],
142+
"extra_column": ["new1", "new2", None, None],
143+
}
144+
).astype({"Cost": pandas.ArrowDtype(pyarrow.decimal128(21, 2))})
145+
assert result_df.equals(expected_df)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
pytest
2+
pytest-env
23
coverage
4+
pyiceberg[sql-sqlite]

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ Jinja2
66
validators
77
python-dateutil
88
pydantic-settings
9+
pyiceberg[pyarrow]

0 commit comments

Comments
 (0)