Coverage for src/cvxcla/pathtracer.py: 100%

61 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-27 06:45 +0000

1"""Generic parametric active-set path tracer. 

2 

3The Critical Line Algorithm is one instance of a broader scheme: the optimiser of 

4a convex quadratic program is followed as a single scalar parameter ``lambda`` 

5varies, the solution path is piecewise linear, and it is traced exactly by 

6stepping from one breakpoint to the next while maintaining a *coordinate* active 

7set. The same scheme drives least angle regression (LARS) and the LASSO homotopy. 

8 

9This module factors out the part that is identical across those problems: 

10 

11* ``select_next_event`` -- the simplex-style "smallest positive ratio to the next 

12 event" rule with a Bland lowest-index tie-break for anti-cycling. It works on an 

13 ``(n, k)`` matrix of candidate critical ``lambda`` values, where the row is the 

14 coordinate and the column is the (problem-defined) event type. 

15* ``trace`` -- the control loop: build the first vertex, solve the current affine 

16 segment, scan events, pick the next one, flip one coordinate's activity, repeat. 

17 

18Everything problem-specific (the segment solve, what an event *means*, how a vertex 

19is recorded) lives behind the ``ParametricProblem`` protocol. ``CLA`` (portfolio 

20frontier) and ``Lasso`` (regression path) are two implementations; both are driven 

21by the same ``trace`` here. 

22""" 

23 

24from __future__ import annotations 

25 

26from functools import cached_property 

27from typing import Any, Protocol, runtime_checkable 

28 

29import numpy as np 

30from numpy.typing import NDArray 

31 

32 

33@runtime_checkable 

34class ParametricProblem(Protocol): 

35 """The problem-specific contract the generic ``trace`` loop drives. 

36 

37 A ``State`` describes the current vertex (which coordinates are free/blocked 

38 and their values); a ``Segment`` describes the affine piece valid until the 

39 next event. Both are opaque to the tracer: it only passes them back to the 

40 problem. Implementations record vertices as they are discovered (the tracer 

41 returns nothing). 

42 """ 

43 

44 @property 

45 def tol(self) -> float: 

46 """Tolerance for the event-selection tie-break and validity window.""" 

47 ... # pragma: no cover 

48 

49 @property 

50 def dimension(self) -> int: 

51 """Number of coordinates ``n`` (sets the iteration safety cap).""" 

52 ... # pragma: no cover 

53 

54 def begin(self) -> tuple[float, Any]: 

55 """Record the first vertex and return ``(lambda_start, initial_state)``.""" 

56 ... # pragma: no cover 

57 

58 def segment(self, state: Any) -> Any: 

59 """Solve the affine segment valid at ``state`` (the reduced KKT system).""" 

60 ... # pragma: no cover 

61 

62 def event_matrix(self, state: Any, segment: Any) -> NDArray[np.float64]: 

63 """Return the ``(n, k)`` matrix of candidate critical ``lambda`` values. 

64 

65 Entry ``(i, j)`` is the ``lambda`` at which coordinate ``i`` would fire 

66 event type ``j`` along ``segment``; ``-inf`` means "no such event". 

67 """ 

68 ... # pragma: no cover 

69 

70 def step(self, state: Any, segment: Any, sec: int, direction: int, lam: float) -> Any: 

71 """Record the vertex at ``lam`` after flipping coordinate ``sec``'s activity. 

72 

73 ``direction`` is the winning event-matrix column; the implementation maps 

74 it to the new activity (and, where relevant, the sign) of ``sec``. Returns 

75 the next ``State``. 

76 """ 

77 ... # pragma: no cover 

78 

79 def finish(self, state: Any, segment: Any) -> None: 

80 """Record the final ``lambda = 0`` vertex (the segment's intercept).""" 

81 ... # pragma: no cover 

82 

83 

84class InequalityConstrained: 

85 """Mixin: normalise optional ``G x <= h`` inequality rows for a parametric problem. 

86 

87 Both the CLA (``G w <= h`` exposure caps) and the LASSO (``G beta <= h``) carry an 

88 optional inequality system through the same bordered machinery, so normalising the 

89 raw ``g``/``h`` inputs -- and counting the resulting path-length coordinates -- lives 

90 here once rather than being duplicated in each class. A concrete problem supplies the 

91 ``g``/``h`` inputs (``None`` when there are no inequality rows) and a ``dimension``. 

92 """ 

93 

94 g: NDArray[np.float64] | None 

95 h: NDArray[np.float64] | None 

96 

97 @property 

98 def dimension(self) -> int: 

99 """Number of coordinates ``n``; provided by the concrete problem class.""" 

100 raise NotImplementedError # pragma: no cover 

101 

102 @cached_property 

103 def g_matrix(self) -> NDArray[np.float64]: 

104 """Inequality matrix ``G`` of ``G x <= h`` as a ``(p, n)`` float array. 

105 

106 ``None`` is normalised to an empty ``(0, n)`` matrix, so the inequality 

107 machinery is a no-op and the trace reduces exactly to the unconstrained 

108 problem. This is the single point where the inequality input is normalised. 

109 """ 

110 if self.g is None: 

111 return np.zeros((0, self.dimension)) 

112 return np.atleast_2d(np.asarray(self.g, dtype=np.float64)) 

113 

114 @cached_property 

115 def h_vector(self) -> NDArray[np.float64]: 

116 """Inequality right-hand side ``h`` of ``G x <= h`` as a ``(p,)`` float array (empty if none).""" 

117 if self.h is None: 

118 return np.zeros(0) 

119 return np.atleast_1d(np.asarray(self.h, dtype=np.float64)) 

120 

121 @property 

122 def event_dimension(self) -> int: 

123 """Coordinate count for the tracer's path-length safety cap: ``n`` + ``p`` rows. 

124 

125 Each turning point fixes the activity of exactly one coordinate -- a box bound 

126 (or LASSO coefficient) or an inequality row of ``G x <= h`` -- so the iteration 

127 cap scales with ``n + p`` rather than ``n`` alone. With no inequality rows this 

128 is just ``n``. 

129 """ 

130 return self.dimension + int(self.g_matrix.shape[0]) 

131 

132 

133def select_next_event(l_mat: NDArray[np.float64], lam: float, tol: float) -> tuple[int, int, float] | None: 

134 """Pick the next breakpoint from the event matrix, or ``None`` to stop. 

135 

136 The current segment is valid only for ``lambda`` at or below the current value 

137 (the path is traced with non-increasing ``lambda``), so ratios above it are 

138 spurious crossings outside the segment and are discarded; if none remain the 

139 trace is complete. Among events tied for the largest valid ratio, a Bland-style 

140 lowest-``(coordinate, event type)`` rule makes the choice deterministic and 

141 prevents cycling on degenerate problems. 

142 

143 Two events are "the same" breakpoint only when their ``lambda`` values agree to 

144 floating-point scale, so the validity window and the tie-break use a tolerance 

145 *relative* to the ``lambda`` magnitude. A fixed absolute tolerance mis-ranks the 

146 densely spaced breakpoints of large paths: their spacing shrinks with the problem 

147 size, so an absolute window eventually merges genuinely distinct events, picks the 

148 wrong one by the tie-break, and lets ``lambda`` step backwards. The caller's 

149 ``tol`` is treated as an upper bound on this relative rate; event ordering needs a 

150 roundoff-scale window regardless of the coarser ``tol`` used elsewhere (e.g. to 

151 classify a weight as sitting on a bound). Finally, the chosen ``lambda`` is clamped 

152 below the current value, so roundoff in the segment solve can never make a 

153 breakpoint appear above it. 

154 

155 Args: 

156 l_mat: The ``(n, k)`` matrix of candidate critical ``lambda`` values. 

157 lam: The current (upper) ``lambda`` bound on valid events. 

158 tol: Upper bound on the relative event-ordering tolerance. 

159 

160 Returns: 

161 ``(sec, direction, lam_next)`` for the chosen event, or ``None`` if no 

162 valid event remains. 

163 """ 

164 l_mat = l_mat.copy() # do not mutate the caller's matrix 

165 rate = min(tol, 1e-10) # roundoff-scale relative window; tol is only an upper bound 

166 l_mat[l_mat > lam + rate * max(1.0, abs(lam))] = -np.inf # pragma: no mutate 

167 

168 lam_max = np.max(l_mat) 

169 if lam_max < 0: # pragma: no mutate 

170 return None 

171 

172 tied = np.argwhere(l_mat >= lam_max - rate * max(1.0, abs(lam_max))) # pragma: no mutate 

173 sec, direction = tied[0] 

174 # lambda is non-increasing along the path: clamp away any roundoff overshoot. 

175 return int(sec), int(direction), float(min(lam, l_mat[sec, direction])) 

176 

177 

178def trace(problem: ParametricProblem) -> None: 

179 """Trace the full piecewise-linear solution path of ``problem``. 

180 

181 Mirrors the turning-point loop of the Critical Line Algorithm, but driven 

182 entirely through the ``ParametricProblem`` interface so the same control flow 

183 serves any coordinate-active-set parametric quadratic program. 

184 

185 Args: 

186 problem: The problem to trace; its ``begin``/``segment``/``event_matrix``/ 

187 ``step``/``finish`` hooks record the discovered vertices. 

188 

189 Raises: 

190 RuntimeError: If the event loop fails to terminate within the safety cap 

191 (each step fixes the activity of at least one coordinate, so a correct 

192 trace runs ``O(n)`` times; vastly exceeding this signals cycling). 

193 """ 

194 lam, state = problem.begin() 

195 

196 # Safety bound: far above any valid path length; exceeding it means the event 

197 # loop failed to terminate, so we fail loudly rather than hang. Each step fixes 

198 # the activity of one coordinate -- a box bound (n of them) or an inequality row 

199 # (p of them) -- so the bound scales with the total n + p, not n alone; a problem 

200 # that exposes only ``dimension`` (no inequality rows) falls back to n. 

201 path_coords = getattr(problem, "event_dimension", problem.dimension) # pragma: no mutate 

202 max_iterations = 100 * (path_coords + 1) # pragma: no mutate 

203 iterations = 0 # pragma: no mutate 

204 

205 while True: # pragma: no mutate 

206 iterations += 1 # pragma: no mutate 

207 if iterations > max_iterations: # pragma: no mutate 

208 msg = "path tracer failed to converge: too many iterations" # pragma: no mutate 

209 raise RuntimeError(msg) # pragma: no mutate 

210 

211 segment = problem.segment(state) 

212 event = select_next_event(problem.event_matrix(state, segment), lam, problem.tol) 

213 if event is None: 

214 problem.finish(state, segment) 

215 return 

216 

217 sec, direction, lam = event 

218 state = problem.step(state, segment, sec, direction, lam)