Skip to content

Commit 83d6ec0

Browse files
Add 21 sovereign debt analysis functions across 6 new modules with tests
Co-authored-by: zachessesjohnson <168567202+zachessesjohnson@users.noreply.github.com>
1 parent 53b45fa commit 83d6ec0

File tree

13 files changed

+1361
-0
lines changed

13 files changed

+1361
-0
lines changed

sovereign_debt_xl/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,9 @@
33
from .forecasting import * # noqa: F401,F403
44
from .modeling import * # noqa: F401,F403
55
from .utils import * # noqa: F401,F403
6+
from .fiscal import * # noqa: F401,F403
7+
from .credit_risk import * # noqa: F401,F403
8+
from .yield_curve import * # noqa: F401,F403
9+
from .reserves import * # noqa: F401,F403
10+
from .stress import * # noqa: F401,F403
11+
from .amortization import * # noqa: F401,F403

sovereign_debt_xl/amortization.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
from __future__ import annotations
2+
3+
from typing import Any
4+
5+
from pyxll import xl_func
6+
7+
from ._coerce import safe_err, to_1d_floats, to_2d_list
8+
9+
10+
@xl_func(
11+
"object[][] bonds_list: object[][]",
12+
name="SOV_AMORTIZATION_PROFILE",
13+
)
14+
def amortization_profile(bonds_list: Any) -> list[list[Any]] | str:
15+
"""Build a redemption wall from a list of [maturity_year, face_value] bond rows.
16+
17+
An optional header row (detected when the first cell is a string) is skipped.
18+
A concentration flag of "HIGH" is set for any year whose redemption represents
19+
25 % or more of the total outstanding amount.
20+
"""
21+
try:
22+
grid = to_2d_list(bonds_list)
23+
if not grid:
24+
return safe_err(ValueError("bonds_list is empty"))
25+
start = 1 if isinstance(grid[0][0], str) else 0
26+
profile: dict[int, float] = {}
27+
for row in grid[start:]:
28+
if len(row) < 2:
29+
continue
30+
try:
31+
yr = int(float(row[0]))
32+
fv = float(row[1])
33+
except Exception:
34+
continue
35+
profile[yr] = profile.get(yr, 0.0) + fv
36+
if not profile:
37+
return safe_err(ValueError("No valid bond rows found"))
38+
total = sum(profile.values())
39+
out: list[list[Any]] = [["year", "redemption", "concentration_flag"]]
40+
for yr in sorted(profile.keys()):
41+
redemption = profile[yr]
42+
flag = "HIGH" if redemption > 0.25 * total else ""
43+
out.append([yr, round(redemption, 4), flag])
44+
return out
45+
except Exception as e:
46+
return safe_err(e)
47+
48+
49+
@xl_func(
50+
"object[][] bonds_outstanding: float",
51+
name="SOV_WEIGHTED_AVG_MATURITY",
52+
)
53+
def weighted_avg_maturity(bonds_outstanding: Any) -> float | str:
54+
"""Weighted average maturity of a debt portfolio.
55+
56+
bonds_outstanding: list of [maturity_years, face_value] rows.
57+
An optional header row is skipped when the first cell is a string.
58+
"""
59+
try:
60+
grid = to_2d_list(bonds_outstanding)
61+
if not grid:
62+
return safe_err(ValueError("bonds_outstanding is empty"))
63+
start = 1 if isinstance(grid[0][0], str) else 0
64+
wam_sum = 0.0
65+
total_value = 0.0
66+
for row in grid[start:]:
67+
if len(row) < 2:
68+
continue
69+
try:
70+
mat = float(row[0])
71+
fv = float(row[1])
72+
except Exception:
73+
continue
74+
wam_sum += mat * fv
75+
total_value += fv
76+
if total_value == 0:
77+
return safe_err(ValueError("Total face value is zero"))
78+
return round(wam_sum / total_value, 4)
79+
except Exception as e:
80+
return safe_err(e)
81+
82+
83+
@xl_func(
84+
"float[] amortization_schedule, float projected_deficit, int year: float",
85+
name="SOV_GROSS_FINANCING_NEED",
86+
)
87+
def gross_financing_need(
88+
amortization_schedule: Any,
89+
projected_deficit: float,
90+
year: int,
91+
) -> float | str:
92+
"""Gross financing need for a given year.
93+
94+
GFN = amortization due in that year + projected fiscal deficit.
95+
year is 1-based (1 = first element of amortization_schedule).
96+
"""
97+
try:
98+
sched = to_1d_floats(amortization_schedule)
99+
if not sched:
100+
return safe_err(ValueError("amortization_schedule is empty"))
101+
if year < 1 or year > len(sched):
102+
return safe_err(ValueError(f"year must be between 1 and {len(sched)}"))
103+
return round(sched[year - 1] + float(projected_deficit), 6)
104+
except Exception as e:
105+
return safe_err(e)

sovereign_debt_xl/credit_risk.py

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
from __future__ import annotations
2+
3+
import math
4+
from typing import Any
5+
6+
import numpy as np
7+
import statsmodels.api as sm
8+
from pyxll import xl_func
9+
from scipy import stats
10+
11+
from ._coerce import safe_err, to_1d_floats
12+
13+
14+
@xl_func(
15+
"float debt_face_value, float asset_value, float asset_volatility,"
16+
" float risk_free_rate, float maturity: object[][]",
17+
name="SOV_MERTON_DEFAULT_PROB",
18+
)
19+
def merton_sovereign_default_prob(
20+
debt_face_value: float,
21+
asset_value: float,
22+
asset_volatility: float,
23+
risk_free_rate: float,
24+
maturity: float,
25+
) -> list[list[Any]] | str:
26+
"""Merton structural model adapted for sovereigns.
27+
28+
Returns d1, d2 (distance-to-default), and the risk-neutral default probability.
29+
"""
30+
try:
31+
F = float(debt_face_value)
32+
V = float(asset_value)
33+
sigma = float(asset_volatility)
34+
r = float(risk_free_rate)
35+
T = float(maturity)
36+
if V <= 0 or sigma <= 0 or T <= 0 or F <= 0:
37+
return safe_err(ValueError("All inputs must be positive"))
38+
d1 = (math.log(V / F) + (r + 0.5 * sigma**2) * T) / (sigma * math.sqrt(T))
39+
d2 = d1 - sigma * math.sqrt(T)
40+
default_prob = float(stats.norm.cdf(-d2))
41+
return [
42+
["metric", "value"],
43+
["d1", round(d1, 6)],
44+
["d2_distance_to_default", round(d2, 6)],
45+
["default_probability", round(default_prob, 6)],
46+
]
47+
except Exception as e:
48+
return safe_err(e)
49+
50+
51+
@xl_func(
52+
"float cds_spread_bps, float recovery_rate, float tenor_years: object[][]",
53+
name="SOV_CDS_DEFAULT_PROB",
54+
)
55+
def cds_implied_default_prob(
56+
cds_spread_bps: float,
57+
recovery_rate: float,
58+
tenor_years: float,
59+
) -> list[list[Any]] | str:
60+
"""Back out cumulative and annualized risk-neutral default probability from CDS spread.
61+
62+
Uses the standard hazard-rate approximation: lambda = spread / LGD.
63+
"""
64+
try:
65+
s = float(cds_spread_bps) / 10_000.0 # bps → decimal
66+
R = float(recovery_rate)
67+
T = float(tenor_years)
68+
if T <= 0:
69+
return safe_err(ValueError("tenor_years must be > 0"))
70+
if not (0.0 <= R < 1.0):
71+
return safe_err(ValueError("recovery_rate must be in [0, 1)"))
72+
lgd = 1.0 - R
73+
hazard = s / lgd
74+
cum_pd = 1.0 - math.exp(-hazard * T)
75+
annual_pd = 1.0 - math.exp(-hazard)
76+
return [
77+
["metric", "value"],
78+
["hazard_rate", round(hazard, 6)],
79+
["cumulative_pd", round(cum_pd, 6)],
80+
["annual_pd", round(annual_pd, 6)],
81+
]
82+
except Exception as e:
83+
return safe_err(e)
84+
85+
86+
@xl_func(
87+
"float current_account_gdp, float reserves_imports, float debt_gdp,"
88+
" float gdp_growth, float inflation: object[][]",
89+
name="SOV_ZSCORE_SOVEREIGN",
90+
)
91+
def zscore_sovereign(
92+
current_account_gdp: float,
93+
reserves_imports: float,
94+
debt_gdp: float,
95+
gdp_growth: float,
96+
inflation: float,
97+
) -> list[list[Any]] | str:
98+
"""Composite early-warning scoring model inspired by Reinhart/Rogoff indicators.
99+
100+
Standardises each indicator against approximate historical benchmarks and
101+
returns a composite z-score plus its percentile rank.
102+
"""
103+
try:
104+
# (name, value, direction) — direction=+1 means higher value → more risk
105+
indicators = [
106+
("current_account_gdp", float(current_account_gdp), -1.0),
107+
("reserves_imports", float(reserves_imports), -1.0),
108+
("debt_gdp", float(debt_gdp), +1.0),
109+
("gdp_growth", float(gdp_growth), -1.0),
110+
("inflation", float(inflation), +1.0),
111+
]
112+
# Approximate historical (mean, std) benchmarks for standardisation
113+
benchmarks = {
114+
"current_account_gdp": (-0.03, 0.05),
115+
"reserves_imports": (4.0, 3.0),
116+
"debt_gdp": (0.60, 0.30),
117+
"gdp_growth": (0.03, 0.03),
118+
"inflation": (0.05, 0.08),
119+
}
120+
z_components: list[float] = []
121+
for name, val, direction in indicators:
122+
mean, std = benchmarks[name]
123+
z = (val - mean) / std * direction
124+
z_components.append(z)
125+
composite_z = float(np.mean(z_components))
126+
percentile = float(stats.norm.cdf(composite_z) * 100.0)
127+
out: list[list[Any]] = [["metric", "value"]]
128+
for (name, _, _), z in zip(indicators, z_components):
129+
out.append([f"z_{name}", round(z, 4)])
130+
out.append(["composite_zscore", round(composite_z, 4)])
131+
out.append(["percentile_rank", round(percentile, 2)])
132+
return out
133+
except Exception as e:
134+
return safe_err(e)
135+
136+
137+
@xl_func(
138+
"float[] embi_spread, float[] us_vix, float[] us_10y,"
139+
" float[] commodity_index, float[] country_fundamentals: object[][]",
140+
name="SOV_SPREAD_DECOMPOSITION",
141+
)
142+
def spread_decomposition(
143+
embi_spread: Any,
144+
us_vix: Any,
145+
us_10y: Any,
146+
commodity_index: Any,
147+
country_fundamentals: Any,
148+
) -> list[list[Any]] | str:
149+
"""Decompose sovereign spreads into global risk factors vs. idiosyncratic fundamentals.
150+
151+
Runs OLS of EMBI spread on (VIX, US 10y, commodity index) as global factors and
152+
a country-fundamentals composite as the idiosyncratic factor. Returns coefficients,
153+
p-values, R², and the share of fitted variance attributable to each group.
154+
"""
155+
try:
156+
y = np.array(to_1d_floats(embi_spread), dtype=float)
157+
vix = np.array(to_1d_floats(us_vix), dtype=float)
158+
usy = np.array(to_1d_floats(us_10y), dtype=float)
159+
comm = np.array(to_1d_floats(commodity_index), dtype=float)
160+
fund = np.array(to_1d_floats(country_fundamentals), dtype=float)
161+
n = len(y)
162+
if n < 5:
163+
return safe_err(ValueError("Need at least 5 observations"))
164+
for arr, lbl in [
165+
(vix, "us_vix"),
166+
(usy, "us_10y"),
167+
(comm, "commodity_index"),
168+
(fund, "country_fundamentals"),
169+
]:
170+
if len(arr) != n:
171+
return safe_err(ValueError(f"{lbl} must have the same length as embi_spread"))
172+
X = np.column_stack([vix, usy, comm, fund])
173+
X = sm.add_constant(X)
174+
model = sm.OLS(y, X).fit()
175+
params = model.params.tolist()
176+
pvals = model.pvalues.tolist()
177+
# Share of fitted variance from global (cols 1-3) vs. idiosyncratic (col 4)
178+
global_fitted = X[:, 1:4] @ np.array(params[1:4])
179+
idio_fitted = X[:, 4] * params[4]
180+
global_ss = float(np.var(global_fitted))
181+
idio_ss = float(np.var(idio_fitted))
182+
denom = global_ss + idio_ss if (global_ss + idio_ss) > 0 else 1.0
183+
labels = ["const", "us_vix", "us_10y", "commodity_index", "country_fundamentals"]
184+
out: list[list[Any]] = [["term", "coef", "pvalue"]]
185+
for lbl, coef, pval in zip(labels, params, pvals):
186+
out.append([lbl, round(float(coef), 6), round(float(pval), 6)])
187+
out.append(["R2", round(float(model.rsquared), 6), float("nan")])
188+
out.append(["global_share", round(global_ss / denom, 4), float("nan")])
189+
out.append(["idiosyncratic_share", round(idio_ss / denom, 4), float("nan")])
190+
return out
191+
except Exception as e:
192+
return safe_err(e)

0 commit comments

Comments
 (0)