Coverage for src/cvxcla/pathtracer.py: 100%
61 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-27 06:45 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-27 06:45 +0000
1"""Generic parametric active-set path tracer.
3The Critical Line Algorithm is one instance of a broader scheme: the optimiser of
4a convex quadratic program is followed as a single scalar parameter ``lambda``
5varies, the solution path is piecewise linear, and it is traced exactly by
6stepping from one breakpoint to the next while maintaining a *coordinate* active
7set. The same scheme drives least angle regression (LARS) and the LASSO homotopy.
9This module factors out the part that is identical across those problems:
11* ``select_next_event`` -- the simplex-style "smallest positive ratio to the next
12 event" rule with a Bland lowest-index tie-break for anti-cycling. It works on an
13 ``(n, k)`` matrix of candidate critical ``lambda`` values, where the row is the
14 coordinate and the column is the (problem-defined) event type.
15* ``trace`` -- the control loop: build the first vertex, solve the current affine
16 segment, scan events, pick the next one, flip one coordinate's activity, repeat.
18Everything problem-specific (the segment solve, what an event *means*, how a vertex
19is recorded) lives behind the ``ParametricProblem`` protocol. ``CLA`` (portfolio
20frontier) and ``Lasso`` (regression path) are two implementations; both are driven
21by the same ``trace`` here.
22"""
24from __future__ import annotations
26from functools import cached_property
27from typing import Any, Protocol, runtime_checkable
29import numpy as np
30from numpy.typing import NDArray
33@runtime_checkable
34class ParametricProblem(Protocol):
35 """The problem-specific contract the generic ``trace`` loop drives.
37 A ``State`` describes the current vertex (which coordinates are free/blocked
38 and their values); a ``Segment`` describes the affine piece valid until the
39 next event. Both are opaque to the tracer: it only passes them back to the
40 problem. Implementations record vertices as they are discovered (the tracer
41 returns nothing).
42 """
44 @property
45 def tol(self) -> float:
46 """Tolerance for the event-selection tie-break and validity window."""
47 ... # pragma: no cover
49 @property
50 def dimension(self) -> int:
51 """Number of coordinates ``n`` (sets the iteration safety cap)."""
52 ... # pragma: no cover
54 def begin(self) -> tuple[float, Any]:
55 """Record the first vertex and return ``(lambda_start, initial_state)``."""
56 ... # pragma: no cover
58 def segment(self, state: Any) -> Any:
59 """Solve the affine segment valid at ``state`` (the reduced KKT system)."""
60 ... # pragma: no cover
62 def event_matrix(self, state: Any, segment: Any) -> NDArray[np.float64]:
63 """Return the ``(n, k)`` matrix of candidate critical ``lambda`` values.
65 Entry ``(i, j)`` is the ``lambda`` at which coordinate ``i`` would fire
66 event type ``j`` along ``segment``; ``-inf`` means "no such event".
67 """
68 ... # pragma: no cover
70 def step(self, state: Any, segment: Any, sec: int, direction: int, lam: float) -> Any:
71 """Record the vertex at ``lam`` after flipping coordinate ``sec``'s activity.
73 ``direction`` is the winning event-matrix column; the implementation maps
74 it to the new activity (and, where relevant, the sign) of ``sec``. Returns
75 the next ``State``.
76 """
77 ... # pragma: no cover
79 def finish(self, state: Any, segment: Any) -> None:
80 """Record the final ``lambda = 0`` vertex (the segment's intercept)."""
81 ... # pragma: no cover
84class InequalityConstrained:
85 """Mixin: normalise optional ``G x <= h`` inequality rows for a parametric problem.
87 Both the CLA (``G w <= h`` exposure caps) and the LASSO (``G beta <= h``) carry an
88 optional inequality system through the same bordered machinery, so normalising the
89 raw ``g``/``h`` inputs -- and counting the resulting path-length coordinates -- lives
90 here once rather than being duplicated in each class. A concrete problem supplies the
91 ``g``/``h`` inputs (``None`` when there are no inequality rows) and a ``dimension``.
92 """
94 g: NDArray[np.float64] | None
95 h: NDArray[np.float64] | None
97 @property
98 def dimension(self) -> int:
99 """Number of coordinates ``n``; provided by the concrete problem class."""
100 raise NotImplementedError # pragma: no cover
102 @cached_property
103 def g_matrix(self) -> NDArray[np.float64]:
104 """Inequality matrix ``G`` of ``G x <= h`` as a ``(p, n)`` float array.
106 ``None`` is normalised to an empty ``(0, n)`` matrix, so the inequality
107 machinery is a no-op and the trace reduces exactly to the unconstrained
108 problem. This is the single point where the inequality input is normalised.
109 """
110 if self.g is None:
111 return np.zeros((0, self.dimension))
112 return np.atleast_2d(np.asarray(self.g, dtype=np.float64))
114 @cached_property
115 def h_vector(self) -> NDArray[np.float64]:
116 """Inequality right-hand side ``h`` of ``G x <= h`` as a ``(p,)`` float array (empty if none)."""
117 if self.h is None:
118 return np.zeros(0)
119 return np.atleast_1d(np.asarray(self.h, dtype=np.float64))
121 @property
122 def event_dimension(self) -> int:
123 """Coordinate count for the tracer's path-length safety cap: ``n`` + ``p`` rows.
125 Each turning point fixes the activity of exactly one coordinate -- a box bound
126 (or LASSO coefficient) or an inequality row of ``G x <= h`` -- so the iteration
127 cap scales with ``n + p`` rather than ``n`` alone. With no inequality rows this
128 is just ``n``.
129 """
130 return self.dimension + int(self.g_matrix.shape[0])
133def select_next_event(l_mat: NDArray[np.float64], lam: float, tol: float) -> tuple[int, int, float] | None:
134 """Pick the next breakpoint from the event matrix, or ``None`` to stop.
136 The current segment is valid only for ``lambda`` at or below the current value
137 (the path is traced with non-increasing ``lambda``), so ratios above it are
138 spurious crossings outside the segment and are discarded; if none remain the
139 trace is complete. Among events tied for the largest valid ratio, a Bland-style
140 lowest-``(coordinate, event type)`` rule makes the choice deterministic and
141 prevents cycling on degenerate problems.
143 Two events are "the same" breakpoint only when their ``lambda`` values agree to
144 floating-point scale, so the validity window and the tie-break use a tolerance
145 *relative* to the ``lambda`` magnitude. A fixed absolute tolerance mis-ranks the
146 densely spaced breakpoints of large paths: their spacing shrinks with the problem
147 size, so an absolute window eventually merges genuinely distinct events, picks the
148 wrong one by the tie-break, and lets ``lambda`` step backwards. The caller's
149 ``tol`` is treated as an upper bound on this relative rate; event ordering needs a
150 roundoff-scale window regardless of the coarser ``tol`` used elsewhere (e.g. to
151 classify a weight as sitting on a bound). Finally, the chosen ``lambda`` is clamped
152 below the current value, so roundoff in the segment solve can never make a
153 breakpoint appear above it.
155 Args:
156 l_mat: The ``(n, k)`` matrix of candidate critical ``lambda`` values.
157 lam: The current (upper) ``lambda`` bound on valid events.
158 tol: Upper bound on the relative event-ordering tolerance.
160 Returns:
161 ``(sec, direction, lam_next)`` for the chosen event, or ``None`` if no
162 valid event remains.
163 """
164 l_mat = l_mat.copy() # do not mutate the caller's matrix
165 rate = min(tol, 1e-10) # roundoff-scale relative window; tol is only an upper bound
166 l_mat[l_mat > lam + rate * max(1.0, abs(lam))] = -np.inf # pragma: no mutate
168 lam_max = np.max(l_mat)
169 if lam_max < 0: # pragma: no mutate
170 return None
172 tied = np.argwhere(l_mat >= lam_max - rate * max(1.0, abs(lam_max))) # pragma: no mutate
173 sec, direction = tied[0]
174 # lambda is non-increasing along the path: clamp away any roundoff overshoot.
175 return int(sec), int(direction), float(min(lam, l_mat[sec, direction]))
178def trace(problem: ParametricProblem) -> None:
179 """Trace the full piecewise-linear solution path of ``problem``.
181 Mirrors the turning-point loop of the Critical Line Algorithm, but driven
182 entirely through the ``ParametricProblem`` interface so the same control flow
183 serves any coordinate-active-set parametric quadratic program.
185 Args:
186 problem: The problem to trace; its ``begin``/``segment``/``event_matrix``/
187 ``step``/``finish`` hooks record the discovered vertices.
189 Raises:
190 RuntimeError: If the event loop fails to terminate within the safety cap
191 (each step fixes the activity of at least one coordinate, so a correct
192 trace runs ``O(n)`` times; vastly exceeding this signals cycling).
193 """
194 lam, state = problem.begin()
196 # Safety bound: far above any valid path length; exceeding it means the event
197 # loop failed to terminate, so we fail loudly rather than hang. Each step fixes
198 # the activity of one coordinate -- a box bound (n of them) or an inequality row
199 # (p of them) -- so the bound scales with the total n + p, not n alone; a problem
200 # that exposes only ``dimension`` (no inequality rows) falls back to n.
201 path_coords = getattr(problem, "event_dimension", problem.dimension) # pragma: no mutate
202 max_iterations = 100 * (path_coords + 1) # pragma: no mutate
203 iterations = 0 # pragma: no mutate
205 while True: # pragma: no mutate
206 iterations += 1 # pragma: no mutate
207 if iterations > max_iterations: # pragma: no mutate
208 msg = "path tracer failed to converge: too many iterations" # pragma: no mutate
209 raise RuntimeError(msg) # pragma: no mutate
211 segment = problem.segment(state)
212 event = select_next_event(problem.event_matrix(state, segment), lam, problem.tol)
213 if event is None:
214 problem.finish(state, segment)
215 return
217 sec, direction, lam = event
218 state = problem.step(state, segment, sec, direction, lam)