Stuck with the issue with memory consumption - after running joblib's Parallel, deleting results and gc.collect() -ing I still have increased memory (checking by htop for process line). Found no way to free memory back. To broaden yangqch answer I use such code to isolate memory for parallel computations:
Imports -
import multiprocessing as mp
n_cores = mp.cpu_count()
import time
import datetime
from sklearn.externals.joblib import Parallel, delayed
import sklearn
from functools import partial
import pickle
joblib progress bar -
class BatchCompletionCallBack(object):
# Added code - start
global total_n_jobs
global jobs_start_time
# Added code - end
def __init__(self, dispatch_timestamp, batch_size, parallel):
self.dispatch_timestamp = dispatch_timestamp
self.batch_size = batch_size
self.parallel = parallel
def __call__(self, out):
self.parallel.n_completed_tasks += self.batch_size
this_batch_duration = time.time() - self.dispatch_timestamp
self.parallel._backend.batch_completed(self.batch_size,
this_batch_duration)
self.parallel.print_progress()
# Added code - start
progress = self.parallel.n_completed_tasks / total_n_jobs
elapsed = int((time.time() - jobs_start_time) / self.parallel.n_completed_tasks * (total_n_jobs - self.parallel.n_completed_tasks))
print(
"\r[{:50s}] {:.1f}% - Elapsed time {} ".format(
'#' * int(progress * 50)
, progress*100
, datetime.timedelta(seconds=elapsed))
, end="", flush=True)
if self.parallel.n_completed_tasks == total_n_jobs:
print('\n')
# Added code - end
if self.parallel._original_iterator is not None:
self.parallel.dispatch_next()
sklearn.externals.joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack
Parallel wrapper -
def parallel_wrapper(func, *args, **kwargs):
global total_n_jobs, jobs_start_time
total_n_jobs = len(args[0])
jobs_start_time = time.time()
if kwargs:
mapfunc = partial(func, **kwargs)
else:
mapfunc = func
with open('file.pkl', 'wb') as file:
pickle.dump(Parallel(n_jobs=n_cores)(map(delayed(mapfunc), *args)), file)
print('Operating time - ', datetime.timedelta(seconds=int(time.time() - jobs_start_time)))
Class to use in 'with' construction -
class isolate:
def __init__(self, func):
self.func = func
def __enter__(self):
return self
def run(self, func, *args, **kwargs):
self.p = mp.Process(target=self.func, args=(func, *args), kwargs=kwargs)
self.p.start()
def __exit__(self, exc_type, exc_val, exc_tb):
self.p.join()
Example -
def dummy(i, keyword_argument=None):
return (i.shape, keyword_argument)
import numpy as np
tmp = np.ones((5,5))
with isolate(parallel_wrapper) as isolated:
isolated.run(dummy, [tmp, tmp], keyword_argument=2)
with open('file.pkl', 'rb') as file:
tmp = pickle.load(file)
tmp
import importlib
importlib.reload(sklearn.externals.joblib.parallel)
So questions are:
- is there an easier way
- how to make local variable 'total_n_jobs' from 'parallel_wrapper' viewable in 'BatchCompletionCallBack' without 'global'
- any improvements