Coverage for src/cvxcla/cla.py: 99%

90 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-19 05:48 +0000

1# Copyright 2023 Stanford University Convex Optimization Group 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Markowitz implementation of the Critical Line Algorithm. 

15 

16This module provides the CLA class, which implements the Critical Line Algorithm 

17as described by Harry Markowitz and colleagues. The algorithm computes the entire 

18efficient frontier by finding all turning points, which are the points where the 

19set of assets at their bounds changes. 

20""" 

21 

22import logging 

23from dataclasses import dataclass, field 

24from functools import cached_property 

25 

26import numpy as np 

27from numpy.typing import NDArray 

28 

29from .first import init_algo 

30from .types import Frontier, FrontierPoint, TurningPoint 

31 

32 

33@dataclass(frozen=True) 

34class CLA: 

35 """Critical Line Algorithm implementation based on Markowitz's approach. 

36 

37 This class implements the Critical Line Algorithm as described by Harry Markowitz 

38 and colleagues. It computes the entire efficient frontier by finding all turning 

39 points, which are the points where the set of assets at their bounds changes. 

40 

41 The algorithm starts with the first turning point (the portfolio with the highest 

42 expected return) and then iteratively computes the next turning point with a lower 

43 expected return until it reaches the minimum variance portfolio. 

44 

45 Attributes: 

46 mean: Vector of expected returns for each asset. 

47 covariance: Covariance matrix of asset returns. 

48 lower_bounds: Vector of lower bounds for asset weights. 

49 upper_bounds: Vector of upper bounds for asset weights. 

50 a: Matrix for linear equality constraints (Ax = b). 

51 b: Vector for linear equality constraints (Ax = b). 

52 turning_points: List of turning points on the efficient frontier. 

53 tol: Tolerance for numerical calculations. 

54 logger: Logger instance for logging information and errors. 

55 

56 """ 

57 

58 mean: NDArray[np.float64] 

59 covariance: NDArray[np.float64] 

60 lower_bounds: NDArray[np.float64] 

61 upper_bounds: NDArray[np.float64] 

62 a: NDArray[np.float64] 

63 b: NDArray[np.float64] 

64 turning_points: list[TurningPoint] = field(default_factory=list) 

65 tol: float = 1e-5 

66 logger: logging.Logger = logging.getLogger(__name__) 

67 

68 @cached_property 

69 def proj(self): 

70 """Construct the projection matrix used in computing Lagrange multipliers. 

71 

72 P is formed by horizontally stacking the covariance matrix and the transpose 

73 of the equality constraint matrix A. It is used to compute: 

74 - gamma = P @ alpha 

75 - delta = P @ beta - mean 

76 

77 This step helps identify which constraints are becoming active or inactive. 

78 """ 

79 return np.block([self.covariance, self.a.T]) 

80 

81 @cached_property 

82 def kkt(self): 

83 """Construct the Karush-Kuhn-Tucker (KKT) system matrix. 

84 

85 The KKT matrix is built by augmenting the covariance matrix with the 

86 equality constraints. It forms the linear system: 

87 [Σ Aᵗ] 

88 [A 0 ] 

89 which we solve to get the optimal portfolio weights (alpha) and the 

90 Lagrange multipliers (lambda) corresponding to the constraints. 

91 

92 This matrix is symmetric but not necessarily positive definite. 

93 """ 

94 m = self.a.shape[0] 

95 return np.block([[self.covariance, self.a.T], [self.a, np.zeros((m, m))]]) 

96 

97 def __post_init__(self): 

98 """Initialize the CLA object and compute the efficient frontier. 

99 

100 This method is automatically called after initialization. It computes 

101 the entire efficient frontier by finding all turning points, starting 

102 from the first turning point (highest expected return) and iteratively 

103 computing the next turning point with a lower expected return until 

104 it reaches the minimum variance portfolio. 

105 

106 The algorithm uses a block matrix approach to solve the system of equations 

107 that determine the turning points. 

108 

109 Raises: 

110 AssertionError: If all variables are blocked, which would make the 

111 system of equations singular. 

112 

113 """ 

114 m = self.a.shape[0] 

115 ns = len(self.mean) 

116 

117 # Compute and store the first turning point 

118 self._append(self._first_turning_point()) 

119 

120 lam = np.inf 

121 

122 while lam > 0: 

123 last = self.turning_points[-1] 

124 

125 # --- Identify active set --- 

126 blocked = ~last.free 

127 assert not np.all(blocked), "All variables cannot be blocked" 

128 

129 at_upper = blocked & np.isclose(last.weights, self.upper_bounds) 

130 at_lower = blocked & np.isclose(last.weights, self.lower_bounds) 

131 

132 _out = at_upper | at_lower 

133 _in = ~_out 

134 

135 # --- Construct RHS for KKT system --- 

136 fixed_weights = np.zeros(ns) 

137 fixed_weights[at_upper] = self.upper_bounds[at_upper] 

138 fixed_weights[at_lower] = self.lower_bounds[at_lower] 

139 

140 adjusted_mean = self.mean.copy() 

141 adjusted_mean[_out] = 0.0 

142 

143 free_mask = np.concatenate([_in, np.ones(m, dtype=bool)]) 

144 rhs_alpha = np.concatenate([fixed_weights, self.b]) 

145 rhs_beta = np.concatenate([adjusted_mean, np.zeros(m)]) 

146 rhs = np.column_stack([rhs_alpha, rhs_beta]) 

147 

148 # --- Solve KKT system --- 

149 alpha, beta = CLA._solve(self.kkt, rhs, free_mask) 

150 

151 # --- Compute Lagrange multipliers and directional derivatives --- 

152 gamma = self.proj @ alpha 

153 delta = self.proj @ beta - self.mean 

154 

155 # --- Compute event ratios --- 

156 l_mat = np.full((ns, 4), -np.inf) 

157 r_alpha, r_beta = alpha[:ns], beta[:ns] 

158 tol = self.tol 

159 

160 l_mat[_in & (r_beta < -tol), 0] = ( 

161 self.upper_bounds[_in & (r_beta < -tol)] - r_alpha[_in & (r_beta < -tol)] 

162 ) / r_beta[_in & (r_beta < -tol)] 

163 l_mat[_in & (r_beta > +tol), 1] = ( 

164 self.lower_bounds[_in & (r_beta > +tol)] - r_alpha[_in & (r_beta > +tol)] 

165 ) / r_beta[_in & (r_beta > +tol)] 

166 l_mat[at_upper & (delta < -tol), 2] = -gamma[at_upper & (delta < -tol)] / delta[at_upper & (delta < -tol)] 

167 l_mat[at_lower & (delta > +tol), 3] = -gamma[at_lower & (delta > +tol)] / delta[at_lower & (delta > +tol)] 

168 

169 # --- Determine next event --- 

170 if np.max(l_mat) < 0: 

171 break 

172 

173 secchg, dirchg = np.unravel_index(np.argmax(l_mat), l_mat.shape) 

174 lam = l_mat[secchg, dirchg] 

175 

176 # --- Update free set --- 

177 free = last.free.copy() 

178 free[secchg] = dirchg >= 2 # boundary → IN if dirchg in {2, 3} 

179 

180 # --- Compute new turning point --- 

181 new_weights = r_alpha + lam * r_beta 

182 self._append(TurningPoint(lamb=lam, weights=new_weights, free=free)) 

183 

184 # Final point at lambda = 0 

185 self._append(TurningPoint(lamb=0, weights=r_alpha, free=last.free)) 

186 

187 @staticmethod 

188 def _solve(a: NDArray[np.float64], b: np.ndarray, _in: np.ndarray) -> tuple[np.ndarray, np.ndarray]: 

189 """Solve the system A x = b with some variables fixed. 

190 

191 Args: 

192 a: Coefficient matrix of shape (n, n). 

193 b: Right-hand side matrix of shape (n, 2). 

194 _in: Boolean array of shape (n,) indicating which variables are free. 

195 

196 Returns: 

197 A tuple (alpha, beta) of solutions for the two RHS vectors. 

198 

199 """ 

200 _out = ~_in 

201 n = a.shape[1] 

202 x = np.zeros((n, 2)) 

203 

204 x[_out] = b[_out] # Set fixed variables 

205 reduced_a = a[_in][:, _in] 

206 reduced_b = b[_in] - a[_in][:, _out] @ x[_out] 

207 

208 x[_in] = np.linalg.solve(reduced_a, reduced_b) 

209 

210 return x[:, 0], x[:, 1] 

211 

212 def __len__(self) -> int: 

213 """Get the number of turning points in the efficient frontier. 

214 

215 Returns: 

216 The number of turning points currently stored in the object. 

217 

218 """ 

219 return len(self.turning_points) 

220 

221 def _first_turning_point(self) -> TurningPoint: 

222 """Calculate the first turning point on the efficient frontier. 

223 

224 This method uses the init_algo function to find the first turning point 

225 based on the mean returns and the bounds on asset weights. 

226 

227 Returns: 

228 A TurningPoint object representing the first point on the efficient frontier. 

229 

230 """ 

231 first = init_algo( 

232 mean=self.mean, 

233 lower_bounds=self.lower_bounds, 

234 upper_bounds=self.upper_bounds, 

235 ) 

236 return first 

237 

238 def _append(self, tp: TurningPoint, tol: float | None = None) -> None: 

239 """Append a turning point to the list of turning points. 

240 

241 This method validates that the turning point satisfies the constraints 

242 before adding it to the list. 

243 

244 Args: 

245 tp: The turning point to append. 

246 tol: Tolerance for constraint validation. If None, uses the class's tol attribute. 

247 

248 Raises: 

249 AssertionError: If the turning point violates any constraints. 

250 

251 """ 

252 tol = tol or self.tol 

253 

254 assert np.all(tp.weights >= (self.lower_bounds - tol)), f"{(tp.weights + tol) - self.lower_bounds}" 

255 assert np.all(tp.weights <= (self.upper_bounds + tol)), f"{(self.upper_bounds + tol) - tp.weights}" 

256 assert np.allclose(np.sum(tp.weights), 1.0), f"{np.sum(tp.weights)}" 

257 

258 self.turning_points.append(tp) 

259 

260 @property 

261 def frontier(self) -> Frontier: 

262 """Get the efficient frontier constructed from the turning points. 

263 

264 This property creates a Frontier object from the list of turning points, 

265 which can be used to analyze the risk-return characteristics of the 

266 efficient portfolios. 

267 

268 Returns: 

269 A Frontier object representing the efficient frontier. 

270 

271 """ 

272 return Frontier( 

273 covariance=self.covariance, 

274 mean=self.mean, 

275 frontier=[FrontierPoint(point.weights) for point in self.turning_points], 

276 )