3
\$\begingroup\$

Stuck with the issue with memory consumption - after running joblib's Parallel, deleting results and gc.collect() -ing I still have increased memory (checking by htop for process line). Found no way to free memory back. To broaden yangqch answer I use such code to isolate memory for parallel computations:

Imports -

import multiprocessing as mp
n_cores = mp.cpu_count()
import time
import datetime
from sklearn.externals.joblib import Parallel, delayed
import sklearn
from functools import partial
import pickle  

joblib progress bar -

class BatchCompletionCallBack(object):
    # Added code - start
    global total_n_jobs
    global jobs_start_time
    # Added code - end
    def __init__(self, dispatch_timestamp, batch_size, parallel):
        self.dispatch_timestamp = dispatch_timestamp
        self.batch_size = batch_size
        self.parallel = parallel

    def __call__(self, out):
        self.parallel.n_completed_tasks += self.batch_size
        this_batch_duration = time.time() - self.dispatch_timestamp

        self.parallel._backend.batch_completed(self.batch_size,
                                           this_batch_duration)
        self.parallel.print_progress()
        # Added code - start
        progress = self.parallel.n_completed_tasks / total_n_jobs
        elapsed = int((time.time() - jobs_start_time) / self.parallel.n_completed_tasks * (total_n_jobs - self.parallel.n_completed_tasks))
        print(
            "\r[{:50s}] {:.1f}% - Elapsed time {} ".format(
                '#' * int(progress * 50)
                , progress*100
                , datetime.timedelta(seconds=elapsed))
            , end="", flush=True)
        if self.parallel.n_completed_tasks == total_n_jobs:
            print('\n')
        # Added code - end
        if self.parallel._original_iterator is not None:
            self.parallel.dispatch_next()

sklearn.externals.joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack

Parallel wrapper -

def parallel_wrapper(func, *args, **kwargs):
    global total_n_jobs, jobs_start_time
    total_n_jobs = len(args[0])
    jobs_start_time = time.time()

    if kwargs:
        mapfunc = partial(func, **kwargs)
    else:
        mapfunc = func

    with open('file.pkl', 'wb') as file:
        pickle.dump(Parallel(n_jobs=n_cores)(map(delayed(mapfunc), *args)), file)
    print('Operating time - ', datetime.timedelta(seconds=int(time.time() - jobs_start_time)))

Class to use in 'with' construction -

class isolate:
    def __init__(self, func):
        self.func = func

    def __enter__(self):
        return self

    def run(self, func, *args, **kwargs):
        self.p = mp.Process(target=self.func, args=(func, *args), kwargs=kwargs)
        self.p.start()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.p.join()

Example -

def dummy(i, keyword_argument=None):
    return (i.shape, keyword_argument)

import numpy as np
tmp = np.ones((5,5))

with isolate(parallel_wrapper) as isolated:
    isolated.run(dummy, [tmp, tmp], keyword_argument=2)

with open('file.pkl', 'rb') as file:
    tmp = pickle.load(file)

tmp

import importlib
importlib.reload(sklearn.externals.joblib.parallel)

So questions are:

  • is there an easier way
  • how to make local variable 'total_n_jobs' from 'parallel_wrapper' viewable in 'BatchCompletionCallBack' without 'global'
  • any improvements
\$\endgroup\$

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.