Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add PrettyPrinter to print backup progress #470

Merged
merged 5 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 54 additions & 53 deletions pitr/cli/internal/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,20 +345,35 @@ func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus {
dataNodeMap[dn.IP] = dn
}

pw := prettyoutput.NewPW(totalNum)
pw := prettyoutput.NewProgressPrinter(prettyoutput.ProgressPrintOption{
NumTrackersExpected: totalNum,
})

go pw.Render()
for idx := 0; idx < totalNum; idx++ {
sn := lsBackup.SsBackup.StorageNodes[idx]

for i := 0; i < totalNum; i++ {
sn := lsBackup.SsBackup.StorageNodes[i]
as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort))
dn := dataNodeMap[sn.IP]
go checkStatus(as, sn, dn, dnCh, pw)
backupInfo := &model.BackupInfo{}
task := &backuptask{
As: as,
Sn: sn,
Dn: dn,
DnCh: dnCh,
Backup: backupInfo,
retries: defaultShowDetailRetryTimes,
}
tracker := &progress.Tracker{
Message: fmt.Sprintf("Checking backup status # %s:%d", sn.IP, sn.Port),
Total: 0,
Units: progress.UnitsDefault,
}
pw.AppendTracker(tracker)
go pw.UpdateProgress(tracker, task.checkProgress)
}

// wait for all data node backup finished
time.Sleep(time.Millisecond * 100)
for pw.IsRenderInProgress() {
time.Sleep(time.Millisecond * 100)
}
pw.BlockedRendered()

close(dnCh)

Expand Down Expand Up @@ -388,62 +403,48 @@ func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus {
return backupFinalStatus
}

func checkStatus(as pkg.IAgentServer, sn *model.StorageNode, dn *model.DataNode, dnCh chan *model.DataNode, pw progress.Writer) {
var (
// mark check status is done, time ticker should break.
done = make(chan struct{})
// time ticker, try to doCheck request every 2 seconds.
ticker = time.Tick(time.Second * 2)
// progress bar.
tracker = progress.Tracker{Message: fmt.Sprintf("Checking backup status # %s:%d", sn.IP, sn.Port), Total: 0, Units: progress.UnitsDefault}
)

pw.AppendTracker(&tracker)
type backuptask struct {
As pkg.IAgentServer
Sn *model.StorageNode
Dn *model.DataNode
DnCh chan *model.DataNode

for !tracker.IsDone() {
select {
case <-done:
return
case <-ticker:
status, err := doCheck(as, sn, dn.BackupID, defaultShowDetailRetryTimes)
if err != nil {
tracker.MarkAsErrored()
dn.Status = status
dn.EndTime = timeutil.Now().String()
dnCh <- dn
done <- struct{}{}
}
if status == model.SsBackupStatusCompleted || status == model.SsBackupStatusFailed {
tracker.MarkAsDone()
dn.Status = status
dn.EndTime = timeutil.Now().String()
dnCh <- dn
done <- struct{}{}
}
}
}
Backup *model.BackupInfo
retries int
}

func doCheck(as pkg.IAgentServer, sn *model.StorageNode, backupID string, retries int) (status model.BackupStatus, err error) {
func (t *backuptask) checkProgress() (bool, error) {
var err error
in := &model.ShowDetailIn{
DBPort: sn.Port,
DBName: sn.Database,
Username: sn.Username,
Password: sn.Password,
DnBackupID: backupID,
DBPort: t.Sn.Port,
DBName: t.Sn.Database,
Username: t.Sn.Username,
Password: t.Sn.Password,
DnBackupID: t.Dn.BackupID,
DnBackupPath: BackupPath,
Instance: defaultInstance,
}
backupInfo, err := as.ShowDetail(in)

t.Backup, err = t.As.ShowDetail(in)
if err != nil {
if retries == 0 {
return model.SsBackupStatusCheckError, err
if t.retries == 0 {
t.Dn.Status = model.SsBackupStatusCheckError
t.DnCh <- t.Dn
return false, err
}
time.Sleep(time.Second * 1)
return doCheck(as, sn, backupID, retries-1)
t.retries--
return t.checkProgress()
}

return backupInfo.Status, nil
t.Dn.Status = t.Backup.Status
t.Dn.EndTime = timeutil.Now().String()

if t.Backup.Status == model.SsBackupStatusCompleted || t.Backup.Status == model.SsBackupStatusFailed {
t.DnCh <- t.Dn
return true, nil
}
return false, nil
}

type deleteMode int
Expand Down
43 changes: 32 additions & 11 deletions pitr/cli/internal/cmd/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,42 +41,63 @@ var _ = Describe("Backup", func() {
sn = &model.StorageNode{
IP: "127.0.0.1",
}

task = &backuptask{}
)
BeforeEach(func() {
ctrl = gomock.NewController(GinkgoT())
as = mock_pkg.NewMockIAgentServer(ctrl)
task = &backuptask{
As: as,
Sn: sn,
Dn: &model.DataNode{},
DnCh: make(chan *model.DataNode, 2),

Backup: &model.BackupInfo{},
}
})
AfterEach(func() {
ctrl.Finish()
})

It("agent server return err", func() {
as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("timeout"))
status, err := doCheck(as, sn, "", 0)
It("mock agent server return err", func() {
as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("mock agent timeout"))

finished, err := task.checkProgress()
Expect(err).To(HaveOccurred())
Expect(status).To(Equal(model.SsBackupStatusCheckError))
Expect(finished).To(BeFalse())
Expect(task.Dn.Status).To(Equal(model.SsBackupStatusCheckError))
})

It("mock agent server and return failed status", func() {
as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusFailed}, nil)
status, err := doCheck(as, sn, "", 0)
finished, err := task.checkProgress()
Expect(err).ToNot(HaveOccurred())
Expect(status).To(Equal(model.SsBackupStatusFailed))
Expect(finished).To(BeTrue())
Expect(task.Backup.Status).To(Equal(model.SsBackupStatusFailed))
})

It("mock agent server and return completed status", func() {
as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil)
status, err := doCheck(as, sn, "", 0)

finished, err := task.checkProgress()
Expect(err).ToNot(HaveOccurred())
Expect(status).To(Equal(model.SsBackupStatusCompleted))
Expect(finished).To(BeTrue())
Expect(task.Backup.Status).To(Equal(model.SsBackupStatusCompleted))
})

It("mock agent server and return check err first time and then success", func() {
as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("timeout"))
as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("mock agent timeout"))
finished, err := task.checkProgress()
Expect(err).To(HaveOccurred())
Expect(finished).To(BeFalse())
Expect(task.Dn.Status).To(Equal(model.SsBackupStatusCheckError))

as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil)
status, err := doCheck(as, sn, "", 1)
finished, err = task.checkProgress()
Expect(err).ToNot(HaveOccurred())
Expect(status).To(Equal(model.SsBackupStatusCompleted))
Expect(finished).To(BeTrue())
Expect(task.Backup.Status).To(Equal(model.SsBackupStatusCompleted))
})
})

Expand Down
67 changes: 66 additions & 1 deletion pitr/cli/pkg/prettyoutput/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,83 @@
package prettyoutput

import (
"time"

"github.com/jedib0t/go-pretty/v6/progress"
)

func NewPW(totalNum int) progress.Writer {
pw := progress.NewWriter()

pw.SetTrackerLength(25)
pw.SetAutoStop(true)
pw.SetNumTrackersExpected(totalNum)
pw.SetSortBy(progress.SortByPercentDsc)
pw.SetTrackerPosition(progress.PositionRight)

style := progress.StyleDefault
style.Options.PercentIndeterminate = "running"
pw.SetStyle(style)
pw.SetTrackerPosition(progress.PositionRight)

return pw
}

type ProgressPrintOption struct {
NumTrackersExpected int
}

type ProgressPrinter struct {
progress.Writer
}

func NewProgressPrinter(opt ProgressPrintOption) *ProgressPrinter {
p := &ProgressPrinter{
Writer: progress.NewWriter(),
}

// passed printer options
p.SetNumTrackersExpected(opt.NumTrackersExpected)

// default printer options
p.SetTrackerLength(25)
p.SetAutoStop(true)
p.SetSortBy(progress.SortByPercentDsc)
p.SetTrackerPosition(progress.PositionRight)
style := progress.StyleDefault
style.Options.PercentIndeterminate = "running"
p.SetStyle(style)

return p
}

func (p *ProgressPrinter) BlockedRendered() {
time.Sleep(time.Millisecond * 100)
for p.IsRenderInProgress() {
time.Sleep(time.Millisecond * 100)
}
}

func (p *ProgressPrinter) UpdateProgress(tracker *progress.Tracker, updateF func() (bool, error)) {
var (
done = make(chan struct{})
ticker = time.NewTicker(time.Second * 2)
)

for !tracker.IsDone() {
select {
case <-done:
return
case <-ticker.C:
finished, err := updateF()
if err != nil {
tracker.MarkAsErrored()
done <- struct{}{}
}

if finished {
tracker.MarkAsDone()
done <- struct{}{}
}
}
}
}
Loading