diff --git a/api/errors.go b/api/errors.go index 6bdf6ef..2580732 100644 --- a/api/errors.go +++ b/api/errors.go @@ -7,6 +7,7 @@ import ( "github.com/golang/glog" "github.com/livepeer/go-api-client" + "github.com/livepeer/task-runner/task" ) type errorResponse struct { @@ -20,6 +21,8 @@ func respondError(r *http.Request, rw http.ResponseWriter, defaultStatus int, er response.Errors = append(response.Errors, err.Error()) if errors.Is(err, api.ErrNotExists) { status = http.StatusNotFound + } else if errors.As(err, &task.InputError{}) { + status = http.StatusUnprocessableEntity } } glog.Warningf("API ended in error. method=%s url=%q status=%d, errors=%+v", r.Method, r.URL, status, response.Errors) diff --git a/task/runner.go b/task/runner.go index 5b3227f..7678f1d 100644 --- a/task/runner.go +++ b/task/runner.go @@ -402,6 +402,10 @@ func (r *runner) getAssetAndOS(assetID string) (*api.Asset, *api.ObjectStore, dr return asset, objectStore, osSession, nil } +type InputError struct { + error +} + func (r *runner) HandleCatalysis(ctx context.Context, taskId, nextStep, attemptID string, callback *clients.CatalystCallback) error { taskInfo, task, err := r.getTaskInfo(taskId, "catalysis", nil) if err != nil { @@ -413,10 +417,13 @@ func (r *runner) HandleCatalysis(ctx context.Context, taskId, nextStep, attemptI if task.Status.Phase != api.TaskPhaseRunning && task.Status.Phase != api.TaskPhaseWaiting { - return fmt.Errorf("task %s is not running", taskId) - } else if curr := catalystTaskAttemptID(task); attemptID != "" && attemptID != curr { - return fmt.Errorf("outdated catalyst job callback, "+ - "task has already been retried (callback: %s current: %s)", attemptID, curr) + return InputError{fmt.Errorf("task %s is not running", taskId)} + } + currAttempt := catalystTaskAttemptID(task) + isSameAttempt := attemptID == currAttempt + if !isSameAttempt { + glog.Warningf("Received outdated catalyst job callback, task has already been retried taskId=%s callbackAttempt=%s currentAttempt=%s", + task.ID, attemptID, currAttempt) } if callback.SourcePlayback != nil { @@ -441,9 +448,12 @@ func (r *runner) HandleCatalysis(ctx context.Context, taskId, nextStep, attemptI } if callback.Status == catalystClients.TranscodeStatusError { - glog.Infof("Catalyst job failed for task type=%q id=%s error=%q unretriable=%v", task.Type, task.ID, callback.Error, callback.Unretriable) - err := NewCatalystError(callback.Error, callback.Unretriable) - return r.publishTaskResult(taskInfo, nil, err) + glog.Infof("Catalyst job failed for task type=%q id=%s attempt=%s error=%q unretriable=%v", task.Type, task.ID, attemptID, callback.Error, callback.Unretriable) + // Make sure not to fail the task with errors from previous attempts. + if isSameAttempt { + err := NewCatalystError(callback.Error, callback.Unretriable) + return r.publishTaskResult(taskInfo, nil, err) + } } else if callback.Status == catalystClients.TranscodeStatusCompleted { return r.scheduleTaskStep(task.ID, nextStep, callback) }