Coverage for src/cvxcla/operators/dense.py: 100%
109 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-27 06:45 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-27 06:45 +0000
1"""Dense covariance backends.
3``DenseCovariance`` is the reference implementation of the
4:class:`~cvxcla.operators._core.QuadraticForm` protocol, wrapping an explicit
5``n x n`` matrix. ``IncrementalDenseCovariance`` is a drop-in alternative that
6maintains the explicit free-block inverse across turning points with rank-one
7updates, trading numerical robustness for speed on dense problems.
8"""
10from __future__ import annotations
12from dataclasses import dataclass
13from typing import cast
15import numpy as np
16from numpy.typing import NDArray
17from scipy.linalg import cho_factor, cho_solve # type: ignore[import-untyped]
18from scipy.linalg.lapack import get_lapack_funcs # type: ignore[import-untyped]
20from ._core import _RCOND_ESTIMATE_MARGIN, _rcond_symmetric
23@dataclass(frozen=True)
24class DenseCovariance:
25 """Dense reference implementation of the ``CovarianceOperator`` protocol.
27 Wraps an explicit symmetric positive (semi-)definite covariance matrix and
28 implements the protocol operations with plain ``numpy`` calls, reproducing
29 exactly what the CLA does with a raw matrix.
31 Attributes:
32 matrix: The covariance matrix of shape ``(n, n)``.
34 Examples:
35 >>> import numpy as np
36 >>> cov = DenseCovariance(np.eye(2))
37 >>> cov.n
38 2
39 >>> cov.matvec(np.array([1.0, 2.0]))
40 array([1., 2.])
41 """
43 matrix: NDArray[np.float64]
45 def __post_init__(self) -> None:
46 """Validate that the wrapped matrix is a square, symmetric 2d array.
48 Raises:
49 ValueError: If the matrix is not two-dimensional and square, or
50 not symmetric to numerical tolerance.
51 """
52 if self.matrix.ndim != 2 or self.matrix.shape[0] != self.matrix.shape[1]:
53 msg = f"Covariance must be a square matrix, got shape {self.matrix.shape}"
54 raise ValueError(msg)
55 if not np.allclose(self.matrix, self.matrix.T):
56 msg = "Covariance must be symmetric"
57 raise ValueError(msg)
59 @property
60 def n(self) -> int:
61 """Number of assets (the dimension of the covariance)."""
62 return int(self.matrix.shape[0])
64 def matvec(self, x: NDArray[np.float64]) -> NDArray[np.float64]:
65 """Compute the matrix-vector product ``Sigma @ x``.
67 Args:
68 x: Vector of shape ``(n,)`` or matrix of shape ``(n, r)``.
70 Returns:
71 ``Sigma @ x`` with the same trailing shape as ``x``.
72 """
73 return self.matrix @ x
75 def solve_free(self, free: NDArray[np.bool_], rhs: NDArray[np.float64]) -> NDArray[np.float64]:
76 """Solve the free-block system ``Sigma[free][:, free] @ y = rhs``.
78 Args:
79 free: Boolean mask of shape ``(n,)`` selecting the free assets.
80 rhs: Right-hand side of shape ``(n_free,)`` or ``(n_free, r)``.
82 Returns:
83 The solution ``y`` with the same shape as ``rhs``.
84 """
85 block = self.matrix[np.ix_(free, free)]
86 try:
87 # The free block of a covariance/Gram matrix is symmetric positive
88 # definite, so Cholesky is ~1.5x faster than a general LU solve.
89 return cast("NDArray[np.float64]", cho_solve(cho_factor(block, overwrite_a=True, check_finite=False), rhs))
90 except np.linalg.LinAlgError:
91 # Not numerically positive definite (rank-deficient / indefinite):
92 # fall back to the general solve, which the degeneracy guards handle.
93 return cast("NDArray[np.float64]", np.linalg.solve(self.matrix[np.ix_(free, free)], rhs))
95 def cross(self, free: NDArray[np.bool_], x: NDArray[np.float64]) -> NDArray[np.float64]:
96 """Compute the free-to-blocked cross product ``Sigma[free][:, ~free] @ x[~free]``.
98 Args:
99 free: Boolean mask of shape ``(n,)`` selecting the free assets.
100 x: Full-length vector of shape ``(n,)``; only the blocked entries
101 ``x[~free]`` enter the product.
103 Returns:
104 Vector of shape ``(n_free,)``.
105 """
106 blocked = ~free
107 return self.matrix[np.ix_(free, blocked)] @ x[blocked]
109 def rcond_free(self, free: NDArray[np.bool_]) -> float:
110 """Reciprocal condition number of the free block; see the protocol."""
111 return _rcond_symmetric(self.matrix[np.ix_(free, free)])
113 def rcond_floor_cleared(self, floor: float) -> bool:
114 """Fast up-front conditioning test: is the whole matrix's rcond at least ``floor``?
116 An optional optimisation hook (not part of the :class:`QuadraticForm`
117 protocol, so backends need not provide it): the CLA calls it once, when
118 present, to decide whether the per-turning-point conditioning guard can be
119 skipped, and otherwise falls back to the exact ``rcond_free`` on the full
120 mask. The boolean is identical to ``rcond_free(<all free>) >= floor``.
122 Settles the common well-conditioned case with a Cholesky factorisation
123 plus a LAPACK 1-norm condition estimate (``?pocon``) -- ``O(n^3)`` for the
124 factor but only ``O(n^2)`` for the estimate -- instead of the full
125 eigendecomposition the exact :meth:`rcond_free` would run. A
126 non-positive-definite matrix (Cholesky fails) is below any positive floor;
127 an estimate within :data:`_RCOND_ESTIMATE_MARGIN` of the floor defers to
128 the exact symmetric rcond, so the answer matches ``rcond_free(all) >= floor``.
129 """
130 try:
131 cho, _lower = cho_factor(self.matrix, check_finite=False)
132 except np.linalg.LinAlgError:
133 return False # not numerically positive definite: below any positive floor
134 (pocon,) = get_lapack_funcs(("pocon",), (self.matrix,))
135 anorm = float(np.linalg.norm(self.matrix, 1))
136 rcond_estimate = float(pocon(cho, anorm)[0])
137 if rcond_estimate >= floor * _RCOND_ESTIMATE_MARGIN:
138 return True
139 return _rcond_symmetric(self.matrix) >= floor
142class IncrementalDenseCovariance:
143 """Dense backend that maintains ``Sigma_FF^{-1}`` across turning points.
145 A drop-in alternative to ``DenseCovariance`` for the dense case. Both wrap an
146 explicit covariance and return identical results; they differ only in how the
147 free-block solve is computed. ``DenseCovariance`` factorises ``Sigma_FF`` from
148 scratch at every turning point (``O(n_F^3)`` each). The CLA changes the free
149 set by exactly one asset between consecutive turning points, so this backend
150 instead *maintains* the explicit inverse ``Sigma_FF^{-1}`` and updates it with
151 a rank-one bordered (asset enters) or deletion (asset leaves) formula
152 (``O(n_F^2)`` each). Empirically this is roughly ``2x`` faster on dense
153 problems of a few hundred assets (see ``experiments/inverse_cla.py``).
155 Two caveats motivate this being opt-in rather than the default:
157 - **Numerics.** A maintained inverse accumulates floating-point error over the
158 ``O(n)`` rank-one updates of a trace, whereas a fresh solve is clean each
159 step. On well-conditioned, positive-definite problems the difference is
160 negligible, but for ill-conditioned or near-degenerate inputs the plain
161 ``DenseCovariance`` (or a ``FactorCovariance``) is the safer choice. As a
162 guard, any non-positive or non-finite pivot triggers a full refactorisation
163 of that step, and any free-set change that is not a single-asset flip
164 (e.g. the first solve of a trace) is computed from scratch.
165 - **Scope.** The win is dense-only. The structured ``FactorCovariance`` never
166 forms ``Sigma_FF`` --- it solves through Woodbury --- and is already faster
167 than a maintained dense inverse once the universe is large.
169 Attributes:
170 matrix: The covariance matrix of shape ``(n, n)``.
172 Examples:
173 >>> import numpy as np
174 >>> cov = IncrementalDenseCovariance(np.eye(3))
175 >>> free = np.array([True, True, False])
176 >>> rhs = np.array([1.0, 2.0])
177 >>> np.allclose(cov.solve_free(free, rhs), rhs)
178 True
179 """
181 def __init__(self, matrix: NDArray[np.float64]) -> None:
182 """Wrap ``matrix`` and validate it is square and symmetric.
184 Args:
185 matrix: The covariance matrix of shape ``(n, n)``.
187 Raises:
188 ValueError: If the matrix is not a square, symmetric 2d array.
189 """
190 self._dense = DenseCovariance(matrix)
191 self.matrix = self._dense.matrix
192 # Cache of the maintained inverse and the free indices it is aligned to,
193 # both in ascending index order. ``None`` until the first solve.
194 self._free_idx: NDArray[np.intp] | None = None
195 self._inv: NDArray[np.float64] | None = None
197 @property
198 def n(self) -> int:
199 """Number of assets (the dimension of the covariance)."""
200 return self._dense.n
202 def matvec(self, x: NDArray[np.float64]) -> NDArray[np.float64]:
203 """Compute the matrix-vector product ``Sigma @ x`` (delegates to dense)."""
204 return self._dense.matvec(x)
206 def cross(self, free: NDArray[np.bool_], x: NDArray[np.float64]) -> NDArray[np.float64]:
207 """Compute the free-to-blocked cross product (delegates to dense)."""
208 return self._dense.cross(free, x)
210 def rcond_free(self, free: NDArray[np.bool_]) -> float:
211 """Reciprocal condition number of the free block (delegates to dense)."""
212 return self._dense.rcond_free(free)
214 def rcond_floor_cleared(self, floor: float) -> bool:
215 """Fast up-front conditioning test (delegates to the wrapped dense matrix)."""
216 return self._dense.rcond_floor_cleared(floor)
218 def solve_free(self, free: NDArray[np.bool_], rhs: NDArray[np.float64]) -> NDArray[np.float64]:
219 """Solve ``Sigma[free][:, free] @ y = rhs`` using the maintained inverse.
221 Updates the cached ``Sigma_FF^{-1}`` for the single-asset change since the
222 previous call (or recomputes it from scratch when the change is not a lone
223 flip or a pivot is degenerate), then returns ``Sigma_FF^{-1} @ rhs``.
225 Args:
226 free: Boolean mask of shape ``(n,)`` selecting the free assets.
227 rhs: Right-hand side of shape ``(n_free,)`` or ``(n_free, r)``.
229 Returns:
230 The solution ``y`` with the same shape as ``rhs``.
231 """
232 cur = np.flatnonzero(free)
233 inv = self._inverse_for(cur)
234 self._free_idx, self._inv = cur, inv
235 return inv @ rhs
237 def _inverse_for(self, cur: NDArray[np.intp]) -> NDArray[np.float64]:
238 """Return ``Sigma[cur][:, cur]^{-1}``, updating the cache incrementally."""
239 prev, prev_inv = self._free_idx, self._inv
240 if prev is None or prev_inv is None:
241 return self._refactor(cur)
243 added = np.setdiff1d(cur, prev, assume_unique=True)
244 removed = np.setdiff1d(prev, cur, assume_unique=True)
245 if added.size == 1 and removed.size == 0:
246 updated = self._insert(prev, prev_inv, int(added[0]), cur)
247 elif removed.size == 1 and added.size == 0:
248 updated = self._delete(prev, prev_inv, int(removed[0]))
249 else:
250 updated = None # not a single-asset flip; fall back
252 return updated if updated is not None else self._refactor(cur)
254 def _refactor(self, cur: NDArray[np.intp]) -> NDArray[np.float64]:
255 """Invert ``Sigma[cur][:, cur]`` from scratch."""
256 if cur.size == 0:
257 return np.zeros((0, 0))
258 return cast("NDArray[np.float64]", np.linalg.inv(self.matrix[np.ix_(cur, cur)]))
260 def _insert(
261 self,
262 prev: NDArray[np.intp],
263 prev_inv: NDArray[np.float64],
264 asset: int,
265 cur: NDArray[np.intp],
266 ) -> NDArray[np.float64] | None:
267 """Rank-one bordered update for one asset entering the free set.
269 Returns the inverse aligned to ascending ``cur`` order, or ``None`` if the
270 Schur pivot is non-positive/non-finite (caller refactorises instead).
271 """
272 c = self.matrix[prev, asset]
273 v = prev_inv @ c
274 schur = float(self.matrix[asset, asset] - c @ v)
275 if not np.isfinite(schur) or schur <= 0.0:
276 return None
277 k = prev.shape[0]
278 aug = np.empty((k + 1, k + 1))
279 aug[:k, :k] = prev_inv + np.outer(v, v) / schur
280 aug[:k, k] = -v / schur
281 aug[k, :k] = -v / schur
282 aug[k, k] = 1.0 / schur
283 # ``aug`` is ordered [prev..., asset]; permute to ascending ``cur`` order.
284 perm = np.empty(k + 1, dtype=np.intp)
285 is_new = cur == asset
286 perm[is_new] = k
287 perm[~is_new] = np.searchsorted(prev, cur[~is_new])
288 return cast("NDArray[np.float64]", aug[np.ix_(perm, perm)])
290 def _delete(
291 self,
292 prev: NDArray[np.intp],
293 prev_inv: NDArray[np.float64],
294 asset: int,
295 ) -> NDArray[np.float64] | None:
296 """Rank-one deletion update for one asset leaving the free set.
298 Returns the inverse aligned to the remaining (ascending) order, or ``None``
299 if the pivot is non-positive/non-finite (caller refactorises instead).
300 """
301 p = int(np.searchsorted(prev, asset))
302 pivot = float(prev_inv[p, p])
303 if not np.isfinite(pivot) or pivot <= 0.0:
304 return None
305 mask = np.ones(prev.shape[0], dtype=bool)
306 mask[p] = False
307 col = prev_inv[mask, p]
308 return cast("NDArray[np.float64]", prev_inv[np.ix_(mask, mask)] - np.outer(col, col) / pivot)