Source code for ezff

"""This module provide general functions for EZFF"""

# General imports
import os
import sys
import numpy as np
import math
import random
from datetime import datetime
import xtal
from functools import partial

# EZFF imports
from .ffio import *
from .errors import *

# Optimizer imports
import nevergrad as ng
import mobopt
from pymoo.core.problem import Problem as pymoo_Problem
from pymoo.core.individual import Individual as pymoo_Individual
from pymoo.core.population import Population as pymoo_Population
from pymoo.algorithms.moo.nsga2 import NSGA2 as pymoo_NSGA2
from pymoo.algorithms.moo.nsga3 import NSGA3 as pymoo_NSGA3
from pymoo.algorithms.moo.unsga3 import UNSGA3 as pymoo_UNSGA3
from pymoo.algorithms.moo.ctaea import CTAEA as pymoo_CTAEA
from pymoo.algorithms.moo.sms import SMSEMOA as pymoo_SMSEMOA
from pymoo.util.ref_dirs import get_reference_directions as pymoo_get_reference_directions
from pymoo.algorithms.moo.rvea import RVEA as pymoo_RVEA
from pymoo.termination.max_eval import MaximumFunctionCallTermination
from pymoo.algorithms.soo.nonconvex.es import ES as pymoo_ES
from pymoo.algorithms.soo.nonconvex.nelder import NelderMead as pymoo_NelderMead
from pymoo.algorithms.soo.nonconvex.cmaes import CMAES as pymoo_CMAES
import platypus

# Parallel processing imports
from schwimmbad import MultiPool as sch_MultiPool
from schwimmbad import MPIPool as sch_MPIPool
from mpi4py import MPI
import multiprocessing



__version__ = '0.9.5' # Update setup.py if version changes


[docs]class FFParam(object): """ Class for EZFF Forcefield Parametrization """
[docs] def __init__(self, error_function=None, num_errors=None): """ :param num_errors: Number of errors to be minimized for forcefield optimization :type num_errors: int :param error_function: User-defined function that takes-in a dictionary of variable-value pairs and outputs a list of computed errors :type error_function: function """ self.error_function = error_function self.num_errors = num_errors self.relative_weights = np.array([1.0 for i in range(num_errors)]) self.variables = [] self.errors = [] self.total_epochs = 0
[docs] def read_variable_bounds(self, filename): """Read permissible lower and upper bounds for decision variables used in forcefields optimization :param filename: Name of text file listing bounds for each decision variable that must be optimized :type filename: str """ self.variable_bounds = ffio.read_variable_bounds(filename) self.num_variables = len(self.variable_bounds.keys()) self.variable_names = [key for key in self.variable_bounds.keys()]
[docs] def read_forcefield_template(self, template_filename): """Read-in the forcefield template. The template is constructed from a functional forcefield file by replacing all optimizable numerical values with variable names enclosed within dual angled brackets << and >>. :param template_filename: Name of the forcefield template file to be read-in :type template_filename: str """ self.forcefield_template = ffio.read_forcefield_template(template_filename)
[docs] def set_algorithm(self, algo_string, population_size = None): """ Set optimization algorithm. Initialize interfaces to external optimizers and return the algorithm object :param algo_string: Type of algorithm to parameterize the forcefield :type algo_string: str :param population_size: Number of candidate forcefields evaluated every epoch :type algo_string: int """ self.population_size = population_size self.algo_string = algo_string ng_algos = ['NGOPT_SO', 'TWOPOINTSDE_SO','PORTFOLIODISCRETEONEPLUSONE_SO','ONEPLUSONE_SO','CMA_SO','TBPSA_SO', 'PSO_SO', 'SCRHAMMERSLEYSEARCHPLUSMIDDLEPOINT_SO', 'RANDOMSEARCH_SO'] mobopt_algos = ['MOBO'] pymoo_algos = ['NSGA2_MO_PYMOO', 'NSGA3_MO_PYMOO', 'UNSGA3_MO_PYMOO', 'CTAEA_MO_PYMOO', 'SMSEMOA_MO_PYMOO', 'RVEA_MO_PYMOO', 'ES_SO_PYMOO', 'NELDERMEAD_SO_PYMOO', 'CMAES_SO_PYMOO'] platypus_algos = ['NSGA2_MO_PLATYPUS', 'NSGA3_MO_PLATYPUS', 'GDE3_MO_PLATYPUS'] if algo_string.upper() in ng_algos: self.algo_framework = 'nevergrad' ng_variable_dict = ng.p.Dict() for variable in self.variable_bounds.keys(): ng_variable_dict[variable] = ng.p.Scalar(lower = self.variable_bounds[variable][0], upper = self.variable_bounds[variable][1]) if algo_string.upper() == 'NGOPT_SO': self.algorithm = ng.optimizers.NGOpt(parametrization=ng_variable_dict, budget=self.population_size, num_workers=2) elif algo_string.upper() == 'TWOPOINTSDE_SO': self.algorithm = ng.optimizers.TwoPointsDE(parametrization=ng_variable_dict, budget=self.population_size, num_workers=2) elif algo_string.upper() == 'PORTFOLIODISCRETEONEPLUSONE_SO': self.algorithm = ng.optimizers.PortfolioDiscreteOnePlusOne_SO(parametrization=ng_variable_dict, budget=self.population_size, num_workers=2) elif algo_string.upper() == 'ONEPLUSONE_SO': self.algorithm = ng.optimizers.OnePlusOne(parametrization=ng_variable_dict, budget=self.population_size, num_workers=2) elif algo_string.upper() == 'CMA_SO': self.algorithm = ng.optimizers.CMA(parametrization=ng_variable_dict, budget=self.population_size, num_workers=2) elif algo_string.upper() == 'TBPSA_SO': self.algorithm = ng.optimizers.TBPSA(parametrization=ng_variable_dict, budget=self.population_size, num_workers=2) elif algo_string.upper() == 'PSO_SO': self.algorithm = ng.optimizers.PSO(parametrization=ng_variable_dict, budget=self.population_size, num_workers=2) elif algo_string.upper() == 'SCRHAMMERSLEYSEARCHPLUSMIDDLEPOINT_SO': self.algorithm = ng.optimizers.ScrHammersleySearchPlusMiddlePoint(parametrization=ng_variable_dict, budget=self.population_size, num_workers=2) elif algo_string.upper() == 'RANDOMSEARCH_SO': self.algorithm = ng.optimizers.RandomSearch(parametrization=ng_variable_dict, budget=self.population_size, num_workers=2) elif algo_string.upper() in mobopt_algos: self.algo_framework = 'mobopt' var_mins = [] var_maxs = [] for variable in self.variable_bounds.keys(): var_mins.append(self.variable_bounds[variable][0]) var_maxs.append(self.variable_bounds[variable][1]) self.mobopt_variable_bounds = np.vstack((var_mins, var_maxs)).T self.algorithm = mobopt.MOBayesianOpt(target = self.error_function, NObj = self.num_errors, pbounds = self.mobopt_variable_bounds) elif algo_string.upper() in pymoo_algos: self.algo_framework = 'pymoo' var_mins = [] var_maxs = [] for variable in self.variable_bounds.keys(): var_mins.append(self.variable_bounds[variable][0]) var_maxs.append(self.variable_bounds[variable][1]) self.pymoo_problem = pymoo_Problem(n_var = self.num_variables, n_obj = self.num_errors, n_constr = 0, xl = var_mins, xu = var_maxs) self.normalize_errors() initial_population = [] for varid, var in enumerate(self.variables): evaledsoln = pymoo_Individual() evaledsoln._X = var evaledsoln._F = self.normalized_errors[varid] initial_population.append(evaledsoln) if algo_string.upper() == 'NSGA2_MO_PYMOO': if initial_population == []: self.algorithm = pymoo_NSGA2(self.population_size) else: initial_population = pymoo_Population(initial_population) self.algorithm = pymoo_NSGA2(self.population_size, sampling = initial_population) self.algorithm.setup(self.pymoo_problem, seed = np.random.randint(100000), verbose = False) elif algo_string.upper() == 'NSGA3_MO_PYMOO': # Identify number of reference points min_points = [math.comb(self.num_errors + ref_pts - 1, ref_pts) for ref_pts in range(25)] num_reference_points = np.sum(np.array(min_points) < self.population_size) reference_directions = pymoo_get_reference_directions("das-dennis", self.num_errors, n_partitions=num_reference_points) if initial_population == []: self.algorithm = pymoo_NSGA3(pop_size=self.population_size, ref_dirs=reference_directions) else: initial_population = pymoo_Population(initial_population) self.algorithm = pymoo_NSGA3(pop_size=self.population_size, ref_dirs=reference_directions, sampling = initial_population) self.algorithm.setup(self.pymoo_problem, seed = np.random.randint(100000), verbose = False) elif algo_string.upper() == 'UNSGA3_MO_PYMOO': # Identify number of reference points min_points = [math.comb(self.num_errors + ref_pts - 1, ref_pts) for ref_pts in range(25)] num_reference_points = np.sum(np.array(min_points) < self.population_size) reference_directions = pymoo_get_reference_directions("das-dennis", self.num_errors, n_partitions=num_reference_points) if initial_population == []: self.algorithm = pymoo_UNSGA3(pop_size=self.population_size, ref_dirs=reference_directions) else: initial_population = pymoo_Population(initial_population) self.algorithm = pymoo_UNSGA3(pop_size=self.population_size, ref_dirs=reference_directions, sampling = initial_population) self.algorithm.setup(self.pymoo_problem, seed = np.random.randint(100000), verbose = False) elif algo_string.upper() == 'CTAEA_MO_PYMOO': # Identify number of reference points min_points = [math.comb(self.num_errors + ref_pts - 1, ref_pts) for ref_pts in range(25)] num_reference_points = np.sum(np.array(min_points) < self.population_size) reference_directions = pymoo_get_reference_directions("das-dennis", self.num_errors, n_partitions=num_reference_points) if initial_population == []: self.algorithm = pymoo_CTAEA(ref_dirs=reference_directions) else: initial_population = pymoo_Population(initial_population) self.algorithm = pymoo_CTAEA(ref_dirs=reference_directions, sampling = initial_population) self.algorithm.setup(self.pymoo_problem, seed = np.random.randint(100000), verbose = False) elif algo_string.upper() == 'SMSEMOA_MO_PYMOO': if initial_population == []: self.algorithm = pymoo_SMSEMOA(self.population_size) else: initial_population = pymoo_Population(initial_population) self.algorithm = pymoo_SMSEMOA(self.population_size, sampling = initial_population) self.algorithm.setup(self.pymoo_problem, seed = np.random.randint(100000), verbose = False) elif algo_string.upper() == 'RVEA_MO_PYMOO': # Identify number of reference points min_points = [math.comb(self.num_errors + ref_pts - 1, ref_pts) for ref_pts in range(25)] num_reference_points = np.sum(np.array(min_points) < self.population_size) reference_directions = pymoo_get_reference_directions("das-dennis", self.num_errors, n_partitions=num_reference_points) if initial_population == []: self.algorithm = pymoo_RVEA(ref_dirs=reference_directions, pop_size = self.population_size) termination = MaximumFunctionCallTermination(5000) self.algorithm.termination = termination else: initial_population = pymoo_Population(initial_population) self.algorithm = pymoo_RVEA(ref_dirs=reference_directions, sampling = initial_population, pop_size = self.population_size) termination = MaximumFunctionCallTermination(5000) self.algorithm.termination = termination self.algorithm.setup(self.pymoo_problem, seed = np.random.randint(100000), verbose = False) elif algo_string.upper() == 'ES_SO_PYMOO': if initial_population == []: self.algorithm = pymoo_ES(n_offspring = self.population_size, pop_size = self.population_size) else: initial_population = pymoo_Population(initial_population) self.algorithm = pymoo_ES(n_offspring = self.population_size, pop_size = self.population_size, sampling = initial_population) self.algorithm.setup(self.pymoo_problem, seed = np.random.randint(100000), verbose = False) elif algo_string.upper() == 'NELDERMEAD_SO_PYMOO': if initial_population == []: self.algorithm = pymoo_NelderMead() else: initial_population = pymoo_Population(initial_population) self.algorithm = pymoo_NelderMead() self.algorithm.pop = initial_population self.algorithm.setup(self.pymoo_problem, seed = np.random.randint(100000), verbose = False) elif algo_string.upper() == 'CMAES_SO_PYMOO': x0_search = np.mean(self.variables, axis=0) if initial_population == []: self.algorithm = pymoo_CMAES(x0 = x0_search) else: initial_population = pymoo_Population(initial_population) self.algorithm = pymoo_CMAES(x0 = x0_search) self.algorithm.pop = initial_population self.algorithm.setup(self.pymoo_problem, seed = np.random.randint(100000), verbose = False) elif algo_string.upper() in platypus_algos: self.algo_framework = 'platypus' var_mins = [] var_maxs = [] for variable in self.variable_bounds.keys(): var_mins.append(self.variable_bounds[variable][0]) var_maxs.append(self.variable_bounds[variable][1]) self.platypus_problem = platypus.Problem(self.num_variables, self.num_errors) self.platypus_problem.types = [platypus.Real(var_mins[i], var_maxs[i]) for i in range(self.num_variables)] self.normalize_errors() initial_population = [] for varid, var in enumerate(self.variables): evaledsoln = platypus.Solution(self.platypus_problem) evaledsoln.variables[:] = list(var) evaledsoln.objectives[:] = list(self.normalized_errors[varid]) initial_population.append(evaledsoln) if algo_string.upper() == 'NSGA2_MO_PLATYPUS': self.algorithm = platypus.NSGAII(self.platypus_problem, self.population_size) self.algorithm.variator = platypus.default_variator(self.platypus_problem) if len(initial_population) > 0: self.algorithm.population = initial_population elif algo_string.upper() == 'NSGA3_MO_PLATYPUS': # Identify number of reference points min_points = [math.comb(self.num_errors + ref_pts - 1, ref_pts) for ref_pts in range(200)] num_reference_points = np.sum(np.array(min_points) < self.population_size) self.algorithm = platypus.NSGAIII(self.platypus_problem, divisions_outer = num_reference_points) self.algorithm.variator = platypus.default_variator(self.platypus_problem) if len(initial_population) > 0: self.algorithm.population = initial_population if algo_string.upper() == 'GDE3_MO_PLATYPUS': self.algorithm = platypus.GDE3(self.platypus_problem, population_size = self.population_size) if len(initial_population) > 0: self.algorithm.population = initial_population
[docs] def ask(self): """ Ask the optimization algorithm the next candidate variables for evaluation """ new_variables = [] if self.algo_framework == 'nevergrad': for i in range(self.population_size): newvar = self.algorithm.ask() new_var_as_list = np.array([newvar.value[self.variable_names[i]] for i in range(len(self.variable_names))]) new_variables.append(new_var_as_list) return new_variables elif self.algo_framework == 'mobopt': q = 0.5 prob = 0.1 for pointID in range(len(self.variables)): self.algorithm.space.add_observation(np.array(self.variables[pointID]), np.array(self.normalized_errors[pointID])) for i in range(self.num_errors): yy = self.algorithm.space.f[:, i] self.algorithm.GP[i].fit(self.algorithm.space.x, yy) pop, logbook, front = mobopt._NSGA2.NSGAII(self.algorithm.NObj, self.algorithm._MOBayesianOpt__ObjectiveGP, self.algorithm.pbounds, MU=self.population_size*2) Population = np.asarray(pop) IndexF, FatorF = self.algorithm._MOBayesianOpt__LargestOfLeast(front, self.algorithm.space.f) IndexPop, FatorPop = self.algorithm._MOBayesianOpt__LargestOfLeast(Population, self.algorithm.space.x) Fator = q * FatorF + (1-q) * FatorPop sorted_ids = np.argsort(Fator) for i in range(self.population_size): Index_try = int(np.argwhere(sorted_ids == np.max(sorted_ids)-i)) self.algorithm.x_try = Population[Index_try] if self.algorithm.space.RS.uniform() < prob: if self.algorithm.NParam > 1: ii = self.algorithm.space.RS.randint(low=0, high=self.algorithm.NParam - 1) else: ii = 0 self.algorithm.x_try[ii] = self.algorithm.space.RS.uniform(low=self.algorithm.pbounds[ii][0],high=self.algorithm.pbounds[ii][1]) new_variables.append(self.algorithm.x_try) return new_variables elif self.algo_framework == 'pymoo': while (len(new_variables) < self.population_size): newXs = self.algorithm.ask() for newX in newXs: new_variables.append(newX.X) return new_variables elif self.algo_framework == 'platypus': if self.algo_string.upper() == 'NSGA2_MO_PLATYPUS': platypus.nondominated_sort(self.algorithm.population) self.algorithm.population = platypus.nondominated_truncate(self.algorithm.population, self.population_size) for i in range(self.population_size): parents = self.algorithm.selector.select(self.algorithm.variator.arity, self.algorithm.population) single_offspring = self.algorithm.variator.evolve(parents) new_variables.append(single_offspring[0].variables[:]) new_variables.append(single_offspring[1].variables[:]) new_variables = random.sample(new_variables, self.population_size) elif self.algo_string.upper() == 'NSGA3_MO_PLATYPUS': platypus.nondominated_sort(self.algorithm.population) self.algorithm.population = self.algorithm._reference_point_truncate(self.algorithm.population, self.population_size) for i in range(self.population_size): parents = self.algorithm.selector.select(self.algorithm.variator.arity, self.algorithm.population) single_offspring = self.algorithm.variator.evolve(parents) new_variables.append(single_offspring[0].variables[:]) new_variables.append(single_offspring[1].variables[:]) new_variables = random.sample(new_variables, self.population_size) elif self.algo_string.upper() == 'GDE3_MO_PLATYPUS': self.algorithm.population = self.algorithm.survival(self.algorithm.population) for i in range(self.population_size): parents = self.algorithm.select(i, self.algorithm.variator.arity) single_offspring = self.algorithm.variator.evolve(parents) new_variables.append(single_offspring[0].variables[:]) # new_variables.append(single_offspring[1].variables[:]) # new_variables = random.sample(list(set(new_variables)), self.population_size) return new_variables
[docs] def parameterize(self, num_epochs = None, pool = None): """ The optimize function provides a uniform wrapper to solve the EZFF problem using the algorithm(s) provided. :param num_epochs: Number of epochs to perform the optimization for. If multiple algorithms are specified, one iteration value should be provided for each algorithm :type num_epochs: int :param pool: Multiprocessing or MPI Pool for forcefield parameterization :type pool: Multiprocessing or MPI Pool object """ self.pool = pool if self.algo_framework == 'nevergrad': for epoch in range(num_epochs): self.total_epochs += 1 self.normalize_errors() self.set_algorithm(algo_string = self.algo_string, population_size = self.population_size) for computed_id, computed in enumerate(self.variables): computed_value = {k:v for (k,v) in zip(self.variable_names,computed)} self.algorithm.suggest(computed_value) asked_suggestion = self.algorithm.ask() self.algorithm.tell(asked_suggestion, self.normalized_errors[computed_id]) new_variables = self.ask() new_errors = [] if self.pool is None: for variable in new_variables: variable_dict = dict(zip(self.variable_names, variable)) error = self.error_function(variable_dict, self.forcefield_template) new_errors.append(error) else: if self.pool_type == 'mpi': if not self.pool.is_master(): pool.wait() variable_dict_list = [dict(zip(self.variable_names, variable)) for variable in new_variables] new_errors = self.pool.map(partial(self.error_function, template = self.forcefield_template), variable_dict_list) for variable_id, variable in enumerate(new_variables): self.variables.append(variable) self.errors.append(new_errors[variable_id]) self._write_out_forcefields() elif self.algo_framework == 'mobopt': for epoch in range(num_epochs): self.total_epochs += 1 self.normalize_errors() self.set_algorithm(algo_string = self.algo_string, population_size = self.population_size) new_variables = self.ask() new_errors = [] if self.pool is None: for variable in new_variables: variable_dict = dict(zip(self.variable_names, variable)) error = self.error_function(variable_dict, self.forcefield_template) new_errors.append(error) else: if self.pool_type == 'mpi': if not self.pool.is_master(): pool.wait() variable_dict_list = [dict(zip(self.variable_names, variable)) for variable in new_variables] new_errors = self.pool.map(partial(self.error_function, template = self.forcefield_template), variable_dict_list) for variable_id, variable in enumerate(new_variables): self.variables.append(variable) self.errors.append(new_errors[variable_id]) self._write_out_forcefields() elif self.algo_framework == 'pymoo': for epoch in range(num_epochs): self.total_epochs += 1 self.normalize_errors() self.set_algorithm(algo_string = self.algo_string, population_size = self.population_size) new_variables = self.ask() new_errors = [] if self.pool is None: for variable in new_variables: variable_dict = dict(zip(self.variable_names, variable)) error = self.error_function(variable_dict, self.forcefield_template) new_errors.append(error) else: if self.pool_type == 'mpi': if not self.pool.is_master(): pool.wait() variable_dict_list = [dict(zip(self.variable_names, variable)) for variable in new_variables] new_errors = self.pool.map(partial(self.error_function, self.forcefield_template), variable_dict_list) for variable_id, variable in enumerate(new_variables): self.variables.append(variable) self.errors.append(new_errors[variable_id]) self._write_out_forcefields() elif self.algo_framework == 'platypus': for epoch in range(num_epochs): self.total_epochs += 1 self.normalize_errors() self.set_algorithm(algo_string = self.algo_string, population_size = self.population_size) new_variables = self.ask() new_errors = [] if self.pool is None: for variable in new_variables: variable_dict = dict(zip(self.variable_names, variable)) error = self.error_function(variable_dict, self.forcefield_template) new_errors.append(error) # for variable_id, variable in enumerate(new_variables): # self.variables.append(variable) # self.errors.append(new_errors[variable_id]) else: if self.pool_type == 'mpi': if not self.pool.is_master(): pool.wait() variable_dict_list = [dict(zip(self.variable_names, variable)) for variable in new_variables] new_errors = self.pool.map(partial(self.error_function, template = self.forcefield_template), variable_dict_list) # for variable_id, variable in enumerate(new_variables): # self.variables.append(variable) # self.errors.append(new_errors[variable_id]) for variable_id, variable in enumerate(new_variables): self.variables.append(variable) self.errors.append(new_errors[variable_id]) self._write_out_forcefields()
def _write_out_forcefields(self): print('Epoch: '+ str(self.total_epochs)) if not (self.algo_string.upper() == 'RANDOMSEARCH_SO'): # Make output files/directories outdir = 'results/' + str(self.total_epochs) if not os.path.isdir(outdir): os.makedirs(outdir) varfilename = outdir + '/variables' errfilename = outdir + '/errors' varfile = open(varfilename, 'w') errfile = open(errfilename, 'w') reco_vars, reco_errs = self.get_best_recommendation() if self.algo_framework == 'nevergrad' and self.num_errors == 1: reco_vars = [[reco_vars[key] for key in self.variable_names]] reco_errs = [[reco_errs]] for reco_id, reco_var in enumerate(reco_vars): varfile.write(' '.join([str(variable) for variable in reco_var])) varfile.write('\n') if self.algo_framework == 'mobopt': errfile.write(' '.join([str(0.0 - error) for error in reco_errs[reco_id]])) else: errfile.write(' '.join([str(error) for error in reco_errs[reco_id]])) errfile.write('\n') varfile.close() errfile.close() all_evaluated_filename = outdir + '/all_evaluated_' self.save_evaluated(all_evaluated_filename) if self.total_epochs % 5 == 0: if not os.path.isdir(outdir+'/forcefields'): os.makedirs(outdir+'/forcefields') for reco_id, reco_var in enumerate(reco_vars): #for sol_index, solution in enumerate(unique(nondominated(algorithm_for_this_stage.result))): ff_name = outdir + '/forcefields/FF_' + str(reco_id+1) parameters_dict = dict(zip(self.variable_names, reco_var)) generate_forcefield(self.forcefield_template, parameters_dict, outfile=ff_name)
[docs] def get_best_recommendation(self): """ Return the best variables evaluated so far """ best_variables = None best_errors = None if self.algo_framework == 'nevergrad': best_recommendation = self.algorithm.provide_recommendation() best_variables = best_recommendation.value best_errors = best_recommendation.loss elif self.algo_framework == 'mobopt': best_errors, best_variables = self.algorithm.space.ParetoSet() elif self.algo_framework == 'pymoo': self.normalize_errors() initial_population = [] for varid, var in enumerate(self.variables): evaledsoln = pymoo_Individual() evaledsoln._X = var evaledsoln._F = self.normalized_errors[varid] initial_population.append(evaledsoln) initial_population = pymoo_Population(initial_population) if '_SO_' in self.algo_string: self.algorithm.pop = initial_population self.algorithm._set_optimum() else: self.algorithm.tell(infills = initial_population) best_recommendation = self.algorithm.result() best_variables = best_recommendation.X best_errors = best_recommendation.F elif self.algo_framework == 'platypus': platypus.nondominated_sort(self.algorithm.population) recommendation = platypus.unique(platypus.nondominated(self.algorithm.population)) best_variables = [] best_errors = [] recommendation = list(set(recommendation)) # Remove duplicates for single_reco in recommendation: best_variables.append(single_reco.variables[:]) best_errors.append(single_reco.objectives[:]) return [best_variables, best_errors]
[docs] def generate_pool(self, pool_type): """ Return a parallel pool object :param pool_type: Type of parallel pool :type pool_type: str """ if pool_type == 'multi': self.pool_type = 'multi' return sch_MultiPool() elif pool_type == 'mpi': self.pool_type = 'mpi' return sch_MPIPool()
[docs] def save_evaluated(self,filename): """ Save all variables evaluated so far as a zipped numpy array :param filename: File to which variables are saved :type filename: str """ timestamp = datetime.now().strftime("%y%m%d_%H%M%S") filename = filename+timestamp+'.npz' np.savez(filename, variables=np.array(self.variables), errors=np.array(self.errors))
[docs] def load_evaluated(self,filename): """ Load all variables evaluated from a zipped numpy array :param filename: File to load variables from :type filename: str """ fileobj = np.load(filename) vars = fileobj['variables'] errs = fileobj['errors'] for varid, var in enumerate(vars): self.variables.append(var) self.errors.append(errs[varid])
[docs] def normalize_errors(self, scale = 2.0): self.normalized_errors = [] if len(self.errors) > 0: maximums = np.nanmax(self.errors, axis=0) maximums[np.isnan(maximums)] = 100.0 for var in self.errors: if np.any(np.isnan(var)): self.normalized_errors.append(maximums * scale) else: self.normalized_errors.append(var) self.normalized_errors = np.array(self.normalized_errors) if self.algo_framework == 'mobopt': self.normalized_errors = 0.0 - self.normalized_errors # MOBOPT will only attempt to maximize the error function
[docs]def get_pool_rank(): """ Return the rank of the current process in a parallel setting """ try: myrank = multiprocessing.current_process()._identity[0] return myrank # return rank of multiprocessing process, if available except: return MPI.COMM_WORLD.Get_rank() # return rank of MPI process, or 0 if there is no MPI