Source code for pyhealth.utils.utility_parallel

# -*- coding: utf-8 -*-
"""A set of utility functions to support parallel computation.
"""

# Author: Yue Zhao <zhaoy@cmu.edu>
# License: BSD 2 clause
import numpy as np
from joblib import effective_n_jobs

import contextlib
import joblib
from tqdm import tqdm
from joblib import Parallel, delayed


[docs]@contextlib.contextmanager def tqdm_joblib(tqdm_object): """Context manager to patch joblib to report into tqdm progress bar given as argument""" class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def __call__(self, *args, **kwargs): tqdm_object.update(n=self.batch_size) return super().__call__(*args, **kwargs) old_batch_callback = joblib.parallel.BatchCompletionCallBack joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback try: yield tqdm_object finally: joblib.parallel.BatchCompletionCallBack = old_batch_callback tqdm_object.close()
# This elegent solution is from: # https://stackoverflow.com/questions/24983493/tracking-progress-of-joblib-parallel-execution
[docs]def partition_estimators(n_estimators, n_jobs): """Private function used to partition estimators between jobs.""" # Compute the number of jobs n_jobs = min(effective_n_jobs(n_jobs), n_estimators) # Partition estimators between jobs n_estimators_per_job = np.full(n_jobs, n_estimators // n_jobs, dtype=np.int) n_estimators_per_job[:n_estimators % n_jobs] += 1 starts = np.cumsum(n_estimators_per_job) xdiff = [starts[n] - starts[n - 1] for n in range(1, len(starts))] # print("Split among workers default:", starts, xdiff) print('Split tasks into', n_jobs, 'cores...') return n_estimators_per_job.tolist(), [0] + starts.tolist(), n_jobs
[docs]def unfold_parallel(lists, n_jobs): """Internal function to unfold the results returned from the parallization Parameters ---------- lists : list The results from the parallelization operations. n_jobs : optional (default=1) The number of jobs to run in parallel for both `fit` and `predict`. If -1, then the number of jobs is set to the number of cores. Returns ------- result_list : list The list of unfolded result. """ full_list = [] for i in range(n_jobs): full_list.extend(lists[i]) return full_list