Skip to content

Commit

Permalink
br: migrate restore checkpoint data to the downstream cluster tempora…
Browse files Browse the repository at this point in the history
…ry database (#56075)

close #55870
  • Loading branch information
Leavrth authored Sep 24, 2024
1 parent ae95f2f commit 5894833
Show file tree
Hide file tree
Showing 20 changed files with 1,322 additions and 667 deletions.
11 changes: 10 additions & 1 deletion br/pkg/checkpoint/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,30 @@ go_library(
srcs = [
"backup.go",
"checkpoint.go",
"external_storage.go",
"log_restore.go",
"restore.go",
"storage.go",
"ticker.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/checkpoint",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/glue",
"//br/pkg/logutil",
"//br/pkg/metautil",
"//br/pkg/pdutil",
"//br/pkg/rtree",
"//br/pkg/storage",
"//br/pkg/summary",
"//br/pkg/utils",
"//pkg/domain",
"//pkg/kv",
"//pkg/meta/model",
"//pkg/parser/model",
"//pkg/util",
"//pkg/util/sqlexec",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
Expand All @@ -38,11 +45,13 @@ go_test(
srcs = ["checkpoint_test.go"],
flaky = True,
race = "on",
shard_count = 5,
shard_count = 6,
deps = [
":checkpoint",
"//br/pkg/gluetidb",
"//br/pkg/pdutil",
"//br/pkg/storage",
"//br/pkg/utiltest",
"//pkg/meta/model",
"//pkg/parser/model",
"@com_github_pingcap_kvproto//pkg/brpb",
Expand Down
18 changes: 9 additions & 9 deletions br/pkg/checkpoint/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ func StartCheckpointBackupRunnerForTest(
tick time.Duration,
timer GlobalTimer,
) (*CheckpointRunner[BackupKeyType, BackupValueType], error) {
runner := newCheckpointRunner[BackupKeyType, BackupValueType](
ctx, storage, cipher, timer, flushPositionForBackup(), valueMarshalerForBackup)

err := runner.initialLock(ctx)
checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer)
if err != nil {
return nil, errors.Annotate(err, "Failed to initialize checkpoint lock.")
return nil, errors.Trace(err)
}
runner := newCheckpointRunner[BackupKeyType, BackupValueType](
checkpointStorage, cipher, valueMarshalerForBackup)

runner.startCheckpointMainLoop(ctx, tick, tick, tick)
return runner, nil
}
Expand All @@ -74,13 +74,13 @@ func StartCheckpointRunnerForBackup(
cipher *backuppb.CipherInfo,
timer GlobalTimer,
) (*CheckpointRunner[BackupKeyType, BackupValueType], error) {
runner := newCheckpointRunner[BackupKeyType, BackupValueType](
ctx, storage, cipher, timer, flushPositionForBackup(), valueMarshalerForBackup)

err := runner.initialLock(ctx)
checkpointStorage, err := newExternalCheckpointStorage(ctx, storage, timer)
if err != nil {
return nil, errors.Trace(err)
}
runner := newCheckpointRunner[BackupKeyType, BackupValueType](
checkpointStorage, cipher, valueMarshalerForBackup)

runner.startCheckpointMainLoop(
ctx,
defaultTickDurationForFlush,
Expand Down
Loading

0 comments on commit 5894833

Please sign in to comment.