Coverage for src/cvxcla/operators/gram.py: 100%
61 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-27 06:45 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-27 06:45 +0000
1"""Data-matrix-backed sample covariance.
3``GramCovariance`` implements the :class:`~cvxcla.operators._core.QuadraticForm`
4protocol straight from the ``(T, n)`` returns matrix, storing only the centered
5data (``O(T n)`` memory) and never forming the ``n x n`` covariance. An optional
6ridge restores positive definiteness in the short-sample (``T < n``) regime,
7with the free-block solve done by Woodbury in ``T``-space.
8"""
10from __future__ import annotations
12from dataclasses import dataclass
13from functools import cached_property
14from typing import cast
16import numpy as np
17from numpy.typing import NDArray
20@dataclass(frozen=True)
21class GramCovariance:
22 """Sample covariance backed by the data matrix, never forming ``Sigma``.
24 A sample covariance *is* a (scaled, centered) Gram matrix:
25 ``Sigma = X_c^T X_c / (T - 1)`` where ``X_c`` is the column-centered
26 ``(T, n)`` returns matrix. This backend stores only ``X_c`` (``O(T n)``
27 memory) and implements the ``QuadraticForm`` protocol straight from it, so
28 the ``n x n`` covariance is never materialised: ``matvec`` is two thin
29 products ``X_c^T (X_c v)``, and ``solve_free`` works on the free columns of
30 ``X_c`` alone.
32 An optional ridge ``delta >= 0`` represents ``Sigma = X_c^T X_c / (T - 1) +
33 delta I``. It matters because ``X_c^T X_c`` has rank at most ``T - 1``: when
34 there are fewer observations than assets (``T <= n``) the raw covariance is
35 singular and the free block becomes unsolvable once the free set outgrows the
36 data rank (the degeneracy the CLA detects via ``rcond_free``). A positive
37 ridge restores positive definiteness, and the free-block solve is then done by
38 the Woodbury identity in the ``T``-dimensional observation space
39 (``O(n_F T^2 + T^3)``) whenever that is cheaper than the ``n_F x n_F`` normal
40 equations -- i.e. this is a factor model whose factors are the observations.
42 When to use:
43 This is a *memory* play, not an unconditional speed-up. Its win is the
44 short-sample regime ``T < n`` (a sample covariance from far fewer periods
45 than assets), where it never forms the ``n x n`` matrix and the
46 ridge + Woodbury solve works in the small ``T``-space. When ``T >= n`` and
47 the dense covariance fits in memory, ``DenseCovariance`` is faster per
48 turning point: it slices a *precomputed* ``Sigma_FF`` once, whereas this
49 backend re-forms ``X_cF^T X_cF`` (``O(n_F^2 T)``) at every solve. The same
50 caution applies to ``FactorCovariance`` with many factors: a
51 diagonal-plus-low-rank solve only pays off while the rank stays well below
52 ``n`` -- at full rank the structured route is slower than the plain dense
53 matrix.
55 Note:
56 The covariance is built from the *centered* data; the raw Gram
57 ``X^T X`` is the second-moment matrix, which differs by the rank-one mean
58 correction ``T x_bar x_bar^T``. This backend centers for you.
60 Attributes:
61 returns: The ``(T, n)`` matrix of observations (e.g. asset returns).
62 ridge: A non-negative diagonal loading added to the covariance.
64 Examples:
65 >>> import numpy as np
66 >>> rng = np.random.default_rng(0)
67 >>> returns = rng.standard_normal((50, 4))
68 >>> gram = GramCovariance(returns)
69 >>> bool(np.allclose(gram.matvec(np.ones(4)), np.cov(returns, rowvar=False) @ np.ones(4)))
70 True
71 """
73 returns: NDArray[np.float64]
74 ridge: float = 0.0
76 def __post_init__(self) -> None:
77 """Validate that ``returns`` is a ``(T, n)`` matrix with ``T >= 2`` and ``ridge >= 0``.
79 Raises:
80 ValueError: If ``returns`` is not two-dimensional with at least two
81 rows, or if ``ridge`` is negative.
82 """
83 if self.returns.ndim != 2 or self.returns.shape[0] < 2:
84 msg = f"returns must be a (T, n) matrix with T >= 2 observations, got shape {self.returns.shape}"
85 raise ValueError(msg)
86 if self.ridge < 0.0:
87 msg = f"ridge must be non-negative, got {self.ridge}"
88 raise ValueError(msg)
90 @cached_property
91 def _centered(self) -> NDArray[np.float64]:
92 """The column-centered data matrix ``X_c`` of shape ``(T, n)``."""
93 return cast("NDArray[np.float64]", self.returns - self.returns.mean(axis=0, keepdims=True))
95 @property
96 def _t(self) -> int:
97 """Number of observations ``T``."""
98 return int(self.returns.shape[0])
100 @property
101 def _scale(self) -> float:
102 """The sample-covariance scale ``1 / (T - 1)`` (``ddof = 1``, matching ``np.cov``)."""
103 return 1.0 / (self._t - 1)
105 @property
106 def n(self) -> int:
107 """Number of assets (the dimension of the covariance)."""
108 return int(self.returns.shape[1])
110 def matvec(self, x: NDArray[np.float64]) -> NDArray[np.float64]:
111 """Compute ``Sigma @ x`` as ``scale * X_c^T (X_c @ x) + ridge * x``.
113 Args:
114 x: Vector of shape ``(n,)`` or matrix of shape ``(n, r)``.
116 Returns:
117 ``Sigma @ x`` with the same trailing shape as ``x``, in ``O(T n)`` per
118 column and without forming the ``n x n`` covariance.
119 """
120 xc = self._centered
121 product = self._scale * (xc.T @ (xc @ x))
122 return product + self.ridge * x
124 def cross(self, free: NDArray[np.bool_], x: NDArray[np.float64]) -> NDArray[np.float64]:
125 """Compute the free-to-blocked cross product ``Sigma[free][:, ~free] @ x[~free]``.
127 The ridge is diagonal, so it contributes nothing to this off-diagonal
128 block; the result is ``scale * X_cF^T (X_cB @ x_B)``.
130 Args:
131 free: Boolean mask of shape ``(n,)`` selecting the free assets.
132 x: Full-length vector of shape ``(n,)``; only the blocked entries
133 ``x[~free]`` enter the product.
135 Returns:
136 Vector of shape ``(n_free,)``.
137 """
138 blocked = ~free
139 xc_free = self._centered[:, free]
140 xc_blocked = self._centered[:, blocked]
141 return self._scale * (xc_free.T @ (xc_blocked @ x[blocked]))
143 def solve_free(self, free: NDArray[np.bool_], rhs: NDArray[np.float64]) -> NDArray[np.float64]:
144 """Solve ``Sigma[free][:, free] @ y = rhs`` from the free columns of ``X_c``.
146 Without a ridge the free block ``scale * X_cF^T X_cF`` is formed and solved
147 directly (it is singular, and the solve unreliable, once the free set
148 outgrows the data rank -- which ``rcond_free`` reports so the CLA can
149 refuse). With a positive ridge the block is positive definite and, when
150 there are more free assets than observations, the solve is done by the
151 Woodbury identity in ``T``-space instead of inverting an ``n_F x n_F``
152 matrix.
154 Args:
155 free: Boolean mask of shape ``(n,)`` selecting the free assets.
156 rhs: Right-hand side of shape ``(n_free,)`` or ``(n_free, r)``.
158 Returns:
159 The solution ``y`` with the same shape as ``rhs``.
160 """
161 xc_free = self._centered[:, free]
162 n_free = xc_free.shape[1]
164 if self.ridge > 0.0 and self._t < n_free:
165 # Woodbury in the T-dimensional observation space:
166 # (ridge I + W^T W)^{-1} = (1/ridge) I - (1/ridge^2) W^T (I + W W^T / ridge)^{-1} W,
167 # with W = sqrt(scale) X_cF (shape (T, n_free)).
168 w = np.sqrt(self._scale) * xc_free
169 correction_system = np.eye(self._t) + (w @ w.T) / self.ridge
170 correction = w.T @ np.linalg.solve(correction_system, w @ rhs)
171 return cast("NDArray[np.float64]", (rhs - correction / self.ridge) / self.ridge)
173 block = self._scale * (xc_free.T @ xc_free)
174 if self.ridge > 0.0:
175 block = block + self.ridge * np.eye(n_free)
176 return cast("NDArray[np.float64]", np.linalg.solve(block, rhs))
178 def rcond_free(self, free: NDArray[np.bool_]) -> float:
179 """Reciprocal condition number of the free block, from the SVD of ``X_cF``.
181 The eigenvalues of ``scale * X_cF^T X_cF + ridge I`` are
182 ``scale * sigma_i^2 + ridge`` for the singular values ``sigma_i`` of
183 ``X_cF``, plus extra ``ridge`` eigenvalues when there are more free assets
184 than the rank of ``X_cF``. This reads the conditioning off the SVD of the
185 ``(T, n_free)`` data block without forming the ``n_F x n_F`` matrix, so a
186 rank-deficient (``T``-limited) free set correctly drives the result toward
187 zero.
189 Args:
190 free: Boolean mask of shape ``(n,)`` selecting the free assets.
192 Returns:
193 The reciprocal 2-norm condition number of the free block, in ``[0, 1]``.
194 """
195 n_free = int(np.count_nonzero(free))
196 if n_free == 0:
197 return 1.0
198 singular_values = np.linalg.svd(self._centered[:, free], compute_uv=False)
199 eig = self._scale * singular_values**2
200 lam_max = float(eig[0]) + self.ridge
201 if lam_max <= 0.0:
202 return 0.0
203 lam_min = self.ridge if n_free > singular_values.shape[0] else float(eig[-1]) + self.ridge
204 return max(lam_min, 0.0) / lam_max