#!/usr/bin/env python3
from typing import List, Optional, Union
import torch
from jaxtyping import Float
from torch import Tensor
from linear_operator.operators._linear_operator import IndexType, LinearOperator
from linear_operator.operators.dense_linear_operator import DenseLinearOperator, to_linear_operator
from linear_operator.operators.matmul_linear_operator import MatmulLinearOperator
from linear_operator.utils.broadcasting import _pad_with_singletons
from linear_operator.utils.getitem import _equal_indices, _noop_index
from linear_operator.utils.memoize import cached
[docs]
class RootLinearOperator(LinearOperator):
def __init__(self, root):
root = to_linear_operator(root)
super().__init__(root)
self.root = root
def _diagonal(self: Float[LinearOperator, "... M N"]) -> Float[torch.Tensor, "... N"]:
if isinstance(self.root, DenseLinearOperator):
return (self.root.tensor**2).sum(-1)
else:
return super()._diagonal()
def _expand_batch(
self: Float[LinearOperator, "... M N"], batch_shape: Union[torch.Size, List[int]]
) -> Float[LinearOperator, "... M N"]:
if len(batch_shape) == 0:
return self
return self.__class__(self.root._expand_batch(batch_shape))
def _get_indices(self, row_index: IndexType, col_index: IndexType, *batch_indices: IndexType) -> torch.Tensor:
row_index = row_index.unsqueeze(-1)
col_index = col_index.unsqueeze(-1)
batch_indices = tuple(batch_index.unsqueeze(-1) for batch_index in batch_indices)
inner_index = torch.arange(0, self.root.size(-1), device=self.device)
inner_index = _pad_with_singletons(inner_index, row_index.dim() - 1, 0)
left_tensor = self.root._get_indices(row_index, inner_index, *batch_indices)
if torch.equal(row_index, col_index):
res = left_tensor.pow(2).sum(-1)
else:
right_tensor = self.root._get_indices(col_index, inner_index, *batch_indices)
res = (left_tensor * right_tensor).sum(-1)
return res
def _getitem(self, row_index: IndexType, col_index: IndexType, *batch_indices: IndexType) -> LinearOperator:
# Make sure we're not generating more memory with our "efficient" method
if torch.is_tensor(row_index) and torch.is_tensor(col_index):
num_indices = row_index.numel()
if num_indices > self.matrix_shape.numel():
return to_linear_operator(self.to_dense())._getitem(row_index, col_index, *batch_indices)
left_tensor = self.root._getitem(row_index, _noop_index, *batch_indices)
if _equal_indices(row_index, col_index):
res = self.__class__(left_tensor)
else:
right_tensor = self.root._getitem(col_index, _noop_index, *batch_indices)
res = MatmulLinearOperator(left_tensor, right_tensor.mT)
return res
def _matmul(
self: Float[LinearOperator, "*batch M N"],
rhs: Union[Float[torch.Tensor, "*batch2 N C"], Float[torch.Tensor, "*batch2 N"]],
) -> Union[Float[torch.Tensor, "... M C"], Float[torch.Tensor, "... M"]]:
return self.root._matmul(self.root._t_matmul(rhs))
def _mul_constant(
self: Float[LinearOperator, "*batch M N"], other: Union[float, torch.Tensor]
) -> Float[LinearOperator, "*batch M N"]:
if (other > 0).all():
res = self.__class__(self.root._mul_constant(other.sqrt()))
else:
res = super()._mul_constant(other)
return res
def _t_matmul(
self: Float[LinearOperator, "*batch M N"],
rhs: Union[Float[Tensor, "*batch2 M P"], Float[LinearOperator, "*batch2 M P"]],
) -> Union[Float[LinearOperator, "... N P"], Float[Tensor, "... N P"]]:
# Matrix is symmetric
return self._matmul(rhs)
def add_low_rank(
self: Float[LinearOperator, "*batch N N"],
low_rank_mat: Union[Float[Tensor, "... N _"], Float[LinearOperator, "... N _"]],
root_decomp_method: Optional[str] = None,
root_inv_decomp_method: Optional[str] = None,
generate_roots: Optional[bool] = True,
**root_decomp_kwargs,
) -> Float[LinearOperator, "*batch N N"]:
return super().add_low_rank(low_rank_mat, root_inv_decomp_method=root_inv_decomp_method)
def root_decomposition(
self: Float[LinearOperator, "*batch N N"], method: Optional[str] = None
) -> Float[LinearOperator, "*batch N N"]:
return self
def _root_decomposition(
self: Float[LinearOperator, "... N N"]
) -> Union[Float[torch.Tensor, "... N N"], Float[LinearOperator, "... N N"]]:
return self.root
def _root_decomposition_size(self) -> int:
return self.root.size(-1)
def _size(self) -> torch.Size:
return torch.Size((*self.root.batch_shape, self.root.size(-2), self.root.size(-2)))
def _transpose_nonbatch(self: Float[LinearOperator, "*batch M N"]) -> Float[LinearOperator, "*batch N M"]:
return self
@cached
def to_dense(self: Float[LinearOperator, "*batch M N"]) -> Float[Tensor, "*batch M N"]:
eval_root = self.root.to_dense()
return torch.matmul(eval_root, eval_root.mT)