Source code for beluga.continuation.continuation

import abc
import copy
import time

import numpy as np
import logging
import inspect
import itertools
import functools
import sys

from tqdm import tqdm

from beluga.symbolic import Problem
from beluga.symbolic.data_classes.components_structures import getattr_from_list
from beluga.utils.logging import logger, fit_string


def gamma_norm(const1, const2):
    norm = sum(abs(const2 - const1))
    return norm


[docs]class ContinuationList(list): def __init__(self): list.__init__(self) # Create list of available strategies clsmembers = inspect.getmembers(sys.modules[__name__], inspect.isclass) self.strategy_list = { obj.strategy_name: obj for (name, obj) in clsmembers if hasattr(obj, 'strategy_name') }
[docs] def add_step(self, strategy='manual', *args, **kwargs): """ Adds a continuation step with specified strategy Returns the strategy object to further chain calls into it """ # Create object if strategy is specified as a string if isinstance(strategy, str): if strategy in self.strategy_list: strategy_obj = self.strategy_list[strategy](*args, **kwargs) else: logging.error('Invalid strategy name') raise ValueError('Continuation strategy : '+strategy+' not found.') else: strategy_obj = strategy self.append(strategy_obj) return strategy_obj
[docs]class ContinuationVariable(object): """ Class containing information for a continuation variable. """ def __init__(self, name, target): self.name = name self.target = target self.value = np.nan self.steps = [] def __str__(self): return self.name + ' -> ' + '{:.3g}'.format(self.target)
class ContinuationStrategy(abc.ABC): def __init__(self, *args, **kwargs): self.solution_reference = None self.gammas = [] self.vars = {} self.ctr = None self.bvp = None self.constant_names = [] def __str__(self): return str(self.vars) def __iter__(self): """Define class as being iterable""" return self def __next__(self): return self.next() def __len__(self): return self.num_cases() def add_gamma(self, gamma): self.gammas.append(copy.deepcopy(gamma)) def get_closest_gamma(self, const): norms = [] for traj in self.gammas: if traj.converged is False: norms.append(9e99) else: norms.append(gamma_norm(traj.const, const)) return np.argmin(norms) def init(self, gamma, bvp): self.bvp = bvp # for var_name in self.var_iterator(): # if var_name not in [str(s['symbol']) for s in self.prob.get_constants()]: # raise ValueError('Variable ' + var_name + ' not found in boundary value problem') gamma_in = copy.deepcopy(gamma) gamma_in.converged = False self.gammas = [gamma_in] self.constant_names = getattr_from_list(bvp.constants, 'name') def next(self, ignore_last_step=False): if len(self.gammas) == 0: raise ValueError('No boundary value problem associated with this object') if not ignore_last_step and len(self.gammas) != 1 and not self.gammas[-1].converged: logging.error('The last step did not converge!') raise RuntimeError('Solution diverged! Stopping.') if self.ctr >= self.num_cases(): raise StopIteration # Update auxiliary variables using previously calculated step sizes total_change = 0.0 const0 = copy.deepcopy(self.gammas[-1].const) for var_name in self.vars: jj = self.constant_names.index(var_name) const0[jj] = self.vars[var_name].steps[self.ctr] total_change += abs(self.vars[var_name].steps[self.ctr]) i = int(self.get_closest_gamma(const0)) gamma_guess = copy.deepcopy(self.gammas[i]) gamma_guess.const = const0 self.ctr += 1 logger.debug('CHOOSE\ttraj #' + str(i) + ' as guess.') return gamma_guess def var_iterator(self): for var_name in self.vars.keys(): yield var_name def num_cases(self): pass
[docs]class ManualStrategy(ContinuationStrategy): """ Class defining the manual continuation strategy. """ # A unique short name to select this class strategy_name = 'manual' def __init__(self, num_cases=1, _vars=None): # TODO change vars to other variable name to avoid overwriting super(ManualStrategy, self).__init__() self._num_cases = num_cases self._spacing = 'linear' if _vars is None: self.vars = {} else: self.vars = _vars # dictionary of values self.ctr = 0 # iteration counter self.last_sol = None self.const = functools.partial(self.set, param_type='const') self.orig_num_cases = num_cases self.constant = self.const def __str__(self): return ', '.join(str(continuationVariable) for continuationVariable in self.vars.values())
[docs] def reset(self): """Resets the internal step counter to zero""" self.ctr = 0 self.last_sol = None
[docs] def clear(self): """Clears all the previously set continuation variables""" self.vars = {} self.reset()
# TODO: Change to store only step-size and use yield def init(self, sol, bvp): super(ManualStrategy, self).init(sol, bvp) # Iterate through all types of variables for var_name in self.var_iterator(): if var_name not in self.constant_names: raise ValueError('Variable ' + var_name + ' not found in boundary value problem') # Set current value of each continuation variable jj = self.constant_names.index(var_name) self.vars[var_name].value = sol.const[jj] # Calculate update steps for continuation process if self._spacing == 'linear': self.vars[var_name].steps = np.linspace(self.vars[var_name].value, self.vars[var_name].target, self._num_cases) elif self._spacing == 'log': self.vars[var_name].steps = np.logspace(np.log10(self.vars[var_name].value), np.log10(self.vars[var_name].target), self._num_cases)
[docs] def set(self, name, target, param_type): """ Sets the target value for the specified parameter """ # Create continuation variable object self.vars[name] = ContinuationVariable(name, target) return self
def num_cases(self, num_cases=None, spacing='linear'): if num_cases is None: return self._num_cases if self.ctr > 0: raise RuntimeError('Cannot set num_cases during iteration') self._num_cases = num_cases self.orig_num_cases = num_cases self._spacing = spacing return self
[docs]class BisectionStrategy(ManualStrategy): """ Defines the bisection continuation strategy. """ strategy_name = 'bisection' def __init__(self, initial_num_cases=5, max_divisions=10, num_divisions=2): super(BisectionStrategy, self).__init__(num_cases=initial_num_cases) self.last_sol = None self.num_divisions = num_divisions self.max_divisions = max_divisions self.division_ctr = 0 self.orig_num_cases = initial_num_cases
[docs] def next(self, ignore_last_step=False): """Generator class to create BVPs for the continuation step iterations last_converged: Specfies if the previous continuation step converged """ # If it is the first step or if previous step converged # continue with usual behavior if self.ctr == 0 or self.gammas[-1].converged: # Reset division counter self.division_ctr = 0 return super(BisectionStrategy, self).next() # If first step didn't converge, the process fails if self.ctr == 1: logging.error('Initial guess should converge for automated continuation to work.') raise RuntimeError('Initial guess does not converge.') # return self.gammas[-1] if self.division_ctr > self.max_divisions: logging.error('Solution does not without exceeding max_divisions : '+str(self.max_divisions)) raise RuntimeError('Exceeded max_divisions') # If previous step did not converge, move back a half step for var_name in self.var_iterator(): old_steps = self.vars[var_name].steps if self._spacing == 'linear': new_steps = np.linspace(old_steps[self.ctr-2], old_steps[self.ctr-1], self.num_divisions+1) elif self._spacing == 'log': new_steps = np.logspace(np.log10(old_steps[self.ctr-2]), np.log10(old_steps[self.ctr-1]), self.num_divisions+1) else: raise ValueError('Invalid spacing type') # Insert new steps self.vars[var_name].steps = np.insert( self.vars[var_name].steps, self.ctr-1, new_steps[1:-1] # Ignore first element as it is repeated ) # Move the counter back self.ctr = self.ctr - 1 # Increment total number of steps self._num_cases = self._num_cases + self.num_divisions-1 logger.debug('Increasing number of cases to '+str(self._num_cases)+'\n') self.division_ctr = self.division_ctr + 1 return super(BisectionStrategy, self).next(True)
class ProductStrategy(ContinuationStrategy): """ Defines the bisection continuation strategy. """ strategy_name = 'productspace' def __init__(self, num_subdivisions=1, sol=None): ContinuationStrategy.__init__(self) self.sol = sol self.sols = [] self._num_cases = None self._num_subdivisions = num_subdivisions self.division_ctr = 0 self.vars = {} # dictionary of values self.ctr = 0 # iteration counter self.const = functools.partial(self.set, param_type='const') self.constant = self.const self.last_sol = None self.orig_num_cases = None def __str__(self): return str(self.vars) def next(self, ignore_last_step=False): if self.ctr == 0 or self.gammas[-1].converged: # Reset division counter self.division_ctr = 0 return super(ProductStrategy, self).next(False) if self.ctr == 1: logging.error('Initial guess should converge for automated continuation to work.') raise RuntimeError('Initial guess does not converge.') return super(ProductStrategy, self).next(True) def reset(self): """Resets the internal step counter to zero""" self.ctr = 0 self.last_sol = None def set(self, name, target, param_type): """ Sets the target value for the specified parameter """ self.vars[name] = ContinuationVariable(name, target) return self def init(self, sol, bvp): super(ProductStrategy, self).init(sol, bvp) num_vars = len(self.vars) self.num_cases(self.num_subdivisions() ** num_vars) for var_name in self.vars.keys(): jj = getattr_from_list(bvp.constants, 'name').index(var_name) self.vars[var_name].value = sol.const[jj] ls_set = [np.linspace(self.vars[var_name].value, self.vars[var_name].target, self._num_subdivisions) for var_name in self.vars.keys()] for val in itertools.product(*ls_set): ii = 0 for var_name in self.vars.keys(): self.vars[var_name].steps.append(val[ii]) ii += 1 def num_cases(self, num_cases=None): if num_cases is None: return self._num_cases else: if self.ctr > 0: raise RuntimeError('Cannot set num_cases during iteration') self._num_cases = num_cases self.orig_num_cases = num_cases return self def num_subdivisions(self, num_subdivisions=None): if num_subdivisions is None: return self._num_subdivisions else: if self.ctr > 0: raise RuntimeError('Cannot set num_cases during iteration') self._num_subdivisions = num_subdivisions return self class SparseProductStrategy(ContinuationStrategy): """ Defines the bisection continuation strategy. """ strategy_name = 'sparse_product' def __init__(self, num_subdivisions=((1,), (0,)), sol=None): ContinuationStrategy.__init__(self) self.sol = sol self.sols = [] self._num_cases = None self._num_subdivisions = num_subdivisions self.division_ctr = 0 self.vars = {} # dictionary of values self.ctr = 0 # iteration counter self.const = functools.partial(self.set, param_type='const') self.constant = self.const self.last_sol = None self.orig_num_cases = None self.case_list = [] def __str__(self): return str(self.vars) def next(self, ignore_last_step=False): if self.ctr == 0 or self.gammas[-1].converged: # Reset division counter self.division_ctr = 0 return super(SparseProductStrategy, self).next(False) if self.ctr == 1: logging.error('Initial guess should converge for automated continuation to work.') raise RuntimeError('Initial guess does not converge.') return super(SparseProductStrategy, self).next(True) def reset(self): """Resets the internal step counter to zero""" self.ctr = 0 self.last_sol = None def set(self, name, target, param_type): """ Sets the target value for the specified parameter """ self.vars[name] = ContinuationVariable(name, target) return self def init(self, sol, bvp): super(SparseProductStrategy, self).init(sol, bvp) lower_bounds = [] upper_bounds = [] num_vars = len(self.vars) for var_name in self.vars.keys(): jj = getattr_from_list(bvp.constants, 'name').index(var_name) self.vars[var_name].value = sol.const[jj] lower_bounds.append(sol.const[jj]) upper_bounds.append(self.vars[var_name].target) major_num_steps = self._num_subdivisions[0] minor_num_steps = self._num_subdivisions[1] step_list = [lower_bounds] major_point_cloud = [lower_bounds] minor_point_cloud = [] for n in range(num_vars): new_maj_pnts = [] maj_step_series = np.linspace(lower_bounds[n], upper_bounds[n], major_num_steps[n]) for maj_point in major_point_cloud: maj_pnt_line = [] for maj_step in maj_step_series: new_pnt = list(maj_point) new_pnt[n] = maj_step maj_pnt_line.append(tuple(new_pnt)) for maj_pnt_0, maj_pnt_1 in zip(maj_pnt_line[:-1], maj_pnt_line[1:]): min_step_series = np.linspace(maj_pnt_0[n], maj_pnt_1[n], minor_num_steps[n]+1, endpoint=False)[1:] for min_step in min_step_series: new_point = list(maj_pnt_0) new_point[n] = min_step step_list.append(tuple(new_point)) minor_point_cloud.append(tuple(new_point)) step_list.append(maj_pnt_1) new_maj_pnts += maj_pnt_line[1:] major_point_cloud += new_maj_pnts self.num_cases(len(step_list)) for step in step_list: for n, var_name in enumerate(self.vars.keys()): self.vars[var_name].steps.append(step[n]) def num_cases(self, num_cases=None): if num_cases is None: return self._num_cases else: if self.ctr > 0: raise RuntimeError('Cannot set num_cases during iteration') self._num_cases = num_cases self.orig_num_cases = num_cases return self def num_subdivisions(self, num_subdivisions=None): if num_subdivisions is None: return self._num_subdivisions else: if self.ctr > 0: raise RuntimeError('Cannot set num_cases during iteration') self._num_subdivisions = num_subdivisions return self def run_continuation_set(bvp_algorithm_, steps, sol_guess, bvp: Problem, pool, autoscale): """ Runs a continuation set for the BVP problem. :param bvp_algorithm_: BVP algorithm to be used. :param steps: The steps in a continuation set. :param sol_guess: Initial guess for the first problem in steps. :param bvp: The compiled boundary-value problem to solve. :param pool: A processing pool, if available. :param autoscale: Whether or not scaling is used. :return: A set of solutions for the steps. """ # Loop through all the continuation steps solution_set = [] functional_problem = bvp.functional_problem # Initialize scaling scale = functional_problem.scale_sol compute_factors = functional_problem.compute_scale_factors # Load the derivative function into the prob algorithm bvp_algorithm_.set_derivative_function(functional_problem.deriv_func) bvp_algorithm_.set_derivative_jacobian(functional_problem.deriv_func_jac) bvp_algorithm_.set_quadrature_function(functional_problem.quad_func) bvp_algorithm_.set_boundarycondition_function(functional_problem.bc_func) bvp_algorithm_.set_boundarycondition_jacobian(functional_problem.bc_func_jac) bvp_algorithm_.set_inequality_constraint_function(functional_problem.ineq_constraints) if steps is None: logger.info('Solving OCP...') time0 = time.time() if autoscale: scale_factors = compute_factors(sol_guess) sol_guess = scale(sol_guess, scale_factors) else: scale_factors = None opt = bvp_algorithm_.solve(sol_guess, pool=pool) sol = opt['sol'] if autoscale: sol = scale(sol, scale_factors, inv=True) solution_set = [[sol]] if sol.converged: elapsed_time = time.time() - time0 logger.debug('Problem converged in %0.4f seconds\n' % elapsed_time) else: logger.debug('Problem failed to converge!\n') else: for step_idx, step in enumerate(steps): logger.debug('\nRunning Continuation Step #{} ({})'.format(step_idx+1, step)+' : ') # logger.debug('Number of Iterations\t\tMax BC Residual\t\tTime to Solution') solution_set.append([]) # Assign solution from last continuation set step.reset() step.init(sol_guess, bvp) try: log_level = logger.display_level_ except AttributeError: log_level = logger.getEffectiveLevel() step_len = len(step) continuation_progress = tqdm( step, disable=(not (logging.INFO <= log_level < logging.WARN)), file=sys.stdout, desc='Cont. #{} ({})'.format(step_idx + 1, fit_string(str(step), 30)), ascii=True, unit='trajectories') for sol_guess in continuation_progress: continuation_progress.total = len(step) if step_len != continuation_progress.total: step_len = continuation_progress.total continuation_progress.refresh() logger.debug('START \tIter {:d}/{:d}'.format(step.ctr, step.num_cases())) time0 = time.time() if autoscale: scale_factors = compute_factors(sol_guess) sol_guess = scale(sol_guess, scale_factors) else: scale_factors = None opt = bvp_algorithm_.solve(sol_guess, pool=pool) sol = opt['sol'] if autoscale: sol = scale(sol, scale_factors, inv=True) ya = sol.y[0, :] yb = sol.y[-1, :] dp = sol.dynamical_parameters ndp = sol.nondynamical_parameters k = sol.const if sol.q.size > 0: qa = sol.q[0, :] qb = sol.q[-1, :] bc_residuals_unscaled = bvp_algorithm_.boundarycondition_function(ya, qa, yb, qb, dp, ndp, k) else: bc_residuals_unscaled = bvp_algorithm_.boundarycondition_function(ya, yb, dp, ndp, k) step.add_gamma(sol) """ The following line is overwritten by the looping variable UNLESS it is the final iteration. It is required when chaining continuation strategies together. DO NOT DELETE! """ sol_guess = copy.deepcopy(sol) elapsed_time = time.time() - time0 logger.debug( 'STOP \tIter {:d}/{:d}\tBVP Iters {:d}\tBC Res {:13.8E}\tTime {:13.8f}' .format(step.ctr, step.num_cases(), opt['niter'], max(bc_residuals_unscaled), elapsed_time)) solution_set[step_idx].append(copy.deepcopy(sol)) if not sol.converged: logger.debug('Iteration %d/%d failed to converge!\n' % (step.ctr, step.num_cases())) return solution_set