diff --git a/pkg/etcd/s3/s3.go b/pkg/etcd/s3/s3.go index c1158a0bc3b2..39ffecd2bc2e 100644 --- a/pkg/etcd/s3/s3.go +++ b/pkg/etcd/s3/s3.go @@ -416,7 +416,7 @@ func (c *Client) SnapshotRetention(ctx context.Context, retention int, prefix st logrus.Infof("Removing S3 snapshot: s3://%s/%s", c.etcdS3.Bucket, df.Key) key := path.Base(df.Key) - if err := c.DeleteSnapshot(ctx, key); err != nil { + if err := c.DeleteSnapshot(ctx, key); err != nil && !snapshot.IsNotExist(err) { return deleted, err } deleted = append(deleted, key) @@ -431,14 +431,27 @@ func (c *Client) DeleteSnapshot(ctx context.Context, key string) error { defer cancel() key = path.Join(c.etcdS3.Folder, key) - err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{}) - if err == nil || snapshot.IsNotExist(err) { - metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key)) - if merr := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); merr != nil && !snapshot.IsNotExist(merr) { - err = merr + _, err := c.mc.StatObject(ctx, c.etcdS3.Bucket, key, minio.StatObjectOptions{}) + if err == nil { + if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{}); err != nil { + return err } } + // check for and try to delete the metadata regardless of whether or not the + // snapshot existed, just to ensure that things are cleaned up in the case of + // ephemeral errors. Metadata delete errors are only exposed if the object + // exists and fails to delete. + metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key)) + _, merr := c.mc.StatObject(ctx, c.etcdS3.Bucket, metadataKey, minio.StatObjectOptions{}) + if merr == nil { + if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); err != nil { + return err + } + } + + // return error from snapshot StatObject call, so that callers can determine + // if the object was actually deleted or not by checking for a NotFound error. return err } diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index f2999f093080..8f844981f5d9 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "runtime" + "slices" "sort" "strconv" "strings" @@ -41,6 +42,7 @@ import ( const ( errorTTL = 24 * time.Hour + s3ReconcileTTL = time.Minute snapshotListPageSize = 20 ) @@ -363,7 +365,7 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) { } } - return res, e.ReconcileSnapshotData(ctx) + return res, e.reconcileSnapshotData(ctx, res) } // listLocalSnapshots provides a list of the currently stored @@ -464,7 +466,7 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) (*managed.SnapshotResult, err res.Deleted = append(res.Deleted, deleted...) } } - return res, e.ReconcileSnapshotData(ctx) + return res, e.reconcileSnapshotData(ctx, res) } // ListSnapshots returns a list of snapshots. Local snapshots are always listed, @@ -555,7 +557,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) (*manage } } - return res, e.ReconcileSnapshotData(ctx) + return res, e.reconcileSnapshotData(ctx, res) } func (e *ETCD) deleteSnapshot(snapshotPath string) error { @@ -647,9 +649,17 @@ func (e *ETCD) emitEvent(esf *k3s.ETCDSnapshotFile) { } // ReconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources. -// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to list S3 snapshots -// and reconcile snapshots from S3. +// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to +// list S3 snapshots and reconcile snapshots from S3. func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { + return e.reconcileSnapshotData(ctx, nil) +} + +// reconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources. +// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to +// list S3 snapshots and reconcile snapshots from S3. Any snapshots listed in the Deleted field of +// the provided SnapshotResult are deleted, even if they are within a retention window. +func (e *ETCD) reconcileSnapshotData(ctx context.Context, res *managed.SnapshotResult) error { // make sure the core.Factory is initialized. There can // be a race between this core code startup. for e.config.Runtime.Core == nil { @@ -726,6 +736,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile() snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return snapshots.List(opts) })) snapshotPager.PageSize = snapshotListPageSize + now := time.Now().Round(time.Second) // List all snapshots matching the selector // If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync. @@ -742,10 +753,20 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { // exists in both and names match, don't need to sync delete(snapshotFiles, sfKey) } else { - // doesn't exist on disk - if it's an error that hasn't expired yet, leave it, otherwise remove it - if esf.Status.Error != nil && esf.Status.Error.Time != nil { + // doesn't exist on disk/s3 + if res != nil && slices.Contains(res.Deleted, esf.Spec.SnapshotName) { + // snapshot has been intentionally deleted, skip checking for expiration + } else if esf.Status.Error != nil && esf.Status.Error.Time != nil { expires := esf.Status.Error.Time.Add(errorTTL) - if time.Now().Before(expires) { + if now.Before(expires) { + // it's an error that hasn't expired yet, leave it + return nil + } + } else if esf.Spec.S3 != nil { + expires := esf.ObjectMeta.CreationTimestamp.Add(s3ReconcileTTL) + if now.Before(expires) { + // it's an s3 snapshot that's only just been created, leave it to prevent a race condition + // when multiple nodes are uploading snapshots at the same time. return nil } } @@ -754,6 +775,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { } else { logrus.Debugf("Key %s not found in snapshotFile list", sfKey) } + // otherwise remove it logrus.Infof("Deleting ETCDSnapshotFile for %s", esf.Spec.SnapshotName) if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil { logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err) @@ -817,18 +839,17 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { } // Update our Node object to note the timestamp of the snapshot storages that have been reconciled - now := time.Now().Round(time.Second).Format(time.RFC3339) patch := []map[string]string{ { "op": "add", - "value": now, + "value": now.Format(time.RFC3339), "path": "/metadata/annotations/" + strings.ReplaceAll(annotationLocalReconciled, "/", "~1"), }, } if e.config.EtcdS3 != nil { patch = append(patch, map[string]string{ "op": "add", - "value": now, + "value": now.Format(time.RFC3339), "path": "/metadata/annotations/" + strings.ReplaceAll(annotationS3Reconciled, "/", "~1"), }) }