Skip to content

Commit

Permalink
Refactor refresh and reconcile triggers (#5535)
Browse files Browse the repository at this point in the history
* Update runtime protos

* Update `rill project refresh` CLI command

* Add EditTrigger permission

* Fix testruntime util

* Update server handler for CreateTrigger

* Better backwards compatibility implementation

* Implement updated refresh trigger reconciler

* Handle triggers for the parser

* Support full model refreshes

* Deprecate `rill project reconcile`

* Fix deprecated type

* Review

* Review

* Review 2

* Fix merge issue

* Nits

* QA
  • Loading branch information
begelundmuller committed Sep 2, 2024
1 parent 7d619fa commit 3ea6d75
Show file tree
Hide file tree
Showing 53 changed files with 7,799 additions and 6,436 deletions.
2 changes: 1 addition & 1 deletion admin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ go run ./cli admin start

### Using Github webhooks in development

The local development environment is not capable of receiving Github webhooks. In most cases, you can just run `rill project reconcile` to manually trigger a reconcile after pushing changes to Github.
The local development environment is not capable of receiving Github webhooks. In most cases, you can just run `rill project refresh --parser` to manually trigger a pull after pushing changes to Github.

Continue reading only if you are making changes to the Github webhooks code and need to these changes specifically.

Expand Down
2 changes: 1 addition & 1 deletion admin/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func (s *Service) IssueRuntimeManagementToken(aud string) (string, error) {
AudienceURL: aud,
Subject: "admin-service",
TTL: time.Hour,
SystemPermissions: []auth.Permission{auth.ManageInstances, auth.ReadInstance, auth.EditInstance, auth.ReadObjects},
SystemPermissions: []auth.Permission{auth.ManageInstances, auth.ReadInstance, auth.EditInstance, auth.EditTrigger, auth.ReadObjects},
})
if err != nil {
return "", err
Expand Down
2 changes: 1 addition & 1 deletion admin/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (s *Service) processGithubPush(ctx context.Context, event *github.PushEvent
continue
}

err = s.TriggerReconcile(ctx, depl)
err = s.TriggerParser(ctx, depl)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion admin/jobs/river/reset_all_deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (w *ResetAllDeploymentsWorker) resetAllDeploymentsForProject(ctx context.Co
}

w.admin.Logger.Info("reset all deployments: redeploying deployment", zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx))
_, err = w.admin.TriggerRedeploy(ctx, proj, depl)
_, err = w.admin.RedeployProject(ctx, proj, depl)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion admin/jobs/river/validate_deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (w *ValidateDeploymentsWorker) reconcileAllDeploymentsForProject(ctx contex
// Trigger a redeploy if config is no longer valid
if !v {
w.admin.Logger.Info("validate deployments: config no longer valid, triggering redeploy", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx))
_, err = w.admin.TriggerRedeploy(ctx, proj, depl)
_, err = w.admin.RedeployProject(ctx, proj, depl)
if err != nil {
return err
}
Expand Down
168 changes: 102 additions & 66 deletions admin/projects.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"errors"
"fmt"
"reflect"
"time"

"github.com/rilldata/rill/admin/database"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/pkg/observability"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// TODO: The functions in this file are not truly fault tolerant. They should be refactored to run as idempotent, retryable background tasks.
Expand Down Expand Up @@ -169,7 +172,7 @@ func (s *Service) UpdateProject(ctx context.Context, proj *database.Project, opt
}
}

return s.TriggerRedeploy(ctx, proj, oldDepl)
return s.RedeployProject(ctx, proj, oldDepl)
}

s.Logger.Info("update project: updating deployments", observability.ZapCtx(ctx))
Expand Down Expand Up @@ -246,20 +249,30 @@ func (s *Service) UpdateOrgDeploymentAnnotations(ctx context.Context, org *datab
return nil
}

// HibernateProject hibernates a project by tearing down its prod deployment.
func (s *Service) HibernateProject(ctx context.Context, proj *database.Project) (*database.Project, error) {
depls, err := s.DB.FindDeploymentsForProject(ctx, proj.ID)
// RedeployProject de-provisions and re-provisions a project's prod deployment.
func (s *Service) RedeployProject(ctx context.Context, proj *database.Project, prevDepl *database.Deployment) (*database.Project, error) {
org, err := s.DB.FindOrganization(ctx, proj.OrganizationID)
if err != nil {
return nil, err
}

for _, depl := range depls {
err = s.TeardownDeployment(ctx, depl)
if err != nil {
return nil, err
}
// Provision new deployment
newDepl, err := s.createDeployment(ctx, &createDeploymentOptions{
ProjectID: proj.ID,
Provisioner: proj.Provisioner,
Annotations: s.NewDeploymentAnnotations(org, proj),
ProdVersion: proj.ProdVersion,
ProdBranch: proj.ProdBranch,
ProdVariables: proj.ProdVariables,
ProdOLAPDriver: proj.ProdOLAPDriver,
ProdOLAPDSN: proj.ProdOLAPDSN,
ProdSlots: proj.ProdSlots,
})
if err != nil {
return nil, err
}

// Update prod deployment on project
proj, err = s.DB.UpdateProject(ctx, proj.ID, &database.UpdateProjectOptions{
Name: proj.Name,
Description: proj.Description,
Expand All @@ -272,42 +285,41 @@ func (s *Service) HibernateProject(ctx context.Context, proj *database.Project)
ProdBranch: proj.ProdBranch,
Subpath: proj.Subpath,
ProdVariables: proj.ProdVariables,
ProdDeploymentID: nil,
ProdDeploymentID: &newDepl.ID,
ProdSlots: proj.ProdSlots,
ProdTTLSeconds: proj.ProdTTLSeconds,
Annotations: proj.Annotations,
})
if err != nil {
return nil, err
err2 := s.TeardownDeployment(ctx, newDepl)
return nil, multierr.Combine(err, err2)
}

// Delete old prod deployment if exists
if prevDepl != nil {
err = s.TeardownDeployment(ctx, prevDepl)
if err != nil {
s.Logger.Error("trigger redeploy: could not teardown old deployment", zap.String("deployment_id", prevDepl.ID), zap.Error(err), observability.ZapCtx(ctx))
}
}

return proj, nil
}

// TriggerRedeploy de-provisions and re-provisions a project's prod deployment.
func (s *Service) TriggerRedeploy(ctx context.Context, proj *database.Project, prevDepl *database.Deployment) (*database.Project, error) {
org, err := s.DB.FindOrganization(ctx, proj.OrganizationID)
// HibernateProject hibernates a project by tearing down its prod deployment.
func (s *Service) HibernateProject(ctx context.Context, proj *database.Project) (*database.Project, error) {
depls, err := s.DB.FindDeploymentsForProject(ctx, proj.ID)
if err != nil {
return nil, err
}

// Provision new deployment
newDepl, err := s.createDeployment(ctx, &createDeploymentOptions{
ProjectID: proj.ID,
Provisioner: proj.Provisioner,
Annotations: s.NewDeploymentAnnotations(org, proj),
ProdVersion: proj.ProdVersion,
ProdBranch: proj.ProdBranch,
ProdVariables: proj.ProdVariables,
ProdOLAPDriver: proj.ProdOLAPDriver,
ProdOLAPDSN: proj.ProdOLAPDSN,
ProdSlots: proj.ProdSlots,
})
if err != nil {
return nil, err
for _, depl := range depls {
err = s.TeardownDeployment(ctx, depl)
if err != nil {
return nil, err
}
}

// Update prod deployment on project
proj, err = s.DB.UpdateProject(ctx, proj.ID, &database.UpdateProjectOptions{
Name: proj.Name,
Description: proj.Description,
Expand All @@ -320,29 +332,20 @@ func (s *Service) TriggerRedeploy(ctx context.Context, proj *database.Project, p
ProdBranch: proj.ProdBranch,
Subpath: proj.Subpath,
ProdVariables: proj.ProdVariables,
ProdDeploymentID: &newDepl.ID,
ProdDeploymentID: nil,
ProdSlots: proj.ProdSlots,
ProdTTLSeconds: proj.ProdTTLSeconds,
Annotations: proj.Annotations,
})
if err != nil {
err2 := s.TeardownDeployment(ctx, newDepl)
return nil, multierr.Combine(err, err2)
}

// Delete old prod deployment if exists
if prevDepl != nil {
err = s.TeardownDeployment(ctx, prevDepl)
if err != nil {
s.Logger.Error("trigger redeploy: could not teardown old deployment", zap.String("deployment_id", prevDepl.ID), zap.Error(err), observability.ZapCtx(ctx))
}
return nil, err
}

return proj, nil
}

// TriggerReconcile triggers a reconcile for a deployment.
func (s *Service) TriggerReconcile(ctx context.Context, depl *database.Deployment) (err error) {
// TriggerParser triggers the deployment's project parser to do a new pull and parse.
func (s *Service) TriggerParser(ctx context.Context, depl *database.Deployment) (err error) {
s.Logger.Info("reconcile: triggering pull", zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx))
defer func() {
if err != nil {
Expand All @@ -360,41 +363,74 @@ func (s *Service) TriggerReconcile(ctx context.Context, depl *database.Deploymen

_, err = rt.CreateTrigger(ctx, &runtimev1.CreateTriggerRequest{
InstanceId: depl.RuntimeInstanceID,
Trigger: &runtimev1.CreateTriggerRequest_PullTriggerSpec{
PullTriggerSpec: &runtimev1.PullTriggerSpec{},
},
Parser: true,
})
return err
}

// TriggerRefreshSource triggers refresh of a deployment's sources. If the sources slice is nil, it will refresh all sources.
func (s *Service) TriggerRefreshSources(ctx context.Context, depl *database.Deployment, sources []string) (err error) {
s.Logger.Info("reconcile: triggering refresh", zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx))
defer func() {
if err != nil {
s.Logger.Error("reconcile: trigger refresh failed", zap.String("deployment_id", depl.ID), zap.Error(err), observability.ZapCtx(ctx))
} else {
s.Logger.Info("reconcile: trigger refresh completed", zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx))
}
}()

names := make([]*runtimev1.ResourceName, 0, len(sources))
for _, source := range sources {
// NOTE: When keeping Kind empty, the RefreshTrigger will match both sources and models
names = append(names, &runtimev1.ResourceName{Name: source})
}

// TriggerParserAndAwaitResource triggers the parser and polls the runtime until the given resource's spec version has been updated (or ctx is canceled).
func (s *Service) TriggerParserAndAwaitResource(ctx context.Context, depl *database.Deployment, name, kind string) error {
rt, err := s.OpenRuntimeClient(depl)
if err != nil {
return err
}
defer rt.Close()

_, err = rt.CreateTrigger(ctx, &runtimev1.CreateTriggerRequest{
resourceReq := &runtimev1.GetResourceRequest{
InstanceId: depl.RuntimeInstanceID,
Trigger: &runtimev1.CreateTriggerRequest_RefreshTriggerSpec{
RefreshTriggerSpec: &runtimev1.RefreshTriggerSpec{OnlyNames: names},
Name: &runtimev1.ResourceName{
Kind: kind,
Name: name,
},
}

// Get old spec version
var oldSpecVersion *int64
r, err := rt.GetResource(ctx, resourceReq)
if err == nil {
oldSpecVersion = &r.Resource.Meta.SpecVersion
}

// Trigger parser
_, err = rt.CreateTrigger(ctx, &runtimev1.CreateTriggerRequest{
InstanceId: depl.RuntimeInstanceID,
Parser: true,
})
return err
if err != nil {
return err
}

// Poll every 1 seconds until the resource is found or the ctx is cancelled or times out
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}

r, err := rt.GetResource(ctx, resourceReq)
if err != nil {
if s, ok := status.FromError(err); !ok || s.Code() != codes.NotFound {
return fmt.Errorf("failed to poll for resource: %w", err)
}
if oldSpecVersion != nil {
// Success - previously the resource was found, now we cannot find it anymore
return nil
}
// Continue polling
continue
}
if oldSpecVersion == nil {
// Success - previously the resource was not found, now we found one
return nil
}
if *oldSpecVersion != r.Resource.Meta.SpecVersion {
// Success - the spec version has changed
return nil
}
}
}
81 changes: 0 additions & 81 deletions admin/reconcile.go

This file was deleted.

Loading

1 comment on commit 3ea6d75

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.