# -*- coding: utf-8 -*-
import os
from concurrent.futures import ProcessPoolExecutor, Executor, Future
from copy import deepcopy
import numpy as np
from simplepso.logging import setup_logger
# set OMP_NUM_THREADS to 1 to prevent multi-processing to expand no each core
os.environ['OMP_NUM_THREADS'] = "1"
class Particle(object):
"""
Particle to be used in the Swarm
"""
def __init__(self, pos, fitness=None, smin=None, smax=None):
self.pos = pos
self.fitness = fitness
self.smin = smin
self.smax = smax
self.__best = None
@property
def best(self):
return self.__best
@best.setter
def best(self, new_best):
self.__best = Particle(
deepcopy(new_best.pos), deepcopy(new_best.fitness),
deepcopy(new_best.smin), deepcopy(new_best.smax)
)
# Used for printing below
header = '{:<10}' + "\t".join(['{:>12}'] * 5)
stats_header = header.format('iteration', 'best', 'mean', 'min', 'max', 'std')
stats_output = '{:<10}' + '\t'.join(['{:>12.3f}'] * 5)
[docs]class PSO(object):
"""
Simple interface to run particle swarm optimization
It can optimize parameters for a function using two run methods.
run() : cost function gets a parameter vector and returns a scalar. Can
be used with any callable function.
run_ssa() : cost function gets multiple trajectories of a PySB model
and returns a scalar. These trajectories are provided as
a pandas.DataFrame. PySB dependent function.
"""
[docs] def __init__(self, start=None, save_sampled=False, verbose=False,
shrink_steps=True):
"""
Parameters
----------
start : list
Starting position
save_sampled : bool
Save each particles position and fitness over all iterations.
verbose : bool
shrink_steps : bool
Shrink max distance traveled by each particle as the number of
iterations increases
"""
self.save_sampled = save_sampled
if start is not None:
self.set_start_position(start)
else:
self.start = None
"""Starting position"""
self.size = None
self.verbose = verbose
self.best = None
"""Best Particle of population"""
self.max_speed = None
self.min_speed = None
self.lb = None
self.ub = None
self.bounds_set = False
self.range = 2
self.all_history = None
"""History of all particles positions over all iterations. Only saved
if save_sampled=True"""
self.all_fitness = None
"""History of all particles finesses over all iterations. Only saved
if save_sampled=True"""
self.population = []
"""Population of particles"""
self.population_fitness = []
"""Fitness values of population of particles"""
self.values = []
"""Fitness values of the best particle for each iteration"""
self.history = []
"""Best particle for each iteration"""
self.update_w = shrink_steps
self._is_setup = False
self.log = setup_logger(verbose)
fi = 2.05 + 2.05 # value from kennedy paper
self.w = 2.0 / np.abs(2.0 - fi - np.sqrt(np.power(fi, 2) - 4 * fi))
[docs] def get_best_value(self):
""" Returns the best fitness value of the population """
return self.best.fitness
[docs] def get_history(self):
""" Returns the history of the run"""
return self.history
def _generate(self):
""" Creates Particles and sets their speed """
part = Particle(np.random.uniform(self.lb, self.ub, self.size))
part.speed = np.random.uniform(self.min_speed, self.max_speed,
self.size)
part.smin = self.min_speed
part.smax = self.max_speed
return part
def _setup_pso(self):
""" Sets up everything for PSO. Only does once """
if self.max_speed is None or self.min_speed is None:
self.set_speed()
if not self.bounds_set:
self.set_bounds()
if self.start is None:
raise Exception("Must provide a starting position\n"
"Use PSO.set_start_position() \n")
self._is_setup = True
def _update_connected(self):
""" Update the population of particles"""
for part in self.population:
if part.best is None or part.best.fitness > part.fitness:
part.best = part
# part.best = deepcopy(part)
# part.best.fitness = deepcopy(part.fitness)
if self.best is None or self.best.fitness > part.fitness:
self.best = Particle(
deepcopy(part.pos),
deepcopy(part.fitness),
deepcopy(part.smin),
deepcopy(part.smax)
)
# self.best = deepcopy(part)
# self.best.fitness = deepcopy(part.fitness)
def _update_particle_position(self, part):
""" Updates an individual particles position """
phi1 = 2.05
phi2 = 2.05
v_u1 = np.random.uniform(0, 1, self.size) * phi1 * (
part.best.pos - part.pos)
v_u2 = np.random.uniform(0, 1, self.size) * phi2 * (
self.best.pos - part.pos)
part.speed = self.w * (part.speed + v_u1 + v_u2)
np.place(part.speed, part.speed < part.smin, part.smin)
np.place(part.speed, part.speed > part.smax, part.smax)
part.pos += part.speed
for i, pos in enumerate(part.pos):
if pos < self.lb[i]:
part.pos[i] = self.lb[i]
elif pos > self.ub[i]:
part.pos[i] = self.ub[i]
[docs] def return_ranked_populations(self):
""" Returns a ranked population of the particles
Returns
-------
np.array, np.array
"""
positions = []
finesses = []
for n, part in enumerate(self.population):
finesses.append(part.best.fitness)
positions.append(part.best.pos)
positions = np.array(positions)
finesses = np.array(finesses)
idx = np.argsort(finesses)
return finesses[idx], positions[idx]
[docs] def set_start_position(self, position):
"""
Set the starting position for the population of particles.
Parameters
----------
position : array
"""
self.start = np.array(position)
self.size = len(position)
[docs] def set_speed(self, speed_min=-10000, speed_max=10000):
""" Sets the max and min speed of the particles.
This is usually a fraction of the bounds set with `set_bounds`. So if
one sets the bound to be +/- 1 order of magnitude, you can set the
speed to be -.1 and .1, allow the particles to update in 1/10 the
parameter space. This keeps particles near their local position rather
than jumping across parameter space quickly.
Parameters
----------
speed_min : float
speed_max : float
"""
self.min_speed = speed_min
self.max_speed = speed_max
[docs] def set_bounds(self, parameter_range=None, lower=None, upper=None):
""" Set the search space bounds that the parameters may search.
This can be a single float, in which the particles can search plus or
minus the starting values around this. It can also be an array that
must be the same length of the starting position.
Parameters
----------
parameter_range : float
If provided parameter_range, the range will be set by the starting
position +/- this value. To set each parameter manually, use
`lower` and `upper` args
lower : array
Lower bounds for parameters
upper : array
Upper bounds for parameters
"""
if self.start is None:
raise Exception("Must provide starting array: %r" % self.start)
if parameter_range is None and upper is None and lower is None:
raise Exception('Need to provide parameter ranges')
if parameter_range is None:
if self.range is None:
raise Exception("Please provide parameter range")
parameter_range = self.range
if lower is None:
lower = self.start - parameter_range
else:
if not self.size == len(lower):
raise Exception("Size of bounds must equal size of particle")
if upper is None:
upper = self.start + parameter_range
else:
if not self.size == len(upper):
raise Exception("Size of bounds must equal size of particle")
self.lb = lower
self.ub = upper
self.bounds_set = True
[docs] def run(self, num_particles, num_iterations, cost_function=None,
num_processors=1, save_samples=False, stop_threshold=1e-5,
max_iter_no_improv=None):
"""
Run optimization
Parameters
----------
num_particles : int
Number of particles in population, ~20 is a good starting place
num_iterations : int
Number of iterations to perform.
cost_function : callable function
Takes a parameter set and returns a scalar (particles fitness)
num_processors : int
Number of processors to run on. If using scipy, note that you may
need to set OMP_NUM_THREADS=1 to prevent each process from using
more than one CPU.
save_samples : bool
Save ALL positions of particles over time, can require large memory
if num_particles, num_iterations, and len(parameters) is large.
stop_threshold : float
Standard deviation of the particles’ cost function at which the
optimization is stopped
max_iter_no_improv: int
Maximum steps allowed without improvement before the optimization
stops.
"""
if self._is_setup:
pass
else:
self._setup_pso()
if not callable(cost_function):
raise TypeError("Provide a callable cost function")
history = np.zeros((num_iterations, len(self.start)))
if self.save_sampled or save_samples:
self.all_history = np.zeros(
(num_iterations, num_particles, len(self.start)))
self.all_fitness = np.zeros((num_iterations, num_particles))
values = np.zeros(num_iterations)
self.population = [self._generate() for _ in range(num_particles)]
self.population_fitness = np.zeros(len(self.population))
if max_iter_no_improv is None:
max_iter_no_improv = np.inf
iter_without_improvement = 0
best_fitness = np.inf
with SerialExecutor() if num_processors == 1 else \
ProcessPoolExecutor(max_workers=num_processors) as executor:
for g in range(num_iterations):
if self.update_w:
self.w = (num_iterations - g + 1.) / num_iterations
self.population_fitness = np.array(
list(executor.map(
cost_function,
[p.pos for p in self.population],
)
)
)
for ind, fit in zip(self.population, self.population_fitness):
ind.fitness = fit
self._update_connected()
for part in self.population:
self._update_particle_position(part)
values[g] = self.best.fitness
history[g] = self.best.pos
if self.save_sampled or save_samples:
curr_fit, curr_pop = self.return_ranked_populations()
self.all_history[g, :, :] = curr_pop
self.all_fitness[g, :] = curr_fit
if self.verbose:
self.print_stats(g + 1, fitness=self.population_fitness)
if self.population_fitness.std() < stop_threshold:
self.log.info("Stopping. STD < stop_threshold.")
break
if self.best.fitness < best_fitness:
best_fitness = self.best.fitness
iter_without_improvement = 0
else:
iter_without_improvement += 1
if iter_without_improvement > max_iter_no_improv:
self.log.info("Stopping. max_iter_no_improv reached.")
break
self.values = np.array(values[:g])
self.history = np.array(history[:g, :])
def _calc_fitness_from_array(self, traj, num_sim, cost_function):
index_names = traj.index.names
traj.reset_index(inplace=True)
for i, part in enumerate(self.population):
start = i * num_sim
end = (i + 1) * num_sim
tmp_results = traj.loc[
traj.simulation.isin(list(range(start, end)))]
tmp_results.set_index(index_names, inplace=True)
error = cost_function(tmp_results)
part.fitness = error
self.population_fitness[i] = error
def _get_parameters_from_population(self, num_sims, total_sims, n_params):
""" Create param_array for GPU based simulators """
rate_vals = np.zeros((total_sims, n_params))
# create parameters for each particle, creates blocks per num_sims
for i, part in enumerate(self.population):
start = i * num_sims
end = (i + 1) * num_sims
rate_vals[start:end, :] = part.pos
return rate_vals
[docs] def run_ssa(self, model, num_particles, num_iterations, num_sim,
cost_function, simulator, save_samples=False,
stop_threshold=0):
"""
Run PSO for a stochastic simulator
Parameters
----------
model : pysb.Model
num_particles : int
Number of particles in the swarm.
num_iterations : int
Number of iterations to perform
num_sim : int
Number of SSA simulations to run for each particle.
cost_function : function
Takes a pandas dataframe of PySB trajectories created by running
multiple SSA simulations. Function must return a scalar.
simulator : pysb.Simulator
PySB simulator (CudaSSASimulator or OpenclSSASimulator)
save_samples : bool
stop_threshold : float
"""
if self._is_setup:
pass
else:
self._setup_pso()
if simulator is None:
raise ValueError(
"Must provide an SSA simulator to use this method")
history = []
values = []
self.population = [self._generate() for _ in range(num_particles)]
self.population_fitness = np.zeros((len(self.population)))
total_sims = num_particles * num_sim
rate_p = model.parameters_rules()
rate_mask = np.array([p in rate_p for p in model.parameters])
nominal_values = np.array([p.value for p in model.parameters])
log10_original_values = np.log10(nominal_values[rate_mask])
all_param_vals = np.repeat([nominal_values], total_sims, axis=0)
for g in range(0, num_iterations):
if self.update_w:
self.w = (num_iterations - g) / num_iterations
rate_values = self._get_parameters_from_population(
num_sim, total_sims, len(log10_original_values)
)
# convert back from log10 space
all_param_vals[:, rate_mask] = 10 ** rate_values
# reset initials and param_values so simulator won't try to
# duplicate any
simulator.initials = None
simulator.param_values = None
traj = simulator.run(param_values=all_param_vals).dataframe
self._calc_fitness_from_array(traj, num_sim, cost_function)
self._update_connected()
for part in self.population:
self._update_particle_position(part)
values.append(self.best.fitness)
history.append(deepcopy(self.best.pos))
if self.save_sampled or save_samples:
curr_fit, curr_pop = self.return_ranked_populations()
self.all_history[g, :, :] = curr_pop
self.all_fitness[g, :] = curr_fit
if self.verbose:
self.print_stats(g + 1, fitness=self.population_fitness)
if self.population_fitness.std() < stop_threshold:
self.log.info("Stopping criteria reached.")
break
self.values = np.array(values)
self.history = np.array(history)
[docs] def print_stats(self, iteration, fitness):
if iteration == 1:
self.log.info(stats_header)
self.log.info(
stats_output.format(iteration, self.best.fitness, fitness.mean(),
fitness.min(), fitness.max(), fitness.std())
)
class SerialExecutor(Executor):
""" Execute tasks in serial (immediately on submission)
Code originally from PySB.
"""
def submit(self, fn, *args, **kwargs):
f = Future()
try:
result = fn(*args, **kwargs)
except BaseException as e:
f.set_exception(e)
else:
f.set_result(result)
return f