import multiprocessing.process
from .base.base_struct import BaseStruct
from HyperGP.mods import AvailableMods, __Mods
# import types
import inspect
# import itertools
import random
from HyperGP.base.base_struct import States
from HyperGP.libs.states import WorkflowStates
from HyperGP.src import device as set_device, query_device
import multiprocessing
import copy
# from HyperGP.src import check_gpu
from tqdm import tqdm
import sys
[docs]
class GpOptimizer(BaseStruct, __Mods):
"""
To use ``GpOptimizer`` module, we should first import it:
Examples:
>>> from HyperGP import GpOptimizer
"""
available_mods = AvailableMods()
[docs]
def __init__(self, states=None, module_states=None, parallel=True, gpu=True, cash=False, **kwargs):
"""
Initialize the optimizer
Args:
states(HyperGP.States):
module_states(HyperGP.States):
parallel(boolean):
gpu(boolean):
kwargs:
Returns:
a new ``GPOptimizer`` module
Examples:
>>> optimizer = GpOptimizer()
"""
self.gpu=gpu
self.cash = cash
self.monitors = []
self.components = {} #for easier to deepcopy
self.status = {**kwargs, **{
'gpu': self.gpu,
'cash': self.cash,
'parallel': parallel
}}
self.workflowstates = WorkflowStates()
self.proc, self.queue = None, None
super().__init__(states, module_states, **kwargs)
if parallel:
self.enable('parallel')
[docs]
def status_init(self, **kwargs):
"""
Register the needed states in the evolution.
The states will be stored in the ``workflowstates`` attribute, then we can get it with `str-like` object when we use ``iter_component`` method, \
or just get it with attribute operator.
Examples:
>>> pop_size = 1000
>>> pset = HyperGP.PrimitiveSet(input_arity=1, primitive_set=[('add', HyperGP.tensor.add, 2),('sub', HyperGP.tensor.sub, 2)])
>>> pop = HyperGP.Population()
>>> pop.initPop(pop_size=pop_size, prog_paras=ProgBuildStates(pset=pset, depth_rg=[2, 3], len_limit=10000))
>>> pop.stateRegister(cprogs = pop.states['progs'].copy)
>>> input_array = HyperGP.tensor.uniform(0, 10, size=(1, 10000))
>>> optimizer = HyperGP.GpOptimizer()
>>> optimizer.status_init(
... p_list=pop.states['cprogs'].indivs,
fit_list = pop.states['cprogs'].fitness,
input=input_array,
pset=pset,
output=None)
"""
for key, value in kwargs.items():
self.workflowstates[key] = value
def __getattr__(self, item):
# assert item in self.workflowstates
# return self.workflowstates[item]
if item not in self.__dict__:
assert item in self.workflowstates, "{ITEM} not in workflowstates and __dict__".format(ITEM=item)
return self.workflowstates[item]
return self.__dict__[item]
def _mask_check(self, masks):
mask_lens = set([len(mask) if isinstance(mask, list) else 1 for mask in masks])
assert len(mask_lens) <= 2, "The length of each element in mask should keep same"
return max(mask_lens)
def _mask_index(self, l, mask):
return [l[m] for m in mask]
[docs]
def monitor(self, tool, track_object, save_path=None):
"""
Register the components want to be iteratively executed.
Args:
tool: The monitor to be called
track_object (str-like or list): States to be monitored
save_path: File path to save the results
Examples:
>>> optimizer.monitor(HyperGP.monitors.statistics_record, "fit_list", "./records/fitness.txt")
./records/fitness.txt:
min max mean var std
148792.54973409744 149393.69216920136 149353.97619133547 15368.910548484808 123.97141020608262
148730.17933285816 149387.63549782007 149297.24560745712 36924.96589522792 192.15869976461622
148758.9647289339 149387.54256014896 149171.7426450109 65744.58211613352 256.4070633117066
...
)
"""
if getattr(tool, "init") and callable(tool.init):
tool.init()
self.monitors.append([tool, track_object, save_path])
[docs]
def iter_component(self, *args):
"""
Register the components want to be iteratively executed.
Args:
args: The ``HyperGP.states.ParaStates`` module is needed to register each component.
Examples:
>>> optimizer.iter_component(
>>> ParaStates(func=HyperGP.RandTrCrv(), source=["p_list", "p_list"], to=["p_list", "p_list"],
... mask=set_prmask(pop_size)),
>>> ParaStates(func=shuffle, source=["p_list"], to=["p_list"],
... mask=[1]),
>>> ParaStates(func=HyperGP.RandTrMut(), source=["p_list", ProgBuildStates(pset=pset, depth_rg=[2, 3], len_limit=100), True], to=["p_list"],
... mask=[set_armask(pop_size), 1, 1]),
>>> ParaStates(func=HyperGP.executor, source=["p_list", "input", "pset"], to=["output", None],
... mask=[1, 1, 1]),
>>> ParaStates(func=HyperGP.evaluation, source=["output", "output_tensor"], to=["fit_list"],
... mask=[1, 1])
)
"""
self.components['from_list'] = []
self.components['to_list'] = []
self.components['mask_list'] = []
self.components['pdefine_list'] = []
self.components['param_list'] = []
self.components['func_list'] = []
self.components['mfunc_list'] = []
for func in args:
signature = inspect.signature(func["func"])
params = signature.parameters
self.components['from_list'].append(func["source"])
self.components['func_list'].append(func["func"])
self.components['to_list'].append(func["to"])
self.components['mask_list'].append(func["mask"])
self.components['pdefine_list'].append(func["parallel"])
self.components['param_list'].append(list(params.keys()))
self.components['mfunc_list'].append(func["func_mask"])
[docs]
def enable(self, mod, **kwargs):
if getattr(self, mod):
self.__setattr__(mod, self.available_mods.__getattribute__(mod)())
self.__getattribute__(mod)._popSet(self, **kwargs)
def _run_independent(self, iter, device=[0], tqdm_diable=False, stop_criteria=None):
for i in tqdm(range(iter), disable=tqdm_diable):
mask_list = [[mask() if callable(mask) else mask for mask in masks] for masks in self.components['mask_list']]
unit_size_list = [self._mask_check(mask) for mask in mask_list]
for j, (func, from_l_pre, to_l, mask_l) in enumerate(zip(self.components['func_list'], self.components['from_list'], self.components['to_list'], mask_list)):
from_l = [[] for l in range(len(from_l_pre))]
for k in range(len(from_l_pre)):
if isinstance(from_l_pre[k], str) and from_l_pre[k] in self.workflowstates:
from_l[k] = self.workflowstates[from_l_pre[k]]
else:
from_l[k] = from_l_pre[k]
states = [States(**{self.components['param_list'][j][k]:source[mask_l[k][z]] for k, source in enumerate(from_l) if isinstance(mask_l[k], list)}) for z in range(unit_size_list[j])]
states_kwargs = {self.components['param_list'][j][k]:from_l[k] for k, mask in enumerate(mask_l) if isinstance(mask, int)}
if len(states) == 0:
states = [States(**states_kwargs)]
states_kwargs = {}
funcs = [func] * unit_size_list[j] if not isinstance(func, list) else [self._mask_index(func, mask) for mask in self.components['mfunc_list'][j]]
rets = self.__parallel(funcs, states, device, self.components['pdefine_list'][j], kwargs=states_kwargs)
if len(to_l) == 0:
continue
for k, key in enumerate(to_l):
if key == None:
continue
if isinstance(key, str):
self.workflowstates[key] = None
continue
key.clear()
if len(to_l) > 1:
result = [[] for i in range(len(to_l))]
for k, key in enumerate(to_l):
if key == None:
continue
for res in rets:
if isinstance(res[k], list):
result[k].extend(res[k])
else:
result[k].append(res[k])
if isinstance(key, str):
if len(result[k]) > 1:
if self.workflowstates[key] is not None:
self.workflowstates[key].extend(result[k])
else:
self.workflowstates[key] = result[k]
else:
if self.workflowstates[key] is not None and isinstance(self.workflowstates[key], list):
print(func, key, self.workflowstates[key])
self.workflowstates[key].extend(result[k][0])
elif self.workflowstates[key] is not None:
self.workflowstates[key] = [self.workflowstates[key]] + [result[k][0]]
else:
self.workflowstates[key] = result[k][0]
else:
key.extend(result[k])
else:
result = []
for res in rets:
if isinstance(res, list):
result.extend(res)
else:
result.append(res)
if isinstance(to_l[0], str):
self.workflowstates[to_l[0]] = result if len(result) > 1 else result[0]
else:
to_l[0].extend(result)
for monitor in self.monitors:
track_object = monitor[1]
setattr(monitor[0], 'iter', i)
if isinstance(monitor[1], str):
track_object = self.workflowstates[monitor[1]]
monitor_ret = monitor[0](track_object, save_path=monitor[2])
if isinstance(monitor[1], list):
track_object = [self.workflowstates[key] if isinstance(key, str) else key for key in monitor[1]]
monitor_ret = monitor[0](*track_object, save_path=monitor[2])
if monitor_ret is not None and isinstance(monitor_ret, str):
tqdm.write(monitor_ret)
if stop_criteria is not None and stop_criteria():
return
def __run_parallel(self, iter, device=[0], tqdm_diable=False, stop_criteria=None):
context = multiprocessing.get_context("spawn")
manager_queue = context.Manager().Queue()
proc = context.Process(target=_state_transform, args=(iter, manager_queue, self._package, self.status, device, tqdm_diable, stop_criteria))
self.proc = proc
self.queue = manager_queue
[docs]
def run(self, iter, device=[0], async_parallel=False, tqdm_diable=False, stop_criteria=None):
"""
Run the optimizer with iteration time and device
Args:
iter(int): The iteration time
device(list): GPU index list used in the optimizer
async_parallel(boolean): Whether asynchronously execute the optimizer. If it is True, the method will immediately return, then a ``wait`` method is needed to wait the evolution process finish.
tqdm_diable(boolean): Whether use a tqdm show
stop_criteria(func-like): early stop condition
Examples:
>>> optimizer.run(10)
or run it asynchronously:
>>> optimizer.run(10, async_parallel=True)
>>> optimizer.wait()
early stop:
>>> optimizer.run(500, stop_criteria=lambda: HyperGP.tensor.min(optimizer.workflowstates.fit_list) < 1e10)
"""
if async_parallel:
self.__run_parallel(iter, device, tqdm_diable=tqdm_diable, stop_criteria=stop_criteria)
else:
self._run_independent(iter, device, tqdm_diable=tqdm_diable, stop_criteria=stop_criteria)
[docs]
def detach(self):
"""
Copy the optimizer. It can be used to avoid repeatly register the same states or components.
Returns:
A new ``GpOptimizer`` with independent same states.
Examples:
>>> optimizers = []
>>> for i in range(5):
... optimizers.append(optimizer.detach())
>>> for optimizer in optimizers:
... optimizer.run(100, async_parallel=True)
>>> for optimizer in optimizers:
... optimizer.wait()
"""
# print(self.status)
new_workflow = GpOptimizer(**self.status)
new_workflow._update(copy.deepcopy(self._package, {}))
return new_workflow
[docs]
def wait(self):
"""
Wait the optimizer finish when an asynchronous run is called.
"""
if self.proc is not None:
self.proc.join()
self._update(self.queue.get())
@property
def _package(self):
return (self.states, self.module_states, self.workflowstates, self.components, self.monitors)
def _update(self, package):
self.states = package[0]
self.module_states = package[1]
self.workflowstates = package[2]
self.components = package[3]
self.monitors = package[4]
def __parallel(self, method, states, gparallel, parallel=False, kwargs={}):
if isinstance(method, list) and len(method) != len(states):
raise ValueError('The method size %d not equal to the cond size %d' % (len(method), len(states)))
default_devid = query_device()
if 'parallel' in self.gmodule_states and parallel:
ret_cond = self.parallel(method, states, **kwargs)
else:
ret_cond = []
if isinstance(states, States):
ret_cond.append(method(**states, **kwargs))
elif isinstance(states, list):
assert len(states) == len(method)
for i, state in enumerate(states):
set_device(gparallel[random.randint(0, len(gparallel) - 1)])
ret_cond.append(method(**state, **kwargs)
if not isinstance(method, list)
else method[i](**state, **kwargs))
else:
assert 0==1
set_device(default_devid)
return ret_cond
def _state_transform(iter, ret_queue, package, status, device=[0], tqdm_diable=False, stop_criteria=None):
new_optimizer = GpOptimizer(**status)
new_optimizer._update(package)
new_optimizer._run_independent(iter, device)
ret_queue.put(new_optimizer._package)