openstack的服务都和eventlet有关,建议看本篇之前先看eventlet使用,绿色线程工作原理
- launcher
openstack里的launcher有两种(在oslo_service.service中), ServiceLauncher和ProcessLauncher,一般用法为
def launch(conf, service, workers=1):
if workers is None or workers == 1:
launcher = ServiceLauncher(conf)
launcher.launch_service(service)
else:
launcher = ProcessLauncher(conf)
launcher.launch_service(service, workers=workers)
return launcher
由于各个组件是不同人写的,写法有点不同,上面写法是neutorn里的比较标准
nova里的为了单例绕了一下就难看一点
# 用这个来做单例
_launcher = None
# 外部代码里调用的是serve而不是launch比较难看,neutron里的比较好看
def serve(server, workers=None):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
# 就是前面的launch函数
_launcher = service.launch(CONF, server, workers=workers)
def wait():
_launcher.wait()
一般都使用多进程的ProcessLauncher,我们来看下核心代码
class ServiceWrapper(object):
def __init__(self, service, workers):
# service对象
self.service = service
# 总共可fork的进程数量
self.workers = workers
# 存放子进程pid
self.children = set()
# 存放每次fork出子进程的用时
# 用于防止fork进程过快
self.forktimes = []
class ProcessLauncher(object):
"""Launch a service with a given number of workers."""
def __init__(self, conf, wait_interval=0.01):
...
# 子进程字典,pid为key
self.children = {}
# 运行标志,实例化的时候就认为已经启动
self.running = True
# 循环间隔
self.wait_interval = wait_interval
# 这launcher是子进程里用的,主进程中无用
self.launcher = None
# eventlet通信管道
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
# 信号处理
self.signal_handler = SignalHandler()
self.handle_signal()
def launch_service(self, service, workers=1):
# 这个函数是一个初始化函数只运行一次
# 这里检查service的类型是不是继承于ServiceBase
_check_service_base(service)
# 生成wrap, 看前面ServiceWrapper类,里面有部分属性没有方法
wrap = ServiceWrapper(service, workers)
LOG.info(_LI('Starting %d workers'), wrap.workers)
# 生成工作的worker,也就是这里fork出多个进程
# 这里可以看出ServiceWrapper里面保存了所有的子进程的pid之类的信息
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
def _start_child(self, wrap):
...
# 省略前面防止fork过快的代码
pid = os.fork()
if pid == 0:
# 子进程中给launcher赋值
self.launcher = self._child_process(wrap.service)
# 自动重启launcher的循环
while True:
# 子进程注册进程接受信号
self._child_process_handle_signal()
# 接收到信号返回,未收到信号的时候进程阻塞于此,内有死销魂还
status, signo = self._child_wait_for_exit_or_signal(
self.launcher)
# 收到正常退出信号,子进程结束
if not _is_sighup_and_daemon(signo):
self.launcher.wait()
break
# 重启sefl.launcher
self.launcher.restart()
os._exit(status)
# 主进程
LOG.debug('Started child %d', pid)
wrap.children.add(pid)
self.children[pid] = wrap
return pid
# 子进程才用到下面代码
def _child_process(self, service):
# 这里注册一次信号接收函数
# 后面又注册一次不是很理解
self._child_process_handle_signal()
# 后面就是eventlet相关代码
# 这里初始化一个hub
eventlet.hubs.use_hub()
# 子进程关闭写管道
os.close(self.writepipe)
# 孵化一个_pipe_watcher函数,用于监控管道
eventlet.spawn_n(self._pipe_watcher)
# 运行一个随机函数,具体作用未知
random.seed()
# 这里就是调用Launcher,注意不是ProcessLauncher
launcher = Launcher(self.conf)
launcher.launch_service(service)
return launcher
def _child_wait_for_exit_or_signal(self, launcher):
status = 0
signo = 0
try:
# 子进程的工作循环在这里
# 这个launcher不是ProcessLauncher
# 看下面Launcher的wait部分
launcher.wait()
except SignalExit as exc:
signame = self.signal_handler.signals_to_name[exc.signo]
LOG.info(_LI('Child caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_LE('Unhandled exception'))
status = 2
return status, signo
# 上面是子进程用到的代码
# 下面是主进程用到的代码
def wait(self):
# 最外层的(也就是systemd指向的脚本里调用到的wait)
# launch_service后会调用这里
# 这里是主进程的wait死循环部分,不是self.launcher的wait部分
systemd.notify_once() # 通知systemd
if self.conf.log_options:
LOG.debug('Full set of CONF:')
self.conf.log_opt_values(LOG, logging.DEBUG)
try:
# 这个循环不是工作循环
# 这个是用于进程崩溃后自动重启的
while True:
# 注册信号
self.handle_signal()
# 死循环在此,这个函数用于监控并重启子进程
self._respawn_children()
# 走到这里表示主进程收到退出信号
if not self.sigcaught:
return
# 省略部分是处理崩溃的
....
except eventlet.greenlet.GreenletExit:
LOG.info(_LI("Wait called after thread killed. Cleaning up."))
# 后面是退出以及清理部分
....
def _respawn_children(self):
# 重启子进程的函数
while self.running:
# 里面主要是调用os.waitpid获取子进程的返回
wrap = self._wait_child()
# 因为子进程没有退出所有正常都是走到这里
if not wrap:
# 这个sleep比较特别
# 这个sleep会切换到其他绿色线程
# 最少self.wait_interval秒后才会返回当前位置
# eventlet.greenthread.sleep其实是创建了一个定时器
# 定时器被调度到的时候才切换回当前位置
# neutron的的主进程里没有socket相关的调用
# 也没有在Hub注册定时器定时器
# 所以没有其他绿色线程可以切换
# 这里的代码相当于切换到hub中的main loop
# 到时间又切换回来
eventlet.greenthread.sleep(self.wait_interval)
continue
# 有子进程退出,重新fork出一个worker子进程
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
下面我们来看和子进程相关的Launcher和Services, Launcher也就是ProcessLauncher中的self.Launcher
class Launcher(object):
def __init__(self, conf):
self.conf = conf
conf.register_opts(_options.service_opts)
# 创建一个services类
# services类用于管理service
self.services = Services()
# 后门端口
self.backdoor_port = (
eventlet_backdoor.initialize_if_enabled(self.conf))
def launch_service(self, service):
# service和主进程里的service是同一个
# 这里和ProcessLauncher中一样先检查service的类型
_check_service_base(service)
service.backdoor_port = self.backdoor_port
# 添加到services中,这里其实就是绿色线程spawn开始的地方
# keyston启动的时候两个service,一个public的一个admin的
# systemd如果启动nova-api就有两个services, os-computer和mete-data
# systemd也可以分别启动os-computer和metadata
# 其他一般的services都只有一个service
self.services.add(service)
def stop(self):
self.services.stop()
def wait(self):
# 看下下面service的wait
# 阻塞再次
self.services.wait()
def restart(self):
self.conf.reload_config_files()
self.services.restart()
class Services(object):
# service的start函数的绿色线程在Services中
def __init__(self):
# 一个存放service的列表
self.services = []
# 初始化一个threadgroup.ThreadGroup()
# 这是一个线程池组
self.tg = threadgroup.ThreadGroup()
# event对象
self.done = event.Event()
def add(self, service):
# 当我们调用add的时候
# 就是调用了ThreadGroup的add_thread方法
# self.run_service是函数, service, self.done是
# self.run_service的参数
self.services.append(service)
# run_service最终会调用service的start方法
# service就是外部传入的基于ServiceBase类的外部实例
# 传入self.done的目的在于
# 任意一个service都可以给self.done发信息通知真个进程退出
self.tg.add_thread(self.run_service, service, self.done)
def stop(self):
"""Wait for graceful shutdown of services and kill the threads."""
# 收到信号后会调用stop函数
# python的主线程才能收到信号
for service in self.services:
# 这里的调用必须能让的serverc.wait里死循环结束
service.stop()
# Each service has performed cleanup, now signal that the run_service
# wrapper threads can now die:
# 这里就是通知程序退出的地方
# 需要调用stop才能让run_service线程结束
# 也就是说调用stop才能让守护进程退出
if not self.done.ready():
self.done.send()
# reap threads
# 这里其实相当于重复调用tg.wait()
# 当这里处理完tg里的线程后
# wait里的tg.wait()也会退出
self.tg.stop()
def wait(self):
"""Wait for services to shut down."""
# 这个wait先调用外部service的wait
# service的wait一般是调用所有定时器的wait函数
# service常用的定时器 1是定期汇报(相当于心跳) 2 是定时任务
# 如果外部service的wait没有死循环会直接走到下面
# 这里是为了等待service里的循环正常退出
for service in self.services:
service.wait()
# 这里最终会调用ThreadGroup的wait
# 看下面ThreadGroup的wait
# Launcher调用的的wait的阻塞就是这里
# 当然service的wati里有死循环就阻塞在前面
self.tg.wait()
def restart(self):
"""Reset services and start them in new threads."""
self.stop()
self.done = event.Event()
for restart_service in self.services:
restart_service.reset()
# 这里最终会spawn
# 实例的event会作为参数传入
# 也就是说service.start()完成后
# 调用event.wait()
self.tg.add_thread(self.run_service, restart_service, self.done)
@staticmethod
def run_service(service, done):
# 这里调用了service的start方法
try:
service.start()
except Exception:
LOG.exception(_LE('Error starting thread.'))
raise SystemExit(1)
else:
# run_service就是tg里的绿色线程
# 具体的阻塞位置就是这里
done.wait()
# 我们再来看ThreadGroup是什么
class ThreadGroup(object):
def __init__(self, thread_pool_size=10):
# eventlet 绿色线程的池在这里初始化
self.pool = greenpool.GreenPool(thread_pool_size)
self.threads = []
self.timers = []
def add_thread(self, callback, *args, **kwargs):
# 也就是说前面的add方法最终调用了
# 绿色线程池的spawn方法
# 也就是说,这里开始,绿色线程池开始让run_service
# 也就是service的start开始被调用
# 返回的gt是一个绿色线程
gt = self.pool.spawn(callback, *args, **kwargs)
# 这个Thread类是用于关联绿色线程和线程组的
# Thread初始化的时候会调用绿色线程的link方法
# link方法用于绿色线程结束后回调一个exit函数
th = Thread(gt, self)
self.threads.append(th)
return th
def wait(self):
# 这里是ThreadGroup自己的定时器
# 需要使用ThreadGroup的时候自行添加
# 目前没看到哪里有用
for x in self.timers:
try:
x.wait()
except eventlet.greenlet.GreenletExit: # nosec
# greenlet exited successfully
pass
except Exception:
LOG.exception(_LE('Error waiting on timer.'))
# wait调用这里
self._perform_action_on_threads(
lambda x: x.wait(),
lambda x: LOG.exception(_LE('Error waiting on thread.')))
def _perform_action_on_threads(self, action_func, on_error_func):
current = threading.current_thread()
# threads中全部是Thread的实例
for x in self.threads[:]:
# 表明这个绿色线程是当前线程,跳过
if x.ident == current.ident:
continue
try:
# 这里就调用了Thread.wait()
# Thread.wait()
action_func(x)
except eventlet.greenlet.GreenletExit: # nosec
# 抛出的异常是GreenletExit
# 说明是绿色线程已经正常结束
pass
except Exception:
# 上面lamdba里可以看出这里是调用LOG做了记录
on_error_func(x)
# 我们来看Thread是什么
class Thread(object):
def __init__(self, thread, group):
self.thread = thread
# 调用绿色线程的link函数
# 绿色线程的link函数用于所执行函数退出的时候回调用
# 相当于绿色线程结束的时候调用
# _on_thread_done(thread, group, self)
# _on_thread_done是把当前Thread从ThreadGroup中移除
self.thread.link(_on_thread_done, group, self)
# 每个绿色线程的id
self._ident = id(thread)
@property
def ident(self):
return self._ident
def stop(self):
self.thread.kill()
def wait(self):
# 这里最终调用到绿色线程的wait函数
# 这里的绿色线程是封装过的GreenThread
return self.thread.wait()
def _on_thread_done(_greenthread, group, thread):
# 参数来源参考GreenThread._resolve_links
# 第一个参数是调用这个函数的绿色线程
# 第二个参数是当前ThreadGroup,link的时候传入的参数
# 第三个参数是当前Thread, 也是link的时候传入的
# 作用是把当前Thread从ThreadGroup里移除
group.thread_done(thread)
接下来就是复杂的openstack里eventlet的使用