Coverage for src/cvxcla/lasso.py: 100%

198 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-27 06:45 +0000

1"""LASSO / LARS regularisation path as a parametric active-set problem. 

2 

3This module shows that the Critical Line Algorithm's machinery is not specific to 

4portfolios: the *same* ``cvxcla.pathtracer.trace`` loop, the *same* 

5``QuadraticForm`` operator, and the *same* Bland event selection trace the LASSO 

6homotopy. Only the problem-specific glue (the segment solve and what an event 

7means) differs. 

8 

9The LASSO solves, for a response ``y`` and design matrix ``X``, 

10 

11 minimize 1/2 ||y - X beta||^2 + lam ||beta||_1 

12 

13and its minimiser ``beta(lam)`` is continuous and piecewise linear in the penalty 

14``lam``. On a segment where the active set ``A`` (the support) and the signs 

15``s_A`` are fixed, 

16 

17 beta_A(lam) = (X_A^T X_A)^{-1} (X_A^T y - lam s_A) = alpha_A - lam * beta_slope_A 

18 correlation(lam) = X^T (y - X beta(lam)) = p + lam * q 

19 

20with ``|correlation_j| <= lam`` off the support and ``correlation_j = lam s_j`` on 

21it. The role played by the covariance ``Sigma`` and mean ``mu`` in the CLA is 

22played here by the Gram matrix ``H = X^T X`` (wrapped in ``DenseCovariance``) and 

23the vector ``X^T y``. 

24 

25**Constraints.** Like the CLA, the path tracer admits general linear inequality 

26constraints ``G beta <= h``. An active row enters the reduced KKT system exactly as 

27in the CLA (the bordered Schur complement of ``cla.py``), and the generalised 

28correlation that drives the enter/leave events carries the active-row multipliers, 

29``correlation(lam) = X^T y - H beta(lam) - G_S^T eta(lam)``. The constrained path is 

30still piecewise linear (a quadratic loss under a polyhedral penalty *and* polyhedral 

31constraints; cf. Rosset and Zhu). We require ``h > 0`` so the path can start from 

32``beta = 0`` with every row slack -- the same first vertex as the unconstrained 

33LASSO. (Equality constraints, or ``h`` with a zero entry, need a feasibility seed 

34analogous to the CLA's linear-programming first vertex, and are left to future work.) 

35 

36Event families, mirroring the CLA's "move to / leave a bound": 

37 

38* **leave** -- an active coefficient reaches zero: ``lam = alpha_j / beta_slope_j``. 

39* **enter** -- an inactive (generalised) correlation reaches ``+/-lam``. 

40* **activate** -- a slack inequality row's residual ``G_r beta - h_r`` reaches zero. 

41* **release** -- an active row's multiplier ``eta_r`` reaches zero. 

42""" 

43 

44from __future__ import annotations 

45 

46from dataclasses import dataclass, field 

47from functools import cached_property 

48from itertools import pairwise 

49from typing import TYPE_CHECKING, NamedTuple, cast 

50 

51import numpy as np 

52from numpy.typing import NDArray 

53 

54from .operators import DenseCovariance, GramCovariance, QuadraticForm, bordered_solve 

55from .pathtracer import InequalityConstrained, trace 

56 

57if TYPE_CHECKING: 

58 from .builder import LassoBuilder 

59 

60 

61class _LassoState(NamedTuple): 

62 """The support, sign pattern, active inequality rows, and current penalty. 

63 

64 ``lam`` is the penalty at the segment's upper end. The event scan uses it to 

65 require *strict* progress to a smaller penalty: right after a coefficient 

66 enters it sits at zero, so its leave event lies at the current ``lam``; without 

67 the strict window the shared selector would re-fire it and the walk would cycle 

68 between entering and leaving the same coordinate. 

69 """ 

70 

71 active: NDArray[np.bool_] 

72 signs: NDArray[np.float64] 

73 rows_active: NDArray[np.bool_] 

74 lam: float 

75 

76 

77class _LassoSegment(NamedTuple): 

78 """The affine path ``beta(lam) = alpha - lam * beta_slope`` and its multipliers. 

79 

80 ``eta_alpha``/``eta_slope`` give the active-row multiplier path 

81 ``eta(lam) = eta_alpha + lam * eta_slope`` (length ``p``, nonzero only on active 

82 rows); ``p``/``q`` give the generalised correlation ``p + lam * q``. 

83 """ 

84 

85 alpha: NDArray[np.float64] 

86 beta_slope: NDArray[np.float64] 

87 p: NDArray[np.float64] 

88 q: NDArray[np.float64] 

89 eta_alpha: NDArray[np.float64] 

90 eta_slope: NDArray[np.float64] 

91 

92 

93@dataclass(frozen=True) 

94class Breakpoint: 

95 """A vertex of the piecewise-linear LASSO path. 

96 

97 Attributes: 

98 lam: The penalty value at this breakpoint. 

99 beta: The coefficient vector ``beta(lam)``. 

100 active: Boolean mask of the support (non-zero coefficients) on the 

101 segment leaving this breakpoint towards smaller ``lam``. 

102 """ 

103 

104 lam: float 

105 beta: NDArray[np.float64] 

106 active: NDArray[np.bool_] 

107 

108 

109@dataclass 

110class Lasso(InequalityConstrained): 

111 """The LASSO regularisation path, traced as a parametric active-set problem. 

112 

113 Constructing a ``Lasso`` traces the entire path from ``lam_max`` (where 

114 ``beta = 0``) down to ``lam = 0`` (the least-squares fit on the final support, 

115 subject to any active constraints), storing the breakpoints in ``path``. The 

116 walk is driven by the same ``cvxcla.pathtracer.trace`` loop as the Critical Line 

117 Algorithm. 

118 

119 Optional linear inequality constraints ``G beta <= h`` (with ``h > 0``) are 

120 traced through the same bordered solve as the CLA's ``G w <= h`` rows. 

121 

122 The quadratic form may be given either as a dense design ``(x, y)`` (the usual 

123 case, ``H = X^T X``) or, via :meth:`from_operator`, as a ``QuadraticForm`` 

124 operator with the linear term ``X^T y``. The operator route lets a structured 

125 form, a diagonal-plus-low-rank factor model or a kernel, drive the path in 

126 ``O(nk)`` per step without forming the ``n x n`` Gram matrix, exactly as on the 

127 portfolio side. 

128 

129 Attributes: 

130 x: Design matrix of shape ``(m, n)`` (``None`` in operator mode). 

131 y: Response vector of shape ``(m,)`` (``None`` in operator mode). 

132 g: Optional inequality matrix ``(p, n)`` of ``G beta <= h``; ``None`` means 

133 the plain LASSO. 

134 h: Optional inequality right-hand side ``(p,)``; must be strictly positive. 

135 tol: Tolerance for event selection and the validity window. 

136 path: The discovered breakpoints, populated on construction. 

137 quad_form: Optional ``QuadraticForm`` operator ``H`` (operator mode). 

138 linear: Optional linear term ``X^T y`` of shape ``(n,)`` (operator mode). 

139 """ 

140 

141 x: NDArray[np.float64] | None = None 

142 y: NDArray[np.float64] | None = None 

143 g: NDArray[np.float64] | None = None 

144 h: NDArray[np.float64] | None = None 

145 nonneg: bool = False # pragma: no mutate 

146 gram: bool = False # pragma: no mutate 

147 tol: float = 1e-9 # pragma: no mutate 

148 path: list[Breakpoint] = field(default_factory=list) 

149 quad_form: QuadraticForm | None = None # pragma: no mutate 

150 linear: NDArray[np.float64] | None = None # pragma: no mutate 

151 

152 def __post_init__(self) -> None: 

153 """Validate shapes and trace the full LASSO path. 

154 

155 Raises: 

156 ValueError: If ``x`` is not 2d, ``y``'s length does not match ``x``, the 

157 constraint shapes are inconsistent, or any ``h`` entry is not 

158 strictly positive (which would make ``beta = 0`` infeasible). 

159 """ 

160 operator_mode = self.quad_form is not None or self.linear is not None 

161 if operator_mode: 

162 if self.quad_form is None or self.linear is None: 

163 msg = "quad_form and linear (X^T y) must be provided together" 

164 raise ValueError(msg) 

165 if self.x is not None or self.y is not None: 

166 msg = "supply either a design (x, y) or an operator (quad_form, linear), not both" 

167 raise ValueError(msg) 

168 self.linear = np.asarray(self.linear, dtype=np.float64) 

169 if self.linear.ndim != 1: 

170 msg = f"linear must be the 1d vector X^T y, got shape {self.linear.shape}" 

171 raise ValueError(msg) 

172 else: 

173 if self.x is None or self.y is None: 

174 msg = "provide a design (x, y) or an operator (quad_form, linear)" 

175 raise ValueError(msg) 

176 if self.x.ndim != 2: 

177 msg = f"x must be a 2d design matrix, got shape {self.x.shape}" 

178 raise ValueError(msg) 

179 if self.y.shape != (self.x.shape[0],): 

180 msg = f"y must have shape ({self.x.shape[0]},), got {self.y.shape}" 

181 raise ValueError(msg) 

182 if self.g is not None or self.h is not None: 

183 if self.g is None or self.h is None: 

184 msg = "g and h must be provided together" 

185 raise ValueError(msg) 

186 if self.g.shape != (self.h.shape[0], self.dimension): 

187 msg = f"g must have shape ({self.h.shape[0]}, {self.dimension}), got {self.g.shape}" 

188 raise ValueError(msg) 

189 if np.any(self.h <= self.tol): 

190 msg = "h must be strictly positive so beta = 0 is feasible (equality/zero-h needs a feasibility seed)" 

191 raise ValueError(msg) 

192 trace(self) 

193 

194 @classmethod 

195 def problem(cls, x: NDArray[np.float64], y: NDArray[np.float64]) -> LassoBuilder: 

196 """Start a fluent :class:`cvxcla.builder.LassoBuilder` for a LASSO path. 

197 

198 The LASSO counterpart of :meth:`cvxcla.cla.CLA.problem`: chain 

199 ``.inequality(G, h)`` and finish with ``.trace()``. The builder maps onto the 

200 constructor arguments and adds no modelling power. 

201 

202 Args: 

203 x: Design matrix of shape ``(m, n)``. 

204 y: Response vector of shape ``(m,)``. 

205 

206 Returns: 

207 A :class:`cvxcla.builder.LassoBuilder`. 

208 """ 

209 from .builder import LassoBuilder 

210 

211 return LassoBuilder(x, y) 

212 

213 @classmethod 

214 def from_operator( 

215 cls, 

216 quad: QuadraticForm, 

217 xty: NDArray[np.float64], 

218 *, 

219 g: NDArray[np.float64] | None = None, 

220 h: NDArray[np.float64] | None = None, 

221 nonneg: bool = False, 

222 tol: float = 1e-9, 

223 ) -> Lasso: 

224 """Trace a LASSO path with the quadratic form supplied as an operator. 

225 

226 The regression counterpart of :class:`cvxcla.cla.CLA` accepting a 

227 ``QuadraticForm`` covariance. Instead of a dense design ``X``, pass the Gram 

228 operator ``H`` (anything implementing :class:`QuadraticForm`, for example a 

229 :class:`cvxcla.operators.FactorCovariance` or a kernel) together with the 

230 linear term ``X^T y``. The homotopy reaches ``H`` only through ``matvec`` and 

231 ``solve_free``, so a diagonal-plus-low-rank factor model or a kernel traces 

232 the path in ``O(nk)`` per step without ever forming the ``n x n`` matrix, 

233 exactly as on the portfolio side. For the path to coincide with the 

234 design-matrix LASSO one needs ``H = X^T X`` and ``xty = X^T y`` 

235 (Theorem 1); any positive-semidefinite operator whose free blocks are 

236 positive definite traces a well-defined path. 

237 

238 Args: 

239 quad: The quadratic form ``H`` as a :class:`QuadraticForm` operator. 

240 xty: The linear term ``X^T y`` of shape ``(n,)``. 

241 g: Optional inequality matrix of ``G beta <= h``. 

242 h: Optional inequality right-hand side; entries must be strictly positive. 

243 nonneg: Restrict the path to ``beta >= 0``. 

244 tol: Tolerance for event selection and the validity window. 

245 

246 Returns: 

247 A traced :class:`Lasso` whose ``path`` holds the breakpoints. 

248 """ 

249 return cls( 

250 quad_form=quad, 

251 linear=np.asarray(xty, dtype=np.float64), 

252 g=g, 

253 h=h, 

254 nonneg=nonneg, 

255 tol=tol, 

256 ) 

257 

258 @cached_property 

259 def quad(self) -> QuadraticForm: 

260 """The Gram matrix ``X^T X`` as a ``QuadraticForm`` backend (cached: ``X`` is fixed). 

261 

262 With ``gram=True`` the data-matrix backend is used instead of forming the 

263 ``n x n`` Gram: it solves through the Woodbury identity in the 

264 ``m``-dimensional observation space and never materialises an ``n x n`` 

265 matrix, the win in the high-dimensional ``p >> n`` regime (more features than 

266 observations). ``GramCovariance`` represents ``X_c^T X_c / (m-1)``, so scaling 

267 the data by ``sqrt(m-1)`` recovers ``X^T X`` exactly for a **centred** design 

268 (the standard LASSO convention; pass a column-centred ``x``). 

269 """ 

270 if self.quad_form is not None: 

271 return self.quad_form 

272 # Not operator mode, so __post_init__ guarantees a design matrix. 

273 x = cast("NDArray[np.float64]", self.x) 

274 if self.gram: 

275 m = x.shape[0] 

276 return GramCovariance(x * np.sqrt(m - 1.0)) 

277 return DenseCovariance(x.T @ x) 

278 

279 @cached_property 

280 def xty(self) -> NDArray[np.float64]: 

281 """The linear data ``X^T y`` (the analogue of the CLA's expected returns; cached).""" 

282 if self.linear is not None: 

283 return self.linear 

284 # Not operator mode, so __post_init__ guarantees a design (x, y). 

285 x = cast("NDArray[np.float64]", self.x) 

286 y = cast("NDArray[np.float64]", self.y) 

287 return x.T @ y 

288 

289 @property 

290 def dimension(self) -> int: 

291 """Number of features ``n`` (the problem dimension for the path tracer).""" 

292 if self.x is not None: 

293 return int(self.x.shape[1]) 

294 return int(self.xty.shape[0]) 

295 

296 @property 

297 def lam_max(self) -> float: 

298 """The smallest penalty at which ``beta = 0`` is optimal: ``||X^T y||_inf``. 

299 

300 With ``h > 0`` every inequality row is slack at ``beta = 0`` (zero 

301 multiplier), so the unconstrained threshold is unchanged. 

302 """ 

303 return float(np.max(np.abs(self.xty))) 

304 

305 def begin(self) -> tuple[float, _LassoState]: 

306 """Record the all-zero solution at the start penalty and enter the first coordinate. 

307 

308 For the plain or inequality-constrained LASSO the start is 

309 ``lam_max = ||X^T y||_inf`` and the most-correlated coordinate enters with its 

310 sign. Under the non-negative restriction ``beta >= 0`` the l1 penalty becomes 

311 the linear term ``lam * 1^T beta``, only positive correlations can enter, so 

312 the start is ``lam_max = max_j (X^T y)_j`` and the coordinate enters with sign 

313 ``+``. When no coordinate can enter (e.g. every correlation is non-positive 

314 under ``beta >= 0``), ``beta = 0`` is optimal for all ``lambda`` and the path 

315 is the single point. 

316 """ 

317 n = self.dimension 

318 xty = self.xty 

319 rows_active = np.zeros(self.g_matrix.shape[0], dtype=bool) 

320 if self.nonneg: 

321 lam_max = float(np.max(xty)) if n else 0.0 

322 j0, s0 = int(np.argmax(xty)), 1.0 

323 else: 

324 lam_max = self.lam_max 

325 j0 = int(np.argmax(np.abs(xty))) 

326 s0 = float(np.sign(xty[j0])) 

327 

328 self.path.append(Breakpoint(max(lam_max, 0.0), np.zeros(n), np.zeros(n, dtype=bool))) 

329 active = np.zeros(n, dtype=bool) 

330 signs = np.zeros(n) 

331 if lam_max > self.tol: 

332 active[j0] = True 

333 signs[j0] = s0 

334 return max(lam_max, 0.0), _LassoState(active, signs, rows_active, max(lam_max, 0.0)) 

335 

336 def segment(self, state: _LassoState) -> _LassoSegment: 

337 """Solve the affine segment for the current support, signs, and active rows. 

338 

339 With no active rows this is the plain LASSO solve against the Gram 

340 submatrix. With active rows it is the bordered Schur solve of the CLA: the 

341 active rows ``G_S`` enter the reduced KKT system as extra equality rows. 

342 """ 

343 n = self.dimension 

344 active, signs, rows_active = state.active, state.signs, state.rows_active 

345 xty = self.xty 

346 alpha = np.zeros(n) 

347 beta_slope = np.zeros(n) 

348 eta_alpha = np.zeros(self.g_matrix.shape[0]) 

349 eta_slope = np.zeros(self.g_matrix.shape[0]) 

350 

351 xty_s = xty[active] 

352 signs_s = signs[active] 

353 if not np.any(active): 

354 # Empty support (e.g. the non-negative path when no correlation is 

355 # positive): beta = 0, correlation = X^T y, and there is nothing to solve. 

356 return _LassoSegment(alpha, beta_slope, xty.copy(), np.zeros(n), eta_alpha, eta_slope) 

357 

358 # The active rows G_RS act as equality rows in the reduced KKT system, exactly 

359 # the CLA's bordered Schur solve (operators.bordered_solve). With no active rows 

360 # (|R| = 0) this reduces to the plain LASSO solve beta_S(lam) = H_SS^{-1}(xty_S - 

361 # lam s_S). The slope's constraint right-hand side is zero, and the slope 

362 # multiplier nu carries the opposite sign convention to eta(lam) (beta = alpha - 

363 # lam beta_slope here, vs w = r_alpha + lam r_beta in the CLA), hence the flip. 

364 g_rs = self.g_matrix[np.ix_(rows_active, active)] # |R| x |S| 

365 h_r = self.h_vector[rows_active] 

366 x_const, x_slope, eta_a, nu_slope = bordered_solve( 

367 self.quad, active, g_rs, xty_s, signs_s, h_r, np.zeros(g_rs.shape[0]) 

368 ) 

369 alpha[active] = x_const 

370 beta_slope[active] = x_slope 

371 eta_alpha[rows_active] = eta_a 

372 eta_slope[rows_active] = -nu_slope 

373 

374 # Generalised correlation c(lam) = xty - H beta(lam) - G_R^T eta(lam) = p + lam q. 

375 g_r = self.g_matrix[rows_active] 

376 eta_a_r = eta_alpha[rows_active] 

377 eta_s_r = eta_slope[rows_active] 

378 p = xty - self.quad.matvec(alpha) - g_r.T @ eta_a_r 

379 q = self.quad.matvec(beta_slope) - g_r.T @ eta_s_r 

380 return _LassoSegment(alpha, beta_slope, p, q, eta_alpha, eta_slope) 

381 

382 def event_matrix(self, state: _LassoState, segment: _LassoSegment) -> NDArray[np.float64]: 

383 """Return the ``(n + p, 4)`` matrix of candidate critical lambdas. 

384 

385 Rows ``0..n-1`` are coordinate events (col 0 leave, col 1 enter ``+``, col 2 

386 enter ``-``); rows ``n..n+p-1`` are inequality-row events (col 0 activate, col 

387 1 release). Entries are ``-inf`` where the event cannot occur, and only 

388 events strictly inside ``(tol, lam - tol)`` are kept. 

389 """ 

390 n = self.dimension 

391 rows = self.g_matrix.shape[0] 

392 active = state.active 

393 inactive = ~active 

394 alpha, beta_slope, p, q, eta_alpha, eta_slope = segment 

395 rows_active = state.rows_active 

396 

397 l_mat = np.full((n + rows, 4), -np.inf) 

398 

399 # leave: alpha_j - lam beta_slope_j = 0 

400 leaves = active & (np.abs(beta_slope) > self.tol) # pragma: no mutate 

401 l_mat[:n][leaves, 0] = alpha[leaves] / beta_slope[leaves] 

402 

403 # enter (+): p_j + lam q_j = +lam -> lam = p_j / (1 - q_j) 

404 denom_pos = 1.0 - q 

405 enters_pos = inactive & (np.abs(denom_pos) > self.tol) # pragma: no mutate 

406 l_mat[:n][enters_pos, 1] = p[enters_pos] / denom_pos[enters_pos] 

407 

408 # enter (-): p_j + lam q_j = -lam -> lam = -p_j / (1 + q_j). Disabled under the 

409 # non-negative restriction beta >= 0, where only positive entries are allowed. 

410 if not self.nonneg: 

411 denom_neg = 1.0 + q 

412 enters_neg = inactive & (np.abs(denom_neg) > self.tol) # pragma: no mutate 

413 l_mat[:n][enters_neg, 2] = -p[enters_neg] / denom_neg[enters_neg] 

414 

415 if rows: 

416 g_mat = self.g_matrix 

417 slope_row = g_mat @ beta_slope # d/d(-lam) of the row value 

418 level_row = g_mat @ alpha - self.h_vector # G_r alpha - h_r 

419 # activate: the row value G_r beta(lam) = level + h_r - lam slope rises to 

420 # the cap h_r as lam decreases when its slope d(value)/d(-lam) = slope_row 

421 # is positive; the crossing is lam = (G_r alpha - h_r) / (G_r beta_slope). 

422 inactive_rows = ~rows_active & (slope_row > self.tol) # pragma: no mutate 

423 l_mat[n:][inactive_rows, 0] = level_row[inactive_rows] / slope_row[inactive_rows] 

424 # release: eta_r(lam) = eta_alpha + lam eta_slope -> 0 from eta > 0, i.e. eta_slope > 0. 

425 releasing = rows_active & (eta_slope > self.tol) # pragma: no mutate 

426 l_mat[n:][releasing, 1] = -eta_alpha[releasing] / eta_slope[releasing] 

427 

428 # Keep only events that make strict progress to a smaller, positive penalty. 

429 l_mat[(l_mat <= self.tol) | (l_mat >= state.lam - self.tol)] = -np.inf 

430 return l_mat 

431 

432 def step(self, state: _LassoState, segment: _LassoSegment, sec: int, direction: int, lam: float) -> _LassoState: 

433 """Record the breakpoint at ``lam`` after flipping coordinate or row ``sec``. 

434 

435 For a coordinate (``sec < n``): direction 0 removes it from the support, 1/2 

436 add it with sign ``+1``/``-1``. For an inequality row (``sec >= n``): 

437 direction 0 activates the row, 1 releases it. The path is continuous across 

438 the flip, so the recorded coefficients are the old segment at ``lam``. 

439 """ 

440 n = self.dimension 

441 active = state.active.copy() 

442 signs = state.signs.copy() 

443 rows_active = state.rows_active.copy() 

444 if sec < n: 

445 if direction == 0: 

446 active[sec] = False 

447 signs[sec] = 0.0 

448 else: 

449 active[sec] = True 

450 signs[sec] = 1.0 if direction == 1 else -1.0 

451 else: 

452 rows_active[sec - n] = direction == 0 

453 

454 beta = segment.alpha - lam * segment.beta_slope 

455 self.path.append(Breakpoint(lam, beta, active.copy())) 

456 return _LassoState(active, signs, rows_active, lam) 

457 

458 def finish(self, state: _LassoState, segment: _LassoSegment) -> None: 

459 """Record the ``lam = 0`` endpoint: the least-squares fit on the final support.""" 

460 self.path.append(Breakpoint(0.0, segment.alpha.copy(), state.active.copy())) 

461 

462 def solution(self, lam: float) -> NDArray[np.float64]: 

463 """Evaluate the piecewise-linear path at penalty ``lam``. 

464 

465 Args: 

466 lam: The penalty value at which to evaluate ``beta``. 

467 

468 Returns: 

469 The coefficient vector ``beta(lam)``, by linear interpolation between 

470 the bracketing breakpoints (clamped to the path's endpoints). 

471 """ 

472 ordered = sorted(self.path, key=lambda bp: bp.lam) 

473 if lam <= ordered[0].lam: 

474 return ordered[0].beta 

475 if lam >= ordered[-1].lam: 

476 return ordered[-1].beta 

477 for lo, hi in pairwise(ordered): 

478 if lo.lam <= lam <= hi.lam: 

479 weight = (lam - lo.lam) / (hi.lam - lo.lam) 

480 return (1.0 - weight) * lo.beta + weight * hi.beta 

481 msg = "lam lies within the path range but no bracketing segment was found" # pragma: no cover 

482 raise AssertionError(msg) # pragma: no cover