Coverage for src/cvxcla/lasso.py: 100%
198 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-27 06:45 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-27 06:45 +0000
1"""LASSO / LARS regularisation path as a parametric active-set problem.
3This module shows that the Critical Line Algorithm's machinery is not specific to
4portfolios: the *same* ``cvxcla.pathtracer.trace`` loop, the *same*
5``QuadraticForm`` operator, and the *same* Bland event selection trace the LASSO
6homotopy. Only the problem-specific glue (the segment solve and what an event
7means) differs.
9The LASSO solves, for a response ``y`` and design matrix ``X``,
11 minimize 1/2 ||y - X beta||^2 + lam ||beta||_1
13and its minimiser ``beta(lam)`` is continuous and piecewise linear in the penalty
14``lam``. On a segment where the active set ``A`` (the support) and the signs
15``s_A`` are fixed,
17 beta_A(lam) = (X_A^T X_A)^{-1} (X_A^T y - lam s_A) = alpha_A - lam * beta_slope_A
18 correlation(lam) = X^T (y - X beta(lam)) = p + lam * q
20with ``|correlation_j| <= lam`` off the support and ``correlation_j = lam s_j`` on
21it. The role played by the covariance ``Sigma`` and mean ``mu`` in the CLA is
22played here by the Gram matrix ``H = X^T X`` (wrapped in ``DenseCovariance``) and
23the vector ``X^T y``.
25**Constraints.** Like the CLA, the path tracer admits general linear inequality
26constraints ``G beta <= h``. An active row enters the reduced KKT system exactly as
27in the CLA (the bordered Schur complement of ``cla.py``), and the generalised
28correlation that drives the enter/leave events carries the active-row multipliers,
29``correlation(lam) = X^T y - H beta(lam) - G_S^T eta(lam)``. The constrained path is
30still piecewise linear (a quadratic loss under a polyhedral penalty *and* polyhedral
31constraints; cf. Rosset and Zhu). We require ``h > 0`` so the path can start from
32``beta = 0`` with every row slack -- the same first vertex as the unconstrained
33LASSO. (Equality constraints, or ``h`` with a zero entry, need a feasibility seed
34analogous to the CLA's linear-programming first vertex, and are left to future work.)
36Event families, mirroring the CLA's "move to / leave a bound":
38* **leave** -- an active coefficient reaches zero: ``lam = alpha_j / beta_slope_j``.
39* **enter** -- an inactive (generalised) correlation reaches ``+/-lam``.
40* **activate** -- a slack inequality row's residual ``G_r beta - h_r`` reaches zero.
41* **release** -- an active row's multiplier ``eta_r`` reaches zero.
42"""
44from __future__ import annotations
46from dataclasses import dataclass, field
47from functools import cached_property
48from itertools import pairwise
49from typing import TYPE_CHECKING, NamedTuple, cast
51import numpy as np
52from numpy.typing import NDArray
54from .operators import DenseCovariance, GramCovariance, QuadraticForm, bordered_solve
55from .pathtracer import InequalityConstrained, trace
57if TYPE_CHECKING:
58 from .builder import LassoBuilder
61class _LassoState(NamedTuple):
62 """The support, sign pattern, active inequality rows, and current penalty.
64 ``lam`` is the penalty at the segment's upper end. The event scan uses it to
65 require *strict* progress to a smaller penalty: right after a coefficient
66 enters it sits at zero, so its leave event lies at the current ``lam``; without
67 the strict window the shared selector would re-fire it and the walk would cycle
68 between entering and leaving the same coordinate.
69 """
71 active: NDArray[np.bool_]
72 signs: NDArray[np.float64]
73 rows_active: NDArray[np.bool_]
74 lam: float
77class _LassoSegment(NamedTuple):
78 """The affine path ``beta(lam) = alpha - lam * beta_slope`` and its multipliers.
80 ``eta_alpha``/``eta_slope`` give the active-row multiplier path
81 ``eta(lam) = eta_alpha + lam * eta_slope`` (length ``p``, nonzero only on active
82 rows); ``p``/``q`` give the generalised correlation ``p + lam * q``.
83 """
85 alpha: NDArray[np.float64]
86 beta_slope: NDArray[np.float64]
87 p: NDArray[np.float64]
88 q: NDArray[np.float64]
89 eta_alpha: NDArray[np.float64]
90 eta_slope: NDArray[np.float64]
93@dataclass(frozen=True)
94class Breakpoint:
95 """A vertex of the piecewise-linear LASSO path.
97 Attributes:
98 lam: The penalty value at this breakpoint.
99 beta: The coefficient vector ``beta(lam)``.
100 active: Boolean mask of the support (non-zero coefficients) on the
101 segment leaving this breakpoint towards smaller ``lam``.
102 """
104 lam: float
105 beta: NDArray[np.float64]
106 active: NDArray[np.bool_]
109@dataclass
110class Lasso(InequalityConstrained):
111 """The LASSO regularisation path, traced as a parametric active-set problem.
113 Constructing a ``Lasso`` traces the entire path from ``lam_max`` (where
114 ``beta = 0``) down to ``lam = 0`` (the least-squares fit on the final support,
115 subject to any active constraints), storing the breakpoints in ``path``. The
116 walk is driven by the same ``cvxcla.pathtracer.trace`` loop as the Critical Line
117 Algorithm.
119 Optional linear inequality constraints ``G beta <= h`` (with ``h > 0``) are
120 traced through the same bordered solve as the CLA's ``G w <= h`` rows.
122 The quadratic form may be given either as a dense design ``(x, y)`` (the usual
123 case, ``H = X^T X``) or, via :meth:`from_operator`, as a ``QuadraticForm``
124 operator with the linear term ``X^T y``. The operator route lets a structured
125 form, a diagonal-plus-low-rank factor model or a kernel, drive the path in
126 ``O(nk)`` per step without forming the ``n x n`` Gram matrix, exactly as on the
127 portfolio side.
129 Attributes:
130 x: Design matrix of shape ``(m, n)`` (``None`` in operator mode).
131 y: Response vector of shape ``(m,)`` (``None`` in operator mode).
132 g: Optional inequality matrix ``(p, n)`` of ``G beta <= h``; ``None`` means
133 the plain LASSO.
134 h: Optional inequality right-hand side ``(p,)``; must be strictly positive.
135 tol: Tolerance for event selection and the validity window.
136 path: The discovered breakpoints, populated on construction.
137 quad_form: Optional ``QuadraticForm`` operator ``H`` (operator mode).
138 linear: Optional linear term ``X^T y`` of shape ``(n,)`` (operator mode).
139 """
141 x: NDArray[np.float64] | None = None
142 y: NDArray[np.float64] | None = None
143 g: NDArray[np.float64] | None = None
144 h: NDArray[np.float64] | None = None
145 nonneg: bool = False # pragma: no mutate
146 gram: bool = False # pragma: no mutate
147 tol: float = 1e-9 # pragma: no mutate
148 path: list[Breakpoint] = field(default_factory=list)
149 quad_form: QuadraticForm | None = None # pragma: no mutate
150 linear: NDArray[np.float64] | None = None # pragma: no mutate
152 def __post_init__(self) -> None:
153 """Validate shapes and trace the full LASSO path.
155 Raises:
156 ValueError: If ``x`` is not 2d, ``y``'s length does not match ``x``, the
157 constraint shapes are inconsistent, or any ``h`` entry is not
158 strictly positive (which would make ``beta = 0`` infeasible).
159 """
160 operator_mode = self.quad_form is not None or self.linear is not None
161 if operator_mode:
162 if self.quad_form is None or self.linear is None:
163 msg = "quad_form and linear (X^T y) must be provided together"
164 raise ValueError(msg)
165 if self.x is not None or self.y is not None:
166 msg = "supply either a design (x, y) or an operator (quad_form, linear), not both"
167 raise ValueError(msg)
168 self.linear = np.asarray(self.linear, dtype=np.float64)
169 if self.linear.ndim != 1:
170 msg = f"linear must be the 1d vector X^T y, got shape {self.linear.shape}"
171 raise ValueError(msg)
172 else:
173 if self.x is None or self.y is None:
174 msg = "provide a design (x, y) or an operator (quad_form, linear)"
175 raise ValueError(msg)
176 if self.x.ndim != 2:
177 msg = f"x must be a 2d design matrix, got shape {self.x.shape}"
178 raise ValueError(msg)
179 if self.y.shape != (self.x.shape[0],):
180 msg = f"y must have shape ({self.x.shape[0]},), got {self.y.shape}"
181 raise ValueError(msg)
182 if self.g is not None or self.h is not None:
183 if self.g is None or self.h is None:
184 msg = "g and h must be provided together"
185 raise ValueError(msg)
186 if self.g.shape != (self.h.shape[0], self.dimension):
187 msg = f"g must have shape ({self.h.shape[0]}, {self.dimension}), got {self.g.shape}"
188 raise ValueError(msg)
189 if np.any(self.h <= self.tol):
190 msg = "h must be strictly positive so beta = 0 is feasible (equality/zero-h needs a feasibility seed)"
191 raise ValueError(msg)
192 trace(self)
194 @classmethod
195 def problem(cls, x: NDArray[np.float64], y: NDArray[np.float64]) -> LassoBuilder:
196 """Start a fluent :class:`cvxcla.builder.LassoBuilder` for a LASSO path.
198 The LASSO counterpart of :meth:`cvxcla.cla.CLA.problem`: chain
199 ``.inequality(G, h)`` and finish with ``.trace()``. The builder maps onto the
200 constructor arguments and adds no modelling power.
202 Args:
203 x: Design matrix of shape ``(m, n)``.
204 y: Response vector of shape ``(m,)``.
206 Returns:
207 A :class:`cvxcla.builder.LassoBuilder`.
208 """
209 from .builder import LassoBuilder
211 return LassoBuilder(x, y)
213 @classmethod
214 def from_operator(
215 cls,
216 quad: QuadraticForm,
217 xty: NDArray[np.float64],
218 *,
219 g: NDArray[np.float64] | None = None,
220 h: NDArray[np.float64] | None = None,
221 nonneg: bool = False,
222 tol: float = 1e-9,
223 ) -> Lasso:
224 """Trace a LASSO path with the quadratic form supplied as an operator.
226 The regression counterpart of :class:`cvxcla.cla.CLA` accepting a
227 ``QuadraticForm`` covariance. Instead of a dense design ``X``, pass the Gram
228 operator ``H`` (anything implementing :class:`QuadraticForm`, for example a
229 :class:`cvxcla.operators.FactorCovariance` or a kernel) together with the
230 linear term ``X^T y``. The homotopy reaches ``H`` only through ``matvec`` and
231 ``solve_free``, so a diagonal-plus-low-rank factor model or a kernel traces
232 the path in ``O(nk)`` per step without ever forming the ``n x n`` matrix,
233 exactly as on the portfolio side. For the path to coincide with the
234 design-matrix LASSO one needs ``H = X^T X`` and ``xty = X^T y``
235 (Theorem 1); any positive-semidefinite operator whose free blocks are
236 positive definite traces a well-defined path.
238 Args:
239 quad: The quadratic form ``H`` as a :class:`QuadraticForm` operator.
240 xty: The linear term ``X^T y`` of shape ``(n,)``.
241 g: Optional inequality matrix of ``G beta <= h``.
242 h: Optional inequality right-hand side; entries must be strictly positive.
243 nonneg: Restrict the path to ``beta >= 0``.
244 tol: Tolerance for event selection and the validity window.
246 Returns:
247 A traced :class:`Lasso` whose ``path`` holds the breakpoints.
248 """
249 return cls(
250 quad_form=quad,
251 linear=np.asarray(xty, dtype=np.float64),
252 g=g,
253 h=h,
254 nonneg=nonneg,
255 tol=tol,
256 )
258 @cached_property
259 def quad(self) -> QuadraticForm:
260 """The Gram matrix ``X^T X`` as a ``QuadraticForm`` backend (cached: ``X`` is fixed).
262 With ``gram=True`` the data-matrix backend is used instead of forming the
263 ``n x n`` Gram: it solves through the Woodbury identity in the
264 ``m``-dimensional observation space and never materialises an ``n x n``
265 matrix, the win in the high-dimensional ``p >> n`` regime (more features than
266 observations). ``GramCovariance`` represents ``X_c^T X_c / (m-1)``, so scaling
267 the data by ``sqrt(m-1)`` recovers ``X^T X`` exactly for a **centred** design
268 (the standard LASSO convention; pass a column-centred ``x``).
269 """
270 if self.quad_form is not None:
271 return self.quad_form
272 # Not operator mode, so __post_init__ guarantees a design matrix.
273 x = cast("NDArray[np.float64]", self.x)
274 if self.gram:
275 m = x.shape[0]
276 return GramCovariance(x * np.sqrt(m - 1.0))
277 return DenseCovariance(x.T @ x)
279 @cached_property
280 def xty(self) -> NDArray[np.float64]:
281 """The linear data ``X^T y`` (the analogue of the CLA's expected returns; cached)."""
282 if self.linear is not None:
283 return self.linear
284 # Not operator mode, so __post_init__ guarantees a design (x, y).
285 x = cast("NDArray[np.float64]", self.x)
286 y = cast("NDArray[np.float64]", self.y)
287 return x.T @ y
289 @property
290 def dimension(self) -> int:
291 """Number of features ``n`` (the problem dimension for the path tracer)."""
292 if self.x is not None:
293 return int(self.x.shape[1])
294 return int(self.xty.shape[0])
296 @property
297 def lam_max(self) -> float:
298 """The smallest penalty at which ``beta = 0`` is optimal: ``||X^T y||_inf``.
300 With ``h > 0`` every inequality row is slack at ``beta = 0`` (zero
301 multiplier), so the unconstrained threshold is unchanged.
302 """
303 return float(np.max(np.abs(self.xty)))
305 def begin(self) -> tuple[float, _LassoState]:
306 """Record the all-zero solution at the start penalty and enter the first coordinate.
308 For the plain or inequality-constrained LASSO the start is
309 ``lam_max = ||X^T y||_inf`` and the most-correlated coordinate enters with its
310 sign. Under the non-negative restriction ``beta >= 0`` the l1 penalty becomes
311 the linear term ``lam * 1^T beta``, only positive correlations can enter, so
312 the start is ``lam_max = max_j (X^T y)_j`` and the coordinate enters with sign
313 ``+``. When no coordinate can enter (e.g. every correlation is non-positive
314 under ``beta >= 0``), ``beta = 0`` is optimal for all ``lambda`` and the path
315 is the single point.
316 """
317 n = self.dimension
318 xty = self.xty
319 rows_active = np.zeros(self.g_matrix.shape[0], dtype=bool)
320 if self.nonneg:
321 lam_max = float(np.max(xty)) if n else 0.0
322 j0, s0 = int(np.argmax(xty)), 1.0
323 else:
324 lam_max = self.lam_max
325 j0 = int(np.argmax(np.abs(xty)))
326 s0 = float(np.sign(xty[j0]))
328 self.path.append(Breakpoint(max(lam_max, 0.0), np.zeros(n), np.zeros(n, dtype=bool)))
329 active = np.zeros(n, dtype=bool)
330 signs = np.zeros(n)
331 if lam_max > self.tol:
332 active[j0] = True
333 signs[j0] = s0
334 return max(lam_max, 0.0), _LassoState(active, signs, rows_active, max(lam_max, 0.0))
336 def segment(self, state: _LassoState) -> _LassoSegment:
337 """Solve the affine segment for the current support, signs, and active rows.
339 With no active rows this is the plain LASSO solve against the Gram
340 submatrix. With active rows it is the bordered Schur solve of the CLA: the
341 active rows ``G_S`` enter the reduced KKT system as extra equality rows.
342 """
343 n = self.dimension
344 active, signs, rows_active = state.active, state.signs, state.rows_active
345 xty = self.xty
346 alpha = np.zeros(n)
347 beta_slope = np.zeros(n)
348 eta_alpha = np.zeros(self.g_matrix.shape[0])
349 eta_slope = np.zeros(self.g_matrix.shape[0])
351 xty_s = xty[active]
352 signs_s = signs[active]
353 if not np.any(active):
354 # Empty support (e.g. the non-negative path when no correlation is
355 # positive): beta = 0, correlation = X^T y, and there is nothing to solve.
356 return _LassoSegment(alpha, beta_slope, xty.copy(), np.zeros(n), eta_alpha, eta_slope)
358 # The active rows G_RS act as equality rows in the reduced KKT system, exactly
359 # the CLA's bordered Schur solve (operators.bordered_solve). With no active rows
360 # (|R| = 0) this reduces to the plain LASSO solve beta_S(lam) = H_SS^{-1}(xty_S -
361 # lam s_S). The slope's constraint right-hand side is zero, and the slope
362 # multiplier nu carries the opposite sign convention to eta(lam) (beta = alpha -
363 # lam beta_slope here, vs w = r_alpha + lam r_beta in the CLA), hence the flip.
364 g_rs = self.g_matrix[np.ix_(rows_active, active)] # |R| x |S|
365 h_r = self.h_vector[rows_active]
366 x_const, x_slope, eta_a, nu_slope = bordered_solve(
367 self.quad, active, g_rs, xty_s, signs_s, h_r, np.zeros(g_rs.shape[0])
368 )
369 alpha[active] = x_const
370 beta_slope[active] = x_slope
371 eta_alpha[rows_active] = eta_a
372 eta_slope[rows_active] = -nu_slope
374 # Generalised correlation c(lam) = xty - H beta(lam) - G_R^T eta(lam) = p + lam q.
375 g_r = self.g_matrix[rows_active]
376 eta_a_r = eta_alpha[rows_active]
377 eta_s_r = eta_slope[rows_active]
378 p = xty - self.quad.matvec(alpha) - g_r.T @ eta_a_r
379 q = self.quad.matvec(beta_slope) - g_r.T @ eta_s_r
380 return _LassoSegment(alpha, beta_slope, p, q, eta_alpha, eta_slope)
382 def event_matrix(self, state: _LassoState, segment: _LassoSegment) -> NDArray[np.float64]:
383 """Return the ``(n + p, 4)`` matrix of candidate critical lambdas.
385 Rows ``0..n-1`` are coordinate events (col 0 leave, col 1 enter ``+``, col 2
386 enter ``-``); rows ``n..n+p-1`` are inequality-row events (col 0 activate, col
387 1 release). Entries are ``-inf`` where the event cannot occur, and only
388 events strictly inside ``(tol, lam - tol)`` are kept.
389 """
390 n = self.dimension
391 rows = self.g_matrix.shape[0]
392 active = state.active
393 inactive = ~active
394 alpha, beta_slope, p, q, eta_alpha, eta_slope = segment
395 rows_active = state.rows_active
397 l_mat = np.full((n + rows, 4), -np.inf)
399 # leave: alpha_j - lam beta_slope_j = 0
400 leaves = active & (np.abs(beta_slope) > self.tol) # pragma: no mutate
401 l_mat[:n][leaves, 0] = alpha[leaves] / beta_slope[leaves]
403 # enter (+): p_j + lam q_j = +lam -> lam = p_j / (1 - q_j)
404 denom_pos = 1.0 - q
405 enters_pos = inactive & (np.abs(denom_pos) > self.tol) # pragma: no mutate
406 l_mat[:n][enters_pos, 1] = p[enters_pos] / denom_pos[enters_pos]
408 # enter (-): p_j + lam q_j = -lam -> lam = -p_j / (1 + q_j). Disabled under the
409 # non-negative restriction beta >= 0, where only positive entries are allowed.
410 if not self.nonneg:
411 denom_neg = 1.0 + q
412 enters_neg = inactive & (np.abs(denom_neg) > self.tol) # pragma: no mutate
413 l_mat[:n][enters_neg, 2] = -p[enters_neg] / denom_neg[enters_neg]
415 if rows:
416 g_mat = self.g_matrix
417 slope_row = g_mat @ beta_slope # d/d(-lam) of the row value
418 level_row = g_mat @ alpha - self.h_vector # G_r alpha - h_r
419 # activate: the row value G_r beta(lam) = level + h_r - lam slope rises to
420 # the cap h_r as lam decreases when its slope d(value)/d(-lam) = slope_row
421 # is positive; the crossing is lam = (G_r alpha - h_r) / (G_r beta_slope).
422 inactive_rows = ~rows_active & (slope_row > self.tol) # pragma: no mutate
423 l_mat[n:][inactive_rows, 0] = level_row[inactive_rows] / slope_row[inactive_rows]
424 # release: eta_r(lam) = eta_alpha + lam eta_slope -> 0 from eta > 0, i.e. eta_slope > 0.
425 releasing = rows_active & (eta_slope > self.tol) # pragma: no mutate
426 l_mat[n:][releasing, 1] = -eta_alpha[releasing] / eta_slope[releasing]
428 # Keep only events that make strict progress to a smaller, positive penalty.
429 l_mat[(l_mat <= self.tol) | (l_mat >= state.lam - self.tol)] = -np.inf
430 return l_mat
432 def step(self, state: _LassoState, segment: _LassoSegment, sec: int, direction: int, lam: float) -> _LassoState:
433 """Record the breakpoint at ``lam`` after flipping coordinate or row ``sec``.
435 For a coordinate (``sec < n``): direction 0 removes it from the support, 1/2
436 add it with sign ``+1``/``-1``. For an inequality row (``sec >= n``):
437 direction 0 activates the row, 1 releases it. The path is continuous across
438 the flip, so the recorded coefficients are the old segment at ``lam``.
439 """
440 n = self.dimension
441 active = state.active.copy()
442 signs = state.signs.copy()
443 rows_active = state.rows_active.copy()
444 if sec < n:
445 if direction == 0:
446 active[sec] = False
447 signs[sec] = 0.0
448 else:
449 active[sec] = True
450 signs[sec] = 1.0 if direction == 1 else -1.0
451 else:
452 rows_active[sec - n] = direction == 0
454 beta = segment.alpha - lam * segment.beta_slope
455 self.path.append(Breakpoint(lam, beta, active.copy()))
456 return _LassoState(active, signs, rows_active, lam)
458 def finish(self, state: _LassoState, segment: _LassoSegment) -> None:
459 """Record the ``lam = 0`` endpoint: the least-squares fit on the final support."""
460 self.path.append(Breakpoint(0.0, segment.alpha.copy(), state.active.copy()))
462 def solution(self, lam: float) -> NDArray[np.float64]:
463 """Evaluate the piecewise-linear path at penalty ``lam``.
465 Args:
466 lam: The penalty value at which to evaluate ``beta``.
468 Returns:
469 The coefficient vector ``beta(lam)``, by linear interpolation between
470 the bracketing breakpoints (clamped to the path's endpoints).
471 """
472 ordered = sorted(self.path, key=lambda bp: bp.lam)
473 if lam <= ordered[0].lam:
474 return ordered[0].beta
475 if lam >= ordered[-1].lam:
476 return ordered[-1].beta
477 for lo, hi in pairwise(ordered):
478 if lo.lam <= lam <= hi.lam:
479 weight = (lam - lo.lam) / (hi.lam - lo.lam)
480 return (1.0 - weight) * lo.beta + weight * hi.beta
481 msg = "lam lies within the path range but no bracketing segment was found" # pragma: no cover
482 raise AssertionError(msg) # pragma: no cover