#coding=utf-8 import threading import logging import time # 多任务多线程任务管理类 class task_manage(): # name 任务名称 # task_func 任务函数指针 # task_args 任务函数参数,默认空 # check_func 任务是否执行完检测函数指针,返回True表示还有任务没有执行,默认调用自身的runtimes_control函数 # check_args 任务检测函数指针参数,默认空 # max_thread_count 最大线程数 # run_time 任务运行次数,设置check_func后,该参数无效 def __init__(self,name,task_func,task_args=(),check_func=None,check_args=(),max_thread_count=1,run_time=1): self._is_finished = False self._task_func = task_func self._task_args = task_args self._max_thread_count = max_thread_count self._threads = [] self._name = name self._check_args = check_args self._run_time = run_time self._task_index = 0 if check_func is None: self._check_func = self.runtimes_control self._check_args = () else: self._check_func = check_func # 任务运行次数控制函数 def runtimes_control(self): if self._run_time > 0: self._run_time -= 1 return True return False # 清除已退出线程 def clear_exit_threads(self): for t in self._threads[:]: if not t.is_alive() : self._threads.remove(t) # 运行任务 def run(self): while(len(self._threads)<self._max_thread_count and not self._is_finished): if self._check_func(*self._check_args): t = threading.Thread(target=self._task_func,args=self._task_args) self._threads.append(t) t.setDaemon(True) t.start() self._task_index += 1 logging.debug("%s run %s" %(self._name,self._task_index)) else: self._is_finished = True break # 对外接口,检测是否所有任务都执行完成 def is_finish(self): self.clear_exit_threads() self.run() return self._is_finished and len(self._threads)==0 def run_task_until_all_finished(manages=[]): while True: all_finished = True for manage in manages: all_finished = manage.is_finish() and all_finished if all_finished : break time.sleep(1) logging.debug('all have finished!')