🦋
Butterfly 用户手册
  • Introduction
  • 一 前言
  • 二 开始
    • 安装部署
    • 五分钟体验指南
    • 单机使用手册
    • 应用规范
      • handler specs
      • middleware specs
      • xingqiao_plugin specs
      • yiqiu_program specs
  • 三 客户端功能
    • MySQL 原生协议
    • MySQL ORM
    • Redis 原生协议
      • redis_config
      • redis_tls
    • Redis ORM
    • Redis mcpack
    • Localcache
    • Kazoo
  • 四 应用(通用服务)
    • API JSON 规范
    • 异步任务 BaiChuan(百川)
    • 任务调度 RuQi(如期)
    • 任务编排 XingQiao(星桥)
    • 配置管理 WuXing(五行)
    • 运筹决策 BaiCe(百策)
  • 五 部署运维
    • 单机容器化部署
    • 监控
    • 异常排查
      • CPU Load spike every 7 hours
    • 升级
    • 安全
    • 其他
  • 六 前端
    • butterfly_template
    • butterfly_fe
    • butterfly-admin(json2web)
      • amis
      • sso
      • pangu
    • NoahV
    • PyWebIO
  • 七 潘多拉魔盒
    • 装饰器
      • localcache_decorator
      • retry_decorator
      • custom_decorator
      • command2http_decorator
    • 算法
      • 算法-分位数
      • 算法-变异系数
    • 实用工具
      • host_util
      • shell_util
      • http_util
      • time_util
      • random_util
      • concurrent
      • jsonschema
      • blinker
      • toml
      • command_util
      • config_util
      • picobox
      • 对称加密
        • des
        • aes
      • ascii_art
        • ttable
        • chart
      • business_rules
      • python-mysql-replication
      • dict_util
    • 中间件
      • middleware_status
      • middleware_whitelist
    • test_handler.py
  • 八 最佳实践
    • 分布式架构
    • Code practice
    • Log practice
    • Daemon process
  • 附录
Powered by GitBook
On this page
  • 1 例子
  • 1.1 简单例子
  • 1.2 异常及重试
  • 1.3 输出异常的 original_task
  • 1.4 使用 with 语句(推荐)
  1. 七 潘多拉魔盒
  2. 实用工具

concurrent

1 例子

1.1 简单例子

import time
from xlib.util import concurrent

def fun(i__):
    time.sleep(2)
    return (str(i__) + 'hi')


pool = concurrent.ThreadPoolExecutor(20)

future_tasks = []
for i in range(100):
    future = pool.submit(fun,i)
    future_tasks.append(future)

task_iter = concurrent.as_completed(future_tasks)

# 完成列表
done_list = []
for future in task_iter:
    done_list.append(future.result())

print len(done_list)
print done_list

1.2 异常及重试

import time
import logging
import traceback

from xlib.util import concurrent
from xlib.util import retry


@retry.retry(max_retries=3, interval=5)
def fun(i):
    time.sleep(2)
    if i == 1:
        raise KeyError
    return (str(i) + 'hi')


pool = concurrent.ThreadPoolExecutor(20)

future_tasks = []
for i in range(100):
    future = pool.submit(fun,i)
    future_tasks.append(future)

task_iter = concurrent.as_completed(future_tasks)

print task_iter

# 完成列表
done_list = []
for future in task_iter:
    try:
        result = future.result()
    except:
        logging.error(traceback.format_exc())
        result = "ERR"

    done_list.append(result)


print len(done_list)
print done_list

1.3 输出异常的 original_task

future_tasks 改为 dict

import time
import logging
import traceback

from xlib.util import concurrent
from xlib.util import retry


#@retry.retry(max_retries=3, interval=5)
def fun(i):
    time.sleep(2)
    if i == 1:
        raise KeyError
    return (str(i) + 'hi')


pool = concurrent.ThreadPoolExecutor(20)

future_tasks = {}
for i in range(10):
    future = pool.submit(fun,i)
    future_tasks[future] = i

task_iter = concurrent.as_completed(future_tasks)

print task_iter

# 完成列表
done_list = []
for future in task_iter:
    try:
        result = future.result()
    except:
        print future_tasks[future]
        logging.error(traceback.format_exc())
        result = "ERR"

    done_list.append(result)


print len(done_list)
print done_list

1.4 使用 with 语句(推荐)

使用 with 语句将创建一个上下文管理器,该管理器可确保在完成后通过隐式调用 executor.shutdown() 函数来清理掉所有线程或进程。

import time
import logging
import traceback

from xlib.util import concurrent
from xlib.util import retry


@retry.retry(max_retries=3, interval=5)
def fun(i):
    time.sleep(2)
    if i == 1:
        raise KeyError
    return (str(i) + 'hi')

with concurrent.ThreadPoolExecutor(10) as pool:
    future_tasks = {}
    for i in range(10):
        future = pool.submit(fun,i)
        future_tasks[future] = i

    task_iter = concurrent.as_completed(future_tasks)

    # 完成列表
    done_list = []
    for future in task_iter:
        try:
            result = future.result()
        except:
            print future_tasks[future]
            logging.error(traceback.format_exc())
            result = "ERR"

        done_list.append(result)


print len(done_list)
print done_list
Previousrandom_utilNextjsonschema

Last updated 1 year ago