Skip to content

Commit

Permalink
feat: Smart retries with delay + retry count for cron trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
Peterburnett committed Mar 3, 2024
1 parent f467840 commit 4fb75b3
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 5 deletions.
5 changes: 5 additions & 0 deletions classes/local/execution/engine.php
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,11 @@ public function abort(?\Throwable $reason = null) {
if ($status !== self::STATUS_FINISHED && !in_array($status, self::STATUS_TERMINATORS)) {
$this->set_current_step($enginestep);
$enginestep->abort();
} else {
// We need to signal to finished steps that the dataflow is aborted. This may require handling seperate to the step abort.
// This is done seperate to the finalise hook so that concerns are seperated for finalised vs aborted runs.
$this->set_current_step($enginestep);
$enginestep->dataflow_abort();
}
}
foreach ($this->flowcaps as $enginestep) {
Expand Down
7 changes: 7 additions & 0 deletions classes/local/execution/engine_step.php
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ public function abort() {
$this->steptype->on_abort();
}

/**
* Signal handler for a full dataflow abort.
*/
public function dataflow_abort() {
$this->steptype->on_dataflow_abort();
}

/**
* Attempt to execute the step.
*
Expand Down
35 changes: 34 additions & 1 deletion classes/local/scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static function get_scheduled_times(int $stepid) {
public static function set_scheduled_times(int $dataflowid, int $stepid, int $newtime, ?int $oldtime = null) {
global $DB;

$obj = (object) ['nextruntime' => $newtime, 'dataflowid' => $dataflowid, 'stepid' => $stepid];
$obj = (object) ['nextruntime' => $newtime, 'dataflowid' => $dataflowid, 'stepid' => $stepid, 'retrycount' => 0];
if (!is_null($oldtime)) {
$obj->lastruntime = $oldtime;
}
Expand All @@ -67,6 +67,39 @@ public static function set_scheduled_times(int $dataflowid, int $stepid, int $ne
}
}

/**
* Schedule a retry run. If the maximum retry count is reached, set to regular scheduled time and no retry count.
*
* @param int $dataflowid the flow id.
* @param int $stepid the step trigger id.
* @param int $retrytime when to run next on a retry.
* @param int $scheduledtime when to run next if allowed retries are exhausted.
* @param int $retriesallowed the amount of retries allowed before resuming regular schedule.
*/
public static function set_scheduled_retry(int $dataflowid, int $stepid, int $retrytime, int $scheduledtime, int $retriesallowed) {
global $DB;

$schedule = $DB->get_record(self::TABLE, ['dataflowid' => $dataflowid, 'stepid' => $stepid]);

if (!$schedule) {
// This method has been called incorrectly for a schedule that has never run or doesn't exist.
// Just return early.
return;
}

if ($schedule->retrycount >= $retriesallowed) {
// Allowed retries are exhausted. Set to regular schedule and no retries.
$schedule->retrycount = 0;
$schedule->nextruntime = $scheduledtime;
} else {
// Increment retry counter, and schedule the retry time.
$schedule->retrycount = $schedule->retrycount + 1;
$schedule->nextruntime = $retrytime;
}

$DB->update_record(self::TABLE, $schedule);
}

/**
* Gets a list of dataflows and timestamps that are due to run based on the given reference time.
*
Expand Down
6 changes: 6 additions & 0 deletions classes/local/step/base_step.php
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,12 @@ public function on_initialise() {
public function on_abort() {
}

/**
* Hook function that gets called when a dataflow has been aborted, at conclusion.
*/
public function on_dataflow_abort() {
}

/**
* Hook function that gets called when an engine step has been finalised.
*/
Expand Down
53 changes: 51 additions & 2 deletions classes/local/step/trigger_cron.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public static function form_define_fields(): array {
'day' => ['type' => PARAM_TEXT],
'month' => ['type' => PARAM_TEXT],
'dayofweek' => ['type' => PARAM_TEXT],
'retryinterval' => ['type' => PARAM_INT],
'retrycount' => ['type' => PARAM_INT],
'disabled' => ['type' => PARAM_TEXT],
];
}
Expand All @@ -60,6 +62,9 @@ public function form_get_default_data(\stdClass $data): \stdClass {
$data->{"config_$field"} = '*';
}
}
$data->config_retryinterval = $data->config_retryinterval ?? 0;
$data->config_retrycount = $data->config_retrycount ?? 0;

return $data;
}

Expand Down Expand Up @@ -128,6 +133,13 @@ public function form_add_custom_inputs(\MoodleQuickForm &$mform) {

$mform->addGroup($crontab, 'crontab', get_string('trigger_cron:crontab', 'tool_dataflows'), ' ', false);
$mform->addElement('static', 'crontab_desc', '', get_string('trigger_cron:crontab_desc', 'tool_dataflows'));

// Retry configurations.
$mform->addElement('duration', 'config_retryinterval', get_string('trigger_cron:retryinterval', 'tool_dataflows'));
$mform->setType('retryinterval', PARAM_INT);
$mform->addElement('text', 'config_retrycount', get_string('trigger_cron:retrycount', 'tool_dataflows'));
$mform->setType('retrycount', PARAM_INT);
$mform->setDefault('retrycount', 0);
}

/**
Expand All @@ -143,6 +155,13 @@ public function validate_config($config) {
return ['crontab' => get_string('trigger_cron:invalid', 'tool_dataflows', '', true)];
}
}
if ($config->retryinterval < 0) {
return ['config_retryinterval' => get_string('trigger_cron:positive_retryinterval', 'tool_dataflows', null, true)];
}
if ($config->retrycount < 0) {
return ['config_retrycount' => get_string('trigger_cron:positive_retryinterval', 'tool_dataflows', null, true)];
}

return true;
}

Expand Down Expand Up @@ -276,8 +295,18 @@ public function on_finalise() {
*/
public function on_abort() {
if (!$this->stepdef->dataflow->is_concurrency_enabled()) {
// Reschedule on aborts.
$this->reschedule();
// Reschedule a retry on aborts.
$this->reschedule_retry();
}
}

/**
* Hook function that gets called when When the dataflow engine is aborting.
*/
public function on_dataflow_abort() {
if (!$this->stepdef->dataflow->is_concurrency_enabled()) {
// Reschedule a retry on aborts.
$this->reschedule_retry();
}
}

Expand All @@ -295,6 +324,26 @@ protected function reschedule() {
$newtime,
$config->nextruntime ?? null
);
$this->log("Rescheduling dataflow to configured schedule.");
}
}

/**
* Schedule a retry for this flow. If the maximum retries are reached, the regular schedule will be used.
*/
public function reschedule_retry() {
$config = $this->get_variables()->get('config');
$scheduledtime = $this->get_next_scheduled_time($config);
$retrytime = time() + $config->retryinterval;

scheduler::set_scheduled_retry(
$this->stepdef->dataflowid,
$this->stepdef->id,
$retrytime,
$scheduledtime,
$config->retrycount
);

$this->log("Rescheduling dataflow to retry.");
}
}
1 change: 1 addition & 0 deletions db/install.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
<FIELD NAME="stepid" TYPE="int" LENGTH="10" NOTNULL="true" SEQUENCE="false"/>
<FIELD NAME="lastruntime" TYPE="int" LENGTH="10" NOTNULL="true" DEFAULT="0" SEQUENCE="false" COMMENT="The time the dataflow was last scheduled to be run"/>
<FIELD NAME="nextruntime" TYPE="int" LENGTH="10" NOTNULL="true" SEQUENCE="false" COMMENT="The time the dataflow is next scheduled to be run"/>
<FIELD NAME="retrycount" TYPE="int" LENGTH="10" NOTNULL="true" DEFAULT="0" SEQUENCE="false" COMMENT="Count of attempted retries on the current dataflow. Reset on scheduling a fresh run."/>
</FIELDS>
<KEYS>
<KEY NAME="primary" TYPE="primary" FIELDS="id"/>
Expand Down
15 changes: 15 additions & 0 deletions db/upgrade.php
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,21 @@ function xmldb_tool_dataflows_upgrade($oldversion) {
upgrade_plugin_savepoint(true, 2023072100, 'tool', 'dataflows');
}

if ($oldversion < 2024030201) {

// Define field retrycount to be added to tool_dataflows_schedule.
$table = new xmldb_table('tool_dataflows_schedule');
$field = new xmldb_field('retrycount', XMLDB_TYPE_INTEGER, '10', null, XMLDB_NOTNULL, null, 0, 'nextruntime');

// Conditionally launch add field retrycount.
if (!$dbman->field_exists($table, $field)) {
$dbman->add_field($table, $field);
}

// Dataflows savepoint reached.
upgrade_plugin_savepoint(true, 2024030201, 'tool', 'dataflows');
}

// Move log files that exist across to new format. Breaking change if any
// dataflows implement logic based on these files based on filename format.
if ($oldversion < 2023122201) {
Expand Down
4 changes: 4 additions & 0 deletions lang/en/tool_dataflows.php
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@
$string['trigger_cron:crontab_desc'] = 'The schedule is edited as five values: minute, hour, day, month and day of month, in that order. The values are in crontab format.';
$string['trigger_cron:strftime_datetime'] = '%d %b %Y, %H:%M';
$string['trigger_cron:next_run_time'] = 'Next run time: {$a}';
$string['trigger_cron:retryinterval'] = 'Retry interval';
$string['trigger_cron:retrycount'] = 'Number of retries';
$string['trigger_cron:positive_retrycount'] = 'Number of retries must be positive or 0';
$string['trigger_cron:positive_retryinterval'] = 'Retry interval must be positive or 0';

// Email notification.
$string['connector_email:message'] = 'Message';
Expand Down
40 changes: 40 additions & 0 deletions tests/tool_dataflows_scheduler_test.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

namespace tool_dataflows;

use Aws\Arn\Arn;
use tool_dataflows\local\scheduler;

/**
Expand Down Expand Up @@ -70,6 +71,45 @@ public function test_update_next_scheduled_time() {
$this->assertEquals((object) ['lastruntime' => 160, 'nextruntime' => 220], scheduler::get_scheduled_times(12));
}

public function test_set_scheduled_retry() {
global $DB;

// Retry cannot be called for a run that hasn't been regularly scheduled.
scheduler::set_scheduled_retry(1, 1, 1, 1, 1);
$this->assertFalse(scheduler::get_scheduled_times(1));

// Default 0.
scheduler::set_scheduled_times(1, 1, 123, 1);
$this->assertEquals(0, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));

// Schedule a retry when none are allowed.
$regulartime = 555;
$retrytime = 444;
scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 0);
$this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $regulartime], scheduler::get_scheduled_times(1));

// Schedule a retry when retries are permitted.
scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 2);
$this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $retrytime], scheduler::get_scheduled_times(1));

// Now run again and confirm counter has been incremented twice.
scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 2);
$this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $retrytime], scheduler::get_scheduled_times(1));
$this->assertEquals(2, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));

// Now attempt to schedule another retry. The counter should reset and go to regular time.
scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 2);
$this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $regulartime], scheduler::get_scheduled_times(1));
$this->assertEquals(0, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));

// Now confirm that if a successful run is registered while there are still retries left, the counter is reset.
scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 2);
$this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $retrytime], scheduler::get_scheduled_times(1));
$this->assertEquals(1, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));
scheduler::set_scheduled_times(1, 1, $regulartime);
$this->assertEquals(0, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));
}

/**
* Tests the get_due_dataflows() function.
*
Expand Down
4 changes: 2 additions & 2 deletions version.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

defined('MOODLE_INTERNAL') || die();

$plugin->version = 2023122201;
$plugin->release = 2023122201;
$plugin->version = 2024030201;
$plugin->release = 2024030201;
$plugin->requires = 2022112800; // Our lowest supported Moodle (3.3.0).
$plugin->supported = [400, 402];
// TODO $plugin->incompatible = ; // Available as of Moodle 3.9.0 or later.
Expand Down

0 comments on commit 4fb75b3

Please sign in to comment.