Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,6 @@ _build/
# PyCharm
#########
.idea/
.vscode/launch.json
.vscode/settings.json
.env
174 changes: 174 additions & 0 deletions pygam/distributions.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,10 +644,184 @@ def sample(self, mu):
return np.random.wald(mean=mu, scale=self.scale, size=None)


class TweedieDist(Distribution):
"""
Tweedie Distribution
"""

def __init__(self, power, scale=None):
if 0 < power < 1:
raise ValueError(
"Power parameter p must not be between 0 and 1 (non-inclusive) for the Tweedie distribution."
)
super(TweedieDist, self).__init__(name='tweedie', scale=scale)
self.power = power

def log_pdf(self, y, mu, weights=None):
"""
Computes the log of the PDF (or PMF for y=0) of the Tweedie distribution.

Parameters
----------
y : array-like of length n
Target values.
mu : array-like of length n
Expected values.
weights : array-like shape (n,) or None, default: None
Sample weights. If None, defaults to array of ones.

Returns
-------
log_pdf : np.array of length n
"""
y = np.asarray(y, dtype=np.float64)
mu = np.asarray(mu, dtype=np.float64)

if np.any(y < 0) or np.any(mu <= 0):
raise ValueError(
"Negative values detected in y or non-positive values in mu."
)

if weights is None:
weights = np.ones_like(mu)

phi = self.scale # dispersion parameter
p = self.power # power parameter

# Initialize log_pdf array
log_pdf_values = np.zeros_like(y, dtype=np.float64)

# Indices where y > 0
idx_positive = y > 0
# Indices where y == 0
idx_zero = y == 0

# For y == 0
if np.any(idx_zero):
mu_zero = mu[idx_zero]
# Compute log probability mass at zero
log_p_zero = -(mu_zero ** (2 - p)) / ((2 - p) * phi)
log_pdf_values[idx_zero] = log_p_zero

# For y > 0
if np.any(idx_positive):
y_pos = y[idx_positive]
mu_pos = mu[idx_positive]

# Compute terms carefully to avoid numerical issues
term1 = (y_pos * mu_pos ** (1 - p)) / (1 - p)
term2 = mu_pos ** (2 - p) / (2 - p)
term3 = y_pos ** (2 - p) / (2 - p)

log_pdf = (1 / phi) * (term1 - term2 - term3)

# Adjust with log(y) term and constants if necessary
# Since constants cancel out in likelihood ratios, they are often omitted
log_pdf_values[idx_positive] = log_pdf

return log_pdf_values

@divide_weights
def V(self, mu):
return mu**self.power

@multiply_weights
def deviance(self, y, mu, scaled=True):
"""
Computes the deviance for the Tweedie distribution.

Parameters
----------
y : array-like of length n
Target values.
mu : array-like of length n
Expected values.
scaled : boolean, default: True
Whether to divide the deviance by the distribution scale.

Returns
-------
deviance : np.array of length n
"""
if np.any(y < 0) or np.any(mu < 0):
raise ValueError("Negative values detected in y or mu.")
p = self.power
if p < 0:
dev = 2 * (
np.power(np.maximum(y, 0), 2 - p) / ((1 - p) * (2 - p))
- y * np.power(mu, 1 - p) / (1 - p)
+ np.power(mu, 2 - p) / (2 - p)
)
elif p == 0:
dev = (y - mu) ** 2
elif p == 1:
dev = 2 * (y * np.log(y / mu) - (y - mu))
elif p == 2:
dev = 2 * (np.log(mu / y) + (y / mu) - 1)
else:
dev = 2 * (
np.power(y, 2 - p) / ((1 - p) * (2 - p))
- y * np.power(mu, 1 - p) / (1 - p)
+ np.power(mu, 2 - p) / (2 - p)
)
if scaled:
dev /= self.scale
return dev

def sample(self, mu):
"""
Generates random samples from the Tweedie distribution.

Parameters
----------
mu : array-like of shape n_samples
Expected values.

Returns
-------
random_samples : np.array of same shape as mu
"""
p = self.power
phi = self.scale

if p <= 1:
raise ValueError(
"Power parameter p must be > 1 for the Tweedie distribution."
)
elif 1 < p < 2:
# Compound Poisson-Gamma
lambda_ = mu ** (2 - p) / (phi * (2 - p))
num_events = np.random.poisson(lambda_)
# Gamma parameters
a = (2 - p) / (p - 1)
s = phi * (p - 1) * mu ** (p - 1)
samples = np.zeros_like(mu)
positive_indices = num_events > 0
samples[positive_indices] = np.array(
[
np.sum(np.random.gamma(shape=a, scale=s[i], size=n))
for i, n in enumerate(num_events[positive_indices])
]
)

return samples
elif p == 2:
# Corrected Gamma distribution parameters
shape = 1 / phi
scale = phi * mu
return np.random.gamma(shape=shape, scale=scale)

else:
raise NotImplementedError(
"Sampling not implemented for p > 2 in TweedieDist."
)


DISTRIBUTIONS = {
"normal": NormalDist,
"poisson": PoissonDist,
"binomial": BinomialDist,
"gamma": GammaDist,
"inv_gauss": InvGaussDist,
"tweedie": TweedieDist,
}
13 changes: 12 additions & 1 deletion pygam/pygam.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ def __init__(
self.terms = TermList(terms) if isinstance(terms, Term) else terms
self.fit_intercept = fit_intercept

# Handle additional parameters for specific distributions
if distribution == 'tweedie':
self.power = kwargs.pop('power', None)
if self.power is None:
raise ValueError("Tweedie distribution requires a 'power' parameter.")
else:
self.power = None

for k, v in kwargs.items():
if k not in self._plural:
raise TypeError(f"__init__() got an unexpected keyword argument {k}")
Expand Down Expand Up @@ -261,7 +269,10 @@ def _validate_params(self):
):
raise ValueError(f"unsupported distribution {self.distribution}")
if self.distribution in DISTRIBUTIONS:
self.distribution = DISTRIBUTIONS[self.distribution]()
if self.distribution == 'tweedie':
self.distribution = DISTRIBUTIONS[self.distribution](power=self.power)
else:
self.distribution = DISTRIBUTIONS[self.distribution]()

# link
if not ((self.link in LINKS) or isinstance(self.link, Link)):
Expand Down
1 change: 1 addition & 0 deletions pygam/tests/test_penalties.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def test_single_spline_penalty():
monotonic_ and convexity_ should be 0.
"""
coef = np.array(1.0)

assert np.all(derivative(1, coef).toarray() == 0.0)
assert np.all(l2(1, coef).toarray() == 1.0)
assert np.all(monotonic_inc(1, coef).toarray() == 0.0)
Expand Down
133 changes: 133 additions & 0 deletions pygam/tests/test_tweedie.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import pytest
import numpy as np
from pygam.distributions import TweedieDist
from pygam import GAM, s


@pytest.fixture
def tweedie_dist():
return TweedieDist(power=1.5, scale=1.0)


def test_log_pdf(tweedie_dist):
mu = np.array([1.0, 2.0, 3.0])
y = np.array([1.5, 2.5, 3.5])
weights = np.array([1.0, 1.0, 1.0])
log_pdf = tweedie_dist.log_pdf(y, mu, weights)
assert log_pdf.shape == y.shape
assert np.all(np.isfinite(log_pdf)), "Log PDF contains non-finite values."


def test_deviance(tweedie_dist):
mu = np.array([1.0, 2.0, 3.0])
y = np.array([1.5, 2.5, 3.5])
deviance = tweedie_dist.deviance(y, mu, scaled=True)
assert deviance.shape == y.shape
assert np.all(deviance >= 0), "Deviance contains negative values."


def test_sample(tweedie_dist):
mu = np.array([1.0, 2.0, 3.0])
# Generate 1000 samples for each mu
samples = np.array([tweedie_dist.sample(mu) for _ in range(1000)])
sample_mean = np.mean(samples)
expected_mean = np.mean(mu)
# Adjust the tolerance based on the variance
tolerance = 0.1 * expected_mean
assert (
abs(sample_mean - expected_mean) < tolerance
), "Sample mean is not within the expected range."


def test_invalid_power():
with pytest.raises(ValueError):
TweedieDist(power=0.5, scale=1.0) # Power less than 1 is invalid


def test_not_implemented_power():
dist = TweedieDist(power=3.0, scale=1.0)
mu = np.array([1.0, 2.0, 3.0])
with pytest.raises(NotImplementedError):
dist.sample(mu)


def test_gam_tweedie_fit():
# Generate synthetic data
np.random.seed(0)
n_samples = 100
X = np.linspace(0, 10, n_samples).reshape(-1, 1)
true_coef = 2.0
# Generate target variable following a Tweedie distribution
# For simplicity, using Gamma distribution as a proxy when power=2
y = X.flatten() * true_coef + np.random.gamma(shape=2.0, scale=1.0, size=n_samples)

# Initialize and fit GAM with Tweedie distribution
gam = GAM(terms=s(0), distribution='tweedie', power=1.5, fit_intercept=True)
gam.fit(X, y)

# Predict and check the shape of predictions
y_pred = gam.predict(X)
assert y_pred.shape == y.shape, "Predictions shape mismatch."

# Check that predictions are finite
assert np.all(np.isfinite(y_pred)), "Predictions contain non-finite values."

# Optionally, check if the mean of predictions is close to the mean of y
sample_mean = np.mean(y_pred)
expected_mean = np.mean(y)
assert (
abs(sample_mean - expected_mean) < 1.0
), "Sample mean is not within the expected range."


def test_variance_function(tweedie_dist):
mu = np.array([1.0, 2.0, 3.0])
variance = tweedie_dist.V(mu)
expected_variance = mu**tweedie_dist.power
assert np.allclose(
variance, expected_variance
), "Variance function V(mu) is incorrect."


def test_zero_targets(tweedie_dist):
mu = np.array([1.0, 2.0, 3.0])
y = np.array([0.0, 0.0, 0.0])
log_pdf = tweedie_dist.log_pdf(y, mu)
assert log_pdf.shape == y.shape
assert np.all(
np.isfinite(log_pdf)
), "Log PDF with zero targets contains non-finite values."


def test_negative_inputs(tweedie_dist):
mu = np.array([-1.0, 2.0, 3.0])
y = np.array([1.0, -2.0, 3.0])
with pytest.raises(ValueError):
tweedie_dist.log_pdf(y, mu)
with pytest.raises(ValueError):
tweedie_dist.deviance(y, mu)


def test_sample_with_zero_mu(tweedie_dist):
mu = np.array([0.0, 0.0, 0.0])
samples = tweedie_dist.sample(mu)
assert np.all(samples == 0), "Samples with zero mu should be zeros."


def test_boundary_power_values():
mu = np.array([1.0, 2.0, 3.0])
y = np.array([1.0, 2.0, 3.0])

# Power approaching 1
tweedie_dist = TweedieDist(power=1.0001, scale=1.0)
log_pdf = tweedie_dist.log_pdf(y, mu)
assert np.all(
np.isfinite(log_pdf)
), "Log PDF near power=1 contains non-finite values."

# Power approaching 2
tweedie_dist = TweedieDist(power=1.9999, scale=1.0)
log_pdf = tweedie_dist.log_pdf(y, mu)
assert np.all(
np.isfinite(log_pdf)
), "Log PDF near power=2 contains non-finite values."