Skip to content

Liberia #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 32 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
264f1e5
README update including instructions to access unity catalog and spec…
Feb 5, 2025
2feb128
Creating microdata extraction and transformation
Feb 5, 2025
3c125c7
adding liberia to cross_country_aggregate
Feb 10, 2025
07bc5c9
First pass edits
Feb 12, 2025
6f2afda
Bug fixes with .eq and final col names
Feb 12, 2025
5281fe8
getting regions without prefix code
Feb 12, 2025
6d4d726
Review of fuc_sub
Feb 19, 2025
74d4edf
Econ verification
Feb 19, 2025
12f8552
logic error fixes
Mar 11, 2025
d335b61
reformatting sections for readability. Implementing end to end
Mar 11, 2025
7c7c21c
renaming variables for clarity
Mar 11, 2025
d004867
reformatting for readability. calculating basic wages
Mar 11, 2025
06ec50e
Merge branch 'main' into liberia
elysenko Mar 12, 2025
7bed88a
moving sub functional logic below functional logic
Mar 12, 2025
fb37401
swap econ and sub econ
Mar 12, 2025
c2530f8
removing unspecified TODO
Mar 12, 2025
220ba90
Rearranging comments to point out where the admin and geo swap occurs…
Mar 12, 2025
6765328
adding `is_foreign` tag
Mar 12, 2025
8429fe4
removing outdated comment. removing blank line. Updating allowances f…
Mar 12, 2025
33e54af
dropping empty cell
Mar 12, 2025
b103534
inheriting column filters in sub econ and sub func from econ and func…
Mar 13, 2025
62bd885
renaming filter for clarity
Mar 13, 2025
6572233
casting admin1 as geo1
Mar 13, 2025
4f0d0a1
begin updates for revised notebook
Apr 2, 2025
2877ccc
refactoring after changing format
Apr 9, 2025
e4c74a1
updating interest on debt to reflect new formula
Apr 10, 2025
e7ae4cd
Merge remote-tracking branch 'origin/main' into liberia
Apr 16, 2025
c7c6189
year filter for years < 2024
Apr 18, 2025
ef61299
filtering out 2024 and formatting
Apr 18, 2025
24c1cd8
removing nested when because it creates null values for econ
Apr 18, 2025
8b7d2d5
Applying End Year filter
Apr 20, 2025
f1551d0
cleaning code and renaming wrong econ name ('Social Benefits') - 'b' …
Apr 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions Liberia/LBR_extract_microdata_excel_to_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Databricks notebook source
# MAGIC %run ../utils

# COMMAND ----------

import pandas as pd
import numpy as np

COUNTRY = 'Liberia'

microdata_csv_dir = prepare_microdata_csv_dir(COUNTRY)
print(microdata_csv_dir)
filename = input_excel_filename(COUNTRY)
disaggregated_data_sheets = ['Data']

for sheet in tqdm(disaggregated_data_sheets):
csv_file_path = f'{microdata_csv_dir}/{sheet}.csv'
print('filename: (next)')
print(filename)
df = pd.read_excel(filename, sheet_name=sheet, header=0)

header = [col_name for col_name in df.columns if is_named_column(col_name)]
df = df[header]
df.columns = [col.strip() for col in header]

df = df.applymap(normalize_cell)
df = df.dropna(how='all')
df.to_csv(csv_file_path, index=False, encoding='utf-8')
227 changes: 227 additions & 0 deletions Liberia/LBR_transform_load_dlt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# Databricks notebook source
import dlt
import unicodedata
from pyspark.sql.functions import (
substring, col, lit, when, udf, trim, regexp_replace, initcap, concat, lower, create_map, coalesce
)

TOP_DIR = "/Volumes/prd_mega/sboost4/vboost4"
INPUT_DIR = f"{TOP_DIR}/Documents/input/Countries"
WORKSPACE_DIR = f"{TOP_DIR}/Workspace"
COUNTRY = 'Liberia'
COUNTRY_MICRODATA_DIR = f'{WORKSPACE_DIR}/microdata_csv/{COUNTRY}'

CSV_READ_OPTIONS = {
"header": "true",
"multiline": "true",
"quote": '"',
"escape": '"',
}

@dlt.table(name='lbr_boost_bronze')
def boost_bronze():
return (spark.read
.format("csv")
.options(**CSV_READ_OPTIONS)
.option("inferSchema", "true")
.load(f'{COUNTRY_MICRODATA_DIR}/Data.csv'))

@dlt.table(name='lbr_boost_silver')
def boost_silver():
df = dlt.read('lbr_boost_bronze')

# --- Dropping unwanted columns ---
bad_cols = ['year']

for bad_col in bad_cols:
if bad_col in df.columns:
df = df.drop(bad_col)

# --- Column renaming mapping ---
rename_mappings = {
'year.1': 'year',
'econ0': 'Econ0',
'econ1': 'Econ1',
'econ2': 'Econ2',
'econ3': 'Econ3',
'econ4': 'Econ4',
'geo1': 'Geo1',
'geo2': 'Geo2',
'admin1': 'Admin1',
'admin2': 'Admin2',
'admin3': 'Admin3',
'func1': 'Func1',
'func214': 'Func3',
'Func226': 'Func2',
'budget_class': 'budget',
'WAGES': 'wages',
'FUND25': 'fund',
}

for old_name, new_name in rename_mappings.items():
df = df.withColumnRenamed(old_name, new_name)

columns_to_clean = [
"econ0", "econ1", "econ2", "econ3",
"econ4", "geo1", "fund",'geo1','geo2','wages'
]

for column in columns_to_clean:
print(df.columns)
df = df.withColumn(column, coalesce(col(column).cast("string"), lit("")))

# --- Global Filters ---
# Filtering out unwanted values at the total expenditures level
df = df.filter((col('Econ0') != '4 Liabilities') & (col('Econ1') != '32 Financial assets'))

# used quite often, so to limit repetition
not_dept = ~col('Admin2').startswith('10401')

# --- Admin and Geo Data Adjustments ---
# admin and geo data appear to be swapped
df = df.withColumn(
'admin0', when(col('Geo1').startswith('00'), 'Central').otherwise('Regional')
).withColumn(
'admin1', when(col('Geo1').startswith('00'), 'Central Scope')
.otherwise(regexp_replace(col('Geo1'), r'^\d+\s+', ''))
).withColumn(
'admin2', regexp_replace(col('Admin1'), r'^\d+\s+', '')
)

# --- Functional Classifications ---
func_mapping = {
"02": ["Defence",False],
"03": ["Public order and safety",False],
"04": ["Economic affairs",False],
"05": ["Environmental protection",True],
"06": ["Housing and community amenities",False],
"07": ["Health",True],
"08": ["Recreation, culture and religion",False],
"09": ["Education",True],
"10": ["Social protection",False]
}

func_filter = None
for key, value_list in func_mapping.items():
base_condition = (col('Func1').startswith(key))
alt_condition_flag = value_list[1]
condition_value = value_list[0]

condition_1 = base_condition & not_dept
condition_2 = base_condition & col('Econ0').startswith('2')

if alt_condition_flag == False:
func_filter = func_filter.when(condition_1, condition_value) if not func_filter is None else when(condition_1, condition_value)
else:
func_filter = func_filter.when(condition_2, condition_value) if not func_filter is None else when(condition_2, condition_value)

func_filter = func_filter.otherwise("General public services")
df = df.withColumn("func", func_filter)

# --- Sub-Functional Classifications ---
df = df.withColumn(
'func_sub', when((col("func") == "Public order and safety") & (col('Func2').startswith('033')), "judiciary")
.when((col("func") == "Public order and safety") & (~col('Func2').startswith('033')), "public safety")
.when(not_dept & (col('Func2').startswith('042')), 'agriculture')
.when(col('Func2').startswith('045'), 'transport')
.when(not_dept & (col('Func3').startswith('0451')), 'roads')
.when(col('Admin1').startswith('429'), 'air transport')
.when(not_dept & (col('Func2').startswith('043')), 'energy')
.when(col('Admin1').startswith('418'), 'telecoms')
.when(col('Func2').startswith('07 ') | col('Func2').startswith('074'), 'primary and secondary health')
.when(col('Func2').startswith('073'), 'tertiary and quaternary health')
)

# --- Econ and sub econ reused filters ---
pensions_filter = (col('Econ0').startswith('2')) & (col('Econ2').startswith('271'))
social_assistance_filter = ((col('Func1').startswith('10')) & col('Econ0').startswith('2')) # 2013 - ...
allowances_filter = ((col('Econ2').startswith('211')) & not_dept)
wage_filter = (col('Econ1').startswith('21'))

# --- Sub-Economic Classifications ---
df = df.withColumn(
'econ_sub', when(social_assistance_filter, 'social assistance')
.when(wage_filter & allowances_filter, 'allowances')
.when(wage_filter & ~allowances_filter, 'basic wages')
.when(pensions_filter, 'pensions')
.when(not_dept & (col('Econ2').startswith('212')), 'social benefits (pension contributions)')
.when((col('Econ3').startswith('2213')) | (col('Econ3').startswith('2218')), 'basic services')
.when(col('Econ3').startswith('2215'), 'recurrent maintenance')
)

# --- Economic Classifications ---
df = df.withColumn(
'econ', when(col('Econ2').startswith("2")
& col('Econ1').startswith("21"),
'Wage bill')
.when(
(col('budget').startswith('4'))
& not_dept
& (~col('Econ1').startswith('21'))
, 'Capital expenditures')
.when(
col('Econ1').startswith('25'),
'Subsidies')
.when(
(col('Econ1').startswith('22'))
& (col('budget').startswith('1'))
, 'Goods and services')
.when(
(col('year').cast('integer') < 2018) & col('Econ1').startswith('24'),
'Interest on debt'
)
.when(
(col('year').cast('integer') >= 2018) & col('Econ4').startswith('423104'),
'Interest on debt'
)
.when(
(col('Econ1').startswith('13') | col('Econ1').startswith('26')) &
~col('Func1').startswith('10') &
col('budget').startswith('1') &
not_dept,
'Other grants and transfers'
)
# social benefits is getting assigned where econ+_sub is null, so mitigate that
.when(
(col('econ_sub').isin('social assistance', 'pensions')) & col('econ_sub').isNotNull(),
'Social benefits'
)
.otherwise('Other expenses')
)

# --- Foreign Classification ---
df = df.withColumn(
'is_foreign', (col('fund') == 'Foreign')
)

# --- Geo ---
df = df.withColumn(
'geo1', lower(col('admin1'))
)

return df

@dlt.table(name=f'lbr_boost_gold')
def boost_gold():
return (dlt.read(f'lbr_boost_silver')
.withColumn('country_name', lit(COUNTRY))
.filter(col('year') > 2008)
.filter(col('year') < 2024)
.filter(col('actual').isNotNull())
.select('country_name',
col('year').cast('integer'),
col('approved'),
col('actual').alias('executed'),
col('revised'),
'admin0',
'admin1',
'admin2',
'econ_sub',
'econ',
'func_sub',
'func',
'is_foreign',
'geo1'
)
)

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ BOOST data is country-specific — for instance, some ministries of finance have
1. Create a new folder named using the country name.
2. Write and test the country-specific ETL code. You may organize the code as follows:
- A notebook for extracting the BOOST/raw data from the Excel sheet(s) (e.g. [ALB_extract_microdata_excel_to_csv](./Albania/ALB_extract_microdata_excel_to_csv.py)). The resulting CSV files can then be loaded directly by a subsequent Delta Live Table (DLT) script (described in the next point). If the raw data is already in a [format supported by DLT load](https://docs.databricks.com/en/delta-live-tables/load.html), such as CSV, you may skip this step/notebook altogether. To test the code, run the notebook in Databricks with the assigned project compute cluster (e.g. DAP-PROD-CLUSTER).
- A DLT notebook for cleaning and transforming the BOOST/raw data. The resulting data should have each line item tagged with a predefined set of tags in a consistent manner (e.g. [ALB_transform_load_dlt](./Albania/ALB_transform_load_dlt.py)). To validate the DLT code and verify the resulting table schema, run the notebook with the default cluster as you would a normal notebook. To populate the tables, create a DLT pipeline with `boost_intermediate` as the target schema name. Reference an existing country's DLT pipeline settings for other configurations.
- A DLT notebook for cleaning and transforming the BOOST/raw data. The resulting data should have each line item tagged with a predefined set of tags in a consistent manner (e.g. [ALB_transform_load_dlt](./Albania/ALB_transform_load_dlt.py)). To validate the DLT code and verify the resulting table schema, run the notebook with the default cluster as you would a normal notebook. To populate the tables, create a DLT pipeline in the `unity catalog`, `prg_mega` as the default catalog, and `boost_intermediate` as the target schema name. Reference an existing country's DLT pipeline settings for other configurations.
- Be sure to follow the naming convention (code folder & file names, table names, pipeline names, etc.) referencing existing countries. When a country code is needed, use the [3-letter ISO 3166 country codes](https://en.wikipedia.org/wiki/ISO_3166-1_alpha-3).
3. Add the new country's ETL pipeline steps to the "BOOST Harmonize" job: Workflows > Jobs > BOOST Harmonize > Tasks > + Add task
- Add the extraction step using Type: Notebook, and Source: git. Use the default cluster as the compute.
Expand Down
3 changes: 2 additions & 1 deletion cross_country_aggregate_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
boost_intermediate_schema = 'boost_intermediate'

# Adding a new country requires adding the country here
country_codes = ['moz', 'pry', 'ken', 'pak', 'bfa', 'col', 'cod', 'nga', 'tun', 'btn', 'bgd', 'alb', 'ury', "zaf", 'chl', 'gha']

country_codes = ['moz', 'pry', 'ken', 'pak', 'bfa', 'col', 'cod', 'nga', 'tun', 'btn', 'bgd', 'alb', 'ury', "zaf", 'chl', 'lbr', 'gha']

schema = StructType([
StructField("country_name", StringType(), True, {'comment': 'The name of the country for which the budget data is recorded (e.g., "Kenya", "Brazil").'}),
Expand Down
8 changes: 8 additions & 0 deletions quality/transform_load_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"escape": '"',
}

end_year = 2024

# COMMAND ----------

@dlt.table(name=f'quality_cci_bronze')
Expand Down Expand Up @@ -51,6 +53,7 @@ def quality_total_silver():
valueColumnName="amount"
)
.filter(F.col('amount').isNotNull())
.filter(F.col('year') < end_year)
)

# COMMAND ----------
Expand Down Expand Up @@ -106,6 +109,7 @@ def quality_functional_silver():
valueColumnName="amount"
)
.filter(F.col('amount').isNotNull())
.filter(F.col('year') < end_year)
)

# COMMAND ----------
Expand Down Expand Up @@ -156,6 +160,7 @@ def quality_economic_silver():
valueColumnName="amount"
)
.filter(F.col('amount').isNotNull())
.filter(F.col('year') < end_year)
)

# COMMAND ----------
Expand Down Expand Up @@ -186,6 +191,7 @@ def quality_judiciary_silver():
valueColumnName="amount"
)
.filter(F.col('amount').isNotNull())
.filter(F.col('year') < end_year)
)

# COMMAND ----------
Expand Down Expand Up @@ -218,6 +224,7 @@ def quality_total_subnat_silver():
valueColumnName="amount"
)
.filter(F.col('amount').isNotNull())
.filter(F.col('year') < end_year)
)

# COMMAND ----------
Expand Down Expand Up @@ -249,6 +256,7 @@ def quality_total_foreign_silver():
valueColumnName="amount"
)
.filter(F.col('amount').isNotNull())
.filter(F.col('year') < end_year)
)

# COMMAND ----------
Expand Down