🦋
Butterfly 用户手册
  • Introduction
  • 一 前言
  • 二 开始
    • 安装部署
    • 五分钟体验指南
    • 单机使用手册
    • 应用规范
      • handler specs
      • middleware specs
      • xingqiao_plugin specs
      • yiqiu_program specs
  • 三 客户端功能
    • MySQL 原生协议
    • MySQL ORM
    • Redis 原生协议
      • redis_config
      • redis_tls
    • Redis ORM
    • Redis mcpack
    • Localcache
    • Kazoo
  • 四 应用(通用服务)
    • API JSON 规范
    • 异步任务 BaiChuan(百川)
    • 任务调度 RuQi(如期)
    • 任务编排 XingQiao(星桥)
    • 配置管理 WuXing(五行)
    • 运筹决策 BaiCe(百策)
  • 五 部署运维
    • 单机容器化部署
    • 监控
    • 异常排查
      • CPU Load spike every 7 hours
    • 升级
    • 安全
    • 其他
  • 六 前端
    • butterfly_template
    • butterfly_fe
    • butterfly-admin(json2web)
      • amis
      • sso
      • pangu
    • NoahV
    • PyWebIO
  • 七 潘多拉魔盒
    • 装饰器
      • localcache_decorator
      • retry_decorator
      • custom_decorator
      • command2http_decorator
    • 算法
      • 算法-分位数
      • 算法-变异系数
    • 实用工具
      • host_util
      • shell_util
      • http_util
      • time_util
      • random_util
      • concurrent
      • jsonschema
      • blinker
      • toml
      • command_util
      • config_util
      • picobox
      • 对称加密
        • des
        • aes
      • ascii_art
        • ttable
        • chart
      • business_rules
      • python-mysql-replication
      • dict_util
    • 中间件
      • middleware_status
      • middleware_whitelist
    • test_handler.py
  • 八 最佳实践
    • 分布式架构
    • Code practice
    • Log practice
    • Daemon process
  • 附录
Powered by GitBook
On this page
  • 1 简介
  • 2 安装
  • 3 使用
  • 3.1 订阅信号
  • 3.2 发布信号
  • 3.3 订阅指定的发布者
  • 3.4 订阅者接收发布者传递的数据
  • 3.5 匿名信号
  • 3.6 通过装饰器来订阅
  • 3.7 检查信号是否有订阅者
  • 4 例子
  • 4.1 flask
  • 4.2 demo
  1. 七 潘多拉魔盒
  2. 实用工具

blinker

  • 1 简介

  • 2 安装

  • 3 使用

    • 3.1 订阅信号

    • 3.2 发布信号

    • 3.3 订阅指定的发布者

    • 3.4 订阅者接收发布者传递的数据

    • 3.5 匿名信号

    • 3.6 通过装饰器来订阅

    • 3.7 检查信号是否有订阅者

  • 4 例子

    • 4.1 flask

    • 4.2 demo

1 简介

Blinker 是一个基于 Python 的强大的信号库,支持一对一、一对多的订阅发布模式,支持发送任意大小的数据等等,且线程安全。

2 安装

pip install blinker

3 使用

signal 为单例模式

signal 使用了单例模式,允许代码的不同模块得到相同的 signal,而不用互相传参。

In [1]: from blinker import signal

In [2]: a = signal('signal_test')

In [3]: b = signal('signal_test')

In [4]: a is b
Out[4]: True

3.1 订阅信号

使用.connect(func) 方法来订阅一个信号,当信号发布时,该信号的订阅者会执行 func。

In [5]: def subscriber(sender):
   ...:     print('Got a signal sent by {}'.format(sender))
   ...:

In [6]: ready = signal('ready')

In [7]: ready.connect(subscriber)
Out[7]: <function __main__.subscriber(sender)>

3.2 发布信号

使用.send() 方法来发布信号,会通知所有订阅者,如果没有订阅者则什么都不会发生。

In [12]: class Processor(object):
    ...:
    ...:     def __init__(self, name):
    ...:         self.name = name
    ...:
    ...:     def go(self):
    ...:         ready = signal('ready')
    ...:         ready.send(self)
    ...:         print('Processing...')
    ...:         complete = signal('complete')
    ...:         complete.send(self)
    ...:
    ...:     def __repr__(self):
    ...:         return '<Processor {}>'.format(self.name)
    ...:

In [13]: processor_a = Processor('a')

In [14]: processor_a.go()
Got a signal sent by <Processor a>
Processing...

3.3 订阅指定的发布者

.connect() 方法接收一个可选参数 sender,可用于接收指定发布者的信号。

In [18]: def b_subscriber():
    ...:     print('Caught signal from peocessor_b')
    ...:

In [19]: ready.connect(b_subscriber, sender=processor_b)
Out[19]: <function __main__.b_subscriber(sender)>

In [20]: processor_a.go()
Got a signal sent by <Processor a>
Processing...

In [21]: processor_b.go()
Got a signal sent by <Processor b>
Caught signal from peocessor_b
Processing...

3.4 订阅者接收发布者传递的数据

除了之前的通过.connect 方法来订阅外,还可以通过装饰器的方法来订阅。

订阅的方法可以接收发布者传递的数据。

In [22]: send_data = signal('send-data')

In [23]: @send_data.connect
    ...: def receive_data(sender, **kw):
    ...:     print('Caught signal from {}, data: {}'.format(sender, kw))
    ...:     return 'received!'
    ...:
    ...:

In [24]: result = send_data.send('anonymous', abc=123)
Caught signal from anonymous, data: {'abc': 123}

.send 方法的返回值是一个由元组组成的列表,每个元组的第一个值为订阅者的方法,第二个值为订阅者的返回值

In [25]: result
Out[25]: [(<function __main__.receive_data(sender, **kw)>, 'received!')]

3.5 匿名信号

信号可以是匿名的,可以使用 Signal 类来创建唯一的信号(S 大写,这个类不像之前的 signal,为非单例模式)。 下面的 on_ready 和 on_complete 为两个不同的信号

In [28]: from blinker import Signal

In [29]: class AltProcessor(object):
    ...:     on_ready = Signal()
    ...:     on_complete = Signal()
    ...:
    ...:     def __init__(self, name):
    ...:         self.name = name
    ...:
    ...:     def go(self):
    ...:         self.on_ready.send(self)
    ...:         print('Altername processing')
    ...:         self.on_complete.send(self)
    ...:
    ...:     def __repr__(self):
    ...:         return '<AltProcessor {}>'.format(self.name)

3.6 通过装饰器来订阅

在订阅者接收发布者传递的数据中简单地演示了使用装饰器来订阅,但是那种订阅方式不支持订阅指定的发布者,这时候我们可以用.connect_via(sender)

In [31]: @dice_roll.connect_via(1)
    ...: @dice_roll.connect_via(3)
    ...: @dice_roll.connect_via(5)
    ...: def odd_subscriver(sender):
    ...:     print('Observed dice roll {}'.format(sender))
    ...:

In [32]: result = dice_roll.send(3)
Observed dice roll 3

In [33]: result = dice_roll.send(1)
Observed dice roll 1

In [34]: result = dice_roll.send(5)
Observed dice roll 5

In [35]: result = dice_roll.send(2)

3.7 检查信号是否有订阅者

In [37]: bool(signal('ready').receivers)
Out[37]: True

In [38]: bool(signal('complete').receivers)
Out[38]: False

In [39]: bool(AltProcessor.on_complete.receivers)
Out[39]: False

In [40]: signal('ready').has_receivers_for(processor_a)
Out[40]: True

4 例子

4.1 flask

flask/signals.py

import typing as t

try:
    from blinker import Namespace

    signals_available = True
except ImportError:
    signals_available = False

    class Namespace:  # type: ignore
        def signal(self, name: str, doc: t.Optional[str] = None) -> "_FakeSignal":
            return _FakeSignal(name, doc)

    class _FakeSignal:
        """If blinker is unavailable, create a fake class with the same
        interface that allows sending of signals but will fail with an
        error on anything else.  Instead of doing anything on send, it
        will just ignore the arguments and do nothing instead.
        """

        def __init__(self, name: str, doc: t.Optional[str] = None) -> None:
            self.name = name
            self.__doc__ = doc

        def send(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
            pass

        def _fail(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
            raise RuntimeError(
                "Signalling support is unavailable because the blinker"
                " library is not installed."
            )

        connect = connect_via = connected_to = temporarily_connected_to = _fail
        disconnect = _fail
        has_receivers_for = receivers_for = _fail
        del _fail


# The namespace for code signals.  If you are not Flask code, do
# not put signals in here.  Create your own namespace instead.
_signals = Namespace()


# Core signals.  For usage examples grep the source code or consult
# the API documentation in docs/api.rst as well as docs/signals.rst

# 主流程
appcontext_pushed = _signals.signal("appcontext-pushed")    #app 上下文 push 时执行



template_rendered = _signals.signal("template-rendered")
before_render_template = _signals.signal("before-render-template")
request_started = _signals.signal("request-started")
request_finished = _signals.signal("request-finished")
request_tearing_down = _signals.signal("request-tearing-down")
got_request_exception = _signals.signal("got-request-exception")
appcontext_tearing_down = _signals.signal("appcontext-tearing-down")
appcontext_popped = _signals.signal("appcontext-popped")
message_flashed = _signals.signal("message-flashed")

4.2 demo

from xlib.util.blinker import signal


def subscriber(sender, action):
    print("Got a signal sent by %r" % sender)
    return action

ready = signal('ready')
ready.connect(subscriber)


result = ready.send('anonymous', action="ccc")
"""
Got a signal sent by 'anonymous'
"""
print result
"""
[(<function subscriber at 0x1071a1b90>, 'ccc')]

send() 方法的返回值收集每个订阅者的返回值,拼接成一个元组组成的列表。每个元组的组成为 (receiver function, return value)。
"""
print result[0][0].func_name
"""
subscriber
"""

备注

  • send 第一个参数不能传 kwargs

PreviousjsonschemaNexttoml

Last updated 1 year ago