Coverage for src/cvxcla/operators/dense.py: 100%

109 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-27 06:45 +0000

1"""Dense covariance backends. 

2 

3``DenseCovariance`` is the reference implementation of the 

4:class:`~cvxcla.operators._core.QuadraticForm` protocol, wrapping an explicit 

5``n x n`` matrix. ``IncrementalDenseCovariance`` is a drop-in alternative that 

6maintains the explicit free-block inverse across turning points with rank-one 

7updates, trading numerical robustness for speed on dense problems. 

8""" 

9 

10from __future__ import annotations 

11 

12from dataclasses import dataclass 

13from typing import cast 

14 

15import numpy as np 

16from numpy.typing import NDArray 

17from scipy.linalg import cho_factor, cho_solve # type: ignore[import-untyped] 

18from scipy.linalg.lapack import get_lapack_funcs # type: ignore[import-untyped] 

19 

20from ._core import _RCOND_ESTIMATE_MARGIN, _rcond_symmetric 

21 

22 

23@dataclass(frozen=True) 

24class DenseCovariance: 

25 """Dense reference implementation of the ``CovarianceOperator`` protocol. 

26 

27 Wraps an explicit symmetric positive (semi-)definite covariance matrix and 

28 implements the protocol operations with plain ``numpy`` calls, reproducing 

29 exactly what the CLA does with a raw matrix. 

30 

31 Attributes: 

32 matrix: The covariance matrix of shape ``(n, n)``. 

33 

34 Examples: 

35 >>> import numpy as np 

36 >>> cov = DenseCovariance(np.eye(2)) 

37 >>> cov.n 

38 2 

39 >>> cov.matvec(np.array([1.0, 2.0])) 

40 array([1., 2.]) 

41 """ 

42 

43 matrix: NDArray[np.float64] 

44 

45 def __post_init__(self) -> None: 

46 """Validate that the wrapped matrix is a square, symmetric 2d array. 

47 

48 Raises: 

49 ValueError: If the matrix is not two-dimensional and square, or 

50 not symmetric to numerical tolerance. 

51 """ 

52 if self.matrix.ndim != 2 or self.matrix.shape[0] != self.matrix.shape[1]: 

53 msg = f"Covariance must be a square matrix, got shape {self.matrix.shape}" 

54 raise ValueError(msg) 

55 if not np.allclose(self.matrix, self.matrix.T): 

56 msg = "Covariance must be symmetric" 

57 raise ValueError(msg) 

58 

59 @property 

60 def n(self) -> int: 

61 """Number of assets (the dimension of the covariance).""" 

62 return int(self.matrix.shape[0]) 

63 

64 def matvec(self, x: NDArray[np.float64]) -> NDArray[np.float64]: 

65 """Compute the matrix-vector product ``Sigma @ x``. 

66 

67 Args: 

68 x: Vector of shape ``(n,)`` or matrix of shape ``(n, r)``. 

69 

70 Returns: 

71 ``Sigma @ x`` with the same trailing shape as ``x``. 

72 """ 

73 return self.matrix @ x 

74 

75 def solve_free(self, free: NDArray[np.bool_], rhs: NDArray[np.float64]) -> NDArray[np.float64]: 

76 """Solve the free-block system ``Sigma[free][:, free] @ y = rhs``. 

77 

78 Args: 

79 free: Boolean mask of shape ``(n,)`` selecting the free assets. 

80 rhs: Right-hand side of shape ``(n_free,)`` or ``(n_free, r)``. 

81 

82 Returns: 

83 The solution ``y`` with the same shape as ``rhs``. 

84 """ 

85 block = self.matrix[np.ix_(free, free)] 

86 try: 

87 # The free block of a covariance/Gram matrix is symmetric positive 

88 # definite, so Cholesky is ~1.5x faster than a general LU solve. 

89 return cast("NDArray[np.float64]", cho_solve(cho_factor(block, overwrite_a=True, check_finite=False), rhs)) 

90 except np.linalg.LinAlgError: 

91 # Not numerically positive definite (rank-deficient / indefinite): 

92 # fall back to the general solve, which the degeneracy guards handle. 

93 return cast("NDArray[np.float64]", np.linalg.solve(self.matrix[np.ix_(free, free)], rhs)) 

94 

95 def cross(self, free: NDArray[np.bool_], x: NDArray[np.float64]) -> NDArray[np.float64]: 

96 """Compute the free-to-blocked cross product ``Sigma[free][:, ~free] @ x[~free]``. 

97 

98 Args: 

99 free: Boolean mask of shape ``(n,)`` selecting the free assets. 

100 x: Full-length vector of shape ``(n,)``; only the blocked entries 

101 ``x[~free]`` enter the product. 

102 

103 Returns: 

104 Vector of shape ``(n_free,)``. 

105 """ 

106 blocked = ~free 

107 return self.matrix[np.ix_(free, blocked)] @ x[blocked] 

108 

109 def rcond_free(self, free: NDArray[np.bool_]) -> float: 

110 """Reciprocal condition number of the free block; see the protocol.""" 

111 return _rcond_symmetric(self.matrix[np.ix_(free, free)]) 

112 

113 def rcond_floor_cleared(self, floor: float) -> bool: 

114 """Fast up-front conditioning test: is the whole matrix's rcond at least ``floor``? 

115 

116 An optional optimisation hook (not part of the :class:`QuadraticForm` 

117 protocol, so backends need not provide it): the CLA calls it once, when 

118 present, to decide whether the per-turning-point conditioning guard can be 

119 skipped, and otherwise falls back to the exact ``rcond_free`` on the full 

120 mask. The boolean is identical to ``rcond_free(<all free>) >= floor``. 

121 

122 Settles the common well-conditioned case with a Cholesky factorisation 

123 plus a LAPACK 1-norm condition estimate (``?pocon``) -- ``O(n^3)`` for the 

124 factor but only ``O(n^2)`` for the estimate -- instead of the full 

125 eigendecomposition the exact :meth:`rcond_free` would run. A 

126 non-positive-definite matrix (Cholesky fails) is below any positive floor; 

127 an estimate within :data:`_RCOND_ESTIMATE_MARGIN` of the floor defers to 

128 the exact symmetric rcond, so the answer matches ``rcond_free(all) >= floor``. 

129 """ 

130 try: 

131 cho, _lower = cho_factor(self.matrix, check_finite=False) 

132 except np.linalg.LinAlgError: 

133 return False # not numerically positive definite: below any positive floor 

134 (pocon,) = get_lapack_funcs(("pocon",), (self.matrix,)) 

135 anorm = float(np.linalg.norm(self.matrix, 1)) 

136 rcond_estimate = float(pocon(cho, anorm)[0]) 

137 if rcond_estimate >= floor * _RCOND_ESTIMATE_MARGIN: 

138 return True 

139 return _rcond_symmetric(self.matrix) >= floor 

140 

141 

142class IncrementalDenseCovariance: 

143 """Dense backend that maintains ``Sigma_FF^{-1}`` across turning points. 

144 

145 A drop-in alternative to ``DenseCovariance`` for the dense case. Both wrap an 

146 explicit covariance and return identical results; they differ only in how the 

147 free-block solve is computed. ``DenseCovariance`` factorises ``Sigma_FF`` from 

148 scratch at every turning point (``O(n_F^3)`` each). The CLA changes the free 

149 set by exactly one asset between consecutive turning points, so this backend 

150 instead *maintains* the explicit inverse ``Sigma_FF^{-1}`` and updates it with 

151 a rank-one bordered (asset enters) or deletion (asset leaves) formula 

152 (``O(n_F^2)`` each). Empirically this is roughly ``2x`` faster on dense 

153 problems of a few hundred assets (see ``experiments/inverse_cla.py``). 

154 

155 Two caveats motivate this being opt-in rather than the default: 

156 

157 - **Numerics.** A maintained inverse accumulates floating-point error over the 

158 ``O(n)`` rank-one updates of a trace, whereas a fresh solve is clean each 

159 step. On well-conditioned, positive-definite problems the difference is 

160 negligible, but for ill-conditioned or near-degenerate inputs the plain 

161 ``DenseCovariance`` (or a ``FactorCovariance``) is the safer choice. As a 

162 guard, any non-positive or non-finite pivot triggers a full refactorisation 

163 of that step, and any free-set change that is not a single-asset flip 

164 (e.g. the first solve of a trace) is computed from scratch. 

165 - **Scope.** The win is dense-only. The structured ``FactorCovariance`` never 

166 forms ``Sigma_FF`` --- it solves through Woodbury --- and is already faster 

167 than a maintained dense inverse once the universe is large. 

168 

169 Attributes: 

170 matrix: The covariance matrix of shape ``(n, n)``. 

171 

172 Examples: 

173 >>> import numpy as np 

174 >>> cov = IncrementalDenseCovariance(np.eye(3)) 

175 >>> free = np.array([True, True, False]) 

176 >>> rhs = np.array([1.0, 2.0]) 

177 >>> np.allclose(cov.solve_free(free, rhs), rhs) 

178 True 

179 """ 

180 

181 def __init__(self, matrix: NDArray[np.float64]) -> None: 

182 """Wrap ``matrix`` and validate it is square and symmetric. 

183 

184 Args: 

185 matrix: The covariance matrix of shape ``(n, n)``. 

186 

187 Raises: 

188 ValueError: If the matrix is not a square, symmetric 2d array. 

189 """ 

190 self._dense = DenseCovariance(matrix) 

191 self.matrix = self._dense.matrix 

192 # Cache of the maintained inverse and the free indices it is aligned to, 

193 # both in ascending index order. ``None`` until the first solve. 

194 self._free_idx: NDArray[np.intp] | None = None 

195 self._inv: NDArray[np.float64] | None = None 

196 

197 @property 

198 def n(self) -> int: 

199 """Number of assets (the dimension of the covariance).""" 

200 return self._dense.n 

201 

202 def matvec(self, x: NDArray[np.float64]) -> NDArray[np.float64]: 

203 """Compute the matrix-vector product ``Sigma @ x`` (delegates to dense).""" 

204 return self._dense.matvec(x) 

205 

206 def cross(self, free: NDArray[np.bool_], x: NDArray[np.float64]) -> NDArray[np.float64]: 

207 """Compute the free-to-blocked cross product (delegates to dense).""" 

208 return self._dense.cross(free, x) 

209 

210 def rcond_free(self, free: NDArray[np.bool_]) -> float: 

211 """Reciprocal condition number of the free block (delegates to dense).""" 

212 return self._dense.rcond_free(free) 

213 

214 def rcond_floor_cleared(self, floor: float) -> bool: 

215 """Fast up-front conditioning test (delegates to the wrapped dense matrix).""" 

216 return self._dense.rcond_floor_cleared(floor) 

217 

218 def solve_free(self, free: NDArray[np.bool_], rhs: NDArray[np.float64]) -> NDArray[np.float64]: 

219 """Solve ``Sigma[free][:, free] @ y = rhs`` using the maintained inverse. 

220 

221 Updates the cached ``Sigma_FF^{-1}`` for the single-asset change since the 

222 previous call (or recomputes it from scratch when the change is not a lone 

223 flip or a pivot is degenerate), then returns ``Sigma_FF^{-1} @ rhs``. 

224 

225 Args: 

226 free: Boolean mask of shape ``(n,)`` selecting the free assets. 

227 rhs: Right-hand side of shape ``(n_free,)`` or ``(n_free, r)``. 

228 

229 Returns: 

230 The solution ``y`` with the same shape as ``rhs``. 

231 """ 

232 cur = np.flatnonzero(free) 

233 inv = self._inverse_for(cur) 

234 self._free_idx, self._inv = cur, inv 

235 return inv @ rhs 

236 

237 def _inverse_for(self, cur: NDArray[np.intp]) -> NDArray[np.float64]: 

238 """Return ``Sigma[cur][:, cur]^{-1}``, updating the cache incrementally.""" 

239 prev, prev_inv = self._free_idx, self._inv 

240 if prev is None or prev_inv is None: 

241 return self._refactor(cur) 

242 

243 added = np.setdiff1d(cur, prev, assume_unique=True) 

244 removed = np.setdiff1d(prev, cur, assume_unique=True) 

245 if added.size == 1 and removed.size == 0: 

246 updated = self._insert(prev, prev_inv, int(added[0]), cur) 

247 elif removed.size == 1 and added.size == 0: 

248 updated = self._delete(prev, prev_inv, int(removed[0])) 

249 else: 

250 updated = None # not a single-asset flip; fall back 

251 

252 return updated if updated is not None else self._refactor(cur) 

253 

254 def _refactor(self, cur: NDArray[np.intp]) -> NDArray[np.float64]: 

255 """Invert ``Sigma[cur][:, cur]`` from scratch.""" 

256 if cur.size == 0: 

257 return np.zeros((0, 0)) 

258 return cast("NDArray[np.float64]", np.linalg.inv(self.matrix[np.ix_(cur, cur)])) 

259 

260 def _insert( 

261 self, 

262 prev: NDArray[np.intp], 

263 prev_inv: NDArray[np.float64], 

264 asset: int, 

265 cur: NDArray[np.intp], 

266 ) -> NDArray[np.float64] | None: 

267 """Rank-one bordered update for one asset entering the free set. 

268 

269 Returns the inverse aligned to ascending ``cur`` order, or ``None`` if the 

270 Schur pivot is non-positive/non-finite (caller refactorises instead). 

271 """ 

272 c = self.matrix[prev, asset] 

273 v = prev_inv @ c 

274 schur = float(self.matrix[asset, asset] - c @ v) 

275 if not np.isfinite(schur) or schur <= 0.0: 

276 return None 

277 k = prev.shape[0] 

278 aug = np.empty((k + 1, k + 1)) 

279 aug[:k, :k] = prev_inv + np.outer(v, v) / schur 

280 aug[:k, k] = -v / schur 

281 aug[k, :k] = -v / schur 

282 aug[k, k] = 1.0 / schur 

283 # ``aug`` is ordered [prev..., asset]; permute to ascending ``cur`` order. 

284 perm = np.empty(k + 1, dtype=np.intp) 

285 is_new = cur == asset 

286 perm[is_new] = k 

287 perm[~is_new] = np.searchsorted(prev, cur[~is_new]) 

288 return cast("NDArray[np.float64]", aug[np.ix_(perm, perm)]) 

289 

290 def _delete( 

291 self, 

292 prev: NDArray[np.intp], 

293 prev_inv: NDArray[np.float64], 

294 asset: int, 

295 ) -> NDArray[np.float64] | None: 

296 """Rank-one deletion update for one asset leaving the free set. 

297 

298 Returns the inverse aligned to the remaining (ascending) order, or ``None`` 

299 if the pivot is non-positive/non-finite (caller refactorises instead). 

300 """ 

301 p = int(np.searchsorted(prev, asset)) 

302 pivot = float(prev_inv[p, p]) 

303 if not np.isfinite(pivot) or pivot <= 0.0: 

304 return None 

305 mask = np.ones(prev.shape[0], dtype=bool) 

306 mask[p] = False 

307 col = prev_inv[mask, p] 

308 return cast("NDArray[np.float64]", prev_inv[np.ix_(mask, mask)] - np.outer(col, col) / pivot)