Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race when calling get metadata #562

Merged
merged 3 commits into from
Aug 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions internal/bft/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Proposer interface {
//
//go:generate mockery -dir . -name ProposerBuilder -case underscore -output ./mocks/
type ProposerBuilder interface {
NewProposer(leader, proposalSequence, viewNum, decisionsInView uint64, quorumSize int) Proposer
NewProposer(leader, proposalSequence, viewNum, decisionsInView uint64, quorumSize int) (Proposer, Phase)
}

// Controller controls the entire flow of the consensus
Expand Down Expand Up @@ -373,7 +373,7 @@ func (c *Controller) convertViewMessageToHeartbeat(m *protos.Message) *protos.Me
}

func (c *Controller) startView(proposalSequence uint64) {
view := c.ProposerBuilder.NewProposer(c.leaderID(), proposalSequence, c.currViewNumber, c.currDecisionsInView, c.quorum)
view, initPhase := c.ProposerBuilder.NewProposer(c.leaderID(), proposalSequence, c.currViewNumber, c.currDecisionsInView, c.quorum)

c.currViewLock.Lock()
c.currView = view
Expand All @@ -383,6 +383,12 @@ func (c *Controller) startView(proposalSequence uint64) {
role := Follower
leader, _ := c.iAmTheLeader()
if leader {
if initPhase == COMMITTED || initPhase == ABORT {
c.Logger.Debugf("Acquiring leader token when starting view with phase %s", initPhase.String())
c.acquireLeaderToken()
} else {
c.Logger.Debugf("Not acquiring leader token when starting view with phase %s", initPhase.String())
}
role = Leader
}
c.LeaderMonitor.ChangeRole(role, c.currViewNumber, c.leaderID())
Expand Down Expand Up @@ -414,10 +420,8 @@ func (c *Controller) changeView(newViewNumber uint64, newProposalSequence uint64
c.Logger.Debugf("Starting view after setting decisions in view to %d", newDecisionsInView)
c.startView(newProposalSequence)

// If I'm the leader, I can claim the leader token.
if iAm, _ := c.iAmTheLeader(); iAm {
c.Batcher.Reset()
c.acquireLeaderToken()
}
}

Expand Down Expand Up @@ -789,9 +793,6 @@ func (c *Controller) Start(startViewNumber uint64, startProposalSequence uint64,
c.currViewNumber = startViewNumber
c.currDecisionsInView = startDecisionsInView
c.startView(startProposalSequence)
if iAm, _ := c.iAmTheLeader(); iAm {
c.acquireLeaderToken()
}

go func() {
defer c.controllerDone.Done()
Expand Down
2 changes: 1 addition & 1 deletion internal/bft/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ func configureProposerBuilder(controller *bft.Controller) *atomic.Value {
pb.On("NewProposer", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(func(a uint64, b uint64, c uint64, d uint64, e int) bft.Proposer {
return createView(controller, a, b, c, d, e, vs)
})
}, bft.Phase(bft.COMMITTED))
controller.ProposerBuilder = pb
return vs
}
Expand Down
13 changes: 11 additions & 2 deletions internal/bft/mocks/proposer_builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/bft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ type ProposalMaker struct {
}

// NewProposer returns a new view
func (pm *ProposalMaker) NewProposer(leader, proposalSequence, viewNum, decisionsInView uint64, quorumSize int) Proposer {
func (pm *ProposalMaker) NewProposer(leader, proposalSequence, viewNum, decisionsInView uint64, quorumSize int) (proposer Proposer, phase Phase) {
view := &View{
RetrieveCheckpoint: pm.Checkpoint.Get,
DecisionsPerLeader: pm.DecisionsPerLeader,
Expand Down Expand Up @@ -330,7 +330,7 @@ func (pm *ProposalMaker) NewProposer(leader, proposalSequence, viewNum, decision
view.MetricsView.DecisionsInView.Set(float64(view.DecisionsInView))
view.MetricsView.Phase.Set(float64(view.Phase))

return view
return view, view.Phase
}

// ViewSequence indicates if a view is currently active and its current proposal sequence
Expand Down
17 changes: 17 additions & 0 deletions internal/bft/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ const (
ABORT
)

func (p Phase) String() string {
switch p {
case COMMITTED:
return "COMMITTED"
case PROPOSED:
return "PROPOSED"
case PREPARED:
return "PREPARED"
case ABORT:
return "ABORT"
default:
return "Invalid Phase"
}
}

// State can save and restore the state
//
//go:generate mockery -dir . -name State -case underscore -output ./mocks/
Expand Down Expand Up @@ -885,6 +900,8 @@ func (v *View) GetMetadata() []byte {
DecisionsInView: v.DecisionsInView,
}

v.Logger.Debugf("GetMetadata with view %d, seq %d, dec %d", metadata.ViewId, metadata.LatestSequence, metadata.DecisionsInView)

var (
prevSigs []*protos.Signature
prevProp *protos.Proposal
Expand Down
79 changes: 79 additions & 0 deletions test/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,85 @@ func TestLeaderCatchUpWithoutSync(t *testing.T) {
}
}

func TestLeaderProposeAfterRestartWithoutSync(t *testing.T) {
t.Parallel()
network := NewNetwork()
defer network.Shutdown()

testDir, err := os.MkdirTemp("", t.Name())
assert.NoErrorf(t, err, "generate temporary test dir")
defer os.RemoveAll(testDir)

numberOfNodes := 4
nodes := make([]*App, 0)
for i := 1; i <= numberOfNodes; i++ {
n := newNode(uint64(i), network, t.Name(), testDir, false, 0)
n.Consensus.Config.SyncOnStart = false
nodes = append(nodes, n)
}

restartWG := sync.WaitGroup{}
restartWG.Add(1)

restoredWG := sync.WaitGroup{}
restoredWG.Add(1)

contViewWG := sync.WaitGroup{}
contViewWG.Add(2)

baseLogger := nodes[0].logger.Desugar()
nodes[0].logger = baseLogger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error {
if strings.Contains(entry.Message, "Processed prepares for proposal with seq 1") {
restartWG.Done()
}
if strings.Contains(entry.Message, "Restored proposal with sequence 1") {
restoredWG.Done()
}
if strings.Contains(entry.Message, "processed commits for proposal with seq 1") {
contViewWG.Wait()
}
if strings.Contains(entry.Message, "GetMetadata with view 0, seq 1") {
contViewWG.Done()
}
if strings.Contains(entry.Message, "Not acquiring leader token when starting view with phase PREPARED") {
contViewWG.Done()
}
if strings.Contains(entry.Message, "Expected proposal sequence 2 but got 1") {
panic("Expected proposal sequence 2 but got 1")
}
return nil
})).Sugar()
nodes[0].Setup()

startNodes(nodes, network)

nodes[0].Submit(Request{ID: "1", ClientID: "alice"})

restartWG.Wait()
nodes[0].RestartSync(false)
restoredWG.Wait()

nodes[0].Submit(Request{ID: "2", ClientID: "alice"})

data := make([]*AppRecord, 0)
for i := 0; i < numberOfNodes; i++ {
d := <-nodes[i].Delivered
data = append(data, d)
}
for i := 0; i < numberOfNodes-1; i++ {
assert.Equal(t, data[i], data[i+1])
}

data = make([]*AppRecord, 0)
for i := 0; i < numberOfNodes; i++ {
d := <-nodes[i].Delivered
data = append(data, d)
}
for i := 0; i < numberOfNodes-1; i++ {
assert.Equal(t, data[i], data[i+1])
}
}

func TestGradualStart(t *testing.T) {
// Scenario: initially the network has only one node
// a transaction is submitted and committed with that node
Expand Down