Coverage for src/cvxcla/operators/gram.py: 100%

61 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-27 06:45 +0000

1"""Data-matrix-backed sample covariance. 

2 

3``GramCovariance`` implements the :class:`~cvxcla.operators._core.QuadraticForm` 

4protocol straight from the ``(T, n)`` returns matrix, storing only the centered 

5data (``O(T n)`` memory) and never forming the ``n x n`` covariance. An optional 

6ridge restores positive definiteness in the short-sample (``T < n``) regime, 

7with the free-block solve done by Woodbury in ``T``-space. 

8""" 

9 

10from __future__ import annotations 

11 

12from dataclasses import dataclass 

13from functools import cached_property 

14from typing import cast 

15 

16import numpy as np 

17from numpy.typing import NDArray 

18 

19 

20@dataclass(frozen=True) 

21class GramCovariance: 

22 """Sample covariance backed by the data matrix, never forming ``Sigma``. 

23 

24 A sample covariance *is* a (scaled, centered) Gram matrix: 

25 ``Sigma = X_c^T X_c / (T - 1)`` where ``X_c`` is the column-centered 

26 ``(T, n)`` returns matrix. This backend stores only ``X_c`` (``O(T n)`` 

27 memory) and implements the ``QuadraticForm`` protocol straight from it, so 

28 the ``n x n`` covariance is never materialised: ``matvec`` is two thin 

29 products ``X_c^T (X_c v)``, and ``solve_free`` works on the free columns of 

30 ``X_c`` alone. 

31 

32 An optional ridge ``delta >= 0`` represents ``Sigma = X_c^T X_c / (T - 1) + 

33 delta I``. It matters because ``X_c^T X_c`` has rank at most ``T - 1``: when 

34 there are fewer observations than assets (``T <= n``) the raw covariance is 

35 singular and the free block becomes unsolvable once the free set outgrows the 

36 data rank (the degeneracy the CLA detects via ``rcond_free``). A positive 

37 ridge restores positive definiteness, and the free-block solve is then done by 

38 the Woodbury identity in the ``T``-dimensional observation space 

39 (``O(n_F T^2 + T^3)``) whenever that is cheaper than the ``n_F x n_F`` normal 

40 equations -- i.e. this is a factor model whose factors are the observations. 

41 

42 When to use: 

43 This is a *memory* play, not an unconditional speed-up. Its win is the 

44 short-sample regime ``T < n`` (a sample covariance from far fewer periods 

45 than assets), where it never forms the ``n x n`` matrix and the 

46 ridge + Woodbury solve works in the small ``T``-space. When ``T >= n`` and 

47 the dense covariance fits in memory, ``DenseCovariance`` is faster per 

48 turning point: it slices a *precomputed* ``Sigma_FF`` once, whereas this 

49 backend re-forms ``X_cF^T X_cF`` (``O(n_F^2 T)``) at every solve. The same 

50 caution applies to ``FactorCovariance`` with many factors: a 

51 diagonal-plus-low-rank solve only pays off while the rank stays well below 

52 ``n`` -- at full rank the structured route is slower than the plain dense 

53 matrix. 

54 

55 Note: 

56 The covariance is built from the *centered* data; the raw Gram 

57 ``X^T X`` is the second-moment matrix, which differs by the rank-one mean 

58 correction ``T x_bar x_bar^T``. This backend centers for you. 

59 

60 Attributes: 

61 returns: The ``(T, n)`` matrix of observations (e.g. asset returns). 

62 ridge: A non-negative diagonal loading added to the covariance. 

63 

64 Examples: 

65 >>> import numpy as np 

66 >>> rng = np.random.default_rng(0) 

67 >>> returns = rng.standard_normal((50, 4)) 

68 >>> gram = GramCovariance(returns) 

69 >>> bool(np.allclose(gram.matvec(np.ones(4)), np.cov(returns, rowvar=False) @ np.ones(4))) 

70 True 

71 """ 

72 

73 returns: NDArray[np.float64] 

74 ridge: float = 0.0 

75 

76 def __post_init__(self) -> None: 

77 """Validate that ``returns`` is a ``(T, n)`` matrix with ``T >= 2`` and ``ridge >= 0``. 

78 

79 Raises: 

80 ValueError: If ``returns`` is not two-dimensional with at least two 

81 rows, or if ``ridge`` is negative. 

82 """ 

83 if self.returns.ndim != 2 or self.returns.shape[0] < 2: 

84 msg = f"returns must be a (T, n) matrix with T >= 2 observations, got shape {self.returns.shape}" 

85 raise ValueError(msg) 

86 if self.ridge < 0.0: 

87 msg = f"ridge must be non-negative, got {self.ridge}" 

88 raise ValueError(msg) 

89 

90 @cached_property 

91 def _centered(self) -> NDArray[np.float64]: 

92 """The column-centered data matrix ``X_c`` of shape ``(T, n)``.""" 

93 return cast("NDArray[np.float64]", self.returns - self.returns.mean(axis=0, keepdims=True)) 

94 

95 @property 

96 def _t(self) -> int: 

97 """Number of observations ``T``.""" 

98 return int(self.returns.shape[0]) 

99 

100 @property 

101 def _scale(self) -> float: 

102 """The sample-covariance scale ``1 / (T - 1)`` (``ddof = 1``, matching ``np.cov``).""" 

103 return 1.0 / (self._t - 1) 

104 

105 @property 

106 def n(self) -> int: 

107 """Number of assets (the dimension of the covariance).""" 

108 return int(self.returns.shape[1]) 

109 

110 def matvec(self, x: NDArray[np.float64]) -> NDArray[np.float64]: 

111 """Compute ``Sigma @ x`` as ``scale * X_c^T (X_c @ x) + ridge * x``. 

112 

113 Args: 

114 x: Vector of shape ``(n,)`` or matrix of shape ``(n, r)``. 

115 

116 Returns: 

117 ``Sigma @ x`` with the same trailing shape as ``x``, in ``O(T n)`` per 

118 column and without forming the ``n x n`` covariance. 

119 """ 

120 xc = self._centered 

121 product = self._scale * (xc.T @ (xc @ x)) 

122 return product + self.ridge * x 

123 

124 def cross(self, free: NDArray[np.bool_], x: NDArray[np.float64]) -> NDArray[np.float64]: 

125 """Compute the free-to-blocked cross product ``Sigma[free][:, ~free] @ x[~free]``. 

126 

127 The ridge is diagonal, so it contributes nothing to this off-diagonal 

128 block; the result is ``scale * X_cF^T (X_cB @ x_B)``. 

129 

130 Args: 

131 free: Boolean mask of shape ``(n,)`` selecting the free assets. 

132 x: Full-length vector of shape ``(n,)``; only the blocked entries 

133 ``x[~free]`` enter the product. 

134 

135 Returns: 

136 Vector of shape ``(n_free,)``. 

137 """ 

138 blocked = ~free 

139 xc_free = self._centered[:, free] 

140 xc_blocked = self._centered[:, blocked] 

141 return self._scale * (xc_free.T @ (xc_blocked @ x[blocked])) 

142 

143 def solve_free(self, free: NDArray[np.bool_], rhs: NDArray[np.float64]) -> NDArray[np.float64]: 

144 """Solve ``Sigma[free][:, free] @ y = rhs`` from the free columns of ``X_c``. 

145 

146 Without a ridge the free block ``scale * X_cF^T X_cF`` is formed and solved 

147 directly (it is singular, and the solve unreliable, once the free set 

148 outgrows the data rank -- which ``rcond_free`` reports so the CLA can 

149 refuse). With a positive ridge the block is positive definite and, when 

150 there are more free assets than observations, the solve is done by the 

151 Woodbury identity in ``T``-space instead of inverting an ``n_F x n_F`` 

152 matrix. 

153 

154 Args: 

155 free: Boolean mask of shape ``(n,)`` selecting the free assets. 

156 rhs: Right-hand side of shape ``(n_free,)`` or ``(n_free, r)``. 

157 

158 Returns: 

159 The solution ``y`` with the same shape as ``rhs``. 

160 """ 

161 xc_free = self._centered[:, free] 

162 n_free = xc_free.shape[1] 

163 

164 if self.ridge > 0.0 and self._t < n_free: 

165 # Woodbury in the T-dimensional observation space: 

166 # (ridge I + W^T W)^{-1} = (1/ridge) I - (1/ridge^2) W^T (I + W W^T / ridge)^{-1} W, 

167 # with W = sqrt(scale) X_cF (shape (T, n_free)). 

168 w = np.sqrt(self._scale) * xc_free 

169 correction_system = np.eye(self._t) + (w @ w.T) / self.ridge 

170 correction = w.T @ np.linalg.solve(correction_system, w @ rhs) 

171 return cast("NDArray[np.float64]", (rhs - correction / self.ridge) / self.ridge) 

172 

173 block = self._scale * (xc_free.T @ xc_free) 

174 if self.ridge > 0.0: 

175 block = block + self.ridge * np.eye(n_free) 

176 return cast("NDArray[np.float64]", np.linalg.solve(block, rhs)) 

177 

178 def rcond_free(self, free: NDArray[np.bool_]) -> float: 

179 """Reciprocal condition number of the free block, from the SVD of ``X_cF``. 

180 

181 The eigenvalues of ``scale * X_cF^T X_cF + ridge I`` are 

182 ``scale * sigma_i^2 + ridge`` for the singular values ``sigma_i`` of 

183 ``X_cF``, plus extra ``ridge`` eigenvalues when there are more free assets 

184 than the rank of ``X_cF``. This reads the conditioning off the SVD of the 

185 ``(T, n_free)`` data block without forming the ``n_F x n_F`` matrix, so a 

186 rank-deficient (``T``-limited) free set correctly drives the result toward 

187 zero. 

188 

189 Args: 

190 free: Boolean mask of shape ``(n,)`` selecting the free assets. 

191 

192 Returns: 

193 The reciprocal 2-norm condition number of the free block, in ``[0, 1]``. 

194 """ 

195 n_free = int(np.count_nonzero(free)) 

196 if n_free == 0: 

197 return 1.0 

198 singular_values = np.linalg.svd(self._centered[:, free], compute_uv=False) 

199 eig = self._scale * singular_values**2 

200 lam_max = float(eig[0]) + self.ridge 

201 if lam_max <= 0.0: 

202 return 0.0 

203 lam_min = self.ridge if n_free > singular_values.shape[0] else float(eig[-1]) + self.ridge 

204 return max(lam_min, 0.0) / lam_max