Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition when multiple nodes reconcile S3 snapshots #10979

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions pkg/etcd/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (c *Client) SnapshotRetention(ctx context.Context, retention int, prefix st
logrus.Infof("Removing S3 snapshot: s3://%s/%s", c.etcdS3.Bucket, df.Key)

key := path.Base(df.Key)
if err := c.DeleteSnapshot(ctx, key); err != nil {
if err := c.DeleteSnapshot(ctx, key); err != nil && !snapshot.IsNotExist(err) {
return deleted, err
}
deleted = append(deleted, key)
Expand All @@ -431,14 +431,27 @@ func (c *Client) DeleteSnapshot(ctx context.Context, key string) error {
defer cancel()

key = path.Join(c.etcdS3.Folder, key)
err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{})
if err == nil || snapshot.IsNotExist(err) {
metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key))
if merr := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); merr != nil && !snapshot.IsNotExist(merr) {
err = merr
_, err := c.mc.StatObject(ctx, c.etcdS3.Bucket, key, minio.StatObjectOptions{})
if err == nil {
if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{}); err != nil {
return err
}
}

// check for and try to delete the metadata regardless of whether or not the
// snapshot existed, just to ensure that things are cleaned up in the case of
// ephemeral errors. Metadata delete errors are only exposed if the object
// exists and fails to delete.
metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key))
_, merr := c.mc.StatObject(ctx, c.etcdS3.Bucket, metadataKey, minio.StatObjectOptions{})
if merr == nil {
if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); err != nil {
return err
}
}

// return error from snapshot StatObject call, so that callers can determine
// if the object was actually deleted or not by checking for a NotFound error.
return err
}

Expand Down
43 changes: 32 additions & 11 deletions pkg/etcd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"runtime"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -41,6 +42,7 @@ import (

const (
errorTTL = 24 * time.Hour
s3ReconcileTTL = time.Minute
snapshotListPageSize = 20
)

Expand Down Expand Up @@ -363,7 +365,7 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
}
}

return res, e.ReconcileSnapshotData(ctx)
return res, e.reconcileSnapshotData(ctx, res)
}

// listLocalSnapshots provides a list of the currently stored
Expand Down Expand Up @@ -464,7 +466,7 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) (*managed.SnapshotResult, err
res.Deleted = append(res.Deleted, deleted...)
}
}
return res, e.ReconcileSnapshotData(ctx)
return res, e.reconcileSnapshotData(ctx, res)
}

// ListSnapshots returns a list of snapshots. Local snapshots are always listed,
Expand Down Expand Up @@ -555,7 +557,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) (*manage
}
}

return res, e.ReconcileSnapshotData(ctx)
return res, e.reconcileSnapshotData(ctx, res)
}

func (e *ETCD) deleteSnapshot(snapshotPath string) error {
Expand Down Expand Up @@ -647,9 +649,17 @@ func (e *ETCD) emitEvent(esf *k3s.ETCDSnapshotFile) {
}

// ReconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources.
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to list S3 snapshots
// and reconcile snapshots from S3.
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to
// list S3 snapshots and reconcile snapshots from S3.
func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
return e.reconcileSnapshotData(ctx, nil)
}

// reconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources.
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to
// list S3 snapshots and reconcile snapshots from S3. Any snapshots listed in the Deleted field of
// the provided SnapshotResult are deleted, even if they are within a retention window.
func (e *ETCD) reconcileSnapshotData(ctx context.Context, res *managed.SnapshotResult) error {
// make sure the core.Factory is initialized. There can
// be a race between this core code startup.
for e.config.Runtime.Core == nil {
Expand Down Expand Up @@ -726,6 +736,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return snapshots.List(opts) }))
snapshotPager.PageSize = snapshotListPageSize
now := time.Now().Round(time.Second)

// List all snapshots matching the selector
// If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync.
Expand All @@ -742,10 +753,20 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
// exists in both and names match, don't need to sync
delete(snapshotFiles, sfKey)
} else {
// doesn't exist on disk - if it's an error that hasn't expired yet, leave it, otherwise remove it
if esf.Status.Error != nil && esf.Status.Error.Time != nil {
// doesn't exist on disk/s3
if res != nil && slices.Contains(res.Deleted, esf.Spec.SnapshotName) {
// snapshot has been intentionally deleted, skip checking for expiration
} else if esf.Status.Error != nil && esf.Status.Error.Time != nil {
expires := esf.Status.Error.Time.Add(errorTTL)
if time.Now().Before(expires) {
if now.Before(expires) {
// it's an error that hasn't expired yet, leave it
return nil
}
} else if esf.Spec.S3 != nil {
expires := esf.ObjectMeta.CreationTimestamp.Add(s3ReconcileTTL)
if now.Before(expires) {
// it's an s3 snapshot that's only just been created, leave it to prevent a race condition
// when multiple nodes are uploading snapshots at the same time.
return nil
}
}
Expand All @@ -754,6 +775,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
} else {
logrus.Debugf("Key %s not found in snapshotFile list", sfKey)
}
// otherwise remove it
logrus.Infof("Deleting ETCDSnapshotFile for %s", esf.Spec.SnapshotName)
if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil {
logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err)
Expand Down Expand Up @@ -817,18 +839,17 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
}

// Update our Node object to note the timestamp of the snapshot storages that have been reconciled
now := time.Now().Round(time.Second).Format(time.RFC3339)
patch := []map[string]string{
{
"op": "add",
"value": now,
"value": now.Format(time.RFC3339),
"path": "/metadata/annotations/" + strings.ReplaceAll(annotationLocalReconciled, "/", "~1"),
},
}
if e.config.EtcdS3 != nil {
patch = append(patch, map[string]string{
"op": "add",
"value": now,
"value": now.Format(time.RFC3339),
"path": "/metadata/annotations/" + strings.ReplaceAll(annotationS3Reconciled, "/", "~1"),
})
}
Expand Down
Loading