concurrent
1 例子
1.1 简单例子
import time
from xlib.util import concurrent
def fun(i__):
time.sleep(2)
return (str(i__) + 'hi')
pool = concurrent.ThreadPoolExecutor(20)
future_tasks = []
for i in range(100):
future = pool.submit(fun,i)
future_tasks.append(future)
task_iter = concurrent.as_completed(future_tasks)
# 完成列表
done_list = []
for future in task_iter:
done_list.append(future.result())
print len(done_list)
print done_list
1.2 异常及重试
import time
import logging
import traceback
from xlib.util import concurrent
from xlib.util import retry
@retry.retry(max_retries=3, interval=5)
def fun(i):
time.sleep(2)
if i == 1:
raise KeyError
return (str(i) + 'hi')
pool = concurrent.ThreadPoolExecutor(20)
future_tasks = []
for i in range(100):
future = pool.submit(fun,i)
future_tasks.append(future)
task_iter = concurrent.as_completed(future_tasks)
print task_iter
# 完成列表
done_list = []
for future in task_iter:
try:
result = future.result()
except:
logging.error(traceback.format_exc())
result = "ERR"
done_list.append(result)
print len(done_list)
print done_list
1.3 输出异常的 original_task
future_tasks 改为 dict
import time
import logging
import traceback
from xlib.util import concurrent
from xlib.util import retry
#@retry.retry(max_retries=3, interval=5)
def fun(i):
time.sleep(2)
if i == 1:
raise KeyError
return (str(i) + 'hi')
pool = concurrent.ThreadPoolExecutor(20)
future_tasks = {}
for i in range(10):
future = pool.submit(fun,i)
future_tasks[future] = i
task_iter = concurrent.as_completed(future_tasks)
print task_iter
# 完成列表
done_list = []
for future in task_iter:
try:
result = future.result()
except:
print future_tasks[future]
logging.error(traceback.format_exc())
result = "ERR"
done_list.append(result)
print len(done_list)
print done_list
1.4 使用 with 语句(推荐)
使用 with 语句将创建一个上下文管理器,该管理器可确保在完成后通过隐式调用 executor.shutdown() 函数来清理掉所有线程或进程。
import time
import logging
import traceback
from xlib.util import concurrent
from xlib.util import retry
@retry.retry(max_retries=3, interval=5)
def fun(i):
time.sleep(2)
if i == 1:
raise KeyError
return (str(i) + 'hi')
with concurrent.ThreadPoolExecutor(10) as pool:
future_tasks = {}
for i in range(10):
future = pool.submit(fun,i)
future_tasks[future] = i
task_iter = concurrent.as_completed(future_tasks)
# 完成列表
done_list = []
for future in task_iter:
try:
result = future.result()
except:
print future_tasks[future]
logging.error(traceback.format_exc())
result = "ERR"
done_list.append(result)
print len(done_list)
print done_list
Last updated