Skip to content

Commit

Permalink
refactor br cli backoff logic
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Jul 15, 2024
1 parent 5b5915b commit bc730cc
Show file tree
Hide file tree
Showing 19 changed files with 172 additions and 207 deletions.
4 changes: 2 additions & 2 deletions br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,12 @@ func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endK

type RetryAndSplitRequestEnv struct {
Env
GetBackoffer func() utils.Backoffer
GetBackoffer func() utils.BackoffStrategy
}

func (r RetryAndSplitRequestEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
rs := utils.ConstantBackoff(10 * time.Second)
bo := utils.Backoffer(rs)
bo := utils.BackoffStrategy(rs)
if r.GetBackoffer != nil {
bo = r.GetBackoffer()
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func TestRetryEnv(t *testing.T) {
return nil
}
ms := RetryAndSplitRequestEnv{Env: tms}
ms.GetBackoffer = func() utils.Backoffer {
ms.GetBackoffer = func() utils.BackoffStrategy {
o := utils.InitialRetryState(2, 0, 0)
return &o
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func startBackup(
}
return nil
})
}, utils.NewBackupSSTBackoffer())
}, utils.NewBackupSSTBackoffStrategy())
})
}
return eg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ func (r *CheckpointRunner[K, V]) getTS(ctx context.Context) (int64, int64, error
}

return nil
}, utils.NewPDReqBackoffer())
}, utils.NewAggressivePDBackoffStrategy())

return p, l, errors.Trace(errRetry)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func GetAllTiKVStoresWithRetry(ctx context.Context,

return errors.Trace(err)
},
utils.NewPDReqBackoffer(),
utils.NewAggressivePDBackoffStrategy(),
)

return stores, errors.Trace(errRetry)
Expand Down Expand Up @@ -412,7 +412,7 @@ func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func
return err
}
return nil
}, utils.NewPDReqBackoffer())
}, utils.NewAggressivePDBackoffStrategy())
if err != nil {
// if one store failed, break and return error
return err
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/log_client/import_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,10 +624,10 @@ func TestRetryRecognizeErrCode(t *testing.T) {
return errors.Trace(e)
}
return nil
}, utils.NewBackoffer(10, waitTime, maxWaitTime, utils.NewErrorContext("download sst", 3)))
}, utils.NewBackoffStrategy(10, waitTime, maxWaitTime, utils.NewErrorContext("download sst", 3)))
outer++
return errors.Trace(e)
}, utils.NewBackoffer(10, waitTime, maxWaitTime, utils.NewErrorContext("import sst", 3)))
}, utils.NewBackoffStrategy(10, waitTime, maxWaitTime, utils.NewErrorContext("import sst", 3)))
require.Equal(t, 10, outer)
require.Equal(t, 100, inner)
}
2 changes: 1 addition & 1 deletion br/pkg/restore/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func GetTSWithRetry(ctx context.Context, pdClient pd.Client) (uint64, error) {
log.Warn("failed to get TS, retry it", zap.Uint("retry time", retry), logutil.ShortError(getTSErr))
}
return getTSErr
}, utils.NewPDReqBackoffer())
}, utils.NewAggressivePDBackoffStrategy())

if err != nil {
log.Error("failed to get TS", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (rc *SnapClient) ResetTS(ctx context.Context, pdCtrl *pdutil.PdController)
log.Info("reset pd timestamp", zap.Uint64("ts", restoreTS))
return utils.WithRetry(ctx, func() error {
return pdCtrl.ResetTS(ctx, restoreTS)
}, utils.NewPDReqBackoffer())
}, utils.NewAggressivePDBackoffStrategy())
}

// GetDatabases returns all databases.
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/restore/snap_client/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (importer *SnapFileImporter) ImportSSTFiles(
summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes)
}
return nil
}, utils.NewImportSSTBackoffer())
}, utils.NewImportSSTBackoffStrategy())
if err != nil {
log.Error("import sst file failed after retry, stop the whole progress", logutil.Files(files), zap.Error(err))
return errors.Trace(err)
Expand Down Expand Up @@ -511,7 +511,7 @@ func (importer *SnapFileImporter) download(
}

return nil
}, utils.NewDownloadSSTBackoffer())
}, utils.NewDownloadSSTBackoffStrategy())

return downloadMetas, errDownload
}
Expand Down Expand Up @@ -610,7 +610,7 @@ func (importer *SnapFileImporter) downloadSST(
}
var err error
var resp *import_sstpb.DownloadResponse
resp, err = utils.WithRetryV2(ectx, utils.NewDownloadSSTBackoffer(), func(ctx context.Context) (*import_sstpb.DownloadResponse, error) {
resp, err = utils.WithRetryV2(ectx, utils.NewDownloadSSTBackoffStrategy(), func(ctx context.Context) (*import_sstpb.DownloadResponse, error) {
dctx, cancel := context.WithTimeout(ctx, gRPCTimeOut)
defer cancel()
return importer.importClient.DownloadSST(dctx, peer.GetStoreId(), req)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ func (c *pdClient) SetStoresLabel(
return nil
}

func (c *pdClient) scatterRegionsSequentially(ctx context.Context, newRegions []*RegionInfo, backoffer utils.Backoffer) {
func (c *pdClient) scatterRegionsSequentially(ctx context.Context, newRegions []*RegionInfo, backoffer utils.BackoffStrategy) {
newRegionSet := make(map[uint64]*RegionInfo, len(newRegions))
for _, newRegion := range newRegions {
newRegionSet[newRegion.Region.Id] = newRegion
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func NewBackoffMayNotCountBackoffer() *BackoffMayNotCountBackoffer {
}
}

// NextBackoff implements utils.Backoffer. For BackoffMayNotCountBackoffer, only
// NextBackoff implements utils.BackoffStrategy. For BackoffMayNotCountBackoffer, only
// ErrBackoff and ErrBackoffAndDontCount is meaningful.
func (b *BackoffMayNotCountBackoffer) NextBackoff(err error) time.Duration {
if errors.ErrorEqual(err, ErrBackoff) {
Expand All @@ -289,7 +289,7 @@ func (b *BackoffMayNotCountBackoffer) NextBackoff(err error) time.Duration {
return 0
}

// Attempt implements utils.Backoffer.
// Attempt implements utils.BackoffStrategy.
func (b *BackoffMayNotCountBackoffer) Attempt() int {
return b.state.Attempt()
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (cx *AdaptEnvForSnapshotBackupContext) Close() {
cx.kvMgr.Close()
}

func (cx *AdaptEnvForSnapshotBackupContext) GetBackOffer(operation string) utils.Backoffer {
func (cx *AdaptEnvForSnapshotBackupContext) GetBackOffer(operation string) utils.BackoffStrategy {
state := utils.InitialRetryState(64, 1*time.Second, 10*time.Second)
bo := utils.GiveUpRetryOn(&state, berrors.ErrPossibleInconsistency)
bo = utils.VerboseRetry(bo, logutil.CL(cx).With(zap.String("operation", operation)))
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,7 @@ func getMaxReplica(ctx context.Context, mgr *conn.Mgr) (cnt uint64, err error) {
err = utils.WithRetry(ctx, func() error {
resp, err = mgr.GetPDHTTPClient().GetReplicateConfig(ctx)
return err
}, utils.NewPDReqBackoffer())
}, utils.NewAggressivePDBackoffStrategy())
if err != nil {
return 0, errors.Trace(err)
}
Expand All @@ -1233,7 +1233,7 @@ func getStores(ctx context.Context, mgr *conn.Mgr) (stores *http.StoresInfo, err
err = utils.WithRetry(ctx, func() error {
stores, err = mgr.GetPDHTTPClient().GetStores(ctx)
return err
}, utils.NewPDReqBackoffer())
}, utils.NewAggressivePDBackoffStrategy())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1332,7 +1332,7 @@ func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File,
}
}
return nil
}, utils.NewDiskCheckBackoffer())
}, utils.NewDiskCheckBackoffStrategy())
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
}
return nil
},
utils.NewPDReqBackofferExt(),
utils.NewConservativePDBackoffStrategy(),
)
restoreNumStores := len(allStores)
if restoreNumStores != numStores {
Expand Down
Loading

0 comments on commit bc730cc

Please sign in to comment.