Coverage for src/cvxcla/cla.py: 99%
90 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 05:48 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 05:48 +0000
1# Copyright 2023 Stanford University Convex Optimization Group
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Markowitz implementation of the Critical Line Algorithm.
16This module provides the CLA class, which implements the Critical Line Algorithm
17as described by Harry Markowitz and colleagues. The algorithm computes the entire
18efficient frontier by finding all turning points, which are the points where the
19set of assets at their bounds changes.
20"""
22import logging
23from dataclasses import dataclass, field
24from functools import cached_property
26import numpy as np
27from numpy.typing import NDArray
29from .first import init_algo
30from .types import Frontier, FrontierPoint, TurningPoint
33@dataclass(frozen=True)
34class CLA:
35 """Critical Line Algorithm implementation based on Markowitz's approach.
37 This class implements the Critical Line Algorithm as described by Harry Markowitz
38 and colleagues. It computes the entire efficient frontier by finding all turning
39 points, which are the points where the set of assets at their bounds changes.
41 The algorithm starts with the first turning point (the portfolio with the highest
42 expected return) and then iteratively computes the next turning point with a lower
43 expected return until it reaches the minimum variance portfolio.
45 Attributes:
46 mean: Vector of expected returns for each asset.
47 covariance: Covariance matrix of asset returns.
48 lower_bounds: Vector of lower bounds for asset weights.
49 upper_bounds: Vector of upper bounds for asset weights.
50 a: Matrix for linear equality constraints (Ax = b).
51 b: Vector for linear equality constraints (Ax = b).
52 turning_points: List of turning points on the efficient frontier.
53 tol: Tolerance for numerical calculations.
54 logger: Logger instance for logging information and errors.
56 """
58 mean: NDArray[np.float64]
59 covariance: NDArray[np.float64]
60 lower_bounds: NDArray[np.float64]
61 upper_bounds: NDArray[np.float64]
62 a: NDArray[np.float64]
63 b: NDArray[np.float64]
64 turning_points: list[TurningPoint] = field(default_factory=list)
65 tol: float = 1e-5
66 logger: logging.Logger = logging.getLogger(__name__)
68 @cached_property
69 def proj(self):
70 """Construct the projection matrix used in computing Lagrange multipliers.
72 P is formed by horizontally stacking the covariance matrix and the transpose
73 of the equality constraint matrix A. It is used to compute:
74 - gamma = P @ alpha
75 - delta = P @ beta - mean
77 This step helps identify which constraints are becoming active or inactive.
78 """
79 return np.block([self.covariance, self.a.T])
81 @cached_property
82 def kkt(self):
83 """Construct the Karush-Kuhn-Tucker (KKT) system matrix.
85 The KKT matrix is built by augmenting the covariance matrix with the
86 equality constraints. It forms the linear system:
87 [Σ Aᵗ]
88 [A 0 ]
89 which we solve to get the optimal portfolio weights (alpha) and the
90 Lagrange multipliers (lambda) corresponding to the constraints.
92 This matrix is symmetric but not necessarily positive definite.
93 """
94 m = self.a.shape[0]
95 return np.block([[self.covariance, self.a.T], [self.a, np.zeros((m, m))]])
97 def __post_init__(self):
98 """Initialize the CLA object and compute the efficient frontier.
100 This method is automatically called after initialization. It computes
101 the entire efficient frontier by finding all turning points, starting
102 from the first turning point (highest expected return) and iteratively
103 computing the next turning point with a lower expected return until
104 it reaches the minimum variance portfolio.
106 The algorithm uses a block matrix approach to solve the system of equations
107 that determine the turning points.
109 Raises:
110 AssertionError: If all variables are blocked, which would make the
111 system of equations singular.
113 """
114 m = self.a.shape[0]
115 ns = len(self.mean)
117 # Compute and store the first turning point
118 self._append(self._first_turning_point())
120 lam = np.inf
122 while lam > 0:
123 last = self.turning_points[-1]
125 # --- Identify active set ---
126 blocked = ~last.free
127 assert not np.all(blocked), "All variables cannot be blocked"
129 at_upper = blocked & np.isclose(last.weights, self.upper_bounds)
130 at_lower = blocked & np.isclose(last.weights, self.lower_bounds)
132 _out = at_upper | at_lower
133 _in = ~_out
135 # --- Construct RHS for KKT system ---
136 fixed_weights = np.zeros(ns)
137 fixed_weights[at_upper] = self.upper_bounds[at_upper]
138 fixed_weights[at_lower] = self.lower_bounds[at_lower]
140 adjusted_mean = self.mean.copy()
141 adjusted_mean[_out] = 0.0
143 free_mask = np.concatenate([_in, np.ones(m, dtype=bool)])
144 rhs_alpha = np.concatenate([fixed_weights, self.b])
145 rhs_beta = np.concatenate([adjusted_mean, np.zeros(m)])
146 rhs = np.column_stack([rhs_alpha, rhs_beta])
148 # --- Solve KKT system ---
149 alpha, beta = CLA._solve(self.kkt, rhs, free_mask)
151 # --- Compute Lagrange multipliers and directional derivatives ---
152 gamma = self.proj @ alpha
153 delta = self.proj @ beta - self.mean
155 # --- Compute event ratios ---
156 l_mat = np.full((ns, 4), -np.inf)
157 r_alpha, r_beta = alpha[:ns], beta[:ns]
158 tol = self.tol
160 l_mat[_in & (r_beta < -tol), 0] = (
161 self.upper_bounds[_in & (r_beta < -tol)] - r_alpha[_in & (r_beta < -tol)]
162 ) / r_beta[_in & (r_beta < -tol)]
163 l_mat[_in & (r_beta > +tol), 1] = (
164 self.lower_bounds[_in & (r_beta > +tol)] - r_alpha[_in & (r_beta > +tol)]
165 ) / r_beta[_in & (r_beta > +tol)]
166 l_mat[at_upper & (delta < -tol), 2] = -gamma[at_upper & (delta < -tol)] / delta[at_upper & (delta < -tol)]
167 l_mat[at_lower & (delta > +tol), 3] = -gamma[at_lower & (delta > +tol)] / delta[at_lower & (delta > +tol)]
169 # --- Determine next event ---
170 if np.max(l_mat) < 0:
171 break
173 secchg, dirchg = np.unravel_index(np.argmax(l_mat), l_mat.shape)
174 lam = l_mat[secchg, dirchg]
176 # --- Update free set ---
177 free = last.free.copy()
178 free[secchg] = dirchg >= 2 # boundary → IN if dirchg in {2, 3}
180 # --- Compute new turning point ---
181 new_weights = r_alpha + lam * r_beta
182 self._append(TurningPoint(lamb=lam, weights=new_weights, free=free))
184 # Final point at lambda = 0
185 self._append(TurningPoint(lamb=0, weights=r_alpha, free=last.free))
187 @staticmethod
188 def _solve(a: NDArray[np.float64], b: np.ndarray, _in: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
189 """Solve the system A x = b with some variables fixed.
191 Args:
192 a: Coefficient matrix of shape (n, n).
193 b: Right-hand side matrix of shape (n, 2).
194 _in: Boolean array of shape (n,) indicating which variables are free.
196 Returns:
197 A tuple (alpha, beta) of solutions for the two RHS vectors.
199 """
200 _out = ~_in
201 n = a.shape[1]
202 x = np.zeros((n, 2))
204 x[_out] = b[_out] # Set fixed variables
205 reduced_a = a[_in][:, _in]
206 reduced_b = b[_in] - a[_in][:, _out] @ x[_out]
208 x[_in] = np.linalg.solve(reduced_a, reduced_b)
210 return x[:, 0], x[:, 1]
212 def __len__(self) -> int:
213 """Get the number of turning points in the efficient frontier.
215 Returns:
216 The number of turning points currently stored in the object.
218 """
219 return len(self.turning_points)
221 def _first_turning_point(self) -> TurningPoint:
222 """Calculate the first turning point on the efficient frontier.
224 This method uses the init_algo function to find the first turning point
225 based on the mean returns and the bounds on asset weights.
227 Returns:
228 A TurningPoint object representing the first point on the efficient frontier.
230 """
231 first = init_algo(
232 mean=self.mean,
233 lower_bounds=self.lower_bounds,
234 upper_bounds=self.upper_bounds,
235 )
236 return first
238 def _append(self, tp: TurningPoint, tol: float | None = None) -> None:
239 """Append a turning point to the list of turning points.
241 This method validates that the turning point satisfies the constraints
242 before adding it to the list.
244 Args:
245 tp: The turning point to append.
246 tol: Tolerance for constraint validation. If None, uses the class's tol attribute.
248 Raises:
249 AssertionError: If the turning point violates any constraints.
251 """
252 tol = tol or self.tol
254 assert np.all(tp.weights >= (self.lower_bounds - tol)), f"{(tp.weights + tol) - self.lower_bounds}"
255 assert np.all(tp.weights <= (self.upper_bounds + tol)), f"{(self.upper_bounds + tol) - tp.weights}"
256 assert np.allclose(np.sum(tp.weights), 1.0), f"{np.sum(tp.weights)}"
258 self.turning_points.append(tp)
260 @property
261 def frontier(self) -> Frontier:
262 """Get the efficient frontier constructed from the turning points.
264 This property creates a Frontier object from the list of turning points,
265 which can be used to analyze the risk-return characteristics of the
266 efficient portfolios.
268 Returns:
269 A Frontier object representing the efficient frontier.
271 """
272 return Frontier(
273 covariance=self.covariance,
274 mean=self.mean,
275 frontier=[FrontierPoint(point.weights) for point in self.turning_points],
276 )