我们先来看看hub怎么来的
# 线程局部变量
_threadlocal = threading.local()
def get_hub():
try:
# 从线程局部变量中获取hub
hub = _threadlocal.hub
except AttributeError:
try:
# 这里是判断_threadlocal中是否已经有了Hub函数
_threadlocal.Hub
except AttributeError:
# use_hub内部是根据系统、python库来选择使用的Hub类
# 在linux中,use_hub()默认为
# _threadlocal.Hub = eventlet.hub.epoll.Hub
# use_hub()里怎么绕到eventlet.hub.epoll.Hub就不再详细看了
use_hub()
# 调用_threadlocal.Hub()给_threadlocal.hub赋值
hub = _threadlocal.hub = _threadlocal.Hub()
# 上面的代码绕了一下的是为的是实现返回的hub在单个线程中只创建一次的(线程中的单例)
return hub
看Hub类之前要看hub用到的一个类timer,这个类比较重要
class Timer(object):
# timer就是一个定时器类
# Hub的main loop里会不停的扫描定时器列表
# 然后处理里面的timer类实例
# timer类实例在tpl变量中保存切换绿色线程的函数
def __init__(self, seconds, cb, *args, **kw):
# timer被调用的时间
# 如果要立即被调用,那么这个时间一般是
# time.time() + 0
# 如果要延迟0.01秒调用
# 这个时间是 time.time() + 0.01
self.seconds = seconds
# cb这个参数一般是GreenThread/greenlet的switch
# *args, **kw自然就是传给switch的参数
# cb也不一定是switch,也可能是event之类用于通知的调用接口
# 注意这里所说的event不是epoll的event,是eventlet.event.Event
# 如果是event,里面会有更复杂的绿色线程切换
# cb也可以直接是一个外部函数
# 但如果是一个外部函数,这个函数必须是非阻塞的而且运行时间短
# 否则会影响整个main loop
# 所以这个cb尽量不要传函数而是传绿色线程的switch或event
self.tpl = cb, args, kw
# 当前定时器是否被调用过
self.called = False
if _g_debug:
self.traceback = six.StringIO()
traceback.print_stack(file=self.traceback)
@property
def pending(self):
return not self.called
def copy(self):
cb, args, kw = self.tpl
return self.__class__(self.seconds, cb, *args, **kw)
def schedule(self):
"""Schedule this timer to run in the current runloop.
"""
self.called = False
self.scheduled_time = get_hub().add_timer(self)
return self
def __call__(self, *args):
# 当这个定时器被调用的时候
# 激活这个timer中的绿色线程/函数
# 这个__call__接受的args参数是没用的
# 这里的参数是防止外部写错了传参导致报错
# 所以timer的调用都是直接timer()
if not self.called:
# 当前timer被调用后
# 设置被调用标记
self.called = True
cb, args, kw = self.tpl
try:
# 当cb是GreenThread/greenlet 的switch时
# cb(*args, **kw)相当于
# 执行的switch(*args, **kw)
# 否则就是执行具体的外部函数或event
cb(*args, **kw)
finally:
try:
# 删除self.tpl
# 防止self.tpl的存在导致绿色线程变量引用计数器不为0
# 最终导致绿色线程没被gc删除
del self.tpl
except AttributeError:
pass
def cancel(self):
# 取消当前定时器,如果这个定时器已经被完成或者取消
# 这个函数无效
# 可以看出timer的cancel
# 最终调用的是hub类的timer_canceled
if not self.called:
self.called = True
get_hub().timer_canceled(self)
try:
del self.tpl
except AttributeError:
pass
# 用于比较timer实例的大小
# 方便timer组成的列表排序
def __lt__(self, other):
return id(self) < id(other)
我在来看一个关键类FdListener
class FdListener(object):
# 这个类是用于存放需要监听fd的绿色线程
# 有greenlet和对应的fd
# cb是外部传入的通知函数
# cb如果是绿色线程的的switch,用于跳转回原来的绿色线程
# 外部一般用trampoline生成FdListener实例
# 在trampoline函数中,传入的cb是greenlet.getcurrent().switch
# 也就说和self.greenlet.switch相同
# cb也有可能是一个event函数,比价少见,zmq的封装里有用event
# cb最好不要是一个外部的函数,参考timer的cb说明
# tb就是绿色线程的throw,一般是出现异常后执行的函数
def __init__(self, evtype, fileno, cb, tb, mark_as_closed):
assert (evtype is READ or evtype is WRITE)
self.evtype = evtype
self.fileno = fileno
self.cb = cb
self.tb = tb
self.mark_as_closed = mark_as_closed
self.spent = False
self.greenlet = greenlet.getcurrent()
def __repr__(self):
return "%s(%r, %r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno,
self.cb, self.tb)
__str__ = __repr__
def defang(self):
# cb重定向到closed_callback
# 这个defang的不在本篇中说明
self.cb = closed_callback
if self.mark_as_closed is not None:
self.mark_as_closed()
self.spent = True
现在可以来看Hub类了
[GCC 4.4.7 20120313 (Red Hat 4.4.7-17)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import eventlet
>>> from eventlet.support import greenlets as greenlet
>>> hub = eventlet.hubs.get_hub()
>>> current = greenlet.getcurrent()
>>> current.switch()
()
>>> print current
<greenlet.greenlet object at 0x7fc3fc805f50>
>>> print hub.greenlet
<greenlet.greenlet object at 0x7fc3f0d87f50>
>>> print hub.greenlet.parent
<greenlet.greenlet object at 0x7fc3fc805f50>
>>> print hub.greenlet.dead
False
>>>
# noop是一个默认的fd listener类(单例)
# 监听fd为0, 用来当默认值防止循环报错的,后面wait里用到
noop = FdListener(READ, 0, lambda x: None, lambda x: None, None)
class Hub(object):
SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)
READ = READ
WRITE = WRITE
def __init__(self, clock=time.time):
# listeners很重要
# evelnet通过monkey patch hack了原生的socket
# 当被hack过的socket类调用connect、recv、send等函数的时候
# 会通过函数trampoline调用Hub.add(具体看后面的add函数)
# 将需要监听的fd加入到self.listeners中去
# 顺便,eventlet没有重载socket.listen函数
# 也就是socket.listen是原生的不被绿色线程管理
# 具体参考eventlet.greenio.base.GreenSocket类
# 他取代了socket.socket
self.listeners = {READ: {}, WRITE: {}}
self.secondaries = {READ: {}, WRITE: {}}
self.closed = []
# 默认时间计算函数用time.time
self.clock = clock
# 启动一个绿色线程,跑self.run的代码
# 前面我们说了 hub是线程中唯一的(线程中的单例)
# 所以这个greenlet就是main greenlet,用于跑main loop,也就是self.run
# 所有greenthread.spawn*函数孵化的绿色线程的parent都是这个greenlet
# main loop的绿色线程的parent也就是
# get_hubs().greenlet.parent具体哪里来的看c源码才知道
# 只知道这个绿色线程不能在一开始就调用get_hubs().switch
# 因为main loop的绿色线程一开始是dead状态的
self.greenlet = greenlet.greenlet(self.run)
self.stopping = False
self.running = False
# 所有列表保存需要处理的定时器Timer实例
# 这个列表每次都会被排序,下次运行时间最小的排第一位
# 为了能最快的排序,代码里用了堆排序
self.timers = []
# next_timers是预备定时器
# main loop里会先把self.next_timers里的定时器装入self.timers
# 然后再调用fire_timers来处理self.timers列表中的定时器
# next_timers的是给外部绿色线程取消自己定时器用的
# 这样main loop就不用排序完后,调用定时器的时候才发现已经取消
# 减少self.timers中的元素以便更快排序
self.next_timers = []
# lclass是FdListener类
self.lclass = FdListener
# 通过这个计数器激活清理timers和next_timers列表
self.timers_canceled = 0
# 调试参数不看
self.debug_exceptions = True
self.debug_blocking = False
self.debug_blocking_resolution = 1
# 上面是BaseHub的代码 epoll的hub多了下面一些属性
self.poll = epoll()
self.modify = self.poll.modify
def block_detect_pre(self):
# 调试用,启动调试后在fire_timers前捕获信号
# fire_timers就是调用最小时间的timer.__call__
tmp = signal.signal(signal.SIGALRM, alarm_handler)
if tmp != alarm_handler:
self._old_signal_handler = tmp
arm_alarm(self.debug_blocking_resolution)
def block_detect_post(self):
# 调试用,启动调试后在fire_timers后捕获信号
if (hasattr(self, "_old_signal_handler") and
self._old_signal_handler):
signal.signal(signal.SIGALRM, self._old_signal_handler)
signal.alarm(0)
# epoll.hub的wait函数
def wait(self, seconds=None):
# wait是self.run,也就是main loop里用于调用切换到其他绿色线程的
readers = self.listeners[READ]
writers = self.listeners[WRITE]
# 没有监听任何fd
if not readers and not writers:
if seconds:
sleep(seconds)
return
try:
# 从epoll中获取事件
presult = self.do_poll(seconds)
except (IOError, select.error) as e:
if get_errno(e) == errno.EINTR:
return
raise
SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
if self.debug_blocking:
self.block_detect_pre()
# 用epoll的返回事件生成回调列表
callbacks = set()
for fileno, event in presult:
if event & READ_MASK:
# noop不用管,防止错误的默认对象
# get返回FdListener类实例
callbacks.add((readers.get(fileno, noop), fileno))
if event & WRITE_MASK:
callbacks.add((writers.get(fileno, noop), fileno))
if event & select.POLLNVAL:
# 出现epoll错误,调用下面的remove_descriptor
self.remove_descriptor(fileno)
continue
if event & EXC_MASK:
callbacks.add((readers.get(fileno, noop), fileno))
callbacks.add((writers.get(fileno, noop), fileno))
for listener, fileno in callbacks:
try:
# 跳回listener所在绿色线程
# listener所在的这个绿色线程
# 调用accept、connect、recv、send等函数后会通过trampoline
# switch到main loop
# 现在main loop在这里又回到之前绿色线程执行的位置
# 也就是trampoline调用hub.switch()后的部分
# 接下来会执行
# hub.remove(listener)
# timer.cancel
# 返回之前hub.switch()的返回值
# 也就是说最终又回到accept、connect、recv、send等调用的位置
# 也就是在这里实现了不修改源代码也能绿化
# 我们后面在详细看下trampoline函数
listener.cb(fileno)
except SYSTEM_EXCEPTIONS:
raise
except:
self.squelch_exception(fileno, sys.exc_info())
clear_sys_exc_info()
# 其他绿色线程执行完回到这里
if self.debug_blocking:
self.block_detect_post()
# epoll多出的函数
def do_poll(self, seconds):
# epoll poll事件
return self.poll.poll(seconds)
def register(self, fileno, new=False):
# 省略epoll注册监听的代码
.....
def add(self, evtype, fileno, cb, tb, mark_as_closed):
oldlisteners = bool(self.listeners[READ].get(fileno) or
self.listeners[WRITE].get(fileno))
# 这个代码用每次有socket调用accept、connect、send等函数时
# 最终都会触发这个Hub.add函数
# 这个函数会通过fd和绿色线程生成FdListener实例
# 并添加到self.listeners字典中
# mark_as_closed是一个外部传入的函数
# 用于fd关闭后的清理
listener = self.lclass(evtype, fileno, cb, tb, mark_as_closed)
bucket = self.listeners[evtype]
if fileno in bucket:
if g_prevent_multiple_readers:
# raise内容我删除了
raise RuntimeError(".....")
self.secondaries[evtype].setdefault(fileno, []).append(listener)
else:
bucket[fileno] = listener
# 下面是epoll多出来的,epoll注册监听
# 可以看到每次调用Hub.add都会调用epoll的注册fd
try:
if not oldlisteners:
self.register(fileno, new=True)
else:
self.register(fileno, new=False)
except IOError as ex: # ignore EEXIST, #80
# 忽略fd已经注册过的错误
if get_errno(ex) != errno.EEXIST:
raise
return listener
def remove(self, listener):
# 这个函数也是用于外部调用
# 用于移除监听的fd
if listener.spent:
# trampoline may trigger this in its finally section.
return
fileno = listener.fileno
evtype = listener.evtype
self.listeners[evtype].pop(fileno, None)
# migrate a secondary listener to be the primary listener
if fileno in self.secondaries[evtype]:
sec = self.secondaries[evtype].get(fileno, None)
if not sec:
return
self.listeners[evtype][fileno] = sec.pop(0)
if not sec:
del self.secondaries[evtype][fileno]
def remove_descriptor(self, fileno):
# 用注销fd监听,
# hub中从self.listeners弹出fd对应的listener
# 陆续切换到listener对应的绿色线程
# remove_descriptor在wait中有调用
# 看上最好由内部调用
listeners = []
listeners.append(self.listeners[READ].pop(fileno, noop))
listeners.append(self.listeners[WRITE].pop(fileno, noop))
listeners.extend(self.secondaries[READ].pop(fileno, ()))
listeners.extend(self.secondaries[WRITE].pop(fileno, ()))
for listener in listeners:
try:
# 看wait函数里相同部分的说明
listener.cb(fileno)
except Exception:
self.squelch_generic_exception(sys.exc_info())
def close_one(self):
# 处理已经关闭的端口的绿色线程
listener = self.closed.pop()
if not listener.greenlet.dead:
# There's no point signalling a greenlet that's already dead.
listener.tb(IOClosed(errno.ENOTCONN, "Operation on closed file"))
def ensure_greenlet(self):
# hub的绿色线程挂了
# 也就是main loop所在线程挂了
# 刚开始Hub的main loop是dead的
if self.greenlet.dead:
# 重新开一个绿色线程
new = greenlet.greenlet(self.run, self.greenlet.parent)
# 设置self.greenlet.parent和self.greenlet为新的绿色线程
self.greenlet.parent = new
self.greenlet = new
def switch(self):
# 这个switch用于给其他绿色线程放弃cpu时间
# 返回到main loop(也就是这里的self.run里)的绿色线程中的函数
# 用法是其他绿色线程调用get_hub().switch()
# 当然其他绿色线程也可以调用self.parent.switch()来回到main loop
# hup里写的switch封装了一些其他东西,比如main loop重启
# 所以切换到main loop用get_hub().switch()比较规范
cur = greenlet.getcurrent()
# main loop所在绿色线程不能执行这个switch函数
assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP'
switch_out = getattr(cur, 'switch_out', None)
if switch_out is not None:
try:
switch_out()
except:
self.squelch_generic_exception(sys.exc_info())
# 确保main loop没有挂
# 如果main loop挂了就重新启动main loop及其对应绿色线程
self.ensure_greenlet()
try:
# 每次都把非main loop的绿色线程parent设置为main loop的绿色线程
# 这里要配合self.ensure_greenlet才好理解
# 在main loop重启后,所有绿色线程的parent都要重新指向
# 新main loop所在的绿色线程
if self.greenlet.parent is not cur:
cur.parent = self.greenlet
except ValueError:
pass # gets raised if there is a greenlet parent cycle
clear_sys_exc_info()
return self.greenlet.switch()
def default_sleep(self):
return 60.0
def sleep_until(self):
t = self.timers
if not t:
return None
return t[0][0]
def run(self, *a, **kw):
# 这个run就是hub的循环
# 前面我们说了 hub是线程中唯一的(线程中的单例)
# 这个循环就是main loop
# 当其他线程调用self.parent.switch()或者调用get_hub().switch()
# 的时候就会切换到当前循环中
if self.running:
raise RuntimeError("Already running!")
try:
# 循环中删除部分调试代码
self.running = True
self.stopping = False
while not self.stopping:
# 清理所有已经关闭的fd
while self.closed:
# 一次清理一个
self.close_one()
# 看函数说明
self.prepare_timers()
# 看函数说明
self.fire_timers(self.clock())
self.prepare_timers()
# wakeup_when是通过self.timers[0]的执行时间来计算sleep_time
wakeup_when = self.sleep_until()
if wakeup_when is None:
sleep_time = self.default_sleep()
else:
sleep_time = wakeup_when - self.clock()
# wait里切换到处理epoll读写事件的fd的绿色线程
# 也就是说main loop里有两次会切换其他绿色线程中
# 一次是fire_timers、一次是wait
if sleep_time > 0:
self.wait(sleep_time)
else:
self.wait(0)
# 下面是循环结束后的清理代码
else:
self.timers_canceled = 0
del self.timers[:]
del self.next_timers[:]
finally:
self.running = False
self.stopping = False
def add_timer(self, timer):
# 把一个定时器加入到next_timers列表中
# next_timers列表是用来预备调度的定时器列表
# main loop中会通过prepare_timers函数讲
# next_timers推送到timers列表中
# timers列表中的timer会被fire_timers函数执行
scheduled_time = self.clock() + timer.seconds
self.next_timers.append((scheduled_time, timer))
return scheduled_time
def timer_canceled(self, timer):
# timer取消,已经完成的timer也是使用timer_canceled
self.timers_canceled += 1
len_timers = len(self.timers) + len(self.next_timers)
# 要执行的timers和准备要执行的timer加起来超过1000
# 且timers_canceled计数器大于上述长度一半的时候
# 开始清理掉有called标记的timer
# 可以看出timers_canceled是一个清理用的计数器
if len_timers > 1000 and len_timers / 2 <= self.timers_canceled:
self.timers_canceled = 0
self.timers = [t for t in self.timers if not t[1].called]
self.next_timers = [t for t in self.next_timers if not t[1].called]
# 重新排序
heapq.heapify(self.timers)
def prepare_timers(self):
# 把next_timers中的所有定时器放入timers列表中
# 推入的时候已经完成排序
# 时间最短的timer在0号位置
heappush = heapq.heappush
t = self.timers
for item in self.next_timers:
if item[1].called:
self.timers_canceled -= 1
else:
heappush(t, item)
# 清空next_timers列表
del self.next_timers[:]
def fire_timers(self, when):
# 从self.timers取出定时器然后执行
t = self.timers
heappop = heapq.heappop
# 在timers列表不为空的情况下循环
while t:
# next是timers中第一个元素
# next是一个tuple,0 是timer实例,1是这个timer需要执行的时间点
# timers是排序过的,第一个元素肯定是时间属性最小的
next = t[0]
# exp是timer需要执行的时间点
exp = next[0]
timer = next[1]
# 第一个需要执行的定时器都还没到需要执行的时间
# 直接退出循环
if when < exp:
break
# 这里相当于t.pop(0)
# 然后再排序把最小的放最前面
# 下次取t[0]的时候还是最小
# 反正是堆排序
heappop(t)
try:
# 取出来的timer有called标记
if timer.called:
# timers_canceled计数器减1
self.timers_canceled -= 1
else:
# 这里是Time.__call__
# 一般是switch到对应绿色线程
timer()
except self.SYSTEM_EXCEPTIONS:
raise
except:
self.squelch_timer_exception(timer, sys.exc_info())
clear_sys_exc_info()
def schedule_call_global(self, seconds, cb, *args, **kw):
# 把一个绿色线程/函数封装为timer, cb就是这个函数或者绿色线程的switch
# 并加入到next_timers预备定时器列表中
# timer被main loop调用的时候
# 会执行cb(*args, **kw)
t = timer.Timer(seconds, cb, *args, **kw)
self.add_timer(t)
return t
# 省略部分调试代码
......
关键的trampoline函数,monkey patch过的socket最终通过trampoline函数来调用Hub.add的
def trampoline(fd, read=None, write=None, timeout=None,
timeout_exc=timeout.Timeout,
mark_as_closed=None):
# trampoline是一次性的
# 从main loop切换回来的时候会注销之前的监听
# 所以recv等需要自己调用Hub.add而不是用trampoline
t = None
hub = get_hub()
current = greenlet.getcurrent()
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
# 不能同时又read又wirte
assert not (
read and write), 'not allowed to trampoline for reading and writing'
# 获取到socket的fileno
try:
fileno = fd.fileno()
except AttributeError:
fileno = fd
# 当外部没有传入timeout的时候才会添加这个调试用的定时器
if timeout is not None:
def _timeout(exc):
# This is only useful to insert debugging
# 注释表示这里是为了插入调试
current.throw(exc)
# 这里可以看出schedule_call_global的cb不一定是switch
# 这个定时器用于触发current.throw
t = hub.schedule_call_global(timeout, _timeout, timeout_exc)
try:
# 这里调用hub.add
# 这里可以看到,每次socket调用后才调用hub.add
# 调用完又把fd从hub的listeners里删除
# hub里的epoll会不停的注册、注销fd
if read:
listener = hub.add(hub.READ, fileno, current.switch, current.throw, mark_as_closed)
elif write:
listener = hub.add(hub.WRITE, fileno, current.switch, current.throw, mark_as_closed)
try:
# 这里的finally有点晕人
# 写成
# ret = hub.switch()
# return ret
# 这样顺序就好理解一点了
# 先 switch 切到main loop
# 当被切回来的时候
# remove、然后再 t.cancel
# 然后return ret
# ret值当然是None了
# 这里的返回值也没什么用
return hub.switch()
finally:
# 这里看到这个listener对象是一次性的
hub.remove(listener)
finally:
if t is not None:
t.cancel()
我们以recv为例来看看GreenSocket怎么封装的
class GreenSocket(object):
# 这个类用于hack socket.socket实例
# 在外部调用socket.socket的时候实际上在调用GreenSocket
def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
if isinstance(family_or_realsock, six.integer_types):
fd = _original_socket(family_or_realsock, *args, **kwargs)
# Notify the hub that this is a newly-opened socket.
notify_opened(fd.fileno())
else:
fd = family_or_realsock
# 这里的fd,就是真正的socket.socket实例
self.fd = fd
...
def recv(self, bufsize, flags=0):
return self._recv_loop(self.fd.recv, bufsize, flags)
def _recv_loop(self, recv_meth, *args):
fd = self.fd
# 使用非阻塞socket的时候
# 调用self.fd.recv
# 也就是直接调用原生socket.recv
# 没有绿色线程什么事
if self.act_non_blocking:
return recv_meth(*args)
# 阻塞的recv走这里
while True:
try:
# recv: bufsize=0?
# recv_into: buffer is empty?
# This is needed because behind the scenes we use sockets in
# nonblocking mode and builtin recv* methods. Attempting to read
# 0 bytes from a nonblocking socket using a builtin recv* method
# does not raise a timeout exception. Since we're simulating
# a blocking socket here we need to produce a timeout exception
# if needed, hence the call to trampoline.
# 说明部分就不翻译了
# 这里的判断可以看出!!!!阻塞情况下有recv的长度,是会不走绿色线程!!!
# 也就是说阻塞的情况下
# recv的第一个参数必须是0才走绿色线程
# 但是直接recv(0)的写法又是有问题的
# 所以,正确的socket.recv写法是
# 1、常规方式,自己用epoll之类写socket.recv的异步
# 2、connect的时候调用Hub.add添加监听
# 然后在recv前切换到main loop
# 切换之前要能让Hub.add的cb函数能切换到recv所在绿色线程
if not args[0]:
# 这里是封装了的trampoline函数
# 这里走完表示epoll在当前fd有事件
# 从main loop回到当前绿色线程
self._read_trampoline()
# 走这里会阻塞!!!
return recv_meth(*args)
except socket.error as e:
# 这个错误是connect还没建立完成就recv的错误
if get_errno(e) in SOCKET_BLOCKING:
pass
elif get_errno(e) in SOCKET_CLOSED:
return b''
else:
raise
# 遇到SOCKET_BLOCKING错误走这里
try:
self._read_trampoline()
except IOClosed as e:
# Perhaps we should return '' instead?
raise EOFError()
def _read_trampoline(self):
self._trampoline(
self.fd,
read=True,
timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
if self._closed:
# If we did any logging, alerting to a second trampoline attempt on a closed
# socket here would be useful.
raise IOClosed()
try:
# 这里调用trampoline,走绿色线程
return trampoline(fd, read=read, write=write, timeout=timeout,
timeout_exc=timeout_exc,
mark_as_closed=self._mark_as_closed)
except IOClosed:
# This socket's been obsoleted. De-fang it.
self._mark_as_closed()
raise
我们现在来总结下,我们先简化一下Hub的man loop
while not self.stopping:
self.prepare_timers()
self.fire_timers(self.clock())
self.prepare_timers()
self.wait(0)
Hup的main loop里主要的工作1:fire_timers————处理定时器
定时器到点就调用timer中的cb
定时器的添加一般通过Hub.schedule_call_global
一般来说你的函数执行过程中要等待另外一个函数执行完
就通过eventlet.greenthread.sleep添加定时器并切换到其他绿色线程
我们来看看sleep的实现
def sleep(seconds=0):
hub = hubs.get_hub()
current = getcurrent()
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
timer = hub.schedule_call_global(seconds, current.switch)
try:
hub.switch()
finally:
# 切换回来的时候删除添加的timer
timer.cancel()
Hup的main loop里主要的工作2:wait————通过epoll处理fd事件
处理完定时器后,检查self.listeners字典
如果里面有内容,就调用epoll扫描字典中的fd
如果fd中有事件,切换到fd相关的绿色线程(这个是常规行为,实际是调用Hub.add传入的cb)
listeners的添加一般通过Hub.add或者封装过的trampoline函数
monkey patch后的原生socket.socket等被替换
执行诸如socket.connect之类的函数最终会根据情况调用trampoline
所以openstack可以在不修改pika、wsgify的socket数据处理部分代码的情况下绿化它们
注意,正常情况下recv不会被自动绿化,具体参考GreenSocket的recv_loop中说明
到这里,我们就大致的理解了eventlet的Hub工作原理了,eventlet也就明白了一半,再深入就要看eventlet中event的工作原理