Go语言中提供了两种定时器 timer 和 ticker,分别是一次性定时器和重复任务定时器。本节咱们主要介绍如何使用Go语言的定时器实现一个任务队列,非常具有实用价值。
Go语言中定时器
一般用法:
package main import( "fmt" "time" ) func main() { input := make(chan interface{}) //producer - produce the messages go func() { for i := 0; i < 5; i++ { input <- i } input <- "hello, world" }() t1 := time.NewTimer(time.Second * 5) t2 := time.NewTimer(time.Second * 10) for { select { //consumer - consume the messages case msg := <-input: fmt.Println(msg) case <-t1.C: println("5s timer") t1.Reset(time.Second * 5) case <-t2.C: println("10s timer") t2.Reset(time.Second * 10) } } }
上面代码中的这个 C 是啥呢,我们去源码看看,以 timer 为例:
type Timer struct {
C <-chan Time
r runtimeTimer
}
原来是一个 channel,其实有 GO 基础的都知道,GO 的运算符当出现的 -> 或者 <- 的时候,必然是有一端是指 channel。按照上面的例子来看,就是阻塞在一个 for 循环内,等待到了定时器的 C 从 channel 出来,当获取到值的时候,进行想要的操作。
设计我们的定时任务队列
当时的需求是这样,需要接收到客户端的请求并产生一个定时任务,会在固定时间执行,可能是一次,也可能是多次,也可能到指定时间自动停止,可能当任务终止的时候,还要能停止掉。
具体的流程如下图所示:
定义结构
type OnceCron struct { tasks []*Task //任务的列队 add chan *Task //当遭遇到新任务的时候 remove chan string //当遭遇到删除任务的时候 stop chan struct{} //当遇到停止信号的时候 Logger *log.Logger //日志 } type Job interface { Run() //执行接口 } type Task struct { Job Job //要执行的任务 Uuid string //任务标识,删除时用 RunTime int64 //执行时间 Spacing int64 //间隔时间 EndTime int64 //结束时间 Number int //总共要次数 }
队列实现
首先,我们要获得一个队列任务
func NewCron() *OnceCron
常规操作,为了节省篇幅,就不写出来,具体可以看源码,贴在了底部。
然后,开始定时器队列的运行,一般,都会命名为 Start。那么就有一个问题,我们刚开始启动程序的时候,这个时候是没有任务队列,那岂不是 for{select{}} 在等待个毛毛球?所以,我们需要在 Start 的时候添加一个默认的任务,防止队列退出。
func (one *OnceCron) Start() { //初始化的时候加入一个一年的长定时器,间隔1小时执行一次 task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour*24*365).Unix() , func() { log.Println("It's a Hour timer!") }) //为了代码格式 markdown 里面有个括号改成全角了 one.tasks = append(one.tasks, task) go one.run() //协成执行 防止主进程被阻塞 }
执行部分应该是重点的,分成三部:
- 首先获得一个最先执行的任务
- 然后产生一个定时器,用于执行任务
- 进行阻塞判断,获取我们要进行的操作
package main import ( "time" "log" "github.com/google/uuid" "os" ) //compatible old name type OnceCron struct { *TaskScheduler } //only exec cron timer cron type TaskScheduler struct { tasks []TaskInterface swap []TaskInterface add chan TaskInterface remove chan string stop chan struct{} Logger TaskLogInterface lock bool } type Lock interface { Lock() UnLock() } //return old name with OnceCron func NewCron() *OnceCron { return &OnceCron{ TaskScheduler:NewScheduler(), } } //return a Controller Scheduler func NewScheduler() *TaskScheduler { return &TaskScheduler{ tasks: make([]TaskInterface, 0), swap: make([]TaskInterface, 0), add: make(chan TaskInterface), stop: make(chan struct{}), remove: make(chan string), Logger: log.New(os.Stdout, "[Control]: ", log.Ldate|log.Ltime|log.Lshortfile), lock: false, } } //add spacing time job to list with number func (scheduler *TaskScheduler) AddFuncSpaceNumber(spaceTime int64, number int, f func()) { task := getTaskWithFuncSpacingNumber(spaceTime, number, f) scheduler.addTask(task) } //add spacing time job to list with endTime func (scheduler *TaskScheduler) AddFuncSpace(spaceTime int64, endTime int64, f func()) { task := getTaskWithFuncSpacing(spaceTime, endTime, f) scheduler.addTask(task) } //add func to list func (scheduler *TaskScheduler) AddFunc(unixTime int64, f func()) { task := getTaskWithFunc(unixTime, f) scheduler.addTask(task) } func (scheduler *TaskScheduler) AddTaskInterface(task TaskInterface) { scheduler.addTask(task) } //add a task to list func (scheduler *TaskScheduler) AddTask(task *Task) string { if task.RunTime != 0 { if task.RunTime < 100000000000 { task.RunTime = task.RunTime * int64(time.Second) } if task.RunTime < time.Now().UnixNano() { //延遲1秒 task.RunTime = time.Now().UnixNano() + int64(time.Second) } } else { if task.Spacing > 0 { task.RunTime = time.Now().UnixNano() + task.Spacing * int64(time.Second) }else{ scheduler.Logger.Println("error too add task! Runtime error") return "" } } if task.Uuid == "" { task.Uuid = uuid.New().String() } return scheduler.addTask(task) } //if lock add to swap func (scheduler *TaskScheduler) addTask(task TaskInterface) string { if scheduler.lock { scheduler.swap = append(scheduler.swap, task) scheduler.add <- task } else{ scheduler.tasks = append(scheduler.tasks, task) scheduler.add <- task } return task.GetUuid() } //new export func (scheduler *TaskScheduler) ExportInterface() []TaskInterface { return scheduler.tasks } //compatible old export tasks func (scheduler *TaskScheduler) Export() []*Task { task := make([]*Task,0) for _,v := range scheduler.tasks { task = append(task, v.(*Task)) } return task } //stop task with uuid func (scheduler *TaskScheduler) StopOnce(uuidStr string) { scheduler.remove <- uuidStr } //run Cron func (scheduler *TaskScheduler) Start() { //初始化的时候加入一个一年的长定时器,间隔1小时执行一次 task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour * 24 * 365).UnixNano(), func() { log.Println("It's a Hour timer!") }) scheduler.tasks = append(scheduler.tasks, task) go scheduler.run() } //stop all func (scheduler *TaskScheduler) Stop() { scheduler.stop <- struct{}{} } //run task list //if is empty, run a year timer task func (scheduler *TaskScheduler) run() { for { now := time.Now() task, key := scheduler.GetTask() runTime := task.GetRunTime() i64 := runTime - now.UnixNano() var d time.Duration if i64 < 0 { scheduler.tasks[key].SetRuntime(now.UnixNano()) if task != nil { go task.RunJob() } scheduler.doAndReset(key) continue } else { sec := runTime / int64(time.Second) nsec := runTime % int64(time.Second) d = time.Unix(sec, nsec).Sub(now) } timer := time.NewTimer(d) //catch a chan and do something for { select { //if time has expired do task and shift key if is task list case <-timer.C: scheduler.doAndReset(key) if task != nil { //fmt.Println(scheduler.tasks[key]) go task.RunJob() timer.Stop() } //if add task case <-scheduler.add: timer.Stop() // remove task with remove uuid case uuidstr := <-scheduler.remove: scheduler.removeTask(uuidstr) timer.Stop() //if get a stop single exit case <-scheduler.stop: timer.Stop() return } break } } } //return a task and key In task list func (scheduler *TaskScheduler) GetTask() (task TaskGetInterface, tempKey int) { scheduler.Lock() defer scheduler.UnLock() min := scheduler.tasks[0].GetRunTime() tempKey = 0 for key, task := range scheduler.tasks { tTime := task.GetRunTime() if min <= tTime { continue } if min > tTime { tempKey = key min = tTime continue } } task = scheduler.tasks[tempKey] return task, tempKey } //if add a new task and runtime < now task runtime // stop now timer and again func (scheduler *TaskScheduler) doAndReset(key int) { scheduler.Lock() defer scheduler.UnLock() //null pointer if key < len(scheduler.tasks) { nowTask := scheduler.tasks[key] scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...) if nowTask.GetSpacing() > 0 { tTime := nowTask.GetRunTime() nowTask.SetRuntime(nowTask.GetSpacing() * int64(time.Second) + tTime) number := nowTask.GetRunNumber() if number > 1 { nowTask.SetRunNumber(number - 1) scheduler.tasks = append(scheduler.tasks, nowTask) } else if nowTask.GetEndTime() >= tTime { scheduler.tasks = append(scheduler.tasks, nowTask) } } } } //remove task by uuid func (scheduler *TaskScheduler) removeTask(uuidStr string) { scheduler.Lock() defer scheduler.UnLock() for key, task := range scheduler.tasks { if task.GetUuid() == uuidStr { scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...) break } } } //lock task [] func (scheduler *TaskScheduler) Lock() { scheduler.lock = true } //unlock task [] func (scheduler *TaskScheduler) UnLock() { scheduler.lock = false if len(scheduler.swap) > 0 { for _, task := range scheduler.swap { scheduler.tasks = append(scheduler.tasks, task) } scheduler.swap = make([]TaskInterface, 0) } }
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/23123.html