Unverified Commit 7487829a authored by Kashif Rasul's avatar Kashif Rasul Committed by GitHub
Browse files

Added support for multivariate independent emission heads (#19453)

* Added support for multivariate independent emission heads

* fix typo

* rename distr_cls

* scale is a vector for multivariate

* set affine transform event_dim

* fix typo

* added variable

* added beta in the config

* set beta

* remove beta-nll option in nll
parent a5da6f18
......@@ -55,7 +55,7 @@ class TimeSeriesTransformerConfig(PretrainedConfig):
distributions it is the negative log likelihood (nll) - which currently is the only supported one.
input_size (`int`, *optional*, defaults to 1):
The size of the target variable which by default is 1 for univariate targets. Would be > 1 in case of
multivarate targets.
multivariate targets.
scaling (`bool`, *optional* defaults to `True`):
Whether to scale the input targets.
lags_sequence (`list[int]`, *optional*, defaults to `[1, 2, 3, 4, 5, 6, 7]`):
......@@ -225,5 +225,5 @@ class TimeSeriesTransformerConfig(PretrainedConfig):
+ self.num_dynamic_real_features
+ self.num_time_features
+ max(1, self.num_static_real_features) # there is at least one dummy static real feature
+ 1 # the log(scale)
+ self.input_size # the log(scale)
)
......@@ -24,6 +24,7 @@ from torch import nn
from torch.distributions import (
AffineTransform,
Distribution,
Independent,
NegativeBinomial,
Normal,
StudentT,
......@@ -49,11 +50,11 @@ TIME_SERIES_TRANSFORMER_PRETRAINED_MODEL_ARCHIVE_LIST = [
class AffineTransformed(TransformedDistribution):
def __init__(self, base_distribution: Distribution, loc=None, scale=None):
def __init__(self, base_distribution: Distribution, loc=None, scale=None, event_dim=0):
self.scale = 1.0 if scale is None else scale
self.loc = 0.0 if loc is None else loc
super().__init__(base_distribution, [AffineTransform(self.loc, self.scale)])
super().__init__(base_distribution, [AffineTransform(loc=self.loc, scale=self.scale, event_dim=event_dim)])
@property
def mean(self):
......@@ -79,11 +80,7 @@ class AffineTransformed(TransformedDistribution):
class ParameterProjection(nn.Module):
def __init__(
self,
in_features: int,
args_dim: Dict[str, int],
domain_map: Callable[..., Tuple[torch.Tensor]],
**kwargs,
self, in_features: int, args_dim: Dict[str, int], domain_map: Callable[..., Tuple[torch.Tensor]], **kwargs
) -> None:
super().__init__(**kwargs)
self.args_dim = args_dim
......@@ -106,15 +103,19 @@ class LambdaLayer(nn.Module):
class DistributionOutput:
distr_cls: type
distribution_class: type
in_features: int
args_dim: Dict[str, int]
def __init__(self) -> None:
pass
def __init__(self, dim: int = 1) -> None:
self.dim = dim
self.args_dim = {k: dim * self.args_dim[k] for k in self.args_dim}
def _base_distribution(self, distr_args):
return self.distr_cls(*distr_args)
if self.dim == 1:
return self.distribution_class(*distr_args)
else:
return Independent(self.distribution_class(*distr_args), 1)
def distribution(
self,
......@@ -126,14 +127,14 @@ class DistributionOutput:
if loc is None and scale is None:
return distr
else:
return AffineTransformed(distr, loc=loc, scale=scale)
return AffineTransformed(distr, loc=loc, scale=scale, event_dim=self.event_dim)
@property
def event_shape(self) -> Tuple:
r"""
Shape of each individual event contemplated by the distributions that this object constructs.
"""
raise NotImplementedError()
return () if self.dim == 1 else (self.dim,)
@property
def event_dim(self) -> int:
......@@ -180,7 +181,7 @@ class DistributionOutput:
class StudentTOutput(DistributionOutput):
args_dim: Dict[str, int] = {"df": 1, "loc": 1, "scale": 1}
distr_cls: type = StudentT
distribution_class: type = StudentT
@classmethod
def domain_map(cls, df: torch.Tensor, loc: torch.Tensor, scale: torch.Tensor):
......@@ -188,28 +189,20 @@ class StudentTOutput(DistributionOutput):
df = 2.0 + cls.squareplus(df)
return df.squeeze(-1), loc.squeeze(-1), scale.squeeze(-1)
@property
def event_shape(self) -> Tuple:
return ()
class NormalOutput(DistributionOutput):
args_dim: Dict[str, int] = {"loc": 1, "scale": 1}
distr_cls: type = Normal
distribution_class: type = Normal
@classmethod
def domain_map(cls, loc: torch.Tensor, scale: torch.Tensor):
scale = cls.squareplus(scale)
return loc.squeeze(-1), scale.squeeze(-1)
@property
def event_shape(self) -> Tuple:
return ()
class NegativeBinomialOutput(DistributionOutput):
args_dim: Dict[str, int] = {"total_count": 1, "logits": 1}
distr_cls: type = NegativeBinomial
distribution_class: type = NegativeBinomial
@classmethod
def domain_map(cls, total_count: torch.Tensor, logits: torch.Tensor):
......@@ -218,27 +211,24 @@ class NegativeBinomialOutput(DistributionOutput):
def _base_distribution(self, distr_args) -> Distribution:
total_count, logits = distr_args
return self.distr_cls(total_count=total_count, logits=logits)
if self.dim == 1:
return self.distribution_class(total_count=total_count, logits=logits)
else:
return Independent(self.distribution_class(total_count=total_count, logits=logits), 1)
# Overwrites the parent class method. We cannot scale using the affine
# transformation since negative binomial should return integers. Instead
# we scale the parameters.
def distribution(
self,
distr_args,
loc: Optional[torch.Tensor] = None,
scale: Optional[torch.Tensor] = None,
self, distr_args, loc: Optional[torch.Tensor] = None, scale: Optional[torch.Tensor] = None
) -> Distribution:
total_count, logits = distr_args
if scale is not None:
# See scaling property of Gamma.
logits += scale.log()
return NegativeBinomial(total_count=total_count, logits=logits)
@property
def event_shape(self) -> Tuple:
return ()
return self._base_distribution((total_count, logits))
class FeatureEmbedder(nn.Module):
......@@ -366,23 +356,11 @@ def weighted_average(input_tensor: torch.Tensor, weights: Optional[torch.Tensor]
class NegativeLogLikelihood:
"""
Computes the negative log likelihood loss.
Args:
beta (`float`):
Float in range (0, 1). The beta parameter from the paper: "On the Pitfalls of Heteroscedastic Uncertainty
Estimation with Probabilistic Neural Networks" by [Seitzer et al.
2022](https://openreview.net/forum?id=aPOpXlnV1T).
Computes the negative log likelihood loss from input distribution with respect to target.
"""
beta: float = 0.0
def __call__(self, input: torch.distributions.Distribution, target: torch.Tensor) -> torch.Tensor:
nll = -input.log_prob(target)
if self.beta > 0.0:
variance = input.variance
nll = nll * (variance.detach() ** self.beta)
return nll
return -input.log_prob(target)
# Copied from transformers.models.bart.modeling_bart._make_causal_mask
......@@ -1552,15 +1530,14 @@ class TimeSeriesTransformerModel(TimeSeriesTransformerPreTrainedModel):
# embeddings
embedded_cat = self.embedder(static_categorical_features)
static_feat = torch.cat(
(embedded_cat, static_real_features, scale.log()),
dim=1,
)
# static features
log_scale = scale.log() if self.config.input_size == 1 else scale.squeeze(1).log()
static_feat = torch.cat((embedded_cat, static_real_features, log_scale), dim=1)
expanded_static_feat = static_feat.unsqueeze(1).expand(-1, time_feat.shape[1], -1)
# all features
features = torch.cat((expanded_static_feat, time_feat), dim=-1)
# sequence = torch.cat((prior_input, inputs), dim=1)
lagged_sequence = self.get_lagged_subsequences(sequence=inputs, subsequences_length=subsequences_length)
lags_shape = lagged_sequence.shape
......@@ -1713,11 +1690,11 @@ class TimeSeriesTransformerForPrediction(TimeSeriesTransformerPreTrainedModel):
super().__init__(config)
self.model = TimeSeriesTransformerModel(config)
if config.distribution_output == "student_t":
self.distribution_output = StudentTOutput()
self.distribution_output = StudentTOutput(dim=config.input_size)
elif config.distribution_output == "normal":
self.distribution_output = NormalOutput()
self.distribution_output = NormalOutput(dim=config.input_size)
elif config.distribution_output == "negative_binomial":
self.distribution_output = NegativeBinomialOutput()
self.distribution_output = NegativeBinomialOutput(dim=config.input_size)
else:
raise ValueError(f"Unknown distribution output {config.distribution_output}")
......@@ -1867,7 +1844,7 @@ class TimeSeriesTransformerForPrediction(TimeSeriesTransformerPreTrainedModel):
if len(self.target_shape) == 0:
loss_weights = future_observed_mask
else:
loss_weights = future_observed_mask.min(dim=-1, keepdim=False)
loss_weights, _ = future_observed_mask.min(dim=-1, keepdim=False)
prediction_loss = weighted_average(loss, weights=loss_weights)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment