#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Nov 21 12:52:49 2018
@author: shenwanxiang
Multi process Run
"""
import time
import pandas as pd
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, wait, as_completed
from multiprocessing import Pool,cpu_count,current_process
import subprocess
from aggmap.utils.logtools import print_info, print_error, pbar,print_warn
[docs]def RunCmd(cmd):
'''
input:
cmd: str
output:
status: int, 0 for success
stdout: str
stderr: str
'''
print_info('run command : %s' % cmd)
def swap_log(swap, error = True):
sinfo = []
for l in swap.split('\n'):
if l == '':
continue
sinfo.append(l)
for o in sinfo:
if error:
print_error(o)
else:
print_info(o)
return
output = subprocess.run(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
status = output.returncode
stdout = output.stdout
stderr = output.stderr
if status != 0:
if output.stdout:
swap_log(output.stdout, error=True)
if output.stderr:
swap_log(output.stderr, error=True)
else:
if output.stdout:
swap_log(output.stdout, error=False)
#return status
return status, stdout, stderr
[docs]def ImapUnorder(processor, iterator, max_workers=10, fail_in_file = './filed.lst'):
'''
processor: fuction
iterator: list or iterator,each element should be a tuple or dict, so that data can be used as ordered
'''
with ProcessPoolExecutor(max_workers=max_workers) as executor:
with open(fail_in_file, 'w+') as f:
futures = {executor.submit(processor, IdPlusSmile):IdPlusSmile for IdPlusSmile in iterator}
success, _ = wait(futures)
with pbar(total = len(futures)) as pb:
for i in success:
IdPlusSmile = futures[i]
print_info('deal '+ str(IdPlusSmile))
try:
data_dict = i.result()
yield data_dict
except Exception as exc:
print_warn('because of the process is dead, input: %s is fialed when deal with %s: %s, so we will deal it automatically' % (IdPlusSmile, processor, exc))
try:
yield processor(IdPlusSmile)
except:
f.write(str(IdPlusSmile)+'\n')
print_error(' input: %s is fialed when deal with %s: %s' % (IdPlusSmile, processor, exc))
pb.update(1)
[docs]def MultiProcessUnorderedBarRun(func, deal_list, n_cpus=None):
if n_cpus ==None:
N_CPUS = cpu_count()
else:
N_CPUS = int(n_cpus)
print_info('the number of process is %s' % N_CPUS)
p = Pool(N_CPUS)
res_list = []
with pbar(total = len(deal_list), ascii=True) as pb:
for res in p.imap_unordered(func, deal_list):
pb.update(1)
res_list.append(res)
p.close()
p.join()
return res_list
[docs]def MultiProcessRun(func, deal_list, n_cpus=None):
'''
input:
func: function to do with each element in the deal_list
deal_list: list to be done
n_cpus: use the number of cpus
output:
list of the return result for each func
'''
#round_c = [deal_list[i:i+batch_size] for i in range(0, len(deal_list), batch_size)]
#mata thinking: https://my.oschina.net/leejun2005/blog/203148
if n_cpus ==None:
N_CPUS = cpu_count()
else:
N_CPUS = int(n_cpus)
print_info('the number of process is %s' % N_CPUS)
pool = Pool(N_CPUS)
a = pool.map(func, deal_list)
pool.close()
pool.join()
return a
########### ordered map reduce ##############
def _decorate_func(func, i, j):
return [i, func(j)]
def _executor(func, series, n_cpus = 4):
with ProcessPoolExecutor(max_workers=n_cpus) as executor:
futures = [executor.submit(_decorate_func, func, i, j) for i,j in series.iteritems()]
return futures
[docs]def MultiExecutorRun(func, deal_list, n_cpus = 4, tqdm_args = {'unit':'one'}):
'''
input:
func: function to do with each element in the deal_list
deal_list: list to be done
n_cpus: use the number of cpus
tqdm_args: args for tqdm
output:
list of the return value for each func
'''
lst =list(deal_list)
series = pd.Series(lst)
futures = _executor(func, series, n_cpus = n_cpus)
args = {
'total': len(deal_list),
'unit': 'one',
'ascii': True,
'unit_scale': True,
'leave': True
}
args.update(tqdm_args)
print_info(args)
results = []
indexs = []
for f in tqdm(as_completed(futures), **args):
#print(f)
idx, result = f.result()
indexs.append(idx)
results.append(result)
res = pd.Series(results,index=indexs)
#sort unordered result
ordered_lst = res.sort_index().tolist()
return ordered_lst