import warnings
from copy import deepcopy
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union
import numpy as np
import pandas as pd
from joblib import Parallel
from joblib import delayed
from sklearn.base import RegressorMixin
from sklearn.linear_model import LinearRegression
from typing_extensions import Literal
from etna.datasets import TSDataset
from etna.distributions import BaseDistribution
from etna.ensembles.mixins import EnsembleMixin
from etna.ensembles.mixins import SaveEnsembleMixin
from etna.loggers import tslogger
from etna.metrics import MAE
from etna.pipeline.base import BasePipeline
[docs]class StackingEnsemble(EnsembleMixin, SaveEnsembleMixin, BasePipeline):
"""StackingEnsemble is a pipeline that forecast future using the metamodel to combine the forecasts of the base models.
Examples
--------
>>> from etna.datasets import generate_ar_df
>>> from etna.datasets import TSDataset
>>> from etna.ensembles import VotingEnsemble
>>> from etna.models import NaiveModel
>>> from etna.models import MovingAverageModel
>>> from etna.pipeline import Pipeline
>>> import pandas as pd
>>> pd.options.display.float_format = '{:,.2f}'.format
>>> df = generate_ar_df(periods=100, start_time="2021-06-01", ar_coef=[0.8], n_segments=3)
>>> df_ts_format = TSDataset.to_dataset(df)
>>> ts = TSDataset(df_ts_format, "D")
>>> ma_pipeline = Pipeline(model=MovingAverageModel(window=5), transforms=[], horizon=7)
>>> naive_pipeline = Pipeline(model=NaiveModel(lag=10), transforms=[], horizon=7)
>>> ensemble = StackingEnsemble(pipelines=[ma_pipeline, naive_pipeline])
>>> _ = ensemble.fit(ts=ts)
>>> forecast = ensemble.forecast()
>>> forecast[:,:,"target"]
segment segment_0 segment_1 segment_2
feature target target target
timestamp
2021-09-09 0.70 1.47 0.20
2021-09-10 0.62 1.53 0.26
2021-09-11 0.50 1.78 0.36
2021-09-12 0.37 1.88 0.21
2021-09-13 0.46 1.87 0.25
2021-09-14 0.44 1.49 0.21
2021-09-15 0.36 1.56 0.30
"""
def __init__(
self,
pipelines: List[BasePipeline],
final_model: Optional[RegressorMixin] = None,
n_folds: int = 3,
features_to_use: Union[None, Literal["all"], List[str]] = None,
n_jobs: int = 1,
joblib_params: Optional[Dict[str, Any]] = None,
):
"""Init StackingEnsemble.
Parameters
----------
pipelines:
List of pipelines that should be used in ensemble.
final_model:
Regression model with fit/predict interface which will be used to combine the base estimators.
n_folds:
Number of folds to use in the backtest. Backtest is not used for model evaluation but for prediction.
features_to_use:
Features except the forecasts of the base models to use in the ``final_model``.
n_jobs:
Number of jobs to run in parallel.
joblib_params:
Additional parameters for :py:class:`joblib.Parallel`.
Raises
------
ValueError:
If the number of the pipelines is less than 2 or pipelines have different horizons.
"""
self._validate_pipeline_number(pipelines=pipelines)
self.pipelines = pipelines
self.final_model = LinearRegression(positive=True) if final_model is None else final_model
self._validate_backtest_n_folds(n_folds)
self.n_folds = n_folds
self.features_to_use = features_to_use
self.filtered_features_for_final_model: Union[None, Set[str]] = None
self.n_jobs = n_jobs
if joblib_params is None:
self.joblib_params = dict(verbose=11, backend="multiprocessing", mmap_mode="c")
else:
self.joblib_params = joblib_params
super().__init__(horizon=self._get_horizon(pipelines=pipelines))
def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set[str]]:
"""Return all the features from ``features_to_use`` which can be obtained from base models' forecasts."""
features_df = pd.concat([forecast.df for forecast in forecasts], axis=1)
available_features = set(features_df.columns.get_level_values("feature")) - {"fold_number"}
features_to_use = self.features_to_use
if features_to_use is None:
return None
elif features_to_use == "all":
return available_features - {"target"}
elif isinstance(features_to_use, list):
features_to_use_unique = set(features_to_use)
if len(features_to_use_unique) == 0:
return None
elif features_to_use_unique.issubset(available_features):
return features_to_use_unique
else:
unavailable_features = features_to_use_unique - available_features
warnings.warn(f"Features {unavailable_features} are not found and will be dropped!")
return features_to_use_unique.intersection(available_features)
else:
warnings.warn(
"Feature list is passed in the wrong format."
"Only the base models' forecasts will be used for the final forecast."
)
return None
def _backtest_pipeline(self, pipeline: BasePipeline, ts: TSDataset) -> TSDataset:
"""Get forecasts from backtest for given pipeline."""
with tslogger.disable():
_, forecasts, _ = pipeline.backtest(ts=ts, metrics=[MAE()], n_folds=self.n_folds)
forecasts = TSDataset(df=forecasts, freq=ts.freq)
return forecasts
[docs] def fit(self, ts: TSDataset) -> "StackingEnsemble":
"""Fit the ensemble.
Parameters
----------
ts:
TSDataset to fit ensemble.
Returns
-------
self:
Fitted ensemble.
"""
self.ts = ts
# Get forecasts from base models on backtest to fit the final model on
forecasts = Parallel(n_jobs=self.n_jobs, **self.joblib_params)(
delayed(self._backtest_pipeline)(pipeline=pipeline, ts=deepcopy(ts)) for pipeline in self.pipelines
)
# Fit the final model
self.filtered_features_for_final_model = self._filter_features_to_use(forecasts)
x, y = self._make_features(ts=self.ts, forecasts=forecasts, train=True)
self.final_model.fit(x, y)
# Fit the base models
self.pipelines = Parallel(n_jobs=self.n_jobs, **self.joblib_params)(
delayed(self._fit_pipeline)(pipeline=pipeline, ts=deepcopy(ts)) for pipeline in self.pipelines
)
return self
def _make_features(
self, ts: TSDataset, forecasts: List[TSDataset], train: bool = False
) -> Tuple[pd.DataFrame, Optional[pd.Series]]:
"""Prepare features for the ``final_model``."""
# Stack targets from the forecasts
targets = [
forecast[:, :, "target"].rename({"target": f"regressor_target_{i}"}, level="feature", axis=1)
for i, forecast in enumerate(forecasts)
]
targets = pd.concat(targets, axis=1)
# Get features from filtered_features_for_final_model
features = pd.DataFrame()
if self.filtered_features_for_final_model is not None:
features_in_forecasts = [
list(
set(forecast.columns.get_level_values("feature")).intersection(
self.filtered_features_for_final_model
)
)
for forecast in forecasts
]
features = pd.concat(
[forecast[:, :, features_in_forecasts[i]] for i, forecast in enumerate(forecasts)], axis=1
)
features = features.loc[:, ~features.columns.duplicated()]
features_df = pd.concat([features, targets], axis=1)
# Flatten the features to fit the sklearn interface
x = pd.concat([features_df.loc[:, segment] for segment in ts.segments], axis=0)
if train:
y = pd.concat(
[ts[forecasts[0].index.min() : forecasts[0].index.max(), segment, "target"] for segment in ts.segments],
axis=0,
)
return x, y
else:
return x, None
def _process_forecasts(self, ts: TSDataset, forecasts: List[TSDataset]) -> TSDataset:
x, _ = self._make_features(ts=ts, forecasts=forecasts, train=False)
y = self.final_model.predict(x)
num_segments = len(forecasts[0].segments)
y = y.reshape(num_segments, -1).T
num_timestamps = y.shape[0]
# Format the forecast into TSDataset
segment_col = [segment for segment in ts.segments for _ in range(num_timestamps)]
x.loc[:, "segment"] = segment_col
x.loc[:, "timestamp"] = x.index.values
df_exog = TSDataset.to_dataset(x)
df = forecasts[0][:, :, "target"].copy()
df.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = np.NAN
result = TSDataset(df=df, freq=ts.freq, df_exog=df_exog)
result.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = y
return result
def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset:
"""Make predictions.
Compute the combination of pipelines' forecasts using ``final_model``
"""
if return_components:
raise NotImplementedError("Adding target components is not currently implemented!")
forecasts = Parallel(n_jobs=self.n_jobs, **self.joblib_params)(
delayed(self._forecast_pipeline)(pipeline=pipeline, ts=ts) for pipeline in self.pipelines
)
forecast = self._process_forecasts(ts=ts, forecasts=forecasts)
return forecast
def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool,
) -> TSDataset:
if prediction_interval:
raise NotImplementedError(f"Ensemble {self.__class__.__name__} doesn't support prediction intervals!")
if return_components:
raise NotImplementedError("Adding target components is not currently implemented!")
predictions = Parallel(n_jobs=self.n_jobs, **self.joblib_params)(
delayed(self._predict_pipeline)(
ts=ts, pipeline=pipeline, start_timestamp=start_timestamp, end_timestamp=end_timestamp
)
for pipeline in self.pipelines
)
prediction = self._process_forecasts(ts=ts, forecasts=predictions)
return prediction
[docs] def params_to_tune(self) -> Dict[str, BaseDistribution]:
"""Get hyperparameter grid to tune.
Not implemented for this class.
Returns
-------
:
Grid with hyperparameters.
"""
raise NotImplementedError(f"{self.__class__.__name__} doesn't support this method!")