Source code for etna.transforms.decomposition.change_points_based.detrend
from typing import Dict
from typing import Optional
import numpy as np
import pandas as pd
from ruptures.detection import Binseg
from sklearn.linear_model import LinearRegression
from etna.distributions import BaseDistribution
from etna.distributions import CategoricalDistribution
from etna.distributions import IntDistribution
from etna.transforms.decomposition.change_points_based.base import ReversibleChangePointsTransform
from etna.transforms.decomposition.change_points_based.base import _OneSegmentChangePointsTransform
from etna.transforms.decomposition.change_points_based.change_points_models import BaseChangePointsModelAdapter
from etna.transforms.decomposition.change_points_based.change_points_models import RupturesChangePointsModel
from etna.transforms.decomposition.change_points_based.per_interval_models import PerIntervalModel
from etna.transforms.decomposition.change_points_based.per_interval_models import SklearnRegressionPerIntervalModel
from etna.transforms.utils import match_target_quantiles
[docs]class _OneSegmentChangePointsTrendTransform(_OneSegmentChangePointsTransform):
"""_OneSegmentChangePointsTransform subtracts multiple linear trend from series."""
@staticmethod
def _get_features(series: pd.Series) -> np.ndarray:
"""Convert ETNA timestamp-index to a list of timestamps to fit regression models."""
timestamps = series.index
timestamps = np.array([[ts.timestamp()] for ts in timestamps])
return timestamps
def _apply_transformation(self, df: pd.DataFrame, transformed_series: pd.Series) -> pd.DataFrame:
df.loc[:, self.in_column] -= transformed_series
return df
def _apply_inverse_transformation(self, df: pd.DataFrame, transformed_series: pd.Series) -> pd.DataFrame:
df.loc[:, self.in_column] += transformed_series
if self.in_column == "target":
quantiles = match_target_quantiles(set(df.columns))
for quantile_column_nm in quantiles:
df.loc[:, quantile_column_nm] += transformed_series
return df
[docs]class ChangePointsTrendTransform(ReversibleChangePointsTransform):
"""Transform that makes a detrending of change-point intervals.
This class differs from :py:class:`~etna.transforms.decomposition.change_points_based.level.ChangePointsLevelTransform`
only by default values for ``change_points_model`` and ``per_interval_model``.
Transform divides each segment into intervals using ``change_points_model``.
Then a separate model is fitted on each interval using ``per_interval_model``.
Values predicted by the model are subtracted from each interval.
Evaluated function can be linear, mean, median, etc. Look at the signature to find out which models can be used.
Warning
-------
This transform can suffer from look-ahead bias. For transforming data at some timestamp
it uses information from the whole train part.
"""
_default_change_points_model = RupturesChangePointsModel(
change_points_model=Binseg(model="ar"),
n_bkps=5,
)
_default_per_interval_model = SklearnRegressionPerIntervalModel(model=LinearRegression())
def __init__(
self,
in_column: str,
change_points_model: Optional[BaseChangePointsModelAdapter] = None,
per_interval_model: Optional[PerIntervalModel] = None,
):
"""Init ChangePointsTrendTransform.
Parameters
----------
in_column:
name of column to apply transform to
change_points_model:
model to get trend change points,
by default :py:class:`ruptures.detection.Binseg` in a wrapper with ``n_bkps=5`` is used
per_interval_model:
model to process intervals of segment,
by default :py:class:`sklearn.linear_models.LinearRegression` in a wrapper is used
"""
self.in_column = in_column
self.change_points_model = (
change_points_model if change_points_model is not None else self._default_change_points_model
)
self.per_interval_model = (
per_interval_model if per_interval_model is not None else self._default_per_interval_model
)
super().__init__(
transform=_OneSegmentChangePointsTrendTransform(
in_column=self.in_column,
change_points_model=self.change_points_model,
per_interval_model=self.per_interval_model,
),
required_features=[in_column],
)
@property
def _is_change_points_model_default(self) -> bool:
# it can't see the difference between Binseg(model="ar") and Binseg(model="l1")
return self.change_points_model.to_dict() == self._default_change_points_model.to_dict()
[docs] def params_to_tune(self) -> Dict[str, BaseDistribution]:
"""Get default grid for tuning hyperparameters.
If ``self.change_points_model`` is equal to default then this grid tunes parameters:
``change_points_model.change_points_model.model``, ``change_points_model.n_bkps``.
Other parameters are expected to be set by the user.
Returns
-------
:
Grid to tune.
"""
if self._is_change_points_model_default:
return {
"change_points_model.change_points_model.model": CategoricalDistribution(
["l1", "l2", "normal", "rbf", "cosine", "linear", "clinear", "ar", "mahalanobis", "rank"]
),
"change_points_model.n_bkps": IntDistribution(low=5, high=30),
}
else:
return {}