openstack里如何使用monkey patch的参考这篇
然后我们要学习eventlet使用,绿色线程工作原理
看完绿色线程工作原理我们来看看L3 Agent的入口start部分,找到绿色线程的入口代码
# 回顾下前面的代码
# 子进程中
# _start_child->_child_process->launch_service->services.add
class Services
def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
# event对象,这里我们看到Services中有event对象
self.done = event.Event()
....
def add(self, service):
.....
self.tg.add_thread(self.run_service, service, self.done)
@staticmethod
def run_service(service, done):
# 这里调用了service的start方法并wait
# done是Services类实力的done属性
# done是一个Event.event()
try:
service.start()
except Exception:
LOG.exception(_LE('Error starting thread.'))
raise SystemExit(1)
else:
# 代码一直不会走到这里
# 直到service.start()结束
done.wait()
def stop(self):
.....
# 可以看到这里调用了self.done.send()
# 这样wait就能获取到结果然后结束
if not self.done.ready():
self.done.send()
....
# Service类的初始化
server = neutron_service.Service.create(
binary='neutron-l3-agent',
topic=topics.L3_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager=manager)
# 所以我们的Service类为
class Service(n_rpc.Service):
...
# 这里也就是run_service函数
# 最终调用的service.start的位置
def start(self):
...
# 省略非关键代码
# n_rpc.Service中的代码合并过来
super(Service, self).start()
# socket代码经过monkey path
# 所有的cocket的recv send connect代码都是绿化过的
self.conn = create_connection()
LOG.debug("Creating Consumer connection for Service %s",
self.topic)
# 关键点!!!!
# self.manager是L3NATAgent
# L3NATAgent作为endpoints传到rpc接口
# 接受到rabbit的数据包的时候
# 自动会调用endpoints,也就是L3NATAgent实例来处理数据包
endpoints = [self.manager]
self.conn.create_consumer(self.topic, endpoints)
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
self.conn.consume_in_threads()
self.manager.after_start()
....
class L3NATAgent:
def after_start(self):
.....
eventlet.spawn_n(self._process_routers_loop)
LOG.info(_LI("L3 agent started"))
def _process_routers_loop(self):
...
# 生成一个绿色线程池,只有8个绿色线程
pool = eventlet.GreenPool(size=8)
# 因为只有8个绿色线程,所以这里会在
# 孵化8个_process_router_update绿色线程后被阻塞
# 阻塞形式是以不停的切换到main loop中的形势阻塞
# 并不是真正的卡住阻塞
# 这里写永真循环的作用在于当任意一个_process_router_update绿色线程挂了
# 都能再启一个_process_router_update绿色线程
while True:
pool.spawn_n(self._process_router_update)