Source code for autooed.mobo.solver.pareto_discovery.pareto_discovery

'''
ParetoDiscovery multi-objective solver.
'''

import numpy as np
from scipy.optimize import minimize
from scipy.linalg import null_space
from pymoo.model.algorithm import Algorithm
from pymoo.model.duplicate import DefaultDuplicateElimination
from pymoo.model.individual import Individual
from pymoo.model.initialization import Initialization
from pymoo.optimize import minimize as minimize_ea
from multiprocess import Process, Queue, cpu_count

from autooed.utils.sampling import lhs
from autooed.utils.pareto import find_pareto_front
from autooed.mobo.solver.base import Solver
from autooed.mobo.solver.pareto_discovery.buffer import get_buffer
from autooed.mobo.solver.pareto_discovery.utils import propose_next_batch, propose_next_batch_without_label, get_sample_num_from_families


def _local_optimization(x, y, f, eval_func, bounds, constr_func, delta_s):
    '''
    Local optimization of generated stochastic samples by minimizing distance to the target, see section 6.2.3.
    Input:
        x: a design sample, shape = (n_var,)
        y: performance of x, shape = (n_obj,)
        f: relative performance to the buffer origin, shape = (n_obj,)
        eval_func: problem's evaluation function
        bounds: problem's lower and upper bounds, shape = (2, n_var)
        constr_func: problem's constraint evaluation function
        delta_s: scaling factor for choosing reference point in local optimization, see section 6.2.3
    Output:
        x_opt: locally optimized sample x
    '''
    # choose reference point z
    f_norm = np.linalg.norm(f)
    s = 2.0 * f / np.sum(f) - 1 - f / f_norm
    s /= np.linalg.norm(s)
    z = y + s * delta_s * np.linalg.norm(f)

    # optimization objective, see eq(4)
    def fun(x):
        fx = eval_func(x, return_values_of=['F'])
        return np.linalg.norm(fx - z)

    # constraint function
    if constr_func is not None:
        def fun_constr(x):
            return -constr_func(x)

    # jacobian of the objective
    dy = eval_func(x, return_values_of=['dF'])
    if dy is None:
        jac = None
    else:
        def jac(x):
            fx, dfx = eval_func(x, return_values_of=['F', 'dF'])
            return ((fx - z) / np.linalg.norm(fx - z)) @ dfx
    
    # do optimization
    if constr_func is None:
        res = minimize(fun, x, method='L-BFGS-B', jac=jac, bounds=np.array(bounds).T)
    else:
        res = minimize(fun, x, method='SLSQP', jac=jac, bounds=np.array(bounds).T, constraints=({'type': 'ineq', 'fun': fun_constr},))
    x_opt = res.x
    return x_opt


def _get_kkt_dual_variables(F, G, DF, DG):
    '''
    Optimizing for dual variables alpha and beta in KKT conditions, see section 4.2, proposition 4.5.
    Input:
        Given a design sample,
        F: performance value, shape = (n_obj,)
        G: active constraints, shape = (n_active_const,)
        DF: jacobian matrix of performance, shape = (n_obj, n_var)
        DG: jacobian matrix of active constraints, shape = (n_active_const, n_var)
        where n_var = D, n_obj = d, n_active_const = K' in the original paper
    Output:
        alpha_opt, beta_opt: optimized dual variables
    '''
    # NOTE: use min-norm solution for solving alpha then determine beta instead?
    n_obj = len(F)
    n_active_const = len(G) if G is not None else 0

    '''
    Optimization formulation:
        To optimize the last line of (2) in section 4.2, we change it to a quadratic optization problem by:
        find x to let Ax = 0 --> min_x (Ax)^2
        where x means [alpha, beta] and A means [DF, DG].
        Constraints: alpha >= 0, beta >= 0, sum(alpha) = 1.
        NOTE: we currently ignore the constraint beta * G = 0 because G will always be 0 with only box constraints, but add that constraint will result in poor optimization solution (?)
    '''
    if n_active_const > 0: # when there are active constraints

        def fun(x, n_obj=n_obj, DF=DF, DG=DG):
            alpha, beta = x[:n_obj], x[n_obj:]
            objective = alpha @ DF + beta @ DG
            return 0.5 * objective @ objective

        def jac(x, n_obj=n_obj, DF=DF, DG=DG):
            alpha, beta = x[:n_obj], x[n_obj:]
            objective = alpha @ DF + beta @ DG
            return np.vstack([DF, DG]) @ objective

        const = {'type': 'eq', 
            'fun': lambda x, n_obj=n_obj: np.sum(x[:n_obj]) - 1.0, 
            'jac': lambda x, n_obj=n_obj: np.concatenate([np.ones(n_obj), np.zeros_like(x[n_obj:])])}
    
    else: # when there's no active constraint
        
        def fun(x, DF=DF):
            objective = x @ DF
            return 0.5 * objective @ objective

        def jac(x, DF=DF):
            objective = x @ DF
            return DF @ objective

        const = {'type': 'eq', 
                'fun': lambda x: np.sum(x) - 1.0, 
                'jac': np.ones_like}

    # specify different bounds for alpha and beta
    bounds = np.array([[0.0, np.inf]] * (n_obj + n_active_const))
    
    # NOTE: we use random value to initialize alpha for now, maybe consider the location of F we can get a more accurate initialization
    alpha_init = np.random.random(len(F))
    alpha_init /= np.sum(alpha_init)
    beta_init = np.zeros(n_active_const) # zero initialization for beta
    x_init = np.concatenate([alpha_init, beta_init])

    # do optimization using SLSQP
    res = minimize(fun, x_init, method='SLSQP', jac=jac, bounds=bounds, constraints=const)
    x_opt = res.x
    alpha_opt, beta_opt = x_opt[:n_obj], x_opt[n_obj:]
    return alpha_opt, beta_opt


def _get_active_box_const(x, bounds):
    '''
    Getting the indices of active box constraints.
    Input:
        x: a design sample, shape = (n_var,)
        bounds: problem's lower and upper bounds, shape = (2, n_var)
    Output:
        active_idx: indices of all active constraints
        upper_active_idx: indices of upper active constraints
        lower_active_idx: indices of lower active constraints
    '''
    eps = 1e-8 # epsilon value to determine 'active'
    upper_active = bounds[1] - x < eps
    lower_active = x - bounds[0] < eps
    active = np.logical_or(upper_active, lower_active)
    active_idx, upper_active_idx, lower_active_idx = np.where(active)[0], np.where(upper_active)[0], np.where(lower_active)[0]
    return active_idx, upper_active_idx, lower_active_idx


def _get_box_const_value_jacobian_hessian(x, bounds):
    '''
    Getting the value, jacobian and hessian of active box constraints.
    Input:
        x: a design sample, shape = (n_var,)
        bounds: problem's lower and upper bounds, shape = (2, n_var)
    Output:
        G: value of active box constraints (always 0), shape = (n_active_const,)
        DG: jacobian matrix of active box constraints (1/-1 at active locations, otherwise 0), shape = (n_active_const, n_var)
        HG: hessian matrix of active box constraints (always 0), shape = (n_active_const, n_var, n_var)
    '''
    # get indices of active constraints
    active_idx, upper_active_idx, _ = _get_active_box_const(x, bounds)
    n_active_const, n_var = len(active_idx), len(x)

    if n_active_const > 0:
        G = np.zeros(n_active_const)
        DG = np.zeros((n_active_const, n_var))
        for i, idx in enumerate(active_idx):
            constraint = np.zeros(n_var)
            if idx in upper_active_idx:
                constraint[idx] = 1 # upper active
            else:
                constraint[idx] = -1 # lower active
            DG[i] = constraint
        HG = np.zeros((n_active_const, n_var, n_var))
        return G, DG, HG
    else:
        # no active constraints
        return None, None, None


def _get_optimization_directions(x_opt, eval_func, bounds):
    '''
    Getting the directions to explore local pareto manifold.
    Input:
        x_opt: locally optimized design sample, shape = (n_var,)
        eval_func: problem's evaluation function
        bounds: problem's lower and upper bounds, shape = (2, n_var)
    Output:
        directions: local exploration directions for alpha, beta and x (design sample)
    '''
    # evaluate the value, jacobian and hessian of performance
    F, DF, HF = eval_func(x_opt, return_values_of=['F', 'dF', 'hF'])
    
    # evaluate the value, jacobian and hessian of box constraint (NOTE: assume no other types of constraints)
    G, DG, HG = _get_box_const_value_jacobian_hessian(x_opt, bounds)
    
    # KKT dual variables optimization
    alpha, beta = _get_kkt_dual_variables(F, G, DF, DG)

    n_obj, n_var, n_active_const = len(F), len(x_opt), len(G) if G is not None else 0

    # compute H in eq(3) (NOTE: the two forms below are equivalent for box constraint since HG = 0)
    if n_active_const > 0:
        H = HF.T @ alpha + HG.T @ beta
    else:
        H = HF.T @ alpha

    # compute exploration directions (unnormalized) by taking the null space of image in eq(3)
    # TODO: this part is mainly copied from Adriana's implementation, to be checked
    # NOTE: seems useless to solve for d_alpha and d_beta, maybe need to consider all possible situations in null_space computation
    alpha_const = np.concatenate([np.ones(n_obj), np.zeros(n_active_const + n_var)])
    if n_active_const > 0:
        comp_slack_const = np.column_stack([np.zeros((n_active_const, n_obj + n_active_const)), DG])
        DxHx = np.vstack([alpha_const, comp_slack_const, np.column_stack([DF.T, DG.T, H])])
    else:
        DxHx = np.vstack([alpha_const, np.column_stack([DF.T, H])])
    directions = null_space(DxHx)

    # eliminate numerical error
    eps = 1e-8
    directions[np.abs(directions) < eps] = 0.0
    return directions


def _first_order_approximation(x_opt, directions, bounds, constr_func, n_grid_sample):
    '''
    Exploring new samples from local manifold (first order approximation of pareto front).
    Input:
        x_opt: locally optimized design sample, shape = (n_var,)
        directions: local exploration directions for alpha, beta and x (design sample)
        bounds: problem's lower and upper bounds, shape = (2, n_var)
        constr_func: problem's constraint evaluation function
        n_grid_sample: number of samples on local manifold (grid), see section 6.3.1
    Output:
        x_samples: new valid samples from local manifold (grid)
    '''
    n_var = len(x_opt)
    lower_bound, upper_bound = bounds[0], bounds[1]
    active_idx, _, _ = _get_active_box_const(x_opt, bounds)
    n_active_const = len(active_idx)
    n_obj = len(directions) - n_var - n_active_const

    x_samples = np.array([x_opt])

    # TODO: check why unused d_alpha and d_beta here
    d_alpha, d_beta, d_x = directions[:n_obj], directions[n_obj:n_obj + n_active_const], directions[-n_var:]
    eps = 1e-8
    if np.linalg.norm(d_x) < eps: # direction is a zero vector
        return x_samples
    direction_dim = d_x.shape[1]

    if direction_dim > n_obj - 1:
        # more than d-1 directions to explore, randomly choose d-1 sub-directions
        indices = np.random.choice(np.arange(direction_dim), n_obj - 1)
        while np.linalg.norm(d_x[:, indices]) < eps:
            indices = np.random.choice(np.arange(direction_dim), n_obj - 1)
        d_x = d_x[:, indices]
    elif direction_dim < n_obj - 1:
        # less than d-1 directions to explore, do not expand the point
        return x_samples
    
    # normalize direction
    d_x /= np.linalg.norm(d_x)

    # NOTE: Adriana's code also checks if such direction has been expanded, but maybe unnecessary

    # grid sampling on expanded surface (NOTE: more delicate sampling scheme?)
    bound_scale = np.expand_dims(upper_bound - lower_bound, axis=1)
    d_x *= bound_scale
    loop_count = 0 # avoid infinite loop when it's hard to get valid samples
    while len(x_samples) < n_grid_sample:
        # compute expanded samples
        curr_dx_samples = np.sum(np.expand_dims(d_x, axis=0) * np.random.random((n_grid_sample, 1, n_obj - 1)), axis=-1)
        curr_x_samples = np.expand_dims(x_opt, axis=0) + curr_dx_samples
        # check validity of samples
        flags = np.logical_and((curr_x_samples <= upper_bound).all(axis=1), (curr_x_samples >= lower_bound).all(axis=1))
        if constr_func is not None:
            flags = np.logical_and(flags, constr_func(curr_x_samples) <= 0)
        valid_idx = np.where(flags)[0]
        x_samples = np.vstack([x_samples, curr_x_samples[valid_idx]])
        loop_count += 1
        if loop_count > 10:
            break
    x_samples = x_samples[:n_grid_sample]
    return x_samples


def _pareto_discover(xs, eval_func, bounds, constr_func, delta_s, origin, origin_constant, n_grid_sample, queue):
    '''
    Local optimization and first-order approximation.
    (We move these functions out from the ParetoDiscovery class for parallelization)
    Input:
        xs: a batch of samples x, shape = (batch_size, n_var)
        eval_func: problem's evaluation function
        bounds: problem's lower and upper bounds, shape = (2, n_var)
        constr_func: problem's constraint evaluation function
        delta_s: scaling factor for choosing reference point in local optimization, see section 6.2.3
        origin: origin of performance buffer
        origin_constant: when evaluted value surpasses the buffer origin, adjust the origin accordingly and subtract this constant
        n_grid_sample: number of samples on local manifold (grid), see section 6.3.1
        queue: the queue storing results from all processes
    Output (stored in queue):
        x_samples_all: all valid samples from local manifold (grid)
        patch_ids: patch ids for all valid samples (same id when expanded from same x)
        sample_num: number of input samples (needed for counting global patch ids)
        new_origin: new origin point for performance buffer
    '''
    # evaluate samples x and adjust origin accordingly
    ys = eval_func(xs, return_values_of=['F'])
    new_origin = np.minimum(origin, np.min(ys, axis=0))
    if (new_origin != origin).any():
        new_origin -= origin_constant
    fs = ys - new_origin

    x_samples_all = []
    patch_ids = []
    for i, (x, y, f) in enumerate(zip(xs, ys, fs)):

        # local optimization by optimizing eq(4)
        x_opt = _local_optimization(x, y, f, eval_func, bounds, constr_func, delta_s)

        # get directions to expand in local manifold
        directions = _get_optimization_directions(x_opt, eval_func, bounds)

        # get new valid samples from local manifold
        x_samples = _first_order_approximation(x_opt, directions, bounds, constr_func, n_grid_sample)
        x_samples_all.append(x_samples)
        patch_ids.extend([i] * len(x_samples))

    queue.put([np.vstack(x_samples_all), patch_ids, len(xs), new_origin])


class ParetoDiscoveryAlgorithm(Algorithm):
    '''
    The Pareto discovery algorithm introduced by: Schulz, Adriana, et al. "Interactive exploration of design trade-offs." ACM Transactions on Graphics (TOG) 37.4 (2018): 1-14.
    '''
    def __init__(self,
                pop_size=None,
                sampling=None,
                survival=None,
                eliminate_duplicates=DefaultDuplicateElimination(),
                repair=None,
                individual=Individual(),
                n_cell=None,
                cell_size=10,
                buffer_origin=None,
                buffer_origin_constant=1e-2,
                delta_b=0.2,
                label_cost=10,
                delta_p=10,
                delta_s=0.3,
                n_grid_sample=100,
                n_process=cpu_count(),
                **kwargs
                ):
        '''
        Inputs (essential parameters):
            pop_size: population size
            sampling: initial sample data or sampling method to obtain initial population
            n_cell: number of cells in performance buffer
            cell_size: maximum number of samples inside each cell of performance buffer
            buffer_origin: origin of performance buffer
            buffer_origin_constant: when evaluted value surpasses the buffer origin, adjust the origin accordingly and subtract this constant
            delta_b: unary energy normalization constant for sparse approximation, see section 6.4
            label_cost: for reducing number of unique labels in sparse approximation, see section 6.4
            delta_p: factor of perturbation in stochastic sampling, see section 6.2.2
            delta_s: scaling factor for choosing reference point in local optimization, see section 6.2.3
            n_grid_sample: number of samples on local manifold (grid), see section 6.3.1
            n_process: number of processes for parallelization
        '''
        super().__init__(**kwargs)
        
        self.pop_size = pop_size
        self.survival = survival
        self.individual = individual

        if isinstance(eliminate_duplicates, bool):
            if eliminate_duplicates:
                self.eliminate_duplicates = DefaultDuplicateElimination()
            else:
                self.eliminate_duplicates = None
        else:
            self.eliminate_duplicates = eliminate_duplicates

        self.initialization = Initialization(sampling,
                                            individual=individual,
                                            repair=repair,
                                            eliminate_duplicates=self.eliminate_duplicates)

        self.n_gen = None
        self.pop = None
        self.off = None

        self.approx_set = None
        self.approx_front = None
        self.fam_lbls = None
        
        self.buffer = None
        if n_cell is None:
            n_cell = self.pop_size
        self.buffer_args = {'cell_num': n_cell,
                            'cell_size': cell_size,
                            'origin': buffer_origin,
                            'origin_constant': buffer_origin_constant,
                            'delta_b': delta_b,
                            'label_cost': label_cost}

        self.delta_p = delta_p
        self.delta_s = delta_s
        self.n_grid_sample = n_grid_sample
        self.n_process = n_process
        self.patch_id = 0

        self.constr_func = None

    def _initialize(self):
        # create the initial population
        pop = self.initialization.do(self.problem, self.pop_size, algorithm=self)
        pop_x = pop.get('X').copy()

        # check if problem has constraints other than bounds
        pop_constr = self.problem.evaluate_constraint(pop_x)
        if pop_constr is not None:
            self.constr_func = self.problem.evaluate_constraint
            pop = pop[pop_constr <= 0]

            while len(pop) < self.pop_size:
                new_pop = self.initialization.do(self.problem, self.pop_size, algorithm=self)
                new_pop_x = new_pop.get('X').copy()
                new_pop = new_pop[self.problem.evaluate_constraint(new_pop_x) <= 0]
                pop = pop.merge(new_pop)
        
            pop = pop[:self.pop_size]
            pop_x = pop.get('X').copy()

        pop_f = self.problem.evaluate(pop_x, return_values_of=['F'])

        # initialize buffer
        self.buffer = get_buffer(self.problem.n_obj, **self.buffer_args)
        patch_ids = np.full(self.pop_size, self.patch_id) # NOTE: patch_ids here might not make sense
        self.patch_id += 1
        self.buffer.insert(pop_x, pop_f, patch_ids)

        # update population by the best samples in the buffer
        pop = pop.new('X', self.buffer.sample(self.pop_size))

        # evaluate population using the objective function
        self.evaluator.eval(self.problem, pop, algorithm=self)

        # NOTE: check if need survival here
        if self.survival:
            pop = self.survival.do(self.problem, pop, len(pop), algorithm=self)

        self.pop = pop

        # sys.stdout.write('ParetoDiscovery optimizing: generation %i' % self.n_gen)
        # sys.stdout.flush()

    def _next(self):
        '''
        Core algorithm part in each iteration, see algorithm 1.
        --------------------------------------
        xs = stochastic_sampling(B, F, X)
        for x in xs:
            D = select_direction(B, x)
            x_opt = local_optimization(D, F, X)
            M = first_order_approximation(x_opt, F, X)
            update_buffer(B, F(M))
        --------------------------------------
        where F is problem evaluation, X is design constraints
        '''
        # update optimization progress
        # sys.stdout.write('\b' * len(str(self.n_gen - 1)) + str(self.n_gen))
        # sys.stdout.flush()

        # stochastic sampling by adding local perturbance
        xs = self._stochastic_sampling()

        # parallelize core pareto discovery process by multiprocess, see _pareto_discover()
        # including select_direction, local_optimization, first_order_approximation in above algorithm illustration
        x_batch = np.array_split(xs, self.n_process)
        queue = Queue()
        process_count = 0
        for x in x_batch:
            if len(x) > 0:
                p = Process(target=_pareto_discover, 
                    args=(x, self.problem.evaluate, [self.problem.xl, self.problem.xu], self.constr_func, self.delta_s, 
                        self.buffer.origin, self.buffer.origin_constant, self.n_grid_sample, queue))
                p.start()
                process_count += 1

        # gather results (new samples, new patch ids, new origin of performance buffer) from parallel discovery
        new_origin = self.buffer.origin
        x_samples_all = []
        patch_ids_all = []
        for _ in range(process_count):
            x_samples, patch_ids, sample_num, origin = queue.get()
            if x_samples is not None:
                x_samples_all.append(x_samples)
                patch_ids_all.append(np.array(patch_ids) + self.patch_id) # assign corresponding global patch ids to samples
                self.patch_id += sample_num
            new_origin = np.minimum(new_origin, origin)

        # evalaute all new samples and adjust the origin point of buffer
        x_samples_all = np.vstack(x_samples_all)
        y_samples_all = self.problem.evaluate(x_samples_all, return_values_of=['F'])
        new_origin = np.minimum(np.min(y_samples_all, axis=0), new_origin)
        patch_ids_all = np.concatenate(patch_ids_all)

        # update buffer
        self.buffer.move_origin(new_origin)
        self.buffer.insert(x_samples_all, y_samples_all, patch_ids_all)

        # update population by the best samples in the buffer
        self.pop = self.pop.new('X', self.buffer.sample(self.pop_size))
        self.evaluator.eval(self.problem, self.pop, algorithm=self)

    def _stochastic_sampling(self):
        '''
        Stochastic sampling around current population to initialize each iteration to avoid local minima, see section 6.2.2
        '''
        xs = self.pop.get('X').copy()

        # sampling loop
        num_target = xs.shape[0]
        xs_final = np.zeros((0, xs.shape[1]), xs.dtype)

        while xs_final.shape[0] < num_target:
            # generate stochastic direction
            d = np.random.random(xs.shape)
            d /= np.expand_dims(np.linalg.norm(d, axis=1), axis=1)

            # generate random scaling factor
            delta = np.random.random() * self.delta_p

            # generate new stochastic samples
            xs_perturb = xs + 1.0 / (2 ** delta) * d # NOTE: is this scaling form reasonable? maybe better use relative scaling?
            xs_perturb = np.clip(xs_perturb, self.problem.xl, self.problem.xu)

            if self.constr_func is None:
                xs_final = xs_perturb
            else:
                # check constraint values
                constr = self.constr_func(xs_perturb)
                flag = constr <= 0
                if np.any(flag):
                    xs_final = np.vstack((xs_final, xs_perturb[flag]))

        return xs_final[:num_target]

    def propose_next_batch(self, curr_pfront, ref_point, batch_size):
        '''
        Propose next batch to evaluate for active learning. 
        Greedely propose sample with max HV until all families ar visited. Allow only samples with max HV from unvisited family.
        '''
        approx_x, approx_y = self.approx_set, self.approx_front
        labels = self.fam_lbls

        X_next = []
        Y_next = []
        family_lbls = []
        
        if len(approx_x) >= batch_size:
            # approximation result is enough to propose all candidates
            curr_X_next, curr_Y_next, labels_next = propose_next_batch(curr_pfront, ref_point, approx_y, approx_x, batch_size, labels)
            X_next.append(curr_X_next)
            Y_next.append(curr_Y_next)
            family_lbls.append(labels_next)

        else:
            # approximation result is not enough to propose all candidates
            # so propose all result as candidates, and propose others from buffer
            # NOTE: may consider re-expanding manifolds to produce more approximation result, but may not be necessary
            X_next.append(approx_x)
            Y_next.append(approx_y)
            family_lbls.extend(labels)
            remain_batch_size = batch_size - len(approx_x)
            buffer_xs, buffer_ys = self.buffer.flattened()
            prop_X_next, prop_Y_next = propose_next_batch_without_label(curr_pfront, ref_point, buffer_ys, buffer_xs, remain_batch_size)
            X_next.append(prop_X_next)
            Y_next.append(prop_Y_next)
            family_lbls.extend(np.full(remain_batch_size, -1))

        X_next = np.vstack(X_next)
        Y_next = np.vstack(Y_next)
        return X_next, Y_next, family_lbls

    def get_sparse_front(self):
        '''
        Get sparse approximation of Pareto front and set
        '''
        approx_x, approx_y = self.approx_set, self.approx_front
        labels = self.fam_lbls

        return labels, approx_x, approx_y

    def _finalize(self):
        # set population as all samples in performance buffer
        pop_x, pop_y = self.buffer.flattened()
        self.pop = self.pop.new('X', pop_x, 'F', pop_y)
        # get sparse front approximation
        self.fam_lbls, self.approx_set, self.approx_front = self.buffer.sparse_approximation()
        # print()


[docs]class ParetoDiscovery(Solver): ''' Solver based on ParetoDiscovery [Schulz et al. 2018]. NOTE: only compatible with direct selection. '''
[docs] def __init__(self, problem, n_gen=10, pop_size=100, n_process=cpu_count(), **kwargs): # TODO: check n_gen super().__init__(problem) self.n_gen = n_gen self.algo = ParetoDiscoveryAlgorithm(pop_size=pop_size, n_process=n_process)
[docs] def _solve(self, X, Y, batch_size): # initialize population X = np.vstack([X, lhs(X.shape[1], batch_size)]) self.algo.initialization.sampling = X res = minimize_ea(self.problem, self.algo, ('n_gen', self.n_gen)) X_candidate, Y_candidate = res.pop.get('X'), res.pop.get('F') algo = res.algorithm curr_pfront = find_pareto_front(Y) ref_point = np.max(np.vstack([Y, Y_candidate]), axis=0) X_candidate, Y_candidate, _ = algo.propose_next_batch(curr_pfront, ref_point, batch_size) return X_candidate, Y_candidate