Source code for aggmap.utils.multiproc

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Nov 21 12:52:49 2018

@author: shenwanxiang

Multi process Run
"""

import time
import pandas as pd
from tqdm import tqdm

from concurrent.futures import ProcessPoolExecutor, wait, as_completed
from multiprocessing import Pool,cpu_count,current_process 
import subprocess


from aggmap.utils.logtools import print_info, print_error, pbar,print_warn


[docs]def RunCmd(cmd): ''' input: cmd: str output: status: int, 0 for success stdout: str stderr: str ''' print_info('run command : %s' % cmd) def swap_log(swap, error = True): sinfo = [] for l in swap.split('\n'): if l == '': continue sinfo.append(l) for o in sinfo: if error: print_error(o) else: print_info(o) return output = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) status = output.returncode stdout = output.stdout stderr = output.stderr if status != 0: if output.stdout: swap_log(output.stdout, error=True) if output.stderr: swap_log(output.stderr, error=True) else: if output.stdout: swap_log(output.stdout, error=False) #return status return status, stdout, stderr
[docs]def ImapUnorder(processor, iterator, max_workers=10, fail_in_file = './filed.lst'): ''' processor: fuction iterator: list or iterator,each element should be a tuple or dict, so that data can be used as ordered ''' with ProcessPoolExecutor(max_workers=max_workers) as executor: with open(fail_in_file, 'w+') as f: futures = {executor.submit(processor, IdPlusSmile):IdPlusSmile for IdPlusSmile in iterator} success, _ = wait(futures) with pbar(total = len(futures)) as pb: for i in success: IdPlusSmile = futures[i] print_info('deal '+ str(IdPlusSmile)) try: data_dict = i.result() yield data_dict except Exception as exc: print_warn('because of the process is dead, input: %s is fialed when deal with %s: %s, so we will deal it automatically' % (IdPlusSmile, processor, exc)) try: yield processor(IdPlusSmile) except: f.write(str(IdPlusSmile)+'\n') print_error(' input: %s is fialed when deal with %s: %s' % (IdPlusSmile, processor, exc)) pb.update(1)
[docs]def MultiProcessUnorderedBarRun(func, deal_list, n_cpus=None): if n_cpus ==None: N_CPUS = cpu_count() else: N_CPUS = int(n_cpus) print_info('the number of process is %s' % N_CPUS) p = Pool(N_CPUS) res_list = [] with pbar(total = len(deal_list), ascii=True) as pb: for res in p.imap_unordered(func, deal_list): pb.update(1) res_list.append(res) p.close() p.join() return res_list
[docs]def MultiProcessRun(func, deal_list, n_cpus=None): ''' input: func: function to do with each element in the deal_list deal_list: list to be done n_cpus: use the number of cpus output: list of the return result for each func ''' #round_c = [deal_list[i:i+batch_size] for i in range(0, len(deal_list), batch_size)] #mata thinking: https://my.oschina.net/leejun2005/blog/203148 if n_cpus ==None: N_CPUS = cpu_count() else: N_CPUS = int(n_cpus) print_info('the number of process is %s' % N_CPUS) pool = Pool(N_CPUS) a = pool.map(func, deal_list) pool.close() pool.join() return a
########### ordered map reduce ############## def _decorate_func(func, i, j): return [i, func(j)] def _executor(func, series, n_cpus = 4): with ProcessPoolExecutor(max_workers=n_cpus) as executor: futures = [executor.submit(_decorate_func, func, i, j) for i,j in series.iteritems()] return futures
[docs]def MultiExecutorRun(func, deal_list, n_cpus = 4, tqdm_args = {'unit':'one'}): ''' input: func: function to do with each element in the deal_list deal_list: list to be done n_cpus: use the number of cpus tqdm_args: args for tqdm output: list of the return value for each func ''' lst =list(deal_list) series = pd.Series(lst) futures = _executor(func, series, n_cpus = n_cpus) args = { 'total': len(deal_list), 'unit': 'one', 'ascii': True, 'unit_scale': True, 'leave': True } args.update(tqdm_args) print_info(args) results = [] indexs = [] for f in tqdm(as_completed(futures), **args): #print(f) idx, result = f.result() indexs.append(idx) results.append(result) res = pd.Series(results,index=indexs) #sort unordered result ordered_lst = res.sort_index().tolist() return ordered_lst