Source code for etna.transforms.math.differencing

from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Union
from typing import cast

import numpy as np
import pandas as pd

from etna.datasets import TSDataset
from etna.distributions import BaseDistribution
from etna.distributions import IntDistribution
from etna.transforms.base import ReversibleTransform
from etna.transforms.utils import check_new_segments
from etna.transforms.utils import match_target_quantiles


[docs]class _SingleDifferencingTransform(ReversibleTransform): """Calculate a time series differences of order 1. During ``fit`` this transform can work with NaNs at the beginning of the segment, but fails when meets NaN inside the segment. During ``transform`` and ``inverse_transform`` there is no special treatment of NaNs. Notes ----- To understand how transform works we recommend: `Stationarity and Differencing <https://otexts.com/fpp2/stationarity.html>`_ """ def __init__( self, in_column: str, period: int = 1, inplace: bool = True, out_column: Optional[str] = None, ): """Create instance of _SingleDifferencingTransform. Parameters ---------- in_column: name of processed column period: number of steps back to calculate the difference with, it should be >= 1 inplace: * if True, apply transformation inplace to in_column, * if False, add transformed column to dataset out_column: * if set, name of added column, the final name will be '{out_column}'; * if isn't set, name will be based on ``self.__repr__()`` Raises ------ ValueError: if period is not integer >= 1 """ super().__init__(required_features=[in_column]) self.in_column = in_column if not isinstance(period, int) or period < 1: raise ValueError("Period should be at least 1") self.period = period self.inplace = inplace self.out_column = out_column self._train_timestamp: Optional[pd.DatetimeIndex] = None self._train_init_dict: Optional[Dict[str, pd.Series]] = None self._test_init_df: Optional[pd.DataFrame] = None self.in_column_regressor: Optional[bool] = None def _get_column_name(self) -> str: if self.inplace: return self.in_column if self.out_column is None: return self.__repr__() else: return self.out_column
[docs] def get_regressors_info(self) -> List[str]: """Return the list with regressors created by the transform.""" if self.in_column_regressor is None: raise ValueError("Fit the transform to get the correct regressors info!") if self.inplace: return [] return [self._get_column_name()] if self.in_column_regressor else []
[docs] def fit(self, ts: TSDataset) -> "_SingleDifferencingTransform": """Fit the transform.""" self.in_column_regressor = self.in_column in ts.regressors super().fit(ts) return self
def _fit(self, df: pd.DataFrame) -> "_SingleDifferencingTransform": """Fit the transform. Parameters ---------- df: dataframe with data. Returns ------- result: _SingleDifferencingTransform Raises ------ ValueError: if NaNs are present inside the segment """ segments = sorted(set(df.columns.get_level_values("segment"))) fit_df = df.loc[:, pd.IndexSlice[segments, self.in_column]].copy() train_init_dict = {} for current_segment in segments: cur_series = fit_df.loc[:, pd.IndexSlice[current_segment, self.in_column]] cur_series = cur_series.loc[cur_series.first_valid_index() :] if cur_series.isna().sum() > 0: raise ValueError(f"There should be no NaNs inside the segments") train_init_dict[current_segment] = cur_series[: self.period] self._train_init_dict = train_init_dict self._train_timestamp = fit_df.index self._test_init_df = fit_df.iloc[-self.period :, :] # make multiindex levels consistent self._test_init_df.columns = self._test_init_df.columns.remove_unused_levels() return self def _transform(self, df: pd.DataFrame) -> pd.DataFrame: """Make a differencing transformation. Parameters ---------- df: dataframe with data to transform. Returns ------- result: transformed dataframe """ segments = sorted(set(df.columns.get_level_values("segment"))) transformed = df.loc[:, pd.IndexSlice[segments, self.in_column]] for current_segment in segments: to_transform = transformed.loc[:, pd.IndexSlice[current_segment, self.in_column]] # make a differentiation transformed.loc[:, pd.IndexSlice[current_segment, self.in_column]] = to_transform.diff(periods=self.period) if self.inplace: result_df = df result_df.loc[:, pd.IndexSlice[segments, self.in_column]] = transformed else: transformed_features = pd.DataFrame( transformed, columns=df.loc[:, pd.IndexSlice[segments, self.in_column]].columns, index=df.index ) column_name = self._get_column_name() transformed_features.columns = pd.MultiIndex.from_product([segments, [column_name]]) result_df = pd.concat((df, transformed_features), axis=1) result_df = result_df.sort_index(axis=1) return result_df def _make_inv_diff(self, to_transform: Union[pd.DataFrame, pd.Series]) -> Union[pd.DataFrame, pd.Series]: """Make inverse difference transform.""" for i in range(self.period): to_transform.iloc[i :: self.period] = to_transform.iloc[i :: self.period].cumsum() return to_transform def _reconstruct_train(self, df: pd.DataFrame, columns_to_inverse: Set[str]) -> pd.DataFrame: """Reconstruct the train in ``inverse_transform``.""" segments = sorted(set(df.columns.get_level_values("segment"))) result_df = df.copy() # impute values for reconstruction and run reconstruction for current_segment in segments: init_segment = self._train_init_dict[current_segment] # type: ignore for column in columns_to_inverse: cur_series = result_df.loc[:, pd.IndexSlice[current_segment, column]] cur_series[init_segment.index] = init_segment.values cur_series = self._make_inv_diff(cur_series) result_df.loc[:, pd.IndexSlice[current_segment, column]] = cur_series return result_df def _reconstruct_test(self, df: pd.DataFrame, columns_to_inverse: Set[str]) -> pd.DataFrame: """Reconstruct the test in ``inverse_transform``.""" segments = sorted(set(df.columns.get_level_values("segment"))) result_df = df.copy() # check that test is right after the train expected_min_test_timestamp = pd.date_range( start=self._test_init_df.index.max(), # type: ignore periods=2, freq=pd.infer_freq(self._train_timestamp), closed="right", )[0] if expected_min_test_timestamp != df.index.min(): raise ValueError("Test should go after the train without gaps") # we can reconstruct the values by concatenating saved fit values before test values for column in columns_to_inverse: to_transform = df.loc[:, pd.IndexSlice[segments, column]].copy() init_df = self._test_init_df.copy() # type: ignore init_df.columns.set_levels([column], level="feature", inplace=True) init_df = init_df[segments] to_transform = pd.concat([init_df, to_transform]) # run reconstruction and save the result to_transform = self._make_inv_diff(to_transform) result_df.loc[:, pd.IndexSlice[segments, column]] = to_transform.loc[result_df.index] return result_df def _fit_transform(self, df: pd.DataFrame) -> pd.DataFrame: """Fit and transform dataframe. Parameters ---------- df: Dataframe to transform. Returns ------- : Transformed dataframe. """ return self._fit(df=df)._transform(df=df) def _inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame: """Apply inverse transformation to DataFrame. Parameters ---------- df: DataFrame to apply inverse transform. Returns ------- result: transformed DataFrame. Raises ------ ValueError: if inverse transform is applied not to full train nor to test that goes after train ValueError: if inverse transform is applied to test that goes after train with gap """ # we assume this to be fitted self._train_timestamp = cast(pd.DatetimeIndex, self._train_timestamp) if not self.inplace: return df columns_to_inverse = {self.in_column} # if we are working with in_column="target" then there can be quantiles to inverse too if self.in_column == "target": columns_to_inverse.update(match_target_quantiles(set(df.columns.get_level_values("feature")))) # determine if we are working with train or test if self._train_timestamp.shape[0] == df.index.shape[0] and np.all(self._train_timestamp == df.index): # we are on the train result_df = self._reconstruct_train(df, columns_to_inverse) elif df.index.min() > self._train_timestamp.max(): # we are on the test result_df = self._reconstruct_test(df, columns_to_inverse) else: raise ValueError("Inverse transform can be applied only to full train or test that should be in the future") return result_df
[docs]class DifferencingTransform(ReversibleTransform): """Calculate a time series differences. During ``fit`` this transform can work with NaNs at the beginning of the segment, but fails when meets NaN inside the segment. During ``transform`` and ``inverse_transform`` there is no special treatment of NaNs. Notes ----- To understand how transform works we recommend: `Stationarity and Differencing <https://otexts.com/fpp2/stationarity.html>`_ """ def __init__( self, in_column: str, period: int = 1, order: int = 1, inplace: bool = True, out_column: Optional[str] = None, ): """Create instance of DifferencingTransform. Parameters ---------- in_column: name of processed column period: number of steps back to calculate the difference with, it should be >= 1 order: number of differences to make, it should be >= 1 inplace: * if True, apply transformation inplace to in_column, * if False, add transformed column to dataset out_column: * if set, name of added column, the final name will be '{out_column}'; * if isn't set, name will be based on ``self.__repr__()`` Raises ------ ValueError: if period is not integer >= 1 ValueError: if order is not integer >= 1 """ super().__init__(required_features=[in_column]) self.in_column = in_column if not isinstance(period, int) or period < 1: raise ValueError("Period should be at least 1") self.period = period if not isinstance(order, int) or order < 1: raise ValueError("Order should be at least 1") self.order = order self.inplace = inplace self.out_column = out_column # add differencing transforms for each order result_out_column = self._get_column_name() self._differencing_transforms: List[_SingleDifferencingTransform] = [] # first transform should work like this transform but with prepared out_column name self._differencing_transforms.append( _SingleDifferencingTransform( in_column=self.in_column, period=self.period, inplace=self.inplace, out_column=result_out_column ) ) # other transforms should make differences inplace for _ in range(self.order - 1): self._differencing_transforms.append( _SingleDifferencingTransform(in_column=result_out_column, period=self.period, inplace=True) ) self._fit_segments: Optional[List[str]] = None self.in_column_regressor: Optional[bool] = None def _get_column_name(self) -> str: if self.inplace: return self.in_column if self.out_column is None: return self.__repr__() else: return self.out_column
[docs] def get_regressors_info(self) -> List[str]: """Return the list with regressors created by the transform.""" if self.in_column_regressor is None: raise ValueError("Fit the transform to get the correct regressors info!") if self.inplace: return [] return [self._get_column_name()] if self.in_column_regressor else []
[docs] def fit(self, ts: TSDataset) -> "DifferencingTransform": """Fit the transform.""" self.in_column_regressor = self.in_column in ts.regressors super().fit(ts) return self
def _fit(self, df: pd.DataFrame) -> "DifferencingTransform": """Fit the transform. Parameters ---------- df: dataframe with data. Returns ------- result: DifferencingTransform Raises ------ ValueError: if NaNs are present inside the segment """ # this is made because transforms of high order may need some columns created by transforms of lower order result_df = df for transform in self._differencing_transforms: result_df = transform._fit_transform(result_df) self._fit_segments = df.columns.get_level_values("segment").unique().tolist() return self def _check_is_fitted(self): if self._fit_segments is None: raise ValueError("Transform is not fitted!") def _transform(self, df: pd.DataFrame) -> pd.DataFrame: """Make a differencing transformation. Parameters ---------- df: dataframe with data to transform. Returns ------- result: transformed dataframe Raises ------ ValueError: if transform isn't fitted NotImplementedError: if there are segments that weren't present during training """ self._check_is_fitted() segments = df.columns.get_level_values("segment").unique().tolist() if self.inplace: check_new_segments(transform_segments=segments, fit_segments=self._fit_segments) result_df = df for transform in self._differencing_transforms: result_df = transform._transform(result_df) return result_df def _inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame: """Apply inverse transformation to DataFrame. Parameters ---------- df: DataFrame to apply inverse transform. Returns ------- result: transformed DataFrame. Raises ------ ValueError: if transform isn't fitted NotImplementedError: if there are segments that weren't present during training ValueError: if inverse transform is applied not to full train nor to test that goes after train ValueError: if inverse transform is applied to test that goes after train with gap """ self._check_is_fitted() if not self.inplace: return df segments = df.columns.get_level_values("segment").unique().tolist() check_new_segments(transform_segments=segments, fit_segments=self._fit_segments) result_df = df for transform in self._differencing_transforms[::-1]: result_df = transform._inverse_transform(result_df) return result_df
[docs] def params_to_tune(self) -> Dict[str, BaseDistribution]: """Get default grid for tuning hyperparameters. This grid tunes ``order`` parameter. Other parameters are expected to be set by the user. Returns ------- : Grid to tune. """ return { "order": IntDistribution(low=1, high=2), }