WIP update deps, sql builder instead of jet

This commit is contained in:
2025-03-10 10:29:18 -04:00
parent cb3b1a429c
commit 13747c2118
87 changed files with 5208 additions and 2523 deletions

View File

@@ -20,10 +20,9 @@ issues:
linters:
enable:
- bodyclose
- exportloopref
- copyloopvar
- gofumpt
- goimports
- gosec
- gosimple
- govet
- ineffassign
@@ -39,6 +38,5 @@ output:
- format: colored-line-number
print-issued-lines: true
print-linter-name: true
uniq-by-line: true
path-prefix: ""
sort-results: true

View File

@@ -12,7 +12,7 @@ repos:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/golangci/golangci-lint
rev: v1.61.0
rev: v1.64.5
hooks:
- id: golangci-lint
- repo: https://github.com/TekWizely/pre-commit-golang

View File

@@ -16,7 +16,7 @@ test_coverage:
@go test -race -v $(GO_FLAGS) -count=1 -coverprofile=coverage.out -covermode=atomic $(GO_PKGS)
test_ci:
@TEST_ENV=ci go test -race -v $(GO_FLAGS) -count=1 $(GO_PKGS)
@go test -race -v $(GO_FLAGS) -count=1 $(GO_PKGS)
mocks:
@go generate ./...

View File

@@ -170,11 +170,20 @@ We appreciate the support for free and open source software!
This project is supported by:
[Jetbrains](https://www.jetbrains.com/?from=gocron)
![JetBrains logo](https://resources.jetbrains.com/storage/products/company/brand/logos/jetbrains.png)
[Sentry](https://sentry.io/welcome/)
<p align="left">
<p align="left">
<a href="https://sentry.io/?utm_source=github&utm_medium=logo" target="_blank">
<img src="https://sentry-brand.storage.googleapis.com/sentry-wordmark-dark-280x84.png" alt="Sentry" width="280" height="84" />
</a>
</p>
</p>
## Star History
[![Star History Chart](https://api.star-history.com/svg?repos=go-co-op/gocron&type=Date)](https://star-history.com/#go-co-op/gocron&Date)

View File

@@ -31,6 +31,9 @@ type executor struct {
// used to request jobs from the scheduler
jobOutRequest chan jobOutRequest
// sends out job needs to update the next runs
jobUpdateNextRuns chan uuid.UUID
// used by the executor to receive a stop signal from the scheduler
stopCh chan struct{}
// the timeout value when stopping
@@ -247,6 +250,14 @@ func (e *executor) sendOutForRescheduling(jIn *jobIn) {
jIn.shouldSendOut = false
}
func (e *executor) sendOutForNextRunUpdate(jIn *jobIn) {
select {
case e.jobUpdateNextRuns <- jIn.id:
case <-e.ctx.Done():
return
}
}
func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: limitModeRunner starting", "name", name)
for {
@@ -376,6 +387,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
e.sendOutForNextRunUpdate(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
@@ -385,6 +397,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
e.sendOutForNextRunUpdate(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()

View File

@@ -6,13 +6,13 @@ import (
"errors"
"fmt"
"math/rand"
"slices"
"strings"
"time"
"github.com/google/uuid"
"github.com/jonboulle/clockwork"
"github.com/robfig/cron/v3"
"golang.org/x/exp/slices"
)
// internalJob stores the information needed by the scheduler
@@ -24,6 +24,7 @@ type internalJob struct {
id uuid.UUID
name string
tags []string
cron Cron
jobSchedule
// as some jobs may queue up, it's possible to
@@ -104,6 +105,20 @@ type limitRunsTo struct {
runCount uint
}
// -----------------------------------------------
// -----------------------------------------------
// --------------- Custom Cron -------------------
// -----------------------------------------------
// -----------------------------------------------
// Cron defines the interface that must be
// implemented to provide a custom cron implementation for
// the job. Pass in the implementation using the JobOption WithCronImplementation.
type Cron interface {
IsValid(crontab string, location *time.Location, now time.Time) error
Next(lastRun time.Time) time.Time
}
// -----------------------------------------------
// -----------------------------------------------
// --------------- Job Variants ------------------
@@ -116,21 +131,29 @@ type JobDefinition interface {
setup(j *internalJob, l *time.Location, now time.Time) error
}
var _ JobDefinition = (*cronJobDefinition)(nil)
// Default cron implementation
type cronJobDefinition struct {
crontab string
withSeconds bool
func newDefaultCronImplementation(withSeconds bool) Cron {
return &defaultCron{
withSeconds: withSeconds,
}
}
func (c cronJobDefinition) setup(j *internalJob, location *time.Location, now time.Time) error {
var _ Cron = (*defaultCron)(nil)
type defaultCron struct {
cronSchedule cron.Schedule
withSeconds bool
}
func (c *defaultCron) IsValid(crontab string, location *time.Location, now time.Time) error {
var withLocation string
if strings.HasPrefix(c.crontab, "TZ=") || strings.HasPrefix(c.crontab, "CRON_TZ=") {
withLocation = c.crontab
if strings.HasPrefix(crontab, "TZ=") || strings.HasPrefix(crontab, "CRON_TZ=") {
withLocation = crontab
} else {
// since the user didn't provide a timezone default to the location
// passed in by the scheduler. Default: time.Local
withLocation = fmt.Sprintf("CRON_TZ=%s %s", location.String(), c.crontab)
withLocation = fmt.Sprintf("CRON_TZ=%s %s", location.String(), crontab)
}
var (
@@ -150,8 +173,32 @@ func (c cronJobDefinition) setup(j *internalJob, location *time.Location, now ti
if cronSchedule.Next(now).IsZero() {
return ErrCronJobInvalid
}
c.cronSchedule = cronSchedule
return nil
}
j.jobSchedule = &cronJob{cronSchedule: cronSchedule}
func (c *defaultCron) Next(lastRun time.Time) time.Time {
return c.cronSchedule.Next(lastRun)
}
// default cron job implementation
var _ JobDefinition = (*cronJobDefinition)(nil)
type cronJobDefinition struct {
crontab string
cron Cron
}
func (c cronJobDefinition) setup(j *internalJob, location *time.Location, now time.Time) error {
if j.cron != nil {
c.cron = j.cron
}
if err := c.cron.IsValid(c.crontab, location, now); err != nil {
return err
}
j.jobSchedule = &cronJob{crontab: c.crontab, cronSchedule: c.cron}
return nil
}
@@ -163,8 +210,8 @@ func (c cronJobDefinition) setup(j *internalJob, location *time.Location, now ti
// `CRON_TZ=America/Chicago * * * * *`
func CronJob(crontab string, withSeconds bool) JobDefinition {
return cronJobDefinition{
crontab: crontab,
withSeconds: withSeconds,
crontab: crontab,
cron: newDefaultCronImplementation(withSeconds),
}
}
@@ -369,11 +416,9 @@ func (m monthlyJobDefinition) setup(j *internalJob, location *time.Location, _ t
}
}
daysStart = removeSliceDuplicatesInt(daysStart)
slices.Sort(daysStart)
ms.days = daysStart
daysEnd = removeSliceDuplicatesInt(daysEnd)
slices.Sort(daysEnd)
ms.daysFromEnd = daysEnd
atTimesDate, err := convertAtTimesToDateTime(m.atTimes, location)
@@ -610,6 +655,15 @@ func WithName(name string) JobOption {
}
}
// WithCronImplementation sets the custom Cron implementation for the job.
// This is only utilized for the CronJob type.
func WithCronImplementation(c Cron) JobOption {
return func(j *internalJob, _ time.Time) error {
j.cron = c
return nil
}
}
// WithSingletonMode keeps the job from running again if it is already running.
// This is useful for jobs that should not overlap, and that occasionally
// (but not consistently) run longer than the interval between job runs.
@@ -820,7 +874,8 @@ type jobSchedule interface {
var _ jobSchedule = (*cronJob)(nil)
type cronJob struct {
cronSchedule cron.Schedule
crontab string
cronSchedule Cron
}
func (j *cronJob) next(lastRun time.Time) time.Time {
@@ -864,7 +919,7 @@ func (d dailyJob) next(lastRun time.Time) time.Time {
}
firstPass = false
startNextDay := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(d.interval), 0, 0, 0, lastRun.Nanosecond(), lastRun.Location())
startNextDay := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(d.interval), 0, 0, 0, 0, lastRun.Location())
return d.nextDay(startNextDay, firstPass)
}
@@ -872,7 +927,7 @@ func (d dailyJob) nextDay(lastRun time.Time, firstPass bool) time.Time {
for _, at := range d.atTimes {
// sub the at time hour/min/sec onto the lastScheduledRun's values
// to use in checks to see if we've got our next run time
atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location())
atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), at.Hour(), at.Minute(), at.Second(), 0, lastRun.Location())
if firstPass && atDate.After(lastRun) {
// checking to see if it is after i.e. greater than,
@@ -918,7 +973,7 @@ func (w weeklyJob) nextWeekDayAtTime(lastRun time.Time, firstPass bool) time.Tim
for _, at := range w.atTimes {
// sub the at time hour/min/sec onto the lastScheduledRun's values
// to use in checks to see if we've got our next run time
atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(weekDayDiff), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location())
atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(weekDayDiff), at.Hour(), at.Minute(), at.Second(), 0, lastRun.Location())
if firstPass && atDate.After(lastRun) {
// checking to see if it is after i.e. greater than,
@@ -986,7 +1041,7 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass
for _, at := range m.atTimes {
// sub the day, and the at time hour/min/sec onto the lastScheduledRun's values
// to use in checks to see if we've got our next run time
atDate := time.Date(lastRun.Year(), lastRun.Month(), day, at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location())
atDate := time.Date(lastRun.Year(), lastRun.Month(), day, at.Hour(), at.Minute(), at.Second(), 0, lastRun.Location())
if atDate.Month() != lastRun.Month() {
// this check handles if we're setting a day not in the current month

View File

@@ -5,11 +5,12 @@ import (
"context"
"reflect"
"runtime"
"slices"
"strings"
"time"
"github.com/google/uuid"
"github.com/jonboulle/clockwork"
"golang.org/x/exp/slices"
)
var _ Scheduler = (*scheduler)(nil)
@@ -137,6 +138,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
jobsIn: make(chan jobIn),
jobsOutForRescheduling: make(chan uuid.UUID),
jobUpdateNextRuns: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error),
@@ -175,7 +177,8 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
select {
case id := <-s.exec.jobsOutForRescheduling:
s.selectExecJobsOutForRescheduling(id)
case id := <-s.exec.jobUpdateNextRuns:
s.updateNextScheduled(id)
case id := <-s.exec.jobsOutCompleted:
s.selectExecJobsOutCompleted(id)
@@ -237,11 +240,8 @@ func (s *scheduler) stopScheduler() {
for _, j := range s.jobs {
j.stop()
}
for id, j := range s.jobs {
for _, j := range s.jobs {
<-j.ctx.Done()
j.ctx, j.cancel = context.WithCancel(s.shutdownCtx)
s.jobs[id] = j
}
var err error
if s.started {
@@ -253,6 +253,21 @@ func (s *scheduler) stopScheduler() {
err = ErrStopExecutorTimedOut
}
}
for id, j := range s.jobs {
oldCtx := j.ctx
if j.parentCtx == nil {
j.parentCtx = s.shutdownCtx
}
j.ctx, j.cancel = context.WithCancel(j.parentCtx)
// also replace the old context with the new one in the parameters
if len(j.parameters) > 0 && j.parameters[0] == oldCtx {
j.parameters[0] = j.ctx
}
s.jobs[id] = j
}
s.stopErrCh <- err
s.started = false
s.logger.Debug("gocron: scheduler stopped")
@@ -267,14 +282,7 @@ func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) {
}
slices.SortFunc(outJobs, func(a, b Job) int {
aID, bID := a.ID().String(), b.ID().String()
switch {
case aID < bID:
return -1
case aID > bID:
return 1
default:
return 0
}
return strings.Compare(aID, bID)
})
select {
case <-s.shutdownCtx.Done():
@@ -335,7 +343,7 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
return
}
scheduleFrom := j.lastRun
var scheduleFrom time.Time
if len(j.nextScheduled) > 0 {
// always grab the last element in the slice as that is the furthest
// out in the future and the time from which we want to calculate
@@ -366,6 +374,15 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
}
}
if slices.Contains(j.nextScheduled, next) {
// if the next value is a duplicate of what's already in the nextScheduled slice, for example:
// - the job is being rescheduled off the same next run value as before
// increment to the next, next value
for slices.Contains(j.nextScheduled, next) {
next = j.next(next)
}
}
// Clean up any existing timer to prevent leaks
if j.timer != nil {
j.timer.Stop()
@@ -390,6 +407,22 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
s.jobs[id] = j
}
func (s *scheduler) updateNextScheduled(id uuid.UUID) {
j, ok := s.jobs[id]
if !ok {
return
}
var newNextScheduled []time.Time
for _, t := range j.nextScheduled {
if t.Before(s.now()) {
continue
}
newNextScheduled = append(newNextScheduled, t)
}
j.nextScheduled = newNextScheduled
s.jobs[id] = j
}
func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
j, ok := s.jobs[id]
if !ok {

View File

@@ -3,12 +3,11 @@ package gocron
import (
"context"
"reflect"
"slices"
"sync"
"time"
"github.com/google/uuid"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)
func callJobFuncWithParams(jobFunc any, params ...any) error {
@@ -63,12 +62,8 @@ func requestJobCtx(ctx context.Context, id uuid.UUID, ch chan jobOutRequest) *in
}
func removeSliceDuplicatesInt(in []int) []int {
m := make(map[int]struct{})
for _, i := range in {
m[i] = struct{}{}
}
return maps.Keys(m)
slices.Sort(in)
return slices.Compact(in)
}
func convertAtTimesToDateTime(atTimes AtTimes, location *time.Location) ([]time.Time, error) {