* 在信息整合、漏洞扫描等平台的开发过程中,经常会遇到定时任务执行的需求,如Windows的定时任务计划和Linux的crontab都是为了这类需求而产生的。本文对GitHub上一个Go版的cron包进行分析,带大家熟悉一下它的具体实现。
先来看看Cron
的整体定义:
type Cron struct {
entries []*Entry
stop chan struct{}
add chan *Entry
snapshot chan []*Entry
running bool
ErrorLog *log.Logger
location *time.Location
}
从它的running
、add
和stop
字段定义可以看出来,这个Cron
对象可以同时对多个定时任务进行管理。
我们先不管其他的自定义类型,继续往下看。
默认的初始化函数New()
会带上时区调用NewWithLocation()
创建一个Cron
:
func NewWithLocation(location *time.Location) *Cron {
return &Cron{
entries: nil,
add: make(chan *Entry),
stop: make(chan struct{}),
snapshot: make(chan []*Entry),
running: false,
ErrorLog: nil,
location: location,
}
}
其中的Entry
包含了任务和它的计划时间信息:
type Entry struct {
Schedule Schedule
Next time.Time
Prev time.Time
Job Job
}
Schedule
是一个接口,用于计算下次执行时间:
type Schedule interface {
Next(time.Time) time.Time
}
Cron
对象定义了Start()
和Run()
两个方法用于启动,区别仅在于Start()
是多线程启动的,它们都会在内部调用run()
。
run()
先计算每个任务的下次执行时间:
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
}
然后在无限循环中,先对当前所有Entry
按下次执行时间排序,以最先执行的任务时间设置计时器time.Timer
(由于它利用动态计算下次执行时间的方案对执行时间进行精细控制,因此并未使用time.Ticker
对象) :
for {
sort.Sort(byTime(c.entries))
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
timer = time.NewTimer(100000 * time.Hour)
} else {
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
// ...
}
当前不存在任务时,它给了一个足够长的时间进入『休眠』。
再使用select
监听各个通道的信号量,由于timer.C
、add
和stop
信号量在计时器资源上存在竞争,本来是不需要使用外部循环来处理并发信号量的,外部循环是为了保证snapshot
之后,仍然会处理其他信号量:
for {
select {
case now = <-timer.C:
// ...
case newEntry := <-c.add:
// ...
case <-c.snapshot:
// ...
case <-c.stop:
// ...
}
break
}
当run()
中的timer.C
通道接收到值后,遍历任务列表,多线程执行所有下次执行时间在当前时间之前的任务:
case now = <-timer.C:
now = now.In(c.location)
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
go c.runWithRecovery(e.Job)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
}
AddFunc()
会将传入的任务执行函数包裹一个Job
接口标准的Run()
函数,再调用AddJob()
按照规范解析表达式字符串获得Schedule
对象:
schedule, err := Parse(spec)
其中解析对象Parser
定义如下:
type Parser struct {
options ParseOption
optionals int
}
options
是解析选项标识位,默认解析器为:
var defaultParser = NewParser(
Second | Minute | Hour | Dom | Month | DowOptional | Descriptor,
)
optionals
是可选项数量,目前只计算了DowOptional
:
optionals := 0
if options&DowOptional > 0 {
options |= Dow
optionals++
}
解析过程简单来说分成了两种情况:
- 当判断表达式第一个字符为
@
时,按Descriptor
解析 (如@yearly
、@annually
、@monthly
、@weekly
、@daily
、@midnight
、@hourly
和@every [duration]
) - 否则正常解析,细节略
解析得到的Schedule
和任务一起封装成Entry
,如果Cron
未启动,则将它直接放入任务列表中,否则扔进add
通道:
if !c.running {
c.entries = append(c.entries, entry)
return
}
c.add <- entry
当run()
中的add
通道接收到值后,停止计时器,计算当前新任务的下次执行时间,将它放入任务列表中 (随后执行的下次循环,将会对所有任务重新排序) :
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
如果Cron
已启动,则给snapshot
通道放进一个信号 (空值nil
当然也可以作为信号量) 阻塞等待执行结果,否则直接调用entrySnapshot()
返回所有Entry
的副本:
func (c *Cron) Entries() []*Entry {
if c.running {
c.snapshot <- nil
x := <-c.snapshot
return x
}
return c.entrySnapshot()
}
当run()
中的snapshot
通道接收到值后,将entrySnapshot()
的执行结果又重新放入snapshot
中还回去:
case <-c.snapshot:
c.snapshot <- c.entrySnapshot()
continue
将一个空结构体作为信号量放入stop
通道中即可通知上述run()
中断执行:
func (c *Cron) Stop() {
if !c.running {
return
}
c.stop <- struct{}{}
c.running = false
}
当stop
通道接收到值后,停止计时器并退出:
case <-c.stop:
timer.Stop()
return
}
整体来说,这个包的结构比较清晰,并且正确使用了Go的Chan
机制来避免并发环境中大量的锁开销。在它的设计基础上还可以比较方便的修改成其他类型的轻量级任务管理包,具备一定的参考价值。