Python多任务多线程任务管理类详解编程语言

task_manage.py  

#coding=utf-8 
import threading 
import logging 
import time 
 
# 多任务多线程任务管理类 
class task_manage(): 
    # name 任务名称 
    # task_func 任务函数指针 
    # task_args 任务函数参数,默认空 
    # check_func 任务是否执行完检测函数指针,返回True表示还有任务没有执行,默认调用自身的runtimes_control函数 
    # check_args 任务检测函数指针参数,默认空 
    # max_thread_count 最大线程数 
    # run_time 任务运行次数,设置check_func后,该参数无效 
    def __init__(self,name,task_func,task_args=(),check_func=None,check_args=(),max_thread_count=1,run_time=1): 
        self._is_finished = False 
        self._task_func = task_func 
        self._task_args = task_args 
        self._max_thread_count = max_thread_count 
        self._threads = [] 
        self._name = name 
        self._check_args = check_args 
        self._run_time = run_time 
        self._task_index = 0 
        if check_func is None: 
            self._check_func = self.runtimes_control 
            self._check_args = () 
        else: 
            self._check_func = check_func 
    # 任务运行次数控制函数 
    def runtimes_control(self): 
        if self._run_time > 0: 
            self._run_time -= 1 
            return True 
        return False 
    # 清除已退出线程 
    def clear_exit_threads(self): 
        for t in self._threads[:]: 
            if not t.is_alive() : 
                self._threads.remove(t) 
    # 运行任务 
    def run(self): 
        while(len(self._threads)<self._max_thread_count and not self._is_finished): 
            if self._check_func(*self._check_args): 
                t = threading.Thread(target=self._task_func,args=self._task_args) 
                self._threads.append(t) 
                t.setDaemon(True) 
                t.start() 
                self._task_index += 1 
                logging.debug("%s run %s" %(self._name,self._task_index)) 
            else: 
                self._is_finished = True 
                break 
    # 对外接口,检测是否所有任务都执行完成 
    def is_finish(self): 
        self.clear_exit_threads() 
        self.run() 
        return self._is_finished and len(self._threads)==0 
 
 
def run_task_until_all_finished(manages=[]): 
    while True: 
        all_finished = True 
        for manage in manages: 
            all_finished = manage.is_finish() and all_finished 
        if all_finished : break 
        time.sleep(1) 
    logging.debug('all have finished!') 

task_test.py           

#coding=utf-8 
 
from base.task_manage import * 
 
def watch_movie(move_name): 
    print("I am watching movie %s now." %(move_name)) 
 
def listen_music(music_name): 
    print("I am listening music %s now." %(music_name)) 
 
 
manages=[] 
manages.append( task_manage(name="watching movie",task_func=watch_movie,task_args=(u"捉妖记",),run_time=10,max_thread_count=3)) 
manages.append( task_manage(name="listening music",task_func=listen_music,task_args=(u"笨小孩",),run_time=10,max_thread_count=3)) 
 
run_task_until_all_finished(manages) 
print('all have finished!') 
quit() 
 

[Python]代码    

运行结果: 
I am watching movie 捉妖记 now. 
I am watching movie 捉妖记 now. 
I am watching movie 捉妖记 now. 
I am listening music 笨小孩 now. 
I am listening music 笨小孩 now. 
I am listening music 笨小孩 now. 
I am watching movie 捉妖记 now. 
I am watching movie 捉妖记 now. 
I am watching movie 捉妖记 now. 
I am listening music 笨小孩 now. 
I am listening music 笨小孩 now. 
I am listening music 笨小孩 now. 
I am watching movie 捉妖记 now. 
I am watching movie 捉妖记 now. 
I am watching movie 捉妖记 now. 
I am listening music 笨小孩 now. 
I am listening music 笨小孩 now. 
I am listening music 笨小孩 now. 
I am watching movie 捉妖记 now. 
I am listening music 笨小孩 now. 
all have finished!

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/8105.html

(0)
上一篇 2021年7月18日
下一篇 2021年7月18日

相关推荐

发表回复

登录后才能评论