diff --git a/admin/deployments.go b/admin/deployments.go index cbeb6494719..c427ddb52b0 100644 --- a/admin/deployments.go +++ b/admin/deployments.go @@ -205,39 +205,41 @@ type UpdateDeploymentOptions struct { } func (s *Service) UpdateDeployment(ctx context.Context, depl *database.Deployment, opts *UpdateDeploymentOptions) error { - // Update the provisioned runtime if the version has changed - if opts.Version != "" && opts.Version != depl.RuntimeVersion { - // Get provisioner from the set - p, ok := s.ProvisionerSet[depl.Provisioner] - if !ok { - return fmt.Errorf("provisioner: %q is not in the provisioner set", depl.Provisioner) - } + // Get provisioner from the set + p, ok := s.ProvisionerSet[depl.Provisioner] + if !ok { + return fmt.Errorf("provisioner: %q is not in the provisioner set", depl.Provisioner) + } - // Update the runtime - err := p.Update(ctx, depl.ProvisionID, opts.Version) - if err != nil { - s.Logger.Error("provisioner: failed to update", zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.Error(err), observability.ZapCtx(ctx)) - return err - } + // Update the runtime + _, err := p.Provision(ctx, &provisioner.ProvisionOptions{ + ProvisionID: depl.ProvisionID, + Slots: depl.Slots, + RuntimeVersion: opts.Version, + Annotations: opts.Annotations.ToMap(), + }) + if err != nil { + s.Logger.Error("provisioner: failed to update", zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.Error(err), observability.ZapCtx(ctx)) + return err + } - // Wait for the runtime to be ready after update - err = p.AwaitReady(ctx, depl.ProvisionID) - if err != nil { - s.Logger.Error("provisioner: failed awaiting runtime to be ready after update", zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.Error(err), observability.ZapCtx(ctx)) - // Mark deployment error - _, err2 := s.DB.UpdateDeploymentStatus(ctx, depl.ID, database.DeploymentStatusError, err.Error()) - return multierr.Combine(err, err2) - } + // Wait for the runtime to be ready after update + err = p.AwaitReady(ctx, depl.ProvisionID) + if err != nil { + s.Logger.Error("provisioner: failed awaiting runtime to be ready after update", zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.Error(err), observability.ZapCtx(ctx)) + // Mark deployment error + _, err2 := s.DB.UpdateDeploymentStatus(ctx, depl.ID, database.DeploymentStatusError, err.Error()) + return multierr.Combine(err, err2) + } - // Update the deployment runtime version - _, err = s.DB.UpdateDeploymentRuntimeVersion(ctx, depl.ID, opts.Version) - if err != nil { - // NOTE: If the update was triggered by a scheduled job like 'upgrade_latest_version_projects', - // then this error will cause the update to be retried on the next job invocation and it should eventually become consistent. + // Update the deployment runtime version + _, err = s.DB.UpdateDeploymentRuntimeVersion(ctx, depl.ID, opts.Version) + if err != nil { + // NOTE: If the update was triggered by a scheduled job like 'validate_deployments', + // then this error will cause the update to be retried on the next job invocation and it should eventually become consistent. - // TODO: Handle inconsistent state when a manually triggered update failed, where we can't rely on job retries. - return err - } + // TODO: Handle inconsistent state when a manually triggered update failed, where we can't rely on job retries. + return err } rt, err := s.OpenRuntimeClient(depl) diff --git a/admin/jobs/river/validate_deployments.go b/admin/jobs/river/validate_deployments.go index f3de4b9e2a2..856294937bc 100644 --- a/admin/jobs/river/validate_deployments.go +++ b/admin/jobs/river/validate_deployments.go @@ -103,14 +103,20 @@ func (w *ValidateDeploymentsWorker) reconcileAllDeploymentsForProject(ctx contex return err } - // Trigger a redeploy if config is no longer valid + // Trigger re-provision if config is no longer valid if !v { - w.admin.Logger.Info("validate deployments: config no longer valid, triggering redeploy", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx)) - _, err = w.admin.RedeployProject(ctx, proj, depl) + w.admin.Logger.Info("validate deployments: config no longer valid, triggering re-provision", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx)) + err = w.admin.UpdateDeployment(ctx, depl, &admin.UpdateDeploymentOptions{ + Version: depl.RuntimeVersion, + Branch: depl.Branch, + Variables: proj.ProdVariables, + Annotations: w.admin.NewDeploymentAnnotations(org, proj), + EvictCachedRepo: false, + }) if err != nil { return err } - w.admin.Logger.Info("validate deployments: redeployed", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), observability.ZapCtx(ctx)) + w.admin.Logger.Info("validate deployments: re-provisioned", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), observability.ZapCtx(ctx)) continue } diff --git a/admin/provisioner/README.md b/admin/provisioner/README.md index 668991d5a60..5a3a09f5dc9 100644 --- a/admin/provisioner/README.md +++ b/admin/provisioner/README.md @@ -38,7 +38,8 @@ The provisioner is configured using `RILL_ADMIN_PROVISIONER_SET_JSON` with a nam "http_ingress": "templates/http_ingress.yaml", // Ingress resource template for HTTP "grpc_ingress": "templates/grpc_ingress.yaml", // Ingress resource template for GRCP "service": "templates/service.yaml", // Service resource template - "statefulset": "templates/statefulset.yaml" // Statefulset resource template + "deployment": "templates/deployment.yaml", // Deployment resource template + "pvc": "templates/pvc.yaml" // PVC resource template } } } @@ -55,35 +56,28 @@ The Kubernetes resource templates provides a high level of flexibility, but they Note: For internal Rill users refer to our private infra repos containing environment specific configurations and templates. -### statefulset.yaml +### deployment.yaml ``` apiVersion: apps/v1 -kind: StatefulSet +kind: Deployment spec: - persistentVolumeClaimRetentionPolicy: - whenDeleted: Delete - whenScaled: Retain - volumeClaimTemplates: - - metadata: - name: data - spec: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: {{ .StorageBytes }} replicas: 1 + strategy: + type: Recreate selector: matchLabels: - app.kubernetes.io/name: {{ .Names.StatefulSet }} - serviceName: cloud-runtime + app.kubernetes.io/name: {{ .Names.Deployment }} template: metadata: labels: - app.kubernetes.io/name: {{ .Names.StatefulSet }} + app.kubernetes.io/name: {{ .Names.Deployment }} spec: securityContext: fsGroup: 1000 + volumes: + - name: data + persistentVolumeClaim: + claimName: {{ .Names.PVC }} containers: - args: - runtime @@ -153,7 +147,7 @@ spec: port: 9090 targetPort: 9090 selector: - app.kubernetes.io/name: {{ .Names.StatefulSet }} + app.kubernetes.io/name: {{ .Names.Deployment }} ``` ### grpc_ingress.yaml @@ -197,3 +191,16 @@ spec: path: /v1 pathType: Prefix ``` + +### pvc.yaml +``` +apiVersion: v1 +kind: PersistentVolumeClaim +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: {{ .StorageBytes }} + storageClassName: storageclass-example +``` diff --git a/admin/provisioner/kubernetes.go b/admin/provisioner/kubernetes.go index 018974f4932..c9536dd9413 100644 --- a/admin/provisioner/kubernetes.go +++ b/admin/provisioner/kubernetes.go @@ -18,16 +18,15 @@ import ( "github.com/c2h5oh/datasize" retryablehttp "github.com/hashicorp/go-retryablehttp" "go.uber.org/multierr" - appsv1 "k8s.io/api/apps/v1" - apiv1 "k8s.io/api/core/v1" - netv1 "k8s.io/api/networking/v1" k8serrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/yaml" + appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1" + corev1ac "k8s.io/client-go/applyconfigurations/core/v1" + netv1ac "k8s.io/client-go/applyconfigurations/networking/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/util/retry" ) type KubernetesSpec struct { @@ -43,7 +42,8 @@ type KubernetesTemplatePaths struct { HTTPIngress string `json:"http_ingress"` GRPCIngress string `json:"grpc_ingress"` Service string `json:"service"` - StatefulSet string `json:"statefulset"` + Deployment string `json:"deployment"` + PVC string `json:"pvc"` } type KubernetesProvisioner struct { @@ -69,7 +69,8 @@ type ResourceNames struct { HTTPIngress string GRPCIngress string Service string - StatefulSet string + Deployment string + PVC string } func NewKubernetes(spec json.RawMessage) (*KubernetesProvisioner, error) { @@ -103,7 +104,8 @@ func NewKubernetes(spec json.RawMessage) (*KubernetesProvisioner, error) { ksp.TemplatePaths.HTTPIngress, ksp.TemplatePaths.GRPCIngress, ksp.TemplatePaths.Service, - ksp.TemplatePaths.StatefulSet, + ksp.TemplatePaths.Deployment, + ksp.TemplatePaths.PVC, } // Parse the template definitions @@ -149,17 +151,19 @@ func (p *KubernetesProvisioner) Provision(ctx context.Context, opts *ProvisionOp } // Define the structured Kubernetes API resources - httpIng := &netv1.Ingress{} - grpcIng := &netv1.Ingress{} - svc := &apiv1.Service{} - sts := &appsv1.StatefulSet{} + httpIng := &netv1ac.IngressApplyConfiguration{} + grpcIng := &netv1ac.IngressApplyConfiguration{} + svc := &corev1ac.ServiceApplyConfiguration{} + pvc := &corev1ac.PersistentVolumeClaimApplyConfiguration{} + depl := &appsv1ac.DeploymentApplyConfiguration{} // Resolve the templates and decode into Kubernetes API resources for k, v := range map[string]any{ p.Spec.TemplatePaths.HTTPIngress: httpIng, p.Spec.TemplatePaths.GRPCIngress: grpcIng, p.Spec.TemplatePaths.Service: svc, - p.Spec.TemplatePaths.StatefulSet: sts, + p.Spec.TemplatePaths.PVC: pvc, + p.Spec.TemplatePaths.Deployment: depl, } { // Resolve template buf := &bytes.Buffer{} @@ -176,47 +180,52 @@ func (p *KubernetesProvisioner) Provision(ctx context.Context, opts *ProvisionOp } } - // We start by deprovisioning any previous attempt, we do this as a simple way to achieve idempotency - err := p.Deprovision(ctx, opts.ProvisionID) + applyOptions := metav1.ApplyOptions{FieldManager: "rill-cloud-admin", Force: true} + labels := map[string]string{ + "app.kubernetes.io/managed-by": "rill-cloud-admin", + "app.kubernetes.io/instance": opts.ProvisionID, + } + annotations := map[string]string{ + "checksum/templates": p.templatesChecksum, + } + + // If the PVC already exists we need to make sure the volume is not decreased, since the Kubernetes storage drivers in general only supports volume expansion + oldPvc, err := p.clientset.CoreV1().PersistentVolumeClaims(p.Spec.Namespace).Get(ctx, names.PVC, metav1.GetOptions{}) + if err != nil && !k8serrs.IsNotFound(err) { + return nil, err + } + if !k8serrs.IsNotFound(err) { + if oldPvc.Spec.Resources.Requests.Storage().Cmp(*pvc.Spec.Resources.Requests.Storage()) == 1 { + pvc.Spec.WithResources(&corev1ac.VolumeResourceRequirementsApplyConfiguration{ + Requests: &oldPvc.Spec.Resources.Requests, + }) + } + } + + // Server-Side apply all the Kubernetes resources, for more info on this methodology see https://kubernetes.io/docs/reference/using-api/server-side-apply/ + _, err = p.clientset.CoreV1().PersistentVolumeClaims(p.Spec.Namespace).Apply(ctx, pvc.WithName(names.PVC).WithLabels(labels).WithAnnotations(annotations), applyOptions) if err != nil { return nil, err } - // Create statefulset - sts.ObjectMeta.Name = names.StatefulSet - sts.ObjectMeta.Annotations["checksum/templates"] = p.templatesChecksum - p.addCommonLabels(opts.ProvisionID, sts.ObjectMeta.Labels) - _, err = p.clientset.AppsV1().StatefulSets(p.Spec.Namespace).Create(ctx, sts, metav1.CreateOptions{}) + _, err = p.clientset.AppsV1().Deployments(p.Spec.Namespace).Apply(ctx, depl.WithName(names.Deployment).WithLabels(labels).WithAnnotations(annotations), applyOptions) if err != nil { - err2 := p.Deprovision(ctx, opts.ProvisionID) - return nil, multierr.Combine(err, err2) + return nil, err } - // Create service - svc.ObjectMeta.Name = names.Service - p.addCommonLabels(opts.ProvisionID, svc.ObjectMeta.Labels) - _, err = p.clientset.CoreV1().Services(p.Spec.Namespace).Create(ctx, svc, metav1.CreateOptions{}) + _, err = p.clientset.CoreV1().Services(p.Spec.Namespace).Apply(ctx, svc.WithName(names.Service).WithLabels(labels).WithAnnotations(annotations), applyOptions) if err != nil { - err2 := p.Deprovision(ctx, opts.ProvisionID) - return nil, multierr.Combine(err, err2) + return nil, err } - // Create HTTP ingress - httpIng.ObjectMeta.Name = names.HTTPIngress - p.addCommonLabels(opts.ProvisionID, httpIng.ObjectMeta.Labels) - _, err = p.clientset.NetworkingV1().Ingresses(p.Spec.Namespace).Create(ctx, httpIng, metav1.CreateOptions{}) + _, err = p.clientset.NetworkingV1().Ingresses(p.Spec.Namespace).Apply(ctx, httpIng.WithName(names.HTTPIngress).WithLabels(labels).WithAnnotations(annotations), applyOptions) if err != nil { - err2 := p.Deprovision(ctx, opts.ProvisionID) - return nil, multierr.Combine(err, err2) + return nil, err } - // Create GRPC ingress - grpcIng.ObjectMeta.Name = names.GRPCIngress - p.addCommonLabels(opts.ProvisionID, grpcIng.ObjectMeta.Labels) - _, err = p.clientset.NetworkingV1().Ingresses(p.Spec.Namespace).Create(ctx, grpcIng, metav1.CreateOptions{}) + _, err = p.clientset.NetworkingV1().Ingresses(p.Spec.Namespace).Apply(ctx, grpcIng.WithName(names.GRPCIngress).WithLabels(labels).WithAnnotations(annotations), applyOptions) if err != nil { - err2 := p.Deprovision(ctx, opts.ProvisionID) - return nil, multierr.Combine(err, err2) + return nil, err } return &Allocation{ @@ -247,11 +256,14 @@ func (p *KubernetesProvisioner) Deprovision(ctx context.Context, provisionID str // Delete service err3 := p.clientset.CoreV1().Services(p.Spec.Namespace).Delete(ctx, names.Service, delOptions) - // Delete statefulset - err4 := p.clientset.AppsV1().StatefulSets(p.Spec.Namespace).Delete(ctx, names.StatefulSet, delOptions) + // Delete deployment + err4 := p.clientset.AppsV1().Deployments(p.Spec.Namespace).Delete(ctx, names.Deployment, delOptions) + + // Delete PVC + err5 := p.clientset.CoreV1().PersistentVolumeClaims(p.Spec.Namespace).Delete(ctx, names.PVC, delOptions) // We ignore not found errors for idempotency - errs := []error{err1, err2, err3, err4} + errs := []error{err1, err2, err3, err4, err5} for i := 0; i < len(errs); i++ { if k8serrs.IsNotFound(errs[i]) { errs[i] = nil @@ -266,13 +278,13 @@ func (p *KubernetesProvisioner) AwaitReady(ctx context.Context, provisionID stri // Get Kubernetes resource names names := p.getResourceNames(provisionID) - // Wait for the statefulset to be ready (with timeout) + // Wait for the deployment to be ready (with timeout) err := wait.PollUntilContextTimeout(ctx, time.Second, time.Duration(p.Spec.TimeoutSeconds)*time.Second, true, func(ctx context.Context) (done bool, err error) { - sts, err := p.clientset.AppsV1().StatefulSets(p.Spec.Namespace).Get(ctx, names.StatefulSet, metav1.GetOptions{}) + depl, err := p.clientset.AppsV1().Deployments(p.Spec.Namespace).Get(ctx, names.Deployment, metav1.GetOptions{}) if err != nil { return false, nil } - return sts.Status.AvailableReplicas > 0 && sts.Status.AvailableReplicas == sts.Status.Replicas && sts.Generation == sts.Status.ObservedGeneration, nil + return depl.Status.AvailableReplicas > 0 && depl.Status.AvailableReplicas == depl.Status.Replicas && depl.Generation == depl.Status.ObservedGeneration, nil }) if err != nil { return err @@ -297,32 +309,6 @@ func (p *KubernetesProvisioner) AwaitReady(ctx context.Context, provisionID stri return nil } -func (p *KubernetesProvisioner) Update(ctx context.Context, provisionID, newVersion string) error { - // Get Kubernetes resource names - names := p.getResourceNames(provisionID) - - // Update the statefulset with retry on conflict to resolve conflicting updates by other clients. - // More info on this best practice: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - // Retrieve the latest version - sts, err := p.clientset.AppsV1().StatefulSets(p.Spec.Namespace).Get(ctx, names.StatefulSet, metav1.GetOptions{}) - if err != nil { - return err - } - - // NOTE: this assumes only one container is defined in the statefulset template - sts.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:%s", p.Spec.Image, newVersion) - - // Attempt update - _, err = p.clientset.AppsV1().StatefulSets(p.Spec.Namespace).Update(ctx, sts, metav1.UpdateOptions{}) - return err - }) - if err != nil { - return err - } - return nil -} - func (p *KubernetesProvisioner) CheckCapacity(ctx context.Context) error { // No-op return nil @@ -332,14 +318,14 @@ func (p *KubernetesProvisioner) ValidateConfig(ctx context.Context, provisionID // Get Kubernetes resource names names := p.getResourceNames(provisionID) - // Get the statefulset - sts, err := p.clientset.AppsV1().StatefulSets(p.Spec.Namespace).Get(ctx, names.StatefulSet, metav1.GetOptions{}) + // Get the deployment + depl, err := p.clientset.AppsV1().Deployments(p.Spec.Namespace).Get(ctx, names.Deployment, metav1.GetOptions{}) if err != nil { return false, err } // Compare the provisioned templates checksum with the current one - if sts.ObjectMeta.Annotations["checksum/templates"] != p.templatesChecksum { + if depl.ObjectMeta.Annotations["checksum/templates"] != p.templatesChecksum { return false, nil } @@ -352,7 +338,8 @@ func (p *KubernetesProvisioner) Type() string { func (p *KubernetesProvisioner) getResourceNames(provisionID string) ResourceNames { return ResourceNames{ - StatefulSet: fmt.Sprintf("runtime-%s", provisionID), + Deployment: fmt.Sprintf("runtime-%s", provisionID), + PVC: fmt.Sprintf("runtime-%s", provisionID), Service: fmt.Sprintf("runtime-%s", provisionID), HTTPIngress: fmt.Sprintf("http-runtime-%s", provisionID), GRPCIngress: fmt.Sprintf("grpc-runtime-%s", provisionID), @@ -362,8 +349,3 @@ func (p *KubernetesProvisioner) getResourceNames(provisionID string) ResourceNam func (p *KubernetesProvisioner) getHost(provisionID string) string { return strings.ReplaceAll(p.Spec.Host, "*", provisionID) } - -func (p *KubernetesProvisioner) addCommonLabels(provisionID string, resourceLabels map[string]string) { - resourceLabels["app.kubernetes.io/instance"] = provisionID - resourceLabels["app.kubernetes.io/managed-by"] = "rill-cloud-admin" -} diff --git a/admin/provisioner/provisioner.go b/admin/provisioner/provisioner.go index 8c032e0a509..0c0aee9d67a 100644 --- a/admin/provisioner/provisioner.go +++ b/admin/provisioner/provisioner.go @@ -13,7 +13,6 @@ type Provisioner interface { Provision(ctx context.Context, opts *ProvisionOptions) (*Allocation, error) Deprovision(ctx context.Context, provisionID string) error AwaitReady(ctx context.Context, provisionID string) error - Update(ctx context.Context, provisionID string, newVersion string) error CheckCapacity(ctx context.Context) error ValidateConfig(ctx context.Context, provisionID string) (bool, error) Type() string diff --git a/admin/provisioner/static.go b/admin/provisioner/static.go index 07a8608e2e5..5d10a6b6373 100644 --- a/admin/provisioner/static.go +++ b/admin/provisioner/static.go @@ -125,11 +125,6 @@ func (p *StaticProvisioner) AwaitReady(ctx context.Context, provisionID string) return nil } -func (p *StaticProvisioner) Update(ctx context.Context, provisionID, newVersion string) error { - // No-op - return nil -} - func (p *StaticProvisioner) ValidateConfig(ctx context.Context, provisionID string) (bool, error) { // No-op return true, nil