Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Add configurable UTC timezone option for scheduler (#494)
Browse files Browse the repository at this point in the history
* Add configurable UTC timezone option for scheduler

Signed-off-by: pmahindrakar-oss <[email protected]>

* Add comments for default behavior

Signed-off-by: pmahindrakar-oss <[email protected]>

Signed-off-by: pmahindrakar-oss <[email protected]>
  • Loading branch information
pmahindrakar-oss authored Nov 4, 2022
1 parent 1817f38 commit 4c58e78
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 10 deletions.
6 changes: 6 additions & 0 deletions pkg/runtime/interfaces/application_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,18 @@ type FlyteWorkflowExecutorConfig struct {
// eg : 100 TPS will send at the max 100 schedule requests to admin per sec.
// Burst specifies burst traffic count
AdminRateLimit *AdminRateLimit `json:"adminRateLimit"`
// Defaults to using user local timezone where the scheduler is deployed.
UseUTCTz bool `json:"useUTCTz"`
}

func (f *FlyteWorkflowExecutorConfig) GetAdminRateLimit() *AdminRateLimit {
return f.AdminRateLimit
}

func (f *FlyteWorkflowExecutorConfig) GetUseUTCTz() bool {
return f.UseUTCTz
}

type AdminRateLimit struct {
Tps rate.Limit `json:"tps"`
Burst int `json:"burst"`
Expand Down
8 changes: 6 additions & 2 deletions scheduler/core/gocron_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,13 @@ func getFixedRateDurationFromSchedule(unit admin.FixedRateUnit, fixedRateValue u
}

func NewGoCronScheduler(ctx context.Context, schedules []models.SchedulableEntity, scope promutils.Scope,
snapshot snapshoter.Snapshot, rateLimiter *rate.Limiter, executor executor.Executor) Scheduler {
snapshot snapshoter.Snapshot, rateLimiter *rate.Limiter, executor executor.Executor, useUtcTz bool) Scheduler {
// Create the new cron scheduler and start it off
c := cron.New()
var opts []cron.Option
if useUtcTz {
opts = append(opts, cron.WithLocation(time.UTC))
}
c := cron.New(opts...)
c.Start()
scheduler := &GoCronScheduler{
cron: c,
Expand Down
28 changes: 21 additions & 7 deletions scheduler/core/gocron_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var scheduleCronDeactivated models.SchedulableEntity
var scheduleFixedDeactivated models.SchedulableEntity
var scheduleNonExistentDeActivated models.SchedulableEntity

func setup(t *testing.T, subscope string) *GoCronScheduler {
func setup(t *testing.T, subscope string, useUtcTz bool) *GoCronScheduler {
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope)
Expand Down Expand Up @@ -109,7 +109,7 @@ func setup(t *testing.T, subscope string) *GoCronScheduler {
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)

snapshot := &snapshoter.SnapshotV1{}
g := NewGoCronScheduler(context.Background(), schedules, schedulerScope, snapshot, rateLimiter, executor)
g := NewGoCronScheduler(context.Background(), schedules, schedulerScope, snapshot, rateLimiter, executor, useUtcTz)
goCronScheduler, ok := g.(*GoCronScheduler)
assert.True(t, ok)
goCronScheduler.UpdateSchedules(context.Background(), schedules)
Expand All @@ -118,17 +118,31 @@ func setup(t *testing.T, subscope string) *GoCronScheduler {
return goCronScheduler
}

func TestUseUTCTz(t *testing.T) {
t.Run("use local timezone", func(t *testing.T) {
g := setup(t, "use_local_tz", false)
loc := g.cron.Location()
assert.NotNil(t, loc)
assert.Equal(t, time.Local, loc)
})
t.Run("use utc timezone", func(t *testing.T) {
g := setup(t, "use_utc_tz", true)
loc := g.cron.Location()
assert.NotNil(t, loc)
assert.Equal(t, time.UTC, loc)
})
}
func TestCalculateSnapshot(t *testing.T) {
t.Run("empty snapshot", func(t *testing.T) {
ctx := context.Background()
g := setup(t, "empty_snapshot")
g := setup(t, "empty_snapshot", false)
snapshot := g.CalculateSnapshot(ctx)
assert.NotNil(t, snapshot)
assert.True(t, snapshot.IsEmpty())
})
t.Run("non empty snapshot", func(t *testing.T) {
ctx := context.Background()
g := setup(t, "non_empty_snapshot")
g := setup(t, "non_empty_snapshot", false)
g.jobStore.Range(func(key, value interface{}) bool {
currTime := time.Now()
job := value.(*GoCronJob)
Expand All @@ -155,7 +169,7 @@ func TestGetTimedFuncWithSchedule(t *testing.T) {
}
for _, tc := range tests {
ctx := context.Background()
g := setup(t, tc.scope)
g := setup(t, tc.scope, false)
timeFunc := g.GetTimedFuncWithSchedule()
assert.NotNil(t, timeFunc)
err := timeFunc(ctx, tc.input, time.Now())
Expand All @@ -164,7 +178,7 @@ func TestGetTimedFuncWithSchedule(t *testing.T) {
})
t.Run("failure case", func(t *testing.T) {
ctx := context.Background()
g := setup(t, "failure_case")
g := setup(t, "failure_case", false)
executor := new(mocks.Executor)
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("failure case"))
g.executor = executor
Expand Down Expand Up @@ -271,7 +285,7 @@ func TestGetFixedRateDurationFromSchedule(t *testing.T) {

func TestCatchUpAllSchedule(t *testing.T) {
ctx := context.Background()
g := setup(t, "catch_up_all_schedules")
g := setup(t, "catch_up_all_schedules", false)
toTime := time.Date(2022, time.January, 29, 0, 0, 0, 0, time.UTC)
catchupSuccess := g.CatchupAll(ctx, toTime)
assert.True(t, catchupSuccess)
Expand Down
3 changes: 2 additions & 1 deletion scheduler/schedule_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func (w *ScheduledExecutor) Run(ctx context.Context) error {
// Also Bootstrap the schedules from the snapshot
bootStrapCtx, bootStrapCancel := context.WithCancel(ctx)
defer bootStrapCancel()
gcronScheduler := core.NewGoCronScheduler(bootStrapCtx, schedules, w.scope, snapshot, rateLimiter, executor)
useUtcTz := w.workflowExecutorConfig.UseUTCTz
gcronScheduler := core.NewGoCronScheduler(bootStrapCtx, schedules, w.scope, snapshot, rateLimiter, executor, useUtcTz)
w.scheduler = gcronScheduler

// Start the go routine to write the update schedules periodically
Expand Down

0 comments on commit 4c58e78

Please sign in to comment.