Source code for dccp.problem

"""DCCP solver implementation for Disciplined Convex-Concave Programming."""

import logging
from dataclasses import dataclass, field
from typing import Any

import cvxpy as cp
import numpy as np
from cvxpy.constraints.zero import Equality

from .constraint import convexify_constr
from .initialization import initialize
from .objective import convexify_obj
from .utils import DCCPSettings, NonDCCPError, is_dccp

logger = logging.getLogger("dccp")
logger.setLevel(logging.INFO)


[docs] @dataclass class DCCPIter: """Store results of a DCCP iteration.""" prob: cp.Problem tau: cp.Parameter = field(default_factory=lambda: cp.Parameter(value=0.005)) vars_slack: list[cp.Variable] = field(default_factory=list) k: int = 0 cost: float = np.inf @property def slack(self) -> float: """Get the maximum slack variable value.""" if not self.vars_slack: return 0.0 slack_values = [ np.max(s.value) if s.value is not None else 0.0 for s in self.vars_slack ] return max(slack_values, default=0.0) @property def slack_sum(self) -> float: """Sum of all slack elements.""" return sum( float(np.sum(s.value)) if s.value is not None else 0.0 for s in self.vars_slack ) @property def cost_no_slack(self) -> float: """Objective value minus τ * sum(slack).""" if self.prob.objective.value is None: return np.inf obj_val = float(np.asarray(self.prob.objective.value).item()) tau_val = float(np.asarray(self.tau.value).item()) return obj_val - tau_val * self.slack_sum
[docs] def solve(self, **kwargs: Any) -> float | None: """Solve the DCCP sub-problem.""" self.k += 1 result = self.prob.solve(**kwargs) if isinstance(result, (int, float, np.floating)): self.cost = float(result) return self.cost return None
[docs] class DCCP: """Implementation of the DCCP (Disciplined Convex-Concave Programming) algorithm. The DCCP class provides a systematic approach to solving nonconvex optimization problems that satisfy the DCCP rules. It uses an iterative convex-concave procedure (CCP) to find approximate solutions. Parameters ---------- prob : cp.Problem A CVXPY Problem that satisfies DCCP rules but not DCP rules. settings : DCCPSettings, optional Configuration settings for the DCCP algorithm. If None, default settings are used. **kwargs Additional keyword arguments passed to the underlying solver. Attributes ---------- is_maximization : bool True if the original problem is a maximization problem. prob_in : cp.Problem The original input problem. solve_args : dict Solver arguments passed to subproblems. conf : DCCPSettings Algorithm configuration settings. tau : cp.Parameter Penalty parameter for constraint violations. iter : DCCPIter Current iteration state and results. Raises ------ NonDCCPError If the problem is DCP compliant (should use standard solvers) or if the problem doesn't satisfy DCCP rules. """ def __init__( self, prob: cp.Problem, *, settings: DCCPSettings | None = None, **kwargs: Any, ) -> None: """Initialize the DCCP solver. Parameters ---------- prob : cp.Problem A CVXPY Problem that satisfies DCCP rules. settings : DCCPSettings, optional Configuration settings for the algorithm. **kwargs Additional solver arguments. """ if settings is None: settings = DCCPSettings() if prob.is_dcp(): msg = "The problem is DCP compliant, solve it with a DCP solver." raise NonDCCPError(msg) if settings.verify_dccp and not is_dccp(prob): msg = "Problem is not DCCP." raise NonDCCPError(msg) # store the problem and settings self.is_maximization = isinstance(prob.objective, cp.Maximize) self.prob_in = prob self.solve_args = kwargs.copy() self.conf = settings # slack loss weight tau self.tau = cp.Parameter(nonneg=True, value=settings.tau_ini, name="tau") # construction of DCCP sub-problem init_kwargs = {} if self.conf.k_ccp is not None and self.conf.k_ccp > 1: init_kwargs["random"] = True if self.conf.seed is not None: init_kwargs["seed"] = self.conf.seed init_kwargs["solver"] = kwargs.get("solver") initialize(prob, **init_kwargs) self.iter = DCCPIter(prob=prob, tau=self.tau) self._prev_var_values = {} self._store_previous_values()
[docs] def _apply_damping(self) -> None: """Apply damping to variable values using previous iteration values.""" for var in self.prob_in.variables(): if var.value is not None and var in self._prev_var_values: prev_val = self._prev_var_values[var] cur_val = var.value.copy() damped_step = 0.8 * cur_val + 0.2 * prev_val var.value = damped_step logger.debug( "Suggested value for %s: %s, previous value: %s, damped value: %s", var.name(), cur_val, prev_val, damped_step, )
[docs] def _store_previous_values(self) -> None: """Store current variable values for damping.""" for var in self.prob_in.variables(): if var.value is not None: val = var.value self._prev_var_values[var] = val.copy() if hasattr(val, "copy") else val
[docs] def _construct_subproblem(self) -> None: """Construct the DCCP sub-problem.""" prob = self.prob_in # split non-affine equality constraints constr: list[cp.Constraint] = [] for constraint in prob.constraints: if isinstance(constraint, Equality) and not constraint.is_dcp(): constr.append(constraint.args[0] <= constraint.args[1]) constr.append(constraint.args[0] >= constraint.args[1]) else: constr.append(constraint) # each non-dcp constraint needs a slack variable var_slack: list[cp.Variable] = [] # convexify objective with damping if needed obj = convexify_obj(prob.objective) if not prob.objective.is_dcp(): k_damp = 0 while obj is None and k_damp < self.conf.max_iter_damp: self._apply_damping() obj = convexify_obj(prob.objective) k_damp += 1 if obj is None: msg = ( "Damping did not yield a convexified objective after " f"{self.conf.max_iter_damp} iterations." ) raise NonDCCPError(msg) # add objective domain constraints into problem constraints constr.extend(list(prob.objective.expr.domain)) # build new linearized, convexified constraints new_constr: list[cp.Constraint] = [] for c in constr: if c.is_dcp(): new_constr.append(c) continue # add slack variable for non-convex constraint v_slack = cp.Variable(c.shape, name=f"slack_{c.id}", nonneg=True) var_slack.append(v_slack) # convexify the constraint with damping if needed c_conv = convexify_constr(c) while c_conv is None: self._apply_damping() c_conv = convexify_constr(c) new_constr.extend(list(c_conv.domain)) new_constr.append(c_conv.constr.expr <= v_slack) # build new problem if var_slack: slack_sum = cp.sum([cp.sum(s) for s in var_slack]) cost = obj.expr + self.tau * slack_sum # type: ignore # noqa: PGH003 else: cost = obj.expr # type: ignore # noqa: PGH003 new_prob = cp.Problem(cp.Minimize(cost), new_constr) # Update the existing iter object instead of creating a new one self.iter.prob = new_prob self.iter.vars_slack = var_slack # store previous variable values for damping self._store_previous_values()
[docs] def _solve(self) -> float: """Solve the DCCP problem.""" converged = False prev_cost = np.inf prev_cost_no_slack = np.inf # run DCCP iterations until convergence or max iterations while not (converged or self.iter.k > self.conf.max_iter): self._construct_subproblem() new_cost = self.iter.solve(**self.solve_args) new_cost_no_slack = self.iter.cost_no_slack slack = self.iter.slack # check all convergence criteria if ( new_cost is not None and np.abs(prev_cost - new_cost) <= self.conf.ep and np.abs(prev_cost_no_slack - new_cost_no_slack) <= self.conf.ep and slack <= self.conf.max_slack ): converged = True # update previous values prev_cost = new_cost if new_cost is not None else prev_cost prev_cost_no_slack = ( new_cost_no_slack if new_cost_no_slack is not None else prev_cost_no_slack ) # update tau for the next iteration if self.iter.tau.value is not None: tau_mul = float(self.iter.tau.value) * self.conf.mu self.iter.tau.value = min(self.conf.tau_max, tau_mul) logger.debug( "Iteration %d: cost=%s, cost_no_slack=%s, slack=%s, tau=%s", self.iter.k, self.iter.cost, self.iter.cost_no_slack, self.iter.slack, self.iter.tau.value, ) # terminate with infeasibility if not converged after max iterations self.prob_in._status = cp.INFEASIBLE # noqa: SLF001 # write the solution back to the original problem if converged: self.prob_in._status = cp.OPTIMAL # noqa: SLF001 for var in self.prob_in.variables(): var.value = self.iter.prob.var_dict[var.name()].value return self.iter.cost return self.iter.cost if converged else np.inf
[docs] def __call__(self) -> float: """Solve a problem using the Disciplined Convex-Concave Procedure.""" return self._solve() * (-1 if self.is_maximization else 1)
[docs] def solve_multi_init(self, num_inits: int) -> float: """Solve with multiple random initializations and return the best result.""" if num_inits <= 1: return self() best_cost = np.inf best_var_values = {} best_status = cp.INFEASIBLE for _ in range(num_inits): # store original variable values orig_values = {} for var in self.prob_in.variables(): orig_values[var] = var.value.copy() if var.value is not None else None # reset and solve with new random initialization initialize(self.prob_in, random=True) self.iter.k = 0 self.iter.cost = np.inf self._prev_var_values = {} self._store_previous_values() try: cost = self._solve() if self.prob_in.status == cp.OPTIMAL and cost < best_cost: best_cost = cost best_status = self.prob_in.status best_var_values = { var: var.value.copy() if var.value is not None else None for var in self.prob_in.variables() } except (NonDCCPError, RuntimeError): continue # restore original values for next iteration for var in self.prob_in.variables(): var.value = orig_values[var] # set the best solution self.prob_in._status = best_status # noqa: SLF001 for var in self.prob_in.variables(): if var in best_var_values: var.value = best_var_values[var] return best_cost * (-1 if self.is_maximization else 1)
[docs] def dccp( # noqa: PLR0913 prob: cp.Problem, *, max_iter: int = 100, tau_ini: float = 0.005, mu: float = 1.2, tau_max: float = 1e8, k_ccp: int = 1, max_slack: float = 1e-3, ep: float = 1e-5, seed: int | None = None, verify_dccp: bool = True, **kwargs: Any, ) -> float: """Run the DCCP algorithm on the given problem. This is the main entry point for solving DCCP problems. It creates a DCCP solver instance with the specified settings and solves the problem. Parameters ---------- prob : cp.Problem A CVXPY Problem that satisfies DCCP rules. max_iter : int, default=100 Maximum number of iterations in the CCP algorithm. tau_ini : float, default=0.005 Initial value for tau parameter (trades off constraints vs objective). mu : float, default=1.2 Rate at which tau increases during the algorithm. tau_max : float, default=1e8 Upper bound for tau parameter. k_ccp : int, default=1 Number of random restarts for the CCP algorithm. max_slack : float, default=1e-3 Maximum slack variable value for convergence. ep : float, default=1e-5 Convergence tolerance for objective value changes. seed : int, optional Random seed for reproducible results. verify_dccp : bool, default=True Whether to verify DCCP compliance before solving. **kwargs Additional keyword arguments passed to the underlying solver. Returns ------- float The optimal objective value (or best found value if not converged). Raises ------ NonDCCPError If the problem doesn't satisfy DCCP rules or is already DCP compliant. See Also -------- DCCP : The main DCCP solver class for more advanced usage. DCCPSettings : Configuration object for algorithm parameters. Examples -------- >>> import cvxpy as cp >>> import dccp >>> x = cp.Variable(2) >>> problem = cp.Problem(cp.Maximize(cp.norm(x, 2)), [cp.norm(x, 1) <= 1]) >>> result = dccp.dccp(problem, max_iter=50, k_ccp=3) """ logger.debug("Running DCCP with solver=%s, kwargs=%s", kwargs.get("solver"), kwargs) dccp_solver = DCCP( prob, settings=DCCPSettings( max_iter=max_iter, tau_ini=tau_ini, mu=mu, tau_max=tau_max, k_ccp=k_ccp, max_slack=max_slack, ep=ep, seed=seed, verify_dccp=verify_dccp, ), **kwargs, ) if k_ccp > 1: return dccp_solver.solve_multi_init(k_ccp) return dccp_solver()