Skip to content

Commit

Permalink
Test modifications following the last push and initial changes for mu…
Browse files Browse the repository at this point in the history
…ltisection
  • Loading branch information
claudioAglieriR committed Oct 19, 2024
1 parent 52f15b9 commit 4ac7587
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 191 deletions.
8 changes: 8 additions & 0 deletions fypy/model/FourierModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,11 @@ def chf(self, T: float, xi: Union[float, np.ndarray]) -> Union[float, np.ndarray
"""
raise NotImplementedError

def inhomogeneous_chf(self, T: np.ndarray, xi: Union[float, np.ndarray], thetas: list[np.ndarray]) -> Union[float, np.ndarray]:
"""
Time-inhomogeneous characteristic function
:param T: float, time to maturity
:param xi: np.ndarray or float, points in frequency domain
"""
raise NotImplementedError

58 changes: 57 additions & 1 deletion fypy/model/levy/LevyModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
from fypy.termstructures.ForwardCurve import ForwardCurve
from fypy.termstructures.DiscountCurve import DiscountCurve
from fypy.model.FourierModel import FourierModel
from typing import Union
from typing import Union, Optional, Dict
import numpy as np
from contextlib import contextmanager


class LevyModel(FourierModel, ABC):
Expand Down Expand Up @@ -41,6 +42,8 @@ def symbol(self, xi: Union[float, np.ndarray]):
"""
raise NotImplementedError



@abstractmethod
def convexity_correction(self) -> float:
"""
Expand All @@ -59,3 +62,56 @@ def set_params(self, params: np.ndarray):

def get_params(self) -> np.ndarray:
return self._params


@contextmanager
def temporary_params(self, new_params: np.ndarray):
"""
This function temporarily sets the model's parameters (`_params`) to `new_params` for the duration of the
context, and then automatically restores the original parameters after exiting the context.
This method could be used for inhomogeneous Levy models, where the model's parameters must be changed frequently.
:param new_params: np.ndarray, new set of parameters to use temporarily.
"""
# Store the original parameters before modifying them
original_params = self.get_params()
try:
# Set the new parameters
self.set_params(new_params)
yield
finally:
# Restore the original parameters after the context is done
self.set_params(original_params)

def frozen_chf(self, xi: float, frozen_params: Dict[float,list]) -> complex:
"""
:param xi: np.ndarray or float, points in the frequency domain
:param thetas: list of np.ndarray, each array represents a set of parameters
:param T: np.ndarray, array of time points T_j corresponding to each set of parameters.
:return: np.ndarray or float, the frozen characteristic function.
"""
frozen_params = dict(sorted(frozen_params.items()))

frozen_factor = 1.0
T_previous = 0

for T, params in frozen_params.items():
delta_T = T - T_previous
T_previous = T

with self.temporary_params(params):
frozen_factor *= np.exp(delta_T * self.symbol(xi))

return frozen_factor

def inhomogeneous_chf(self, T: float, xi: Union[float, np.ndarray], frozen_params: Dict[float, list] = None) -> complex:
"""
Time-inhomogeneous characteristic function
:param T: float, time to maturity
:param xi: np.ndarray or float, points in frequency domain
:param frozen_params: optional dict, parameters for the frozen characteristic function
"""

frozen_params = frozen_params or {}

return self.chf(T=T, xi=xi) * self.frozen_chf(xi=xi, frozen_params=frozen_params) if frozen_params else self.chf( T=T, xi=xi)
153 changes: 62 additions & 91 deletions fypy/pricing/fourier/ProjEuropeanPricer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,121 +4,92 @@
from fypy.model.levy.LevyModel import FourierModel
from fypy.pricing.fourier.ProjPricer import ProjPricer, Impl, CubicImpl, LinearImpl, HaarImpl


class ProjEuropeanPricer(ProjPricer):
def __init__(self, model: FourierModel, N: int = 2 ** 9, L: float = 10., order: int = 3,
alpha_override: float = np.nan):
"""
Price European options using the Frame Projection (PROJ) method of Kirkby (2015)
Ref: JL Kirkby, SIAM Journal on Financial Mathematics, 6 (1), 713-747
:param model: Fourier model
:param N: int (power of 2), number of basis coefficients (increase to increase accuracy)
:param L: float, controls gridwidth of density. A value of L = 10~14 works well... For Black-Scholes,
L = 6 is fine, for heavy tailed processes such as CGMY, may want a larger value to get very high accuracy
:param order: int, the Spline order: 0 = Haar, 1 = Linear, 2 = Quadratic, 3 = Cubic
Note: Cubic is preferred, the others are provided for research purposes. Only 1 and 3 are currently coded
:param alpha_override: float, if supplied, this overrides the rule using L to determine the gridwidth,
allows you to use your own rule to set grid if desired
"""
super().__init__(model, N, L, order, alpha_override)

self._efficient_multi_strike = [1]

if order not in (0, 1, 3):
raise NotImplementedError("Only cubic, linear and Haar implemented so far")

def price_strikes_fill(self,
T: float,
K: np.ndarray,
is_calls: np.ndarray,
output: np.ndarray):
def price_strikes_fill(self, T: float, K: np.ndarray, is_calls: np.ndarray, output: np.ndarray):
"""
Price a set of set of strikes (at same time to maturity, ie one slice of a surface)
Override this method if given a more efficient implementation for multiple strikes.
:param T: float, time to maturity of options
:param K: np.array, strikes of options
:param is_calls: np.ndarray[bool], indicators of if strikes are calls (true) or puts (false)
:param output: np.ndarray[float], the output to fill in with prices, must be same size as K and is_calls
:return: None, this method fills in the output array, make sure its sized properly first
Price a set of strikes (at same time to maturity)
"""
S0 = self._model.spot()
lws_vec = np.log(K / S0)
max_lws = np.log(np.max(K) / S0)
lws_vec = np.log(K / self._model.spot())
max_lws = np.log(np.max(K) / self._model.spot())

cumulants = self._model.cumulants(T)
alph = cumulants.get_truncation_heuristic(L=self._L) \
if np.isnan(self._alpha_override) else self._alpha_override

# Ensure that grid is wide enough to cover the strike
alph = cumulants.get_truncation_heuristic(L=self._L) if np.isnan(self._alpha_override) else self._alpha_override
alph = max(alph, 1.15 * max(np.abs(lws_vec)) + cumulants.c1)

dx = 2 * alph / (self._N - 1)
a = 1. / dx
lam = cumulants.c1 - (self._N / 2 - 1) * dx

max_n_bar = self.get_nbar(a=a, lws=max_lws, lam=lam)

if self._order == 0:
impl = HaarImpl(N=self._N, dx=dx, model=self._model, T=T, max_n_bar=max_n_bar)

elif self._order == 1:
impl = LinearImpl(N=self._N, dx=dx, model=self._model, T=T, max_n_bar=max_n_bar)

else:
impl = CubicImpl(N=self._N, dx=dx, model=self._model, T=T, max_n_bar=max_n_bar)

disc = self._model.discountCurve(T)
fwd = self._model.forwardCurve(T)
cons3 = impl.cons() * disc / self._N
grid = {
'dx': 2 * alph / (self._N - 1),
'a': 1. / (2 * alph / (self._N - 1)),
'lam': cumulants.c1 - (self._N / 2 - 1) * (2 * alph / (self._N - 1)),
'cons3': None, # Verrà popolato successivamente
'max_nbar': self.get_nbar(a=1. / (2 * alph / (self._N - 1)), lws=max_lws, lam=cumulants.c1 - (self._N / 2 - 1) * (2 * alph / (self._N - 1)))
}

# ==============
# Price Strikes
# ==============
impl = self._get_implementation(self._order, T, grid['max_nbar'], grid['dx'])

def price_aligned_grid(index, strike):
lws = lws_vec[index]
nbar = self.get_nbar(a=a, lws=lws, lam=lam)
xmin = lws - (nbar - 1) * dx
grid['cons3'] = impl.cons() * self._model.discountCurve(T) / self._N

beta = ProjEuropeanPricer._beta_computation(impl=impl, xmin=xmin)
coeffs = impl.coefficients(nbar=nbar, W=strike, S0=S0, xmin=xmin)
option = {
'disc': self._model.discountCurve(T),
'fwd': self._model.forwardCurve(T),
'lws_vec': lws_vec,
'is_calls': is_calls,
'max_lws': max_lws,
'K': K
}

# price the put
price = cons3 * np.dot(beta[:len(coeffs)], coeffs)
if is_calls[index]: # price using put-call parity
price += (fwd - strike) * disc
self.price_computation(grid, option, impl, output)

output[index] = max(0, price)

# Prices method adapted to multi-strike
# with Quadrature Adjustment for Grid Misalignment
def price_misaligned_grid(index, strike):
closest_nbar = self.get_nbar(a=a, lws=lws_vec[index], lam=xmin)
rho = lws_vec[index] - (xmin + (closest_nbar - 1) * dx)

coeffs = impl.coefficients(nbar=closest_nbar, W=strike, S0=S0, xmin=xmin, rho=rho,
misaligned_grid=True)
def price_computation(self, grid: dict, option: dict, impl: Impl, output: np.ndarray):
"""
Compute prices for multiple strikes, handling both aligned and misaligned grids.
"""

# price the put
price = cons3 * np.dot(beta[:len(coeffs)], coeffs)
if is_calls[index]: # price using put-call parity
price += (fwd - strike) * disc
if len(option['K']) > 1 and self._order in self._efficient_multi_strike:
xmin = option['max_lws'] - (grid['max_nbar'] - 1) * grid['dx']
option['beta'] = ProjEuropeanPricer._beta_computation(impl=impl, xmin=xmin)
price_vectorized = np.vectorize(self.price_misaligned_grid, excluded=['grid', 'option', 'impl', 'output'])
else:
price_vectorized = np.vectorize(self.price_aligned_grid, excluded=['grid', 'option', 'impl', 'output'])

output[index] = max(0, price)
price_vectorized(np.arange(0, len(option['K'])), option['K'], grid=grid, option=option, impl=impl, output=output)

# Prices computation
def price_aligned_grid(self, index, strike, grid: dict, option: dict, impl: Impl, output: np.ndarray):
"""
Price computation for aligned grid.
"""
lws = option['lws_vec'][index]
nbar = self.get_nbar(a=grid['a'], lws=lws, lam=grid['lam'])
xmin = lws - (nbar - 1) * grid['dx']
beta = ProjEuropeanPricer._beta_computation(impl=impl, xmin=xmin)
coeffs = impl.coefficients(nbar=nbar, W=strike, S0=self._model.spot(), xmin=xmin)
price = grid['cons3'] * np.dot(beta[:len(coeffs)], coeffs)
if option['is_calls'][index]:
price += (option['fwd'] - strike) * option['disc']
output[index] = max(0, price)

def price_misaligned_grid(self, index, strike, grid: dict, option: dict, impl: Impl, output: np.ndarray):
"""
Price computation for misaligned grid.
"""
lws = option['lws_vec'][index]
closest_nbar = self.get_nbar(a=grid['a'], lws=lws, lam=grid['lam'])
xmin = lws - (closest_nbar - 1) * grid['dx']
rho = lws - (xmin + (closest_nbar - 1) * grid['dx'])
beta = ProjEuropeanPricer._beta_computation(impl=impl, xmin=xmin)
coeffs = impl.coefficients(nbar=closest_nbar, W=strike, S0=self._model.spot(), xmin=xmin, rho=rho, misaligned_grid=True)
price = grid['cons3'] * np.dot(beta[:len(coeffs)], coeffs)
if option['is_calls'][index]:
price += (option['fwd'] - strike) * option['disc']
output[index] = max(0, price)

if len(K) > 1 and self._order in self._efficient_multi_strike:
xmin = max_lws - (max_n_bar - 1) * dx
beta = ProjEuropeanPricer._beta_computation(impl=impl, xmin=xmin)
price_vectorized = np.vectorize(price_misaligned_grid)
price_vectorized(np.arange(0, len(K)), K)
else:
price_vectorized = np.vectorize(price_aligned_grid)
price_vectorized(np.arange(0, len(K)), K)

@staticmethod
def _beta_computation(impl: Impl = None, xmin: float = None):
Expand Down
17 changes: 14 additions & 3 deletions fypy/pricing/fourier/ProjPricer.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ def copy_original_arrays(*arrays):
def _beta_computation(self):
raise NotImplementedError

def _get_implementation(self, order:int, T:float, max_n_bar:int, dx:float):
"""
Returns the appropriate implementation based on the spline order.
"""
if order == 0:
return HaarImpl(N=self._N, dx=dx, model=self._model, T=T, max_n_bar=max_n_bar)
elif order == 1:
return LinearImpl(N=self._N, dx=dx, model=self._model, T=T, max_n_bar=max_n_bar)
else:
return CubicImpl(N=self._N, dx=dx, model=self._model, T=T, max_n_bar=max_n_bar)


# ===================================
# Private
Expand Down Expand Up @@ -123,7 +134,7 @@ def integrand(self, xmin: float) -> np.ndarray:
raise NotImplementedError

@abstractmethod
def coefficients(self, nbar: int, W: float, S0: float, xmin: float) -> np.ndarray:
def coefficients(self, nbar: int, W: float, S0: float, xmin: float, rho:float=None, misaligned_grid:bool=False) -> np.ndarray:
raise NotImplementedError

@abstractmethod
Expand Down Expand Up @@ -164,7 +175,7 @@ def num_coeffs(self, nbar: int) -> int:
return nbar + 1

def coefficients(self,
nbar: int, W: float, S0: float, xmin: float) -> np.ndarray:
nbar: int, W: float, S0: float, xmin: float, rho:float=None, misaligned_grid:bool=False) -> np.ndarray:
self.G[nbar] = W * self.g1
self.G[nbar - 1] = W * self.g2
self.G[nbar - 2] = W * self.g3
Expand Down Expand Up @@ -341,7 +352,7 @@ def integrand(self, xmin: float) -> np.ndarray:
grand[0] = 1 / self.cons()
return grand

def coefficients(self, nbar: int, W: float, S0: float, xmin: float) -> np.ndarray:
def coefficients(self, nbar: int, W: float, S0: float, xmin: float, rho:float=None, misaligned_grid:bool=False) -> np.ndarray:
a = self.a
dx = 1 / a
self.G[nbar - 1] = W * (.5 - a * (1 - np.exp(-.5 * dx)))
Expand Down
Loading

0 comments on commit 4ac7587

Please sign in to comment.