Source code for autooed.mobo.solver.moead

'''
MOEA/D multi-objective solver.
'''

import numpy as np
from sklearn.cluster import KMeans
from pymoo.algorithms.moead import MOEAD as MOEADAlgo
from pymoo.util.nds.non_dominated_sorting import NonDominatedSorting
from pymoo.optimize import minimize

from autooed.utils.sampling import lhs
from autooed.utils.pareto import find_pareto_front
from autooed.mobo.solver.base import Solver


[docs]class MOEAD(Solver): ''' Solver based on MOEA/D in MOEA/D-EGO. NOTE: only compatible with Direct selection. '''
[docs] def __init__(self, problem, n_gen=100, pop_size=100, **kwargs): super().__init__(problem) self.n_gen = n_gen self.pop_size = pop_size # generate direction vectors by random sampling self.ref_dirs = np.random.random((pop_size, problem.n_obj)) self.ref_dirs /= np.expand_dims(np.sum(self.ref_dirs, axis=1), 1) self.algo = MOEADAlgo(pop_size=pop_size, ref_dirs=self.ref_dirs, eliminate_duplicates=False)
[docs] def _solve(self, X, Y, batch_size): # initialize population if len(X) < self.pop_size: X = np.vstack([X, lhs(X.shape[1], self.pop_size - len(X))]) elif len(X) > self.pop_size: sorted_indices = NonDominatedSorting().do(Y) X = X[sorted_indices[:self.pop_size]] self.algo.initialization.sampling = X res = minimize(self.problem, self.algo, ('n_gen', self.n_gen)) X_candidate, Y_candidate, algo = res.pop.get('X'), res.pop.get('F'), res.algorithm G = Y_candidate _, curr_pset_idx = find_pareto_front(Y, return_index=True) curr_pset = X[curr_pset_idx] G_s = algo._decomposition.do(G, weights=self.ref_dirs, ideal_point=algo.ideal_point) # scalarized acquisition value # build candidate pool Q Q_x, Q_dir, Q_g, Q_gs = [], [], [], [] X_added = curr_pset.copy() for x, ref_dir, g, gs in zip(X_candidate, self.ref_dirs, G, G_s): if (x != X_added).any(axis=1).all(): Q_x.append(x) Q_dir.append(ref_dir) Q_g.append(g) Q_gs.append(gs) X_added = np.vstack([X_added, x]) Q_x, Q_dir, Q_g, Q_gs = np.array(Q_x), np.array(Q_dir), np.array(Q_g), np.array(Q_gs) min_batch_size = min(batch_size, len(Q_x)) # in case Q is smaller than batch size if min_batch_size == 0: indices = np.random.choice(len(X_candidate), batch_size, replace=False) return X_candidate[indices], Y_candidate[indices] # k-means clustering on X with weight vectors labels = KMeans(n_clusters=batch_size).fit_predict(np.column_stack([Q_x, Q_dir])) # select point in each cluster with lowest scalarized acquisition value X_candidate, Y_candidate = [], [] for i in range(batch_size): indices = np.where(labels == i)[0] top_idx = indices[np.argmin(Q_gs[indices])] X_candidate.append(Q_x[top_idx]) Y_candidate.append(Q_g[top_idx]) return np.array(X_candidate), np.array(Y_candidate)