|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from typing import Any |
| 4 | + |
| 5 | +import numpy as np |
| 6 | +import statsmodels.api as sm |
| 7 | +from pyxll import xl_func |
| 8 | + |
| 9 | +from ._coerce import safe_err, to_1d_floats, to_2d_list |
| 10 | + |
| 11 | + |
| 12 | +@xl_func( |
| 13 | + "float[] target_country_spreads, float[] source_country_spreads," |
| 14 | + " float[] global_factor_series, int window_days: object[][]", |
| 15 | + name="SOV_CONTAGION_BETA", |
| 16 | +) |
| 17 | +def sovereign_contagion_beta( |
| 18 | + target_country_spreads: Any, |
| 19 | + source_country_spreads: Any, |
| 20 | + global_factor_series: Any, |
| 21 | + window_days: int, |
| 22 | +) -> list[list[Any]] | str: |
| 23 | + """Rolling regression beta isolating bilateral contagion after controlling for a global factor. |
| 24 | +
|
| 25 | + For each rolling window of size window_days, regresses target spread changes |
| 26 | + on source spread changes and global factor changes. Returns the time-series of |
| 27 | + bilateral betas (coefficient on source) and summary statistics. |
| 28 | + """ |
| 29 | + try: |
| 30 | + target = np.array(to_1d_floats(target_country_spreads), dtype=float) |
| 31 | + source = np.array(to_1d_floats(source_country_spreads), dtype=float) |
| 32 | + global_f = np.array(to_1d_floats(global_factor_series), dtype=float) |
| 33 | + n = len(target) |
| 34 | + if n < 10: |
| 35 | + return safe_err(ValueError("Need at least 10 observations")) |
| 36 | + if len(source) != n or len(global_f) != n: |
| 37 | + return safe_err(ValueError("All series must have the same length")) |
| 38 | + if window_days < 5 or window_days >= n: |
| 39 | + return safe_err(ValueError(f"window_days must be in [5, {n - 1}]")) |
| 40 | + d_target = np.diff(target) |
| 41 | + d_source = np.diff(source) |
| 42 | + d_global = np.diff(global_f) |
| 43 | + m = len(d_target) |
| 44 | + betas: list[float] = [] |
| 45 | + for i in range(window_days - 1, m): |
| 46 | + start = i - window_days + 1 |
| 47 | + y = d_target[start : i + 1] |
| 48 | + X = np.column_stack([d_source[start : i + 1], d_global[start : i + 1]]) |
| 49 | + X = sm.add_constant(X) |
| 50 | + try: |
| 51 | + res = sm.OLS(y, X).fit() |
| 52 | + betas.append(float(res.params[1])) |
| 53 | + except Exception: |
| 54 | + betas.append(float("nan")) |
| 55 | + betas_arr = np.array(betas, dtype=float) |
| 56 | + valid = betas_arr[~np.isnan(betas_arr)] |
| 57 | + out: list[list[Any]] = [["metric", "value"]] |
| 58 | + out.append(["n_windows", len(betas)]) |
| 59 | + out.append(["mean_beta", round(float(np.mean(valid)), 4) if len(valid) else float("nan")]) |
| 60 | + out.append(["median_beta", round(float(np.median(valid)), 4) if len(valid) else float("nan")]) |
| 61 | + out.append(["beta_std", round(float(np.std(valid, ddof=1)), 4) if len(valid) > 1 else float("nan")]) |
| 62 | + out.append(["", ""]) |
| 63 | + out.append(["window_index", "beta"]) |
| 64 | + for i, b in enumerate(betas): |
| 65 | + out.append([i + window_days, round(b, 4) if not np.isnan(b) else float("nan")]) |
| 66 | + return out |
| 67 | + except Exception as e: |
| 68 | + return safe_err(e) |
| 69 | + |
| 70 | + |
| 71 | +@xl_func( |
| 72 | + "float[] spread_series_a, float[] spread_series_b, int window: object[][]", |
| 73 | + name="SOV_DCC_GARCH_CORR", |
| 74 | +) |
| 75 | +def dcc_garch_correlation( |
| 76 | + spread_series_a: Any, |
| 77 | + spread_series_b: Any, |
| 78 | + window: int, |
| 79 | +) -> list[list[Any]] | str: |
| 80 | + """Dynamic conditional correlation between two sovereign spread series. |
| 81 | +
|
| 82 | + Uses a rolling-window DCC approximation: standardise each series by its |
| 83 | + rolling EWMA volatility, then compute the rolling correlation of the |
| 84 | + standardised residuals. This is a tractable proxy for full DCC-GARCH |
| 85 | + suitable for Excel use. |
| 86 | + """ |
| 87 | + try: |
| 88 | + a = np.array(to_1d_floats(spread_series_a), dtype=float) |
| 89 | + b = np.array(to_1d_floats(spread_series_b), dtype=float) |
| 90 | + n = len(a) |
| 91 | + if n < 10: |
| 92 | + return safe_err(ValueError("Need at least 10 observations")) |
| 93 | + if len(b) != n: |
| 94 | + return safe_err(ValueError("Both series must have the same length")) |
| 95 | + if window < 5 or window >= n: |
| 96 | + return safe_err(ValueError(f"window must be in [5, {n - 1}]")) |
| 97 | + |
| 98 | + # EWMA variance (decay = 0.94, RiskMetrics style) |
| 99 | + decay = 0.94 |
| 100 | + da = np.diff(a) |
| 101 | + db = np.diff(b) |
| 102 | + m = len(da) |
| 103 | + var_a = np.zeros(m) |
| 104 | + var_b = np.zeros(m) |
| 105 | + var_a[0] = da[0] ** 2 |
| 106 | + var_b[0] = db[0] ** 2 |
| 107 | + for t in range(1, m): |
| 108 | + var_a[t] = decay * var_a[t - 1] + (1 - decay) * da[t] ** 2 |
| 109 | + var_b[t] = decay * var_b[t - 1] + (1 - decay) * db[t] ** 2 |
| 110 | + std_a = np.sqrt(np.maximum(var_a, 1e-12)) |
| 111 | + std_b = np.sqrt(np.maximum(var_b, 1e-12)) |
| 112 | + ea = da / std_a |
| 113 | + eb = db / std_b |
| 114 | + |
| 115 | + # Rolling correlation of standardised residuals |
| 116 | + corrs: list[float] = [] |
| 117 | + for i in range(window - 1, m): |
| 118 | + s = i - window + 1 |
| 119 | + ca, cb = ea[s : i + 1], eb[s : i + 1] |
| 120 | + if np.std(ca) == 0 or np.std(cb) == 0: |
| 121 | + corrs.append(float("nan")) |
| 122 | + else: |
| 123 | + corrs.append(float(np.corrcoef(ca, cb)[0, 1])) |
| 124 | + |
| 125 | + valid = [c for c in corrs if not np.isnan(c)] |
| 126 | + out: list[list[Any]] = [["metric", "value"]] |
| 127 | + out.append(["mean_dcc", round(float(np.mean(valid)), 4) if valid else float("nan")]) |
| 128 | + out.append(["min_dcc", round(float(np.min(valid)), 4) if valid else float("nan")]) |
| 129 | + out.append(["max_dcc", round(float(np.max(valid)), 4) if valid else float("nan")]) |
| 130 | + out.append(["", ""]) |
| 131 | + out.append(["window_index", "dcc"]) |
| 132 | + for i, c in enumerate(corrs): |
| 133 | + out.append([i + window, round(c, 4) if not np.isnan(c) else float("nan")]) |
| 134 | + return out |
| 135 | + except Exception as e: |
| 136 | + return safe_err(e) |
| 137 | + |
| 138 | + |
| 139 | +@xl_func( |
| 140 | + "float[][] spread_matrix, int lags, float significance_level: object[][]", |
| 141 | + name="SOV_GRANGER_CAUSALITY", |
| 142 | +) |
| 143 | +def granger_causality_spreads( |
| 144 | + spread_matrix: Any, |
| 145 | + lags: int, |
| 146 | + significance_level: float, |
| 147 | +) -> list[list[Any]] | str: |
| 148 | + """Pairwise Granger causality across sovereign spreads. |
| 149 | +
|
| 150 | + spread_matrix: columns are countries (T × N), each column is one spread series. |
| 151 | + Returns an N × N adjacency matrix where entry [i, j] = 1 if country i |
| 152 | + Granger-causes country j at the given significance level. |
| 153 | + """ |
| 154 | + try: |
| 155 | + from statsmodels.tsa.stattools import grangercausalitytests |
| 156 | + |
| 157 | + grid = to_2d_list(spread_matrix) |
| 158 | + if not grid or not grid[0]: |
| 159 | + return safe_err(ValueError("spread_matrix is empty")) |
| 160 | + # Try to detect if it has a header row |
| 161 | + start = 1 if isinstance(grid[0][0], str) else 0 |
| 162 | + data_rows = grid[start:] |
| 163 | + if len(data_rows) < 10: |
| 164 | + return safe_err(ValueError("Need at least 10 data rows")) |
| 165 | + try: |
| 166 | + arr = np.array([[float(v) for v in row] for row in data_rows], dtype=float) |
| 167 | + except Exception: |
| 168 | + return safe_err(ValueError("All cells in spread_matrix must be numeric")) |
| 169 | + T, N = arr.shape |
| 170 | + if N < 2: |
| 171 | + return safe_err(ValueError("Need at least 2 country columns")) |
| 172 | + if lags < 1 or lags >= T // 2: |
| 173 | + return safe_err(ValueError(f"lags must be in [1, {T // 2 - 1}]")) |
| 174 | + if not (0.0 < significance_level < 1.0): |
| 175 | + return safe_err(ValueError("significance_level must be in (0, 1)")) |
| 176 | + |
| 177 | + adj: list[list[int | str | float]] = [] |
| 178 | + header: list[Any] = ["causes\\is_caused_by"] + [f"c{j + 1}" for j in range(N)] |
| 179 | + adj.append(header) |
| 180 | + for i in range(N): |
| 181 | + row_vals: list[Any] = [f"c{i + 1}"] |
| 182 | + for j in range(N): |
| 183 | + if i == j: |
| 184 | + row_vals.append(0) |
| 185 | + continue |
| 186 | + test_data = arr[:, [j, i]] # [caused, cause] |
| 187 | + try: |
| 188 | + result = grangercausalitytests(test_data, maxlag=lags, verbose=False) |
| 189 | + # Use F-test p-value at the selected lag |
| 190 | + pval = result[lags][0]["ssr_ftest"][1] |
| 191 | + row_vals.append(1 if float(pval) < significance_level else 0) |
| 192 | + except Exception: |
| 193 | + row_vals.append(float("nan")) |
| 194 | + adj.append(row_vals) |
| 195 | + return adj |
| 196 | + except Exception as e: |
| 197 | + return safe_err(e) |
| 198 | + |
| 199 | + |
| 200 | +@xl_func( |
| 201 | + "float[][] bilateral_trade_flows, float[] gdp_values: object[][]", |
| 202 | + name="SOV_TRADE_LINKAGE_MATRIX", |
| 203 | +) |
| 204 | +def trade_linkage_matrix( |
| 205 | + bilateral_trade_flows: Any, |
| 206 | + gdp_values: Any, |
| 207 | +) -> list[list[Any]] | str: |
| 208 | + """Trade-weighted exposure matrix normalised by GDP. |
| 209 | +
|
| 210 | + bilateral_trade_flows: N × N matrix of bilateral trade (rows = exporters, cols = importers). |
| 211 | + gdp_values: length-N vector of GDP values (same currency as trade flows). |
| 212 | +
|
| 213 | + Returns an N × N matrix where entry [i, j] = trade_{i→j} / GDP_j, |
| 214 | + showing country j's real-economy exposure to stress in country i. |
| 215 | + """ |
| 216 | + try: |
| 217 | + grid = to_2d_list(bilateral_trade_flows) |
| 218 | + if not grid: |
| 219 | + return safe_err(ValueError("bilateral_trade_flows is empty")) |
| 220 | + start = 1 if isinstance(grid[0][0], str) else 0 |
| 221 | + data_rows = grid[start:] |
| 222 | + try: |
| 223 | + flows = np.array([[float(v) for v in row] for row in data_rows], dtype=float) |
| 224 | + except Exception: |
| 225 | + return safe_err(ValueError("All cells in bilateral_trade_flows must be numeric")) |
| 226 | + N, M = flows.shape |
| 227 | + if N != M: |
| 228 | + return safe_err(ValueError("bilateral_trade_flows must be a square N × N matrix")) |
| 229 | + gdp = np.array(to_1d_floats(gdp_values), dtype=float) |
| 230 | + if len(gdp) != N: |
| 231 | + return safe_err(ValueError("gdp_values must have N elements matching the matrix dimension")) |
| 232 | + if np.any(gdp == 0): |
| 233 | + return safe_err(ValueError("All GDP values must be non-zero")) |
| 234 | + # exposure[i, j] = flows[i, j] / gdp[j] |
| 235 | + exposure = flows / gdp[np.newaxis, :] |
| 236 | + out: list[list[Any]] = [["exporter\\importer"] + [f"c{j + 1}" for j in range(N)]] |
| 237 | + for i in range(N): |
| 238 | + out.append([f"c{i + 1}"] + [round(float(exposure[i, j]), 6) for j in range(N)]) |
| 239 | + return out |
| 240 | + except Exception as e: |
| 241 | + return safe_err(e) |
0 commit comments