Coverage for src / cvxcla / cla.py: 100%
98 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 01:26 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 01:26 +0000
1# Copyright 2023 Stanford University Convex Optimization Group
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Markowitz implementation of the Critical Line Algorithm.
16This module provides the CLA class, which implements the Critical Line Algorithm
17as described by Harry Markowitz and colleagues. The algorithm computes the entire
18efficient frontier by finding all turning points, which are the points where the
19set of assets at their bounds changes.
20"""
22import logging
23from dataclasses import dataclass, field
24from functools import cached_property
26import numpy as np
27from numpy.typing import NDArray
29from .first import init_algo
30from .types import Frontier, FrontierPoint, TurningPoint
33@dataclass(frozen=True)
34class CLA:
35 """Critical Line Algorithm implementation based on Markowitz's approach.
37 This class implements the Critical Line Algorithm as described by Harry Markowitz
38 and colleagues. It computes the entire efficient frontier by finding all turning
39 points, which are the points where the set of assets at their bounds changes.
41 The algorithm starts with the first turning point (the portfolio with the highest
42 expected return) and then iteratively computes the next turning point with a lower
43 expected return until it reaches the minimum variance portfolio.
45 Attributes:
46 mean: Vector of expected returns for each asset.
47 covariance: Covariance matrix of asset returns.
48 lower_bounds: Vector of lower bounds for asset weights.
49 upper_bounds: Vector of upper bounds for asset weights.
50 a: Matrix for linear equality constraints (Ax = b).
51 b: Vector for linear equality constraints (Ax = b).
52 turning_points: List of turning points on the efficient frontier.
53 tol: Tolerance for numerical calculations.
54 logger: Logger instance for logging information and errors.
56 """
58 mean: NDArray[np.float64]
59 covariance: NDArray[np.float64]
60 lower_bounds: NDArray[np.float64]
61 upper_bounds: NDArray[np.float64]
62 a: NDArray[np.float64]
63 b: NDArray[np.float64]
64 turning_points: list[TurningPoint] = field(default_factory=list)
65 tol: float = 1e-5
66 logger: logging.Logger = field(default_factory=lambda: logging.getLogger(__name__))
68 @cached_property
69 def proj(self) -> np.ndarray:
70 """Construct the projection matrix used in computing Lagrange multipliers.
72 P is formed by horizontally stacking the covariance matrix and the transpose
73 of the equality constraint matrix A. It is used to compute:
74 - gamma = P @ alpha
75 - delta = P @ beta - mean
77 This step helps identify which constraints are becoming active or inactive.
78 """
79 return np.block([self.covariance, self.a.T])
81 @cached_property
82 def kkt(self) -> np.ndarray:
83 """Construct the Karush-Kuhn-Tucker (KKT) system matrix.
85 The KKT matrix is built by augmenting the covariance matrix with the
86 equality constraints. It forms the linear system:
87 [Σ Aᵗ]
88 [A 0 ]
89 which we solve to get the optimal portfolio weights (alpha) and the
90 Lagrange multipliers (lambda) corresponding to the constraints.
92 This matrix is symmetric but not necessarily positive definite.
93 """
94 m = self.a.shape[0]
95 return np.block([[self.covariance, self.a.T], [self.a, np.zeros((m, m))]])
97 def __post_init__(self) -> None:
98 """Initialize the CLA object and compute the efficient frontier.
100 This method is automatically called after initialization. It computes
101 the entire efficient frontier by finding all turning points, starting
102 from the first turning point (highest expected return) and iteratively
103 computing the next turning point with a lower expected return until
104 it reaches the minimum variance portfolio.
106 The algorithm uses a block matrix approach to solve the system of equations
107 that determine the turning points.
109 Raises:
110 AssertionError: If all variables are blocked, which would make the
111 system of equations singular.
113 """
114 m = self.a.shape[0]
115 ns = len(self.mean)
117 # Compute and store the first turning point
118 self._append(self._first_turning_point())
120 lam = np.inf
122 while lam > 0:
123 last = self.turning_points[-1]
125 # --- Identify active set ---
126 blocked = ~last.free
127 if np.all(blocked):
128 msg = "All variables cannot be blocked"
129 raise RuntimeError(msg)
131 at_upper = blocked & np.isclose(last.weights, self.upper_bounds)
132 at_lower = blocked & np.isclose(last.weights, self.lower_bounds)
134 _out = at_upper | at_lower
135 _in = ~_out
137 # --- Construct RHS for KKT system ---
138 fixed_weights = np.zeros(ns)
139 fixed_weights[at_upper] = self.upper_bounds[at_upper]
140 fixed_weights[at_lower] = self.lower_bounds[at_lower]
142 adjusted_mean = self.mean.copy()
143 adjusted_mean[_out] = 0.0
145 free_mask = np.concatenate([_in, np.ones(m, dtype=bool)])
146 rhs_alpha = np.concatenate([fixed_weights, self.b])
147 rhs_beta = np.concatenate([adjusted_mean, np.zeros(m)])
148 rhs = np.column_stack([rhs_alpha, rhs_beta])
150 # --- Solve KKT system ---
151 alpha, beta = CLA._solve(self.kkt, rhs, free_mask)
153 # --- Compute Lagrange multipliers and directional derivatives ---
154 gamma = self.proj @ alpha
155 delta = self.proj @ beta - self.mean
157 # --- Compute event ratios ---
158 l_mat = np.full((ns, 4), -np.inf)
159 r_alpha, r_beta = alpha[:ns], beta[:ns]
160 tol = self.tol
162 l_mat[_in & (r_beta < -tol), 0] = (
163 self.upper_bounds[_in & (r_beta < -tol)] - r_alpha[_in & (r_beta < -tol)]
164 ) / r_beta[_in & (r_beta < -tol)]
165 l_mat[_in & (r_beta > +tol), 1] = (
166 self.lower_bounds[_in & (r_beta > +tol)] - r_alpha[_in & (r_beta > +tol)]
167 ) / r_beta[_in & (r_beta > +tol)]
168 l_mat[at_upper & (delta < -tol), 2] = -gamma[at_upper & (delta < -tol)] / delta[at_upper & (delta < -tol)]
169 l_mat[at_lower & (delta > +tol), 3] = -gamma[at_lower & (delta > +tol)] / delta[at_lower & (delta > +tol)]
171 # --- Determine next event ---
172 if np.max(l_mat) < 0:
173 break
175 secchg, dirchg = np.unravel_index(np.argmax(l_mat), l_mat.shape)
176 lam = l_mat[secchg, dirchg]
178 # --- Update free set ---
179 free = last.free.copy()
180 free[secchg] = dirchg >= 2 # boundary → IN if dirchg in {2, 3}
182 # --- Compute new turning point ---
183 new_weights = r_alpha + lam * r_beta
184 self._append(TurningPoint(lamb=lam, weights=new_weights, free=free))
186 # Final point at lambda = 0
187 self._append(TurningPoint(lamb=0, weights=r_alpha, free=last.free))
189 @staticmethod
190 def _solve(a: NDArray[np.float64], b: np.ndarray, _in: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
191 """Solve the system A x = b with some variables fixed.
193 Args:
194 a: Coefficient matrix of shape (n, n).
195 b: Right-hand side matrix of shape (n, 2).
196 _in: Boolean array of shape (n,) indicating which variables are free.
198 Returns:
199 A tuple (alpha, beta) of solutions for the two RHS vectors.
201 """
202 _out = ~_in
203 n = a.shape[1]
204 x = np.zeros((n, 2))
206 x[_out] = b[_out] # Set fixed variables
207 reduced_a = a[_in][:, _in]
208 reduced_b = b[_in] - a[_in][:, _out] @ x[_out]
210 x[_in] = np.linalg.solve(reduced_a, reduced_b)
212 return x[:, 0], x[:, 1]
214 def __len__(self) -> int:
215 """Get the number of turning points in the efficient frontier.
217 Returns:
218 The number of turning points currently stored in the object.
220 """
221 return len(self.turning_points)
223 def _first_turning_point(self) -> TurningPoint:
224 """Calculate the first turning point on the efficient frontier.
226 This method uses the init_algo function to find the first turning point
227 based on the mean returns and the bounds on asset weights.
229 Returns:
230 A TurningPoint object representing the first point on the efficient frontier.
232 """
233 first = init_algo(
234 mean=self.mean,
235 lower_bounds=self.lower_bounds,
236 upper_bounds=self.upper_bounds,
237 )
238 return first
240 def _append(self, tp: TurningPoint, tol: float | None = None) -> None:
241 """Append a turning point to the list of turning points.
243 This method validates that the turning point satisfies the constraints
244 before adding it to the list.
246 Args:
247 tp: The turning point to append.
248 tol: Tolerance for constraint validation. If None, uses the class's tol attribute.
250 Raises:
251 AssertionError: If the turning point violates any constraints.
253 """
254 tol = tol or self.tol
256 if not np.all(tp.weights >= (self.lower_bounds - tol)):
257 msg = "Weights below lower bounds"
258 raise ValueError(msg)
259 if not np.all(tp.weights <= (self.upper_bounds + tol)):
260 msg = "Weights above upper bounds"
261 raise ValueError(msg)
262 if not np.allclose(np.sum(tp.weights), 1.0):
263 msg = "Weights do not sum to 1"
264 raise ValueError(msg)
266 self.turning_points.append(tp)
268 @property
269 def frontier(self) -> Frontier:
270 """Get the efficient frontier constructed from the turning points.
272 This property creates a Frontier object from the list of turning points,
273 which can be used to analyze the risk-return characteristics of the
274 efficient portfolios.
276 Returns:
277 A Frontier object representing the efficient frontier.
279 """
280 return Frontier(
281 covariance=self.covariance,
282 mean=self.mean,
283 frontier=[FrontierPoint(point.weights) for point in self.turning_points],
284 )