# PEMtk wrappers for parallel processing for fitting
#
# 13/09/21 v1 Basic wrapper with XYZPY implemented,
#
# Initial dev code: see http://127.0.0.1:8888/lab/tree/dev/PEMtk/fitting/multiFit_tests_and_parallel/PEMtk_fitting_dev_multiproc_070921.ipynb
#
"""
PEMtk wrappers for parallel processing for fitting
Currently wraps xyzpy's `run_combos` for parallel functionality and data handling.
See `the xyzpy docs <https://xyzpy.readthedocs.io/en/latest/gen_parallel.html>`_ for details.
Other methods to be implemented.
13/09/21 v1 Basic wrapper with XYZPY implemented
"""
# Currently using xyzpy for parallel stuff.
try:
import xyzpy as xyz
except:
print("***xyzpy not found, parallel functions not available.")
import multiprocessing as mp
[docs]def multiFit(self, nRange = [0,10],
parallel = True, num_workers = None,
randomizeParams = True, seedParams = None):
"""
Basic wrapper for pemtk.fitClass.fit() for multiprocess execution.
Run a batch of fits in parallel, and return results to main class structure.
Currently wraps xyzpy's `run_combos` for parallel functionality and data handling.
See `the xyzpy docs <https://xyzpy.readthedocs.io/en/latest/gen_parallel.html>`_ for details.
Note: full set of results currently returned as an Xarray DataSet, then sorted back to base class as self.data[n] (integer n).
In future may just want to use Xarray return directly?
Parameters
----------
nRange : list
Fit indexers.
Set [nStart, nStop], full run will be set as list(range(nRange[0],nRange[1])).
TODO: more flexibility here, and auto.
parallel : bool, default = True
Run fit jobs in parallel?
Note - in testing this seemed to be ignored?
num_workers : int, default = None
Number of cores to use if parallel job.
Currently set to default to ~90% of mp.cpu_count()
randomizeParams : bool, default = True
Randomize seed parameters per fit?
seedParams : int, default = None
NOT IMPLEMENTED, but will provide an option to seed fits from a previous result.
"""
#*** Settings
if num_workers is None:
num_workers = round(mp.cpu_count() * 0.9) # Default to ~90% cores
if self.verbose['main']:
print("Number of processors: ", mp.cpu_count(), "\nRunning pool on: ", num_workers)
#*** Setup XYZPY runner
combos = {'n': list(range(nRange[0],nRange[1]))}
data = {'data':self} # Pass data - may be better to set fitPara as class method? Not sure if that will work with wrapper.
# With runner class
r = xyz.Runner(fitPara, var_names=['results'], resources = data, constants = {'randomizeParams':randomizeParams, 'seedParams':seedParams}) # Use constants to pass additional function args.
#*** Run fits
outputs = r.run_combos(combos, parallel = parallel, num_workers = num_workers) # This returns an Xarray Dataset.
#*** Sort results back to main data structure
# Restack from output > dict form
# Bit hacky, but works OK.
for n in outputs.n:
self.data[n.item()] = outputs.results.sel({'n':n}).item()
# Update self.fitInd
fInd, fitInds = self._getFitInds()
self.fitInd = fInd
self.result = self.data[fInd]['results'] # Set last result to self.result for quick checks.
[docs]def fitPara(data = None, n = None, randomizeParams = True, seedParams = None):
"""
Wrap self.fit() for XYZPY runner.
"""
if randomizeParams:
data.randomizeParams() # This may not work as is, since it might accidentally overwrite in-use params?
# Comment out to give fast test run only
if seedParams is not None:
# data.params = data.data[seedParams]['results'] # Should be something like this, just need to pull correct vars.
print('seedParams not yet implemented.')
data.fit(fitInd = n) # This may also need some attention if pointers are shared.
return data.data[n] # OK