[QUESTION] create dataset to fit using a stride between consecutive samples equal to output_chunk_length #2621
Description
Hello everyone, and thank you for this awesome library.
I'm currently working weather data, for which I have a forecasted series every day at the same hour.
At the current state of my implementation, i'm using a mix of covariates (past and future) to train the model.
From my understanding, darts by default is shifting each new sample by one timestep from the previous one. A different behaviour can be achieved creating a custom class (inherited from GenericShiftedDataset), consecutively packed in another custom class inherited from MixedCovariatesSequentialDataset.
My goal is to create a dataset in which each sample (intended as a sequence of input_chunk_length+output_chunk_length time steps) is shifted from the previous one by a stride of output_chunk_length time steps. My interest is to evaluate how the model would perform if trained in the same way as i'm expecting to call it at the inference stage, i.e. once a day, instead of the current training approach with a stride=1.
I attempted a very basic implementation, following the example in #2421 which is unfortunately not working at all and I'm not confident enough to fly solo:
horizon = 24
history = 120
target_series = TimeSeries.from_dataframe(df, value_cols=target_f, freq='h')
past_covariates_series = TimeSeries.from_dataframe(df, value_cols=past_cc, freq='h')
future_covariates_series = TimeSeries.from_dataframe(df, value_cols=future_cc,freq='h')
sample_weight = target_series.with_values(sample_weight)
train_target,val_target=target_series.slice(per_train.start_date, per_train.end_date), target_series.slice(per_val.start_date, per_val.end_date-pd.Timedelta(hours=horizon))
train_past_covariates,val_past_covariates=past_covariates_series.slice(per_train.start_date, per_train.end_date),past_covariates_series.slice(per_val.start_date, per_val.end_date-pd.Timedelta(hours=horizon))
train_future_covariates,val_future_covariates=future_covariates_series.slice(per_train.start_date, per_train.end_date+pd.Timedelta(hours=horizon)),future_covariates_series.slice(per_val.start_date, per_val.end_date)
from typing import Optional, Sequence, Tuple, Union
import numpy as np
from darts import TimeSeries
from darts.utils.data import MixedCovariatesSequentialDataset
from darts.utils.data.shifted_dataset import GenericShiftedDataset
from darts.logging import raise_log, get_logger
from darts.utils.data.utils import CovariateType
logger = get_logger(__name__)
class CustomGSD_stride(GenericShiftedDataset):
def __init__(
self,
target_series: Union[TimeSeries, Sequence[TimeSeries]],
covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
input_chunk_length: int = 12,
output_chunk_length: int = 1,
shift_covariates: bool = False,
max_samples_per_ts: Optional[int] = None,
covariate_type: CovariateType = CovariateType.NONE,
use_static_covariates: bool = True,
sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None,
):
# Here we want to produce samples with a stride = output_chunk_length.
# That means each subsequent sample is separated from the previous one by output_chunk_length steps.
shift = output_chunk_length
super().__init__(
target_series=target_series,
covariates=covariates,
input_chunk_length=input_chunk_length,
output_chunk_length=output_chunk_length,
shift=shift,
shift_covariates=shift_covariates,
max_samples_per_ts=max_samples_per_ts,
covariate_type=covariate_type,
use_static_covariates=use_static_covariates,
sample_weight=sample_weight,
)
# If max_samples_per_ts is None, calculate it.
# n_samples_in_ts = total samples possible with stride=1
if self.max_samples_per_ts is None:
if isinstance(target_series, TimeSeries):
series_list = [target_series]
else:
series_list = target_series
# For each series, compute n_samples_in_ts = len(ts) - (input+output) + 1
n_samples_candidates = [
(len(ts) - (self.input_chunk_length + self.output_chunk_length) + 1)
for ts in series_list
]
# Now each sample will jump forward by `shift` steps, so we divide by shift
max_samples_per_ts_list = [ns // self.shift for ns in n_samples_candidates if ns > 0]
# Take the minimum across all target series (if multiple)
if len(max_samples_per_ts_list) == 0:
raise ValueError("No valid samples can be formed from the provided series.")
self.max_samples_per_ts = min(max_samples_per_ts_list)
def __getitem__(
self, idx
) -> Tuple[
np.ndarray,
Optional[np.ndarray],
Optional[np.ndarray],
Optional[np.ndarray],
np.ndarray,
]:
target_idx = idx // self.max_samples_per_ts
within_ts_idx = idx % self.max_samples_per_ts
target_series = self.target_series[target_idx]
target_vals = target_series.random_component_values(copy=False)
n_samples_in_ts = len(target_vals) - self.size_of_both_chunks + 1
if n_samples_in_ts < 1:
raise ValueError(
"The series is too short to form even one sample. "
f"Required at least {max(self.input_chunk_length, self.shift + self.output_chunk_length)} steps."
)
# The first sample ends at (input_chunk_length + output_chunk_length - 1)
# Each subsequent sample moves forward by `shift` steps
end_of_output_idx = (self.input_chunk_length + self.output_chunk_length - 1) + (within_ts_idx * self.shift)
covariate_series = self.covariates[target_idx] if self.covariates is not None else None
sample_weight_series = self.sample_weight[target_idx] if self.sample_weight is not None else None
(
past_start,
past_end,
future_start,
future_end,
covariate_start,
covariate_end,
sample_weight_start,
sample_weight_end,
) = self._memory_indexer(
target_idx=target_idx,
target_series=target_series,
shift=self.shift,
input_chunk_length=self.input_chunk_length,
output_chunk_length=self.output_chunk_length,
end_of_output_idx=end_of_output_idx,
covariate_series=covariate_series,
covariate_type=self.main_covariate_type,
sample_weight_series=sample_weight_series,
)
# Debug prints can be commented out or removed in production
# print("past_start", past_start)
# print("past_end", past_end)
# print("future_start", future_start)
# print("future_end", future_end)
# print("covariate_start", covariate_start)
# print("covariate_end", covariate_end)
# print("sample_weight_start", sample_weight_start)
# print("sample_weight_end", sample_weight_end)
# Extract target segments
future_target = target_vals[future_start:future_end]
past_target = target_vals[past_start:past_end]
# Extract covariates
covariate = None
if self.covariates is not None:
if covariate_end > len(covariate_series) or covariate_start < 0:
raise ValueError(
f"The dataset contains {self.main_covariate_type.value} covariates "
f"that don't extend far enough. (index {idx}-th sample)"
)
covariate = covariate_series.random_component_values(copy=False)[covariate_start:covariate_end]
expected_len = self.output_chunk_length if self.shift_covariates else self.input_chunk_length
if len(covariate) != expected_len:
raise ValueError(
f"The dataset contains {self.main_covariate_type.value} covariates "
f"with incorrect length. Expected {expected_len}, got {len(covariate)}."
)
# Extract sample weights
sample_weight = None
if self.sample_weight is not None:
if sample_weight_end > len(sample_weight_series) or sample_weight_start < 0:
raise ValueError("Sample weights are not long enough.")
sample_weight = sample_weight_series.random_component_values(copy=False)[sample_weight_start:sample_weight_end]
if len(sample_weight) != self.output_chunk_length:
raise ValueError(
f"Sample weights should match output_chunk_length. "
f"Expected {self.output_chunk_length}, got {len(sample_weight)}."
)
# Static covariates
static_covariate = target_series.static_covariates_values(copy=False) if self.use_static_covariates else None
return past_target, covariate, static_covariate, sample_weight, future_target
class CustomMCSD_stride(MixedCovariatesSequentialDataset):
def __init__(
self,
target_series: Union[TimeSeries, Sequence[TimeSeries]],
past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None,
input_chunk_length: int = 12,
output_chunk_length: int = 1,
max_samples_per_ts: Optional[int] = None,
use_static_covariates: bool = True,
sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None,
):
# Past dataset
self.ds_past = CustomGSD_stride(
target_series=target_series,
covariates=past_covariates,
input_chunk_length=input_chunk_length,
output_chunk_length=output_chunk_length,
shift_covariates=False,
max_samples_per_ts=max_samples_per_ts,
covariate_type=CovariateType.PAST,
use_static_covariates=use_static_covariates,
sample_weight=sample_weight,
)
# Historic future dataset
self.ds_historic_future = CustomGSD_stride(
target_series=target_series,
covariates=future_covariates,
input_chunk_length=input_chunk_length,
output_chunk_length=output_chunk_length,
shift_covariates=False,
max_samples_per_ts=max_samples_per_ts,
covariate_type=CovariateType.HISTORIC_FUTURE,
use_static_covariates=use_static_covariates,
sample_weight=sample_weight,
)
# Future covariates dataset
self.ds_future = CustomGSD_stride(
target_series=target_series,
covariates=future_covariates,
input_chunk_length=input_chunk_length,
output_chunk_length=output_chunk_length,
shift_covariates=True,
max_samples_per_ts=max_samples_per_ts,
covariate_type=CovariateType.FUTURE,
use_static_covariates=use_static_covariates,
)
def __getitem__(
self, idx
) -> Tuple[
np.ndarray,
Optional[np.ndarray],
Optional[np.ndarray],
Optional[np.ndarray],
Optional[np.ndarray],
Optional[np.ndarray],
np.ndarray,
]:
past_target, past_covariate, static_covariate, sample_weight, future_target = self.ds_past[idx]
_, historic_future_covariate, _, _, _ = self.ds_historic_future[idx]
_, future_covariate, _, _, _ = self.ds_future[idx]
return (
past_target,
past_covariate,
historic_future_covariate,
future_covariate,
static_covariate,
sample_weight,
future_target,
)
then i create the dataset and then fit from dataset
ds_train = CustomMCSD_stride(
target_series=[train_target],
past_covariates=[train_past_covariates],
future_covariates=[train_future_covariates],
input_chunk_length=history,
output_chunk_length=horizon,
max_samples_per_ts=None,
use_static_covariates=False,
sample_weight=sample_weight,
)
my_stopper = EarlyStopping(
monitor="val_loss",
patience=5,
min_delta=1e-5,
mode='min',
)
pl_trainer_kwargs = {"callbacks": [my_stopper],
"gradient_clip_val": 1,
#"logger": logger,
}
optimizer_cls = torch.optim.Adam
optimizer_kwargs = {
"lr": 1e-5,
}
# learning rate scheduler
lr_scheduler_cls = torch.optim.lr_scheduler.ExponentialLR
lr_scheduler_kwargs = {"gamma": 0.999}
model = TSMixerModel(model_name="tsmixer",
input_chunk_length=history,
output_chunk_length=horizon,
optimizer_kwargs = optimizer_kwargs,
lr_scheduler_kwargs = lr_scheduler_kwargs,
activation="ReLU",
hidden_size= 64,
ff_size=64,
n_epochs=50,
random_state=40,
log_tensorboard=False,
pl_trainer_kwargs=pl_trainer_kwargs # Passa i kwargs del trainer al modello
)
# model.trainer_params = pl_trainer_kwargs
model.fit_from_dataset(ds_train,verbose=True)
The error i'm getting at this stage is "ValueError: The dataset contains past covariates that don't extend far enough. (index 13128-th sample)" at the fit stage. Please note that the timseries creation and the train and validation split is perfectly working in the default train approach (i.e. without the custom dataset and instead using the fit() method from the model)
Any help or insight would be much appreciated, i'm a bit lost right now.
Thank again for your work!