The introduction
- There are many times when we need to deal with tasks on a regular basis, so today we are going to learn about plugging into the GIN framework
- Choose github.com/robfig/cron/v3 reason making star 6.7 k, the author has been maintained
- Common writing
- The source code interpretation
- How do I ensure that when I close/restart, I can continue the shutdown/restart only after the running tasks have completed
- Case Code address
1. Quick use
go mod init github.com/18211167516/go-lib/cron/rebfig_cron
Copy the code
Create the main.go file
package main
import (
"fmt"
"time"
"github.com/robfig/cron/v3"
)
type testJob struct{}
Type Job interface {Run()}
func (t testJob) Run(a) {
fmt.Println("i.m test job")}func main(a) {
c := cron.New()
c.AddFunc("@every 1s".func(a) {
fmt.Println("tick every 1 second")
})
c.AddJob("* * * * *", testJob{})
c.Start()
select{}}Copy the code
- Create a Corn object to manage scheduled tasks
- Run the addFunc command to add a scheduled task
- The Start method starts a scheduled task. Start a new Goroutine
- Select {} to prevent the primary goroutine from exiting
2, New() option
It actually returns type Option func(*Cron)
There are currently five built-in options
2.1 WithLocation Specifies the time zone
loc,_ := time.LoadLocation("America/Los_Angeles")
c := cron.New(cron.WithLocation(loc))
Copy the code
2.2 WithSeconds supports granularity to the second level (default is minute level like crontab)
In fact, WithSeconds is implemented using WithParser
c:=cron.New(cron.WithSeconds())
// Execute every 2 seconds
c.AddFunc("*/2 * * * * *".func(a) {
file, _ := os.OpenFile("log.txt", os.O_APPEND|os.O_CREATE, 0755)
defer file.Close()
fmt.Println("test 11")
file.Write([]byte("test 111\r\n"))})Copy the code
2.3 WithParser uses a custom parser
Type ScheduleParser interface {
Parse(spec string) (Schedule, error)
Copy the code
}
paeser:= cron.NewParser(
cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)
c := cron.New(cron.WithParser(paeser))
c.AddFunc("1 * * * * *".func (a) {
fmt.Println("every 1 second")})Copy the code
2.4 WithChain Job Wrapper (Job Middleware)
- By default, there are three middleware
Recover Capture generated by internal jobspanic; When DelayIfStillRunning is triggered, if the last task has not been completed (it takes too long), the SkipIfStillRunning is executed after the last task is completed. When DelayIfStillRunning is triggered, the SkipIfStillRunning is skipped if the last task has not been completedCopy the code
- use
type testJob struct{} func (t testJob) Run() { panic("test job") //fmt.Println("i.m test job") } logger := cron.VerbosePrintfLogger(log.New(io.MultiWriter(f, os.Stdout), "cron: ", New(cron.withchain (cron.recover (logger))) c.addFunc("* * * * * *",func(){ C.A ddJob("@every 1s", cron.newchain (cron.recover (cron.defaultlogger)).then (testJob{}))Copy the code
- The main implementation
// The middleware executes the specific Job first
func (c Chain) Then(j Job) Job {
for i := range c.wrappers {
j = c.wrappers[len(c.wrappers)-i- 1](j)
}
return j
}
Copy the code
2.5 WithLogger User-defined log Logger
Look at the default Logger first
var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
func PrintfLogger(l interface{ Printf(string.interface{}) }) Logger {
return printfLogger{l, false}}func VerbosePrintfLogger(l interface{ Printf(string.interface{}) }) Logger {
return printfLogger{l, true}}type printfLogger struct {
logger interface{ Printf(string.interface{}) }
logInfo bool
}
func (pl printfLogger) Info(msg string, keysAndValues ...interface{}) {
if pl.logInfo {
keysAndValues = formatTimes(keysAndValues)
pl.logger.Printf(
formatString(len(keysAndValues)),
append([]interface{}{msg}, keysAndValues...) ...). }}func (pl printfLogger) Error(err error, msg string, keysAndValues ...interface{}) {
keysAndValues = formatTimes(keysAndValues)
pl.logger.Printf(
formatString(len(keysAndValues)+2),
append([]interface{}{msg, "error", err}, keysAndValues...) ...). }Copy the code
You can customize the Logger as long as you implement the Logger interface
type Logger interface {
// Info logs routine messages about cron's operation.
Info(msg string, keysAndValues ...interface{})
// Error logs an error condition.
Error(err error, msg string, keysAndValues ...interface{})}Copy the code
// Dual-write console and file log
f, _ := os.Create("cron.log")
c := cron.New(cron.WithSeconds(), cron.WithLogger(
cron.VerbosePrintfLogger(log.New(io.MultiWriter(f, os.Stdout), "cron: ", log.LstdFlags))))
c.AddFunc("*/2 * * * * *".func(a) {
fmt.Println("test 11")})Copy the code
3. Create tasks
3.1 addFunc ()
c := cron.New()
c.AddFunc("@every 1s".func(a) {
fmt.Println("tick every 1 second")
})
c.Start()
Copy the code
3.2 addJob ()
type testJob struct{}
Type Job interface {Run()}
func (t testJob) Run(a) {
fmt.Println("i.m test job")
}
c.AddJob("* * * * *", testJob{})
c.Start()
Copy the code
3.3 Source Code Interpretation
Step1 AddFunc is implemented based on addJob
func (c *Cron) AddFunc(spec string, cmd func(a)) (EntryID, error) {
return c.AddJob(spec, FuncJob(cmd))
}
// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
// Parse the time format
schedule, err := c.parser.Parse(spec)
iferr ! =nil {
return 0, err
}
return c.Schedule(schedule, cmd), nil
}
Copy the code
step2
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
/ / concurrent lock
c.runningMu.Lock()
defer c.runningMu.Unlock()
/ / on the ID
c.nextID++
entry := &Entry{
ID: c.nextID,
Schedule: schedule,
WrappedJob: c.chain.Then(cmd),
Job: cmd,
}
// Add a new task when the service is already started
if! c.running { c.entries =append(c.entries, entry)
} else {
c.add <- entry
}
return entry.ID
}
Copy the code
Step3 finally run c.start (), the core of which is the cron,run() method
- Tasks are traversed to obtain the next execution time of each task
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
c.logger.Info("schedule"."now", now, "entry", entry.ID, "next", entry.Next)
}
Copy the code
- Sort start timer
// Wireless loop
for {
// Sort tasks
sort.Sort(byTime(c.entries))
var timer *time.Timer
// Set timer (how long to execute)
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
timer = time.NewTimer(100000 * time.Hour)
} else {
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
}
Copy the code
- Listen to the channel
For {select {case now = < -timer.c: now = now.In(c.location) c.logger.Info("wake", "now", Now) // Run every entry whose next time was less than now E := range c. tries {// if the first execution time is less than the current time or time zero, , jump out not executed if e.N ext After (now) | | e.N ext IsZero () {break} / / perform Job c.s. tartJob wrapper (e.W rappedJob) / / update e.P rev = e.N ext e.N ext = e.Schedule.Next(now) c.logger.Info("run", "now", now, "entry", e.ID, "next", Case newEntry := < -c.dd: timer.Stop() now = c.now() newEntry.Next = newEntry.Schedule.Next(now) c.entries = append(c.entries, newEntry) c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", Newentry.next) // case replyChan := <-c.snapshot: replyChan <-c.snapshot () continue // Service stop case <-c.stop: // Stop the service timer.stop () c.logger.info (" Stop ") return // Case ID := < -c.emove: timer.Stop() now = c.now() c.removeEntry(id) c.logger.Info("removed", "entry", id) } break }Copy the code
- time.Stop()
Use sync.waitGroup to do this
func (c *Cron) Stop(a) context.Context {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
c.stop <- struct{}{}
c.running = false
}
// Background blocking blocks code
ctx, cancel := context.WithCancel(context.Background())
go func(a) {
// The counter is 0
c.jobWaiter.Wait()
// Unblock
cancel()
}()
return ctx
}
Copy the code
- Perform a task
func (c *Cron) startJob(j Job) {
// Add one to the counter
c.jobWaiter.Add(1)
go func(a) {
defer c.jobWaiter.Done()
j.Run()
}()
}
Copy the code
4. Time format
- Predefined format
@yearly: can also be written @annually, which means 0 on the first day of the year. That's the same thing as 0, 0, 1, 1 times; @Monthly: Zero on the first day of each month. That's the same thing as 0, 0, 1 * *; @weekly: zero on the first day of the week. Note that the first day is Sunday, which is the end of Saturday and the beginning of Sunday at zero. That's the same thing as 0, 0 * * 0; @daily: Can also be written @midnight, which means 0 o 'clock every day. That's the same thing as 0, 0 * * *; Hourly: indicates the start of an hour. This is equal to 0 * * * *.Copy the code
- Fixed interval scheme
@every <duration>"@every 1s".func(a) {
fmt.Println("test 111")
})
c.AddFunc(fmt.Sprint("@every ", time.Duration(1)*time.Second), func(a) {
fmt.Println("test 222")})Copy the code
- Customize the time format
The default support5Bit to minute level (equivalent to crontab)"* * * * *".func (a) {
fmt.Println("Every 1 minute")})/ / support
c := cron.New(cron.WithSeconds())
c.AddFunc("* * * * * *".func (a) {
fmt.Println("Every 1 second")})Copy the code
5. To summarize
The sparrow is small all-sided, because golang features, natural support thread safety, ensure the integrity of their mission If consider task persistence layer also pull away, may be more conducive to extend distributed in the future, three middleware personally think that should be combined with the default (I think no one is willing to because a task error that terminate all task)
6, reference
- Go Daily library of CRon
- Time package usage guide
- cron
7. Series of articles
- Serial set up a Golang environment
- Serial 2 Install Gin
- Serial three defines the directory structure
- Serial four build case API1
- Serial five build case API2
- Serialized six access Swagger interface documents
- Serialize seven log components
- Serial eight graceful restart and stop
- Serialize the external Makefile build
- Serialize other Cron scheduled tasks
- Serialized content to create command-line tools
- 3 days to build exclusive Cache(First day)
- 3 days to build exclusive Cache(Second day)
- Third Day: Creating a dedicated Cache