blinker
1 简介
2 安装
3 使用
3.1 订阅信号
3.2 发布信号
3.3 订阅指定的发布者
3.4 订阅者接收发布者传递的数据
3.5 匿名信号
3.6 通过装饰器来订阅
3.7 检查信号是否有订阅者
4 例子
4.1 flask
4.2 demo
1 简介
Blinker 是一个基于 Python 的强大的信号库,支持一对一、一对多的订阅发布模式,支持发送任意大小的数据等等,且线程安全。
2 安装
pip install blinker
3 使用
signal 为单例模式
signal 使用了单例模式,允许代码的不同模块得到相同的 signal,而不用互相传参。
In [1]: from blinker import signal
In [2]: a = signal('signal_test')
In [3]: b = signal('signal_test')
In [4]: a is b
Out[4]: True
3.1 订阅信号
使用.connect(func) 方法来订阅一个信号,当信号发布时,该信号的订阅者会执行 func。
In [5]: def subscriber(sender):
...: print('Got a signal sent by {}'.format(sender))
...:
In [6]: ready = signal('ready')
In [7]: ready.connect(subscriber)
Out[7]: <function __main__.subscriber(sender)>
3.2 发布信号
使用.send() 方法来发布信号,会通知所有订阅者,如果没有订阅者则什么都不会发生。
In [12]: class Processor(object):
...:
...: def __init__(self, name):
...: self.name = name
...:
...: def go(self):
...: ready = signal('ready')
...: ready.send(self)
...: print('Processing...')
...: complete = signal('complete')
...: complete.send(self)
...:
...: def __repr__(self):
...: return '<Processor {}>'.format(self.name)
...:
In [13]: processor_a = Processor('a')
In [14]: processor_a.go()
Got a signal sent by <Processor a>
Processing...
3.3 订阅指定的发布者
.connect() 方法接收一个可选参数 sender,可用于接收指定发布者的信号。
In [18]: def b_subscriber():
...: print('Caught signal from peocessor_b')
...:
In [19]: ready.connect(b_subscriber, sender=processor_b)
Out[19]: <function __main__.b_subscriber(sender)>
In [20]: processor_a.go()
Got a signal sent by <Processor a>
Processing...
In [21]: processor_b.go()
Got a signal sent by <Processor b>
Caught signal from peocessor_b
Processing...
3.4 订阅者接收发布者传递的数据
除了之前的通过.connect 方法来订阅外,还可以通过装饰器的方法来订阅。
订阅的方法可以接收发布者传递的数据。
In [22]: send_data = signal('send-data')
In [23]: @send_data.connect
...: def receive_data(sender, **kw):
...: print('Caught signal from {}, data: {}'.format(sender, kw))
...: return 'received!'
...:
...:
In [24]: result = send_data.send('anonymous', abc=123)
Caught signal from anonymous, data: {'abc': 123}
.send 方法的返回值是一个由元组组成的列表,每个元组的第一个值为订阅者的方法,第二个值为订阅者的返回值
In [25]: result
Out[25]: [(<function __main__.receive_data(sender, **kw)>, 'received!')]
3.5 匿名信号
信号可以是匿名的,可以使用 Signal 类来创建唯一的信号(S 大写,这个类不像之前的 signal,为非单例模式)。 下面的 on_ready 和 on_complete 为两个不同的信号
In [28]: from blinker import Signal
In [29]: class AltProcessor(object):
...: on_ready = Signal()
...: on_complete = Signal()
...:
...: def __init__(self, name):
...: self.name = name
...:
...: def go(self):
...: self.on_ready.send(self)
...: print('Altername processing')
...: self.on_complete.send(self)
...:
...: def __repr__(self):
...: return '<AltProcessor {}>'.format(self.name)
3.6 通过装饰器来订阅
在订阅者接收发布者传递的数据中简单地演示了使用装饰器来订阅,但是那种订阅方式不支持订阅指定的发布者,这时候我们可以用.connect_via(sender)
In [31]: @dice_roll.connect_via(1)
...: @dice_roll.connect_via(3)
...: @dice_roll.connect_via(5)
...: def odd_subscriver(sender):
...: print('Observed dice roll {}'.format(sender))
...:
In [32]: result = dice_roll.send(3)
Observed dice roll 3
In [33]: result = dice_roll.send(1)
Observed dice roll 1
In [34]: result = dice_roll.send(5)
Observed dice roll 5
In [35]: result = dice_roll.send(2)
3.7 检查信号是否有订阅者
In [37]: bool(signal('ready').receivers)
Out[37]: True
In [38]: bool(signal('complete').receivers)
Out[38]: False
In [39]: bool(AltProcessor.on_complete.receivers)
Out[39]: False
In [40]: signal('ready').has_receivers_for(processor_a)
Out[40]: True
4 例子
4.1 flask
flask/signals.py
import typing as t
try:
from blinker import Namespace
signals_available = True
except ImportError:
signals_available = False
class Namespace: # type: ignore
def signal(self, name: str, doc: t.Optional[str] = None) -> "_FakeSignal":
return _FakeSignal(name, doc)
class _FakeSignal:
"""If blinker is unavailable, create a fake class with the same
interface that allows sending of signals but will fail with an
error on anything else. Instead of doing anything on send, it
will just ignore the arguments and do nothing instead.
"""
def __init__(self, name: str, doc: t.Optional[str] = None) -> None:
self.name = name
self.__doc__ = doc
def send(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
pass
def _fail(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
raise RuntimeError(
"Signalling support is unavailable because the blinker"
" library is not installed."
)
connect = connect_via = connected_to = temporarily_connected_to = _fail
disconnect = _fail
has_receivers_for = receivers_for = _fail
del _fail
# The namespace for code signals. If you are not Flask code, do
# not put signals in here. Create your own namespace instead.
_signals = Namespace()
# Core signals. For usage examples grep the source code or consult
# the API documentation in docs/api.rst as well as docs/signals.rst
# 主流程
appcontext_pushed = _signals.signal("appcontext-pushed") #app 上下文 push 时执行
template_rendered = _signals.signal("template-rendered")
before_render_template = _signals.signal("before-render-template")
request_started = _signals.signal("request-started")
request_finished = _signals.signal("request-finished")
request_tearing_down = _signals.signal("request-tearing-down")
got_request_exception = _signals.signal("got-request-exception")
appcontext_tearing_down = _signals.signal("appcontext-tearing-down")
appcontext_popped = _signals.signal("appcontext-popped")
message_flashed = _signals.signal("message-flashed")
4.2 demo
from xlib.util.blinker import signal
def subscriber(sender, action):
print("Got a signal sent by %r" % sender)
return action
ready = signal('ready')
ready.connect(subscriber)
result = ready.send('anonymous', action="ccc")
"""
Got a signal sent by 'anonymous'
"""
print result
"""
[(<function subscriber at 0x1071a1b90>, 'ccc')]
send() 方法的返回值收集每个订阅者的返回值,拼接成一个元组组成的列表。每个元组的组成为 (receiver function, return value)。
"""
print result[0][0].func_name
"""
subscriber
"""
备注
send 第一个参数不能传 kwargs
Last updated