程序用了老早的一个ThreadPool模块,站点用的uwsgi方式运行,运行时间久了以后发现有时候会出现无法分配线程
问题肯定在于线程join后依然没有释放,研究了下thread的线程的销毁
首先解决一个语法问题,我来看
_shutdown = _MainThread()._exitfunc
class _MainThread(Thread):
def _exitfunc(self):
....
# Thread中没有_Thread__delete这个方法
self._Thread__delete()
原因在于
class A(object):
def _internal_use(self):
pass
def __method_name(self):
pass
dir(A())
['_A__method_name', ..., '_internal_use']
# 也就是说
# _Thread__delete() 就是 __delete()
一个不错的关于下划线的文章参考
然后,代码中没有找到调用delete方法销毁线程的地方,这也是我们最关心的
找到一篇相关文章
大致就是
谁调用shutdown函数的呢?
当python要销毁运行时之前肯定会调用
所以打开pythonrun.c,你会发现如下函数
/* Wait until threading._shutdown completes, provided
the threading module was imported in the first place.
The shutdown routine will wait until all non-daemon
"threading" threads have completed. */
static void
wait_for_thread_shutdown(void)
{
#ifdef WITH_THREAD
PyObject *result;
PyThreadState *tstate = PyThreadState_GET();
PyObject *threading = PyMapping_GetItemString(tstate->interp->modules,
"threading");
if (threading == NULL) {
/* threading not imported */
PyErr_Clear();
return;
}
result = PyObject_CallMethod(threading, "_shutdown", "");
if (result == NULL)
PyErr_WriteUnraisable(threading);
else
Py_DECREF(result);
Py_DECREF(threading);
#endif
}
既然python的虚拟机是这样调用的,所以我们手动销毁线程也就跟着写就好了
下面是改动过的ThreadPool的线程join方法
def dismissWorkers(self, num_workers, do_join=False):
"""Tell num_workers worker threads to quit after their current task."""
dismiss_list = []
for i in range(min(num_workers, len(self.workers))):
worker = self.workers.pop()
worker.dismiss()
dismiss_list.append(worker)
if do_join:
while dismiss_list:
worker = dismiss_list.pop()
worker.join()
# 清理线程
worker._Thread__delete()
del worker
else:
self.dismissedWorkers.extend(dismiss_list)
def joinAllDismissedWorkers(self):
"""Perform Thread.join() on all worker threads that have been dismissed.
"""
while self.dismissedWorkers:
worker = self.dismissedWorkers.pop()
worker.join()
# 清理线程
worker._Thread__delete()
del worker
经过测试好像无效…..在创建线程的地方捕获下错误
def createWorkers(self, num_workers, poll_timeout=5):
for i in range(num_workers):
err_count = 0
while True:
try:
self.workers.append(WorkerThread(self._requests_queue,
self._results_queue,
poll_timeout=poll_timeout))
err_count = 0
break
except:
err_count += 1
if err_count>=5:
del self.workers[:]
raise
time.sleep(0.3)
time.sleep(0.1)