import sys
import warnings
import copy
import time
import pathos
import logging
import numpy as np
from beluga.utils.logging import logger
from beluga.release import __splash__
from beluga.numeric.data_classes.Trajectory import Trajectory
from beluga.utils import save, init_logging
from beluga.symbolic.data_classes.components_structures import getattr_from_list
from beluga.symbolic.data_classes import make_direct_method, make_indirect_method, make_postprocessor, make_preprocessor
from beluga.continuation import run_continuation_set, match_constants_to_states
[docs]def add_logger(display_level=logging.INFO, file_level=logging.DEBUG, filename='beluga.log'):
"""
Attaches a logger to beluga's main process.
:keyword display_level: The level at which logging is displayed to stdout.
:keyword file_level: The level at which logging is written to the output file.
:keyword filename: Name of the log file. Default is `beluga.log`.
:return: None
.. seealso::
logging.FileHandler
"""
# Suppress warnings
# warnings.filterwarnings("ignore")
#
# # logfile options for logging.FileHandler
# config = {'filename': 'beluga.log', 'mode': 'w'}
# config.update(kwargs)
init_logging(display_level=display_level, file_level=file_level, filename=filename)
def solve(
autoscale=True,
bvp=None,
bvp_algorithm=None,
guess_generator=None,
initial_helper=False,
method='traditional',
n_cpus=1,
ocp=None,
ocp_map=None,
ocp_map_inverse=None,
optim_options=None,
steps=None,
save_sols=True):
"""
Solves the OCP using specified method
+------------------------+-----------------+---------------------------------------+
| Valid kwargs | Default Value | Valid Values |
+========================+=================+=======================================+
| autoscale | True | bool |
+------------------------+-----------------+---------------------------------------+
| prob | None | codegen'd BVPs |
+------------------------+-----------------+---------------------------------------+
| bvp_algorithm | None | prob algorithm |
+------------------------+-----------------+---------------------------------------+
| guess_generator | None | guess generator |
+------------------------+-----------------+---------------------------------------+
| initial_helper | False | bool |
+------------------------+-----------------+---------------------------------------+
| method | 'traditional' | string |
+------------------------+-----------------+---------------------------------------+
| n_cpus | 1 | integer |
+------------------------+-----------------+---------------------------------------+
| ocp | None | :math:`\\Sigma` |
+------------------------+-----------------+---------------------------------------+
| ocp_map | None | :math:`\\gamma \rightarrow \\gamma` |
+------------------------+-----------------+---------------------------------------+
| ocp_map_inverse | None | :math:`\\gamma \rightarrow \\gamma` |
+------------------------+-----------------+---------------------------------------+
| optim_options | None | dict() |
+------------------------+-----------------+---------------------------------------+
| steps | None | continuation_strategy |
+------------------------+-----------------+---------------------------------------+
| save | False | bool, str |
+------------------------+-----------------+---------------------------------------+
"""
if optim_options is None:
optim_options = {}
# Display useful info about the environment to debug logger.
logger.debug('\n'+__splash__+'\n')
from beluga import __version__ as beluga_version
from llvmlite import __version__ as llvmlite_version
from numba import __version__ as numba_version
from numpy import __version__ as numpy_version
from scipy import __version__ as scipy_version
from sympy.release import __version__ as sympy_version
logger.debug('beluga:\t\t' + str(beluga_version))
logger.debug('llvmlite:\t' + str(llvmlite_version))
logger.debug('numba:\t\t' + str(numba_version))
logger.debug('numpy:\t\t' + str(numpy_version))
logger.debug('python:\t\t'
+ str(sys.version_info[0]) + '.' + str(sys.version_info[1]) + '.' + str(sys.version_info[2]))
logger.debug('scipy:\t\t' + str(scipy_version))
logger.debug('sympy:\t\t' + str(sympy_version) + '\n\n')
"""
Error checking
"""
if n_cpus < 1:
raise ValueError('Number of cpus must be greater than 1.')
if n_cpus > 1:
pool = pathos.multiprocessing.Pool(processes=n_cpus)
else:
pool = None
if ocp is None:
raise NotImplementedError('\"ocp\" must be defined.')
"""
Main code
"""
# f_ocp = compile_direct(ocp)
logger.debug('Using ' + str(n_cpus) + '/' + str(pathos.multiprocessing.cpu_count()) + ' CPUs. ')
if bvp is None:
preprocessor = make_preprocessor()
processed_ocp = preprocessor(copy.deepcopy(ocp))
if method.lower() in ['indirect', 'traditional', 'brysonho']:
method = 'traditional'
if method.lower() in ['traditional', 'diffyg']:
RFfunctor = make_indirect_method(copy.deepcopy(processed_ocp), method=method, **optim_options)
prob = RFfunctor(copy.deepcopy(processed_ocp))
elif method == 'direct':
functor = make_direct_method(copy.deepcopy(processed_ocp), **optim_options)
prob = functor(copy.deepcopy(processed_ocp))
else:
raise NotImplementedError
postprocessor = make_postprocessor()
bvp = postprocessor(copy.deepcopy(prob))
logger.debug('Resulting BVP problem:')
logger.debug(bvp.__repr__())
ocp_map = bvp.map_sol
ocp_map_inverse = bvp.inv_map_sol
else:
if ocp_map is None or ocp_map_inverse is None:
raise ValueError('BVP problem must have an associated \'ocp_map\' and \'ocp_map_inverse\'')
solinit = Trajectory()
solinit.const = np.array(getattr_from_list(bvp.constants, 'default_val'))
solinit = guess_generator.generate(bvp.functional_problem, solinit, ocp_map, ocp_map_inverse)
if initial_helper:
sol_ocp = copy.deepcopy(solinit)
sol_ocp = match_constants_to_states(ocp, ocp_map_inverse(sol_ocp))
solinit.const = sol_ocp.const
if bvp.functional_problem.compute_u is not None:
u = np.array([bvp.functional_problem.compute_u(solinit.y[0], solinit.dynamical_parameters, solinit.const)])
for ii in range(len(solinit.t) - 1):
u = np.vstack(
(u, bvp.functional_problem.compute_u(solinit.y[ii + 1], solinit.dynamical_parameters, solinit.const)))
solinit.u = u
"""
Main continuation process
"""
time0 = time.time()
continuation_set = run_continuation_set(bvp_algorithm, steps, solinit, bvp, pool, autoscale)
total_time = time.time() - time0
logger.info('Continuation process completed in %0.4f seconds.\n' % total_time)
bvp_algorithm.close()
"""
Post processing and output
"""
out = postprocess_continuations(continuation_set, ocp_map_inverse)
if pool is not None:
pool.close()
if save_sols or (isinstance(save_sols, str)):
if isinstance(save_sols, str):
filename = save_sols
else:
filename = 'data.beluga'
save(out, ocp, bvp, filename=filename)
return out
def postprocess_continuations(continuation_set, ocp_map_inverse):
"""
Post processes the data after the continuation process has run.
:param continuation_set: The set of all continuation processes.
:param ocp: The compiled OCP.
:param prob: The compiled BVP.
:param ocp_map_inverse: A mapping converting BVP solutions to OCP solutions.
:return: Set of trajectories to be returned to the user.
"""
out = []
# Calculate the control time-history for each trajectory
for cont_num, continuation_step in enumerate(continuation_set):
tempset = []
for sol_num, sol in enumerate(continuation_step):
tempset.append(ocp_map_inverse(sol))
out.append(tempset)
return out