Coverage for src / cvxcla / cla.py: 100%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-03 01:26 +0000

1# Copyright 2023 Stanford University Convex Optimization Group 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Markowitz implementation of the Critical Line Algorithm. 

15 

16This module provides the CLA class, which implements the Critical Line Algorithm 

17as described by Harry Markowitz and colleagues. The algorithm computes the entire 

18efficient frontier by finding all turning points, which are the points where the 

19set of assets at their bounds changes. 

20""" 

21 

22import logging 

23from dataclasses import dataclass, field 

24from functools import cached_property 

25 

26import numpy as np 

27from numpy.typing import NDArray 

28 

29from .first import init_algo 

30from .types import Frontier, FrontierPoint, TurningPoint 

31 

32 

33@dataclass(frozen=True) 

34class CLA: 

35 """Critical Line Algorithm implementation based on Markowitz's approach. 

36 

37 This class implements the Critical Line Algorithm as described by Harry Markowitz 

38 and colleagues. It computes the entire efficient frontier by finding all turning 

39 points, which are the points where the set of assets at their bounds changes. 

40 

41 The algorithm starts with the first turning point (the portfolio with the highest 

42 expected return) and then iteratively computes the next turning point with a lower 

43 expected return until it reaches the minimum variance portfolio. 

44 

45 Attributes: 

46 mean: Vector of expected returns for each asset. 

47 covariance: Covariance matrix of asset returns. 

48 lower_bounds: Vector of lower bounds for asset weights. 

49 upper_bounds: Vector of upper bounds for asset weights. 

50 a: Matrix for linear equality constraints (Ax = b). 

51 b: Vector for linear equality constraints (Ax = b). 

52 turning_points: List of turning points on the efficient frontier. 

53 tol: Tolerance for numerical calculations. 

54 logger: Logger instance for logging information and errors. 

55 

56 """ 

57 

58 mean: NDArray[np.float64] 

59 covariance: NDArray[np.float64] 

60 lower_bounds: NDArray[np.float64] 

61 upper_bounds: NDArray[np.float64] 

62 a: NDArray[np.float64] 

63 b: NDArray[np.float64] 

64 turning_points: list[TurningPoint] = field(default_factory=list) 

65 tol: float = 1e-5 

66 logger: logging.Logger = field(default_factory=lambda: logging.getLogger(__name__)) 

67 

68 @cached_property 

69 def proj(self) -> np.ndarray: 

70 """Construct the projection matrix used in computing Lagrange multipliers. 

71 

72 P is formed by horizontally stacking the covariance matrix and the transpose 

73 of the equality constraint matrix A. It is used to compute: 

74 - gamma = P @ alpha 

75 - delta = P @ beta - mean 

76 

77 This step helps identify which constraints are becoming active or inactive. 

78 """ 

79 return np.block([self.covariance, self.a.T]) 

80 

81 @cached_property 

82 def kkt(self) -> np.ndarray: 

83 """Construct the Karush-Kuhn-Tucker (KKT) system matrix. 

84 

85 The KKT matrix is built by augmenting the covariance matrix with the 

86 equality constraints. It forms the linear system: 

87 [Σ Aᵗ] 

88 [A 0 ] 

89 which we solve to get the optimal portfolio weights (alpha) and the 

90 Lagrange multipliers (lambda) corresponding to the constraints. 

91 

92 This matrix is symmetric but not necessarily positive definite. 

93 """ 

94 m = self.a.shape[0] 

95 return np.block([[self.covariance, self.a.T], [self.a, np.zeros((m, m))]]) 

96 

97 def __post_init__(self) -> None: 

98 """Initialize the CLA object and compute the efficient frontier. 

99 

100 This method is automatically called after initialization. It computes 

101 the entire efficient frontier by finding all turning points, starting 

102 from the first turning point (highest expected return) and iteratively 

103 computing the next turning point with a lower expected return until 

104 it reaches the minimum variance portfolio. 

105 

106 The algorithm uses a block matrix approach to solve the system of equations 

107 that determine the turning points. 

108 

109 Raises: 

110 AssertionError: If all variables are blocked, which would make the 

111 system of equations singular. 

112 

113 """ 

114 m = self.a.shape[0] 

115 ns = len(self.mean) 

116 

117 # Compute and store the first turning point 

118 self._append(self._first_turning_point()) 

119 

120 lam = np.inf 

121 

122 while lam > 0: 

123 last = self.turning_points[-1] 

124 

125 # --- Identify active set --- 

126 blocked = ~last.free 

127 if np.all(blocked): 

128 msg = "All variables cannot be blocked" 

129 raise RuntimeError(msg) 

130 

131 at_upper = blocked & np.isclose(last.weights, self.upper_bounds) 

132 at_lower = blocked & np.isclose(last.weights, self.lower_bounds) 

133 

134 _out = at_upper | at_lower 

135 _in = ~_out 

136 

137 # --- Construct RHS for KKT system --- 

138 fixed_weights = np.zeros(ns) 

139 fixed_weights[at_upper] = self.upper_bounds[at_upper] 

140 fixed_weights[at_lower] = self.lower_bounds[at_lower] 

141 

142 adjusted_mean = self.mean.copy() 

143 adjusted_mean[_out] = 0.0 

144 

145 free_mask = np.concatenate([_in, np.ones(m, dtype=bool)]) 

146 rhs_alpha = np.concatenate([fixed_weights, self.b]) 

147 rhs_beta = np.concatenate([adjusted_mean, np.zeros(m)]) 

148 rhs = np.column_stack([rhs_alpha, rhs_beta]) 

149 

150 # --- Solve KKT system --- 

151 alpha, beta = CLA._solve(self.kkt, rhs, free_mask) 

152 

153 # --- Compute Lagrange multipliers and directional derivatives --- 

154 gamma = self.proj @ alpha 

155 delta = self.proj @ beta - self.mean 

156 

157 # --- Compute event ratios --- 

158 l_mat = np.full((ns, 4), -np.inf) 

159 r_alpha, r_beta = alpha[:ns], beta[:ns] 

160 tol = self.tol 

161 

162 l_mat[_in & (r_beta < -tol), 0] = ( 

163 self.upper_bounds[_in & (r_beta < -tol)] - r_alpha[_in & (r_beta < -tol)] 

164 ) / r_beta[_in & (r_beta < -tol)] 

165 l_mat[_in & (r_beta > +tol), 1] = ( 

166 self.lower_bounds[_in & (r_beta > +tol)] - r_alpha[_in & (r_beta > +tol)] 

167 ) / r_beta[_in & (r_beta > +tol)] 

168 l_mat[at_upper & (delta < -tol), 2] = -gamma[at_upper & (delta < -tol)] / delta[at_upper & (delta < -tol)] 

169 l_mat[at_lower & (delta > +tol), 3] = -gamma[at_lower & (delta > +tol)] / delta[at_lower & (delta > +tol)] 

170 

171 # --- Determine next event --- 

172 if np.max(l_mat) < 0: 

173 break 

174 

175 secchg, dirchg = np.unravel_index(np.argmax(l_mat), l_mat.shape) 

176 lam = l_mat[secchg, dirchg] 

177 

178 # --- Update free set --- 

179 free = last.free.copy() 

180 free[secchg] = dirchg >= 2 # boundary → IN if dirchg in {2, 3} 

181 

182 # --- Compute new turning point --- 

183 new_weights = r_alpha + lam * r_beta 

184 self._append(TurningPoint(lamb=lam, weights=new_weights, free=free)) 

185 

186 # Final point at lambda = 0 

187 self._append(TurningPoint(lamb=0, weights=r_alpha, free=last.free)) 

188 

189 @staticmethod 

190 def _solve(a: NDArray[np.float64], b: np.ndarray, _in: np.ndarray) -> tuple[np.ndarray, np.ndarray]: 

191 """Solve the system A x = b with some variables fixed. 

192 

193 Args: 

194 a: Coefficient matrix of shape (n, n). 

195 b: Right-hand side matrix of shape (n, 2). 

196 _in: Boolean array of shape (n,) indicating which variables are free. 

197 

198 Returns: 

199 A tuple (alpha, beta) of solutions for the two RHS vectors. 

200 

201 """ 

202 _out = ~_in 

203 n = a.shape[1] 

204 x = np.zeros((n, 2)) 

205 

206 x[_out] = b[_out] # Set fixed variables 

207 reduced_a = a[_in][:, _in] 

208 reduced_b = b[_in] - a[_in][:, _out] @ x[_out] 

209 

210 x[_in] = np.linalg.solve(reduced_a, reduced_b) 

211 

212 return x[:, 0], x[:, 1] 

213 

214 def __len__(self) -> int: 

215 """Get the number of turning points in the efficient frontier. 

216 

217 Returns: 

218 The number of turning points currently stored in the object. 

219 

220 """ 

221 return len(self.turning_points) 

222 

223 def _first_turning_point(self) -> TurningPoint: 

224 """Calculate the first turning point on the efficient frontier. 

225 

226 This method uses the init_algo function to find the first turning point 

227 based on the mean returns and the bounds on asset weights. 

228 

229 Returns: 

230 A TurningPoint object representing the first point on the efficient frontier. 

231 

232 """ 

233 first = init_algo( 

234 mean=self.mean, 

235 lower_bounds=self.lower_bounds, 

236 upper_bounds=self.upper_bounds, 

237 ) 

238 return first 

239 

240 def _append(self, tp: TurningPoint, tol: float | None = None) -> None: 

241 """Append a turning point to the list of turning points. 

242 

243 This method validates that the turning point satisfies the constraints 

244 before adding it to the list. 

245 

246 Args: 

247 tp: The turning point to append. 

248 tol: Tolerance for constraint validation. If None, uses the class's tol attribute. 

249 

250 Raises: 

251 AssertionError: If the turning point violates any constraints. 

252 

253 """ 

254 tol = tol or self.tol 

255 

256 if not np.all(tp.weights >= (self.lower_bounds - tol)): 

257 msg = "Weights below lower bounds" 

258 raise ValueError(msg) 

259 if not np.all(tp.weights <= (self.upper_bounds + tol)): 

260 msg = "Weights above upper bounds" 

261 raise ValueError(msg) 

262 if not np.allclose(np.sum(tp.weights), 1.0): 

263 msg = "Weights do not sum to 1" 

264 raise ValueError(msg) 

265 

266 self.turning_points.append(tp) 

267 

268 @property 

269 def frontier(self) -> Frontier: 

270 """Get the efficient frontier constructed from the turning points. 

271 

272 This property creates a Frontier object from the list of turning points, 

273 which can be used to analyze the risk-return characteristics of the 

274 efficient portfolios. 

275 

276 Returns: 

277 A Frontier object representing the efficient frontier. 

278 

279 """ 

280 return Frontier( 

281 covariance=self.covariance, 

282 mean=self.mean, 

283 frontier=[FrontierPoint(point.weights) for point in self.turning_points], 

284 )