1 Web 框架分层
下面分别从这三部分介绍下
2 HTTP 服务器模型
2.1 关于 I/O 模型的引出
我们都知道,为了 OS 的安全性等的考虑
进程是无法直接操作 I/O 设备的,其必须通过系统调用请求内核来协助完成 I/O 动作,而内核会为每个 I/O 设备维护一个 buffer。如下图所示:
整个请求过程
用户进程发起请求,内核接受到请求
从 I/O 设备中获取数据到 buffer 中
再将 buffer 中的数据 copy 到用户进程的地址空间
该用户进程获取到数据后再响应客户端
在整个请求过程中,数据输入至 buffer 需要时间,而从 buffer 复制数据至进程也需要时间。因此根据在这两段时间内等待方式的不同,I/O 动作可以分为以下五种模式:
(1) 阻塞 I/O (Blocking I/O)
(2) 非阻塞 I/O (Non-Blocking I/O)
(3) I/O 复用(I/O Multiplexing)
(4) 信号驱动的 I/O (Signal Driven I/O)
(5) 异步 I/O (Asynchrnous I/O)
前四种为同步 IO,第 5 为异步 IO
2.2 服务器模型说明
2.2.1 IO 阻塞模型
阻塞 IO(blocking IO)的特点:就是在 IO 执行的两个阶段(等待数据和拷贝数据两个阶段)都被 block 了
2.2.1.1 一个简单的开始
import socket
"""
# response = response_headers + response_body
# response_headers 和 response_body 间使用 '\r\n' 隔开
"""
response = 'HTTP/1.1 200 OK\r\nConnection: Close\r\nContent-Length: 1\r\n\r\nA'
server = socket.socket()
"""
# setsockopt(level, optname, value)
# level:选项所在的协议层
# optname:
# SO_REUSEADDR: 打开或关闭地址复用功能,设置为 1 时,保证在 TIME_WAIT 状态下 bind() 调用可以成功
"""
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8585))
server.listen(32)
while True:
client, clientaddr = server.accept() # blocking
request = client.recv(4096) # blocking
client.send(response) # maybe blocking
client.close()
阻塞
这个例子是阻塞的
在 accept 阻塞的时候,无法调用 recv 接收客户端数据。
在 recv 阻塞的时候也无法调用 accept 接受新的请求。
所以它同时只能接受和处理一个请求,如果一个客户端发送数据的速度比较慢,会拖慢整个服务器响应的速度。
第一处阻塞
client, clientaddr = server.accept() # blocking
当调用的函数是一个阻塞的系统调用时,如果没有满足的数据,这个系统调用不会返回,进程会被操作系统挂起,直到有满足的数据才会将进程唤醒。比如 accept 在此处就会发生阻塞。
阻塞过程中 CPU 处于闲置状态
第二处阻塞
request = client.recv(4096) # blocking
recv 和 accept 一样。
第三处可能的阻塞
client.send(response) # maybe blocking
在内核的 socket 实现中,会有两个缓存 (buffer)。read buffer 和 write buffer 。
当内核接收到网卡传来的客户端从数据后,把数据复制到 read buffer ,这个时候 recv 阻塞的进程就可以被唤醒。
当调用 send 的时候,内核只是把 send 的数据复制到 write buffer 里,然后立即返回。
只有 write buffer 的空间不够时 send 才会被阻塞,需要等待网卡发送数据腾空 write buffer 。
(PS:curl twemproxy status 信息时,当服务端返回的信息超过 write buffer 时,客户端会接收失败,也是这个 write buffer 满导致的)
在 write buffer 的空间足够放下 send 的数据时进程才可以被唤醒。
2.2.1.2 用进程或线程处理多个客户端
因为 accept 和 recv 互相阻塞,服务器才不能同时处理多个请求,那么把它们拆出来不就好了?
这是早期大多数服务器使用的办法,具体实现有下面几种。
CGI 的方式
import os
import sys
import socket
import multiprocessing
response = 'HTTP/1.1 200 OK\r\nConnection: Close\r\nContent-Length: 1\r\n\r\nA'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8585))
server.listen(32)
def handler(client):
request = client.recv(4096)
client.send(response)
client.close()
sys.exit()
while True:
client, addr = server.accept()
process = multiprocessing.Process(target=handler, args=(client,))
process.daemon = True
process.start()
这里的 accept 依然会阻塞,但一旦新的连接建立,会创建一个新的进程去调用 recv 和 send (handler 函数在新的进程中执行)。
在主进程中立即再次调用 accept 准备接收下一个客户端连接。
CGI 虽然能处理多个客户端的请求,可是它实在太慢了。打个比方,1.1.1 的代码相当于一辆汽车在一条公路上行驶,虽然每次只能送一个人,但它可以开得飞快。
在 CGI 中变成了无数辆汽车,一旦汽车的数量超过道路的负载就堵车了。
Thread 线程
import os
import sys
import socket
import threading
response = 'HTTP/1.1 200 OK\r\nConnection: Close\r\nContent-Length: 1\r\n\r\nA'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8585))
server.listen(32)
def handler(client):
request = client.recv(4096)
client.send(response)
client.close()
while True:
client, clientaddr = server.accept()
thread = threading.Thread(target=handler, args=(client,))
thread.daemon = True
thread.start()
进程换成了线程。既然给汽车太多会堵车,那么给每个人发一辆自行车好了,这样一条路就可以让更多的人通过了。
显然这样做是治标不治本啊,自行车多了,不还是会堵车吗。
Prefork
import os
import sys
import socket
import multiprocessing
response = 'HTTP/1.1 200 OK\r\nConnection: Close\r\nContent-Length: 1\r\n\r\nA'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8585))
server.listen(32)
def handler():
while True:
client, addr = server.accept()
request = client.recv(4096)
client.send(response)
client.close()
sys.exit()
processes = []
for i in range(0, 4):
process = multiprocessing.Process(target=handler, args=())
process.start()
processes.append(process)
for process in processes:
process.join()
如果道路只允许固定的汽车同时通过能够保证道路的畅通,而且其它汽车等待的时间比堵车的时间还要短。 那我干脆只允许每次只有固定数量的汽车通过好了,如果路上没有位置,就让他等待别的车行驶到终点。 如果路上还有空余的位置,就立即通行。这样的设计即能同时处理多个请求,也能保证不会让系统变得很慢。
这个设计简直太聪明了。现在有相当多的服务器是这样实现的。
因为 UNIX 子进程会继承父进程的所有 fd 。显然 server 变量也不例外,这样只要在子进程中调用 accept 和 recv 就好了(相当于多个 第 1 章 中的实例),即使阻塞也只会阻塞一个进程。这个方式能够同时处理请求的数量等于子进程的数量。
你可能会有一个问题,“如果多个子进程同时阻塞在 accept 上,那么新的连接来了应该唤醒谁?”
这个现象叫“惊群效应”,在早期 Linux 内核中,每个阻塞在 accept 的子进程都会被唤醒,但只有一个进程能成功建立连接,其它的进程会返回一个错误。新的 Linux 内核已经不存在这个问题,每次只会唤醒一个进程,其它进程继续等待新的连接。
这个时候想,如果把 Prefork 中的 “汽车” 换成 “自行车” 会怎样?
ThreadPool 线程池
ThreadPool 有两个版本,第一个版本在主线程中 accept , 子线程中 recv 和 send
import os
import sys
import time
import Queue
import socket
import threading
response = 'HTTP/1.1 200 OK\r\nConnection: Close\r\nContent-Length: 1\r\n\r\nA'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8585))
server.listen(32)
def handler(queue):
while True:
client = queue.get()
request = client.recv(4096)
client.send(response)
client.close()
thread.exit()
queue = Queue.Queue()
threads = []
for i in range(0, 4):
thread = threading.Thread(target=handler, args=(queue,))
thread.daemon = True
thread.start()
threads.append(thread)
while True:
client, clientaddr = server.accept()
queue.put(client)
第二个版本,子线程同时 accept 并 recv 和 send
import os
import sys
import time
import socket
import threading
response = 'HTTP/1.1 200 OK\r\nConnection: Close\r\nContent-Length: 1\r\n\r\nA'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
server.bind(('0.0.0.0', 8585))
server.listen(32)
def handler():
while True:
client, clientaddr = server.accept()
request = client.recv(4096)
client.send(response)
client.close()
thread.exit()
threads = []
for i in range(0, 4):
thread = threading.Thread(target=handler, args=())
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
这个就是 “Prefork” 的线程池版。现在也有相当多的服务器是用这个方法实现的。
很难说 Prefork 和 ThreadPool 哪个更好,如果业务相对独立 Prefork 提供了更好的隔离性,进程之间不能访问彼此的内存空间,而且即使有一个子进程甚至主进程崩溃都不会影响正常的子进程。但如果需要共享数据的话,Prefork 是相当的麻烦。
当需要共享大量数据的时候 ThreadPool 用起来会更顺手。但也同时需要注意多线程临界区的加锁等。实现不好可能会变成死锁或者效率太低。
在线程池的版本中,第一个 ThreadPool 中的例子在 queue 上会有读写锁(主线程写,子线程读)。不过可以用 Lock Free 算法实现一个精巧的 Ring Buffer 避免这个问题,后边服务器优化的文章再介绍。
第二个 ThreadPool 的例子和 prefork 版本几乎是一模一样的。依赖内核的 accept 的实现。
2.2.2 Nonblocking 非阻塞
CGI, Thread, Prefork, ThreadPool 的目标只有一个,解决阻塞调用拖慢服务器的问题。费了这么大的功夫,为什么不直接把“阻塞”的函数变成“不阻塞”的呢?
非阻塞式 IO 中,用户进程其实是需要不断的主动询问 kernel 数据准备好了没有
import os
import fcntl
import socket
import select
response = 'HTTP/1.1 200 OK\r\nConnection: Close\r\nContent-Length: 1\r\n\r\nA'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8585))
server.listen(32)
# set nonblocking
flags = fcntl.fcntl(server.fileno(), fcntl.F_GETFL)
fcntl.fcntl(server.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
clients = set([])
while True:
try:
client, clientaddr = server.accept()
clients.add(client)
except Exception as e:
pass
for client in clients.copy():
try:
request = client.recv(4096)
client.send(response)
clients.remove(client)
client.close()
except Exception as e:
pass
这两行代码把 server 设置成非阻塞模式。
flags = fcntl.fcntl(server.fileno(), fcntl.F_GETFL)
fcntl.fcntl(server.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
现在 server 上的 accept 调用就成了非阻塞的了,并且在 accept 成功返回的 client 也会继承 server 的非阻塞特性,所以对 client 的 recv 调用,也是非阻塞的。
在非阻塞模式下调用阻塞函数(accept 和 recv),如果并没有可用的数据, 原本应阻塞的函数会直接返回 EAGAIN 或 EWOULDBLOCK 错误,程序可以继续执行,不会被操作系统挂起。
EAGAIN 和 EWOULDBLOCK 的错误码通常是相同的。但从兼容上考虑,需要两个错误分别判断。
在 Python 中是以 异常 的形式返回这两个错误的,我用 Exception 就为了捕获这两个错误码,处理方式是什么都不做。
try:
client, clientaddr = server.accept()
clients.add(client)
except Exception as e:
pass
这是处理非阻塞的 accept 调用,如果 accept 成功返回,就把返回的 client 放到一个 clients 的数组中,准备下一步接收 client 的数据。如果没有新的连接,accept 抛出异常,我们直接 pass 到下一步。
现在进程不会被操作系统挂起,将立即执行下面的代码。
for client in clients.copy():
try:
request = client.recv(4096)
client.send(response)
clients.remove(client)
client.close()
except Exception as e:
pass
这段代码并不难理解,for 循环遍历所有已经建立连接的 client 并 尝试 调用 recv 。如果 recv 返回数据,继续调用 send 发送回复,然后把 client 从 clients 中删除并关闭。如果抛出异常就 pass 到下一个 client 。
因为 Python 不允许在遍历一个数组的时候修改数组的内容 ( clients.remove(c) ),所以我用遍历的是 clients.copy() 。
实际上,这个例子中的代码是有严重性能问题的,假定以下情况。
(2) 现在也没有任何连接,clients 数组是空的。
当满足这两情况下的时候,服务器负载为 0 。会发生什么?
(1) accept 会立即失败进入 for 循环。
服务器逻辑相当于下面的代码。
进入了一个无意义的循环消耗 CPU ,这个叫做 busy wait 。这种情况在服务器编程中是不允许出现的。
那么如何避免这样的情况发生?方法可能让你大跌眼镜。
用 blocking 的方式。我们躲避了半天的 blocking ,又回来了。
2.2.3 I/O Multiplexing IO 多路复用
这是电子通信的术语。把它用在非阻塞编程上,也是恰到好处。
2.2.3.1 Select
可看最后一章,传送门
2.2.3.2 epoll
select 的实现比较早,那时并没有考虑到会有成千上万个连接的情况。
所以 select 返回的时候并不返回发生事件的 fd ,它只会告诉你有 N 个 fd 有事件,但具体是哪个 fd ,哪个事件,需要自己去找出来。
我们不得不用循环的方式挨个检查我们传给 select 的 fd 。这样就会导致当连接数量比较多且通信比较少的时候(比如长连接心跳),每次都要循环检查一遍所有的连接,真是白白浪费 CPU 。
epoll 与 select 不同的是它会返回具体的 fd 。我们只要处理有事件的 fd 就可以了。性能和连接数的多少没有关系。
2.3 如何选择
2.3.1 少量客户端,短时间连接
典型应用:流量较少的静态 HTTP 服务
推荐模型:CGI ,Thread ,Prefork ,ThreadPool
2.3.2 少量客户端,长时间连接
典型应用:数据库服务器
推荐模型:CGI , Thread
2.3.3 大量客户端,短时间连接
典型应用:流量大的静态 HTTP 服务
推荐模型:Prefork ,ThreadPool ,Select
2.3.4 大量客户端,长时间连接
典型应用:大流量 HTTP 服务, XMPP 服务,WebSocket 服务
推荐模型:Epoll