Skip to content

Commit

Permalink
Resolve #650 fix can not stop prophet (#652)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 authored Jan 5, 2022
1 parent d72f278 commit 9b6d376
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 60 deletions.
52 changes: 33 additions & 19 deletions components/prophet/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ type asyncClient struct {
containerID uint64
adapter metadata.Adapter
id uint64
contexts sync.Map // id -> request
leaderConn goetty.IOSession

resetReadC chan string
Expand All @@ -113,6 +112,11 @@ type asyncClient struct {
sync.RWMutex
state int32
}

contextsMu struct {
sync.RWMutex
contexts map[uint64]*ctx
}
}

// NewClient create a prophet client
Expand All @@ -132,6 +136,7 @@ func NewClient(adapter metadata.Adapter, opts ...Option) Client {
c.opts.adjust()
c.stopper = stop.NewStopper("prophet-client", stop.WithLogger(c.opts.logger))
c.leaderConn = createConn(c.opts.logger)
c.contextsMu.contexts = make(map[uint64]*ctx)
c.start()
return c
}
Expand Down Expand Up @@ -592,8 +597,10 @@ func (c *asyncClient) do(ctx *ctx) error {
if !added {
ctx.req.ID = c.nextID()
if ctx.sync || ctx.cb != nil {
c.contexts.Store(ctx.req.ID, ctx)
c.contextsMu.Lock()
c.contextsMu.contexts[ctx.req.ID] = ctx
util.DefaultTimeoutWheel().Schedule(c.opts.rpcTimeout, c.timeout, ctx.req.ID)
c.contextsMu.Unlock()
}
added = true
}
Expand All @@ -612,9 +619,11 @@ func (c *asyncClient) do(ctx *ctx) error {
}

func (c *asyncClient) timeout(arg interface{}) {
if v, ok := c.contexts.Load(arg); ok {
c.contexts.Delete(arg)
v.(*ctx).done(nil, ErrTimeout)
c.contextsMu.RLock()
defer c.contextsMu.RUnlock()

if ctx, ok := c.contextsMu.contexts[arg.(uint64)]; ok {
ctx.done(nil, ErrTimeout)
}
}

Expand Down Expand Up @@ -667,12 +676,12 @@ OUTER:
for {
select {
case <-stopCtx.Done():
c.contexts.Range(func(key, value interface{}) bool {
if value != nil {
value.(*ctx).done(nil, ErrClosed)
}
return true
})
c.contextsMu.Lock()
for k, ctx := range c.contextsMu.contexts {
ctx.done(nil, ErrClosed)
delete(c.contextsMu.contexts, k)
}
c.contextsMu.Unlock()
return
case leader, ok := <-c.resetReadC:
if ok {
Expand Down Expand Up @@ -714,20 +723,25 @@ OUTER:
}

func (c *asyncClient) requestDoneWithRetry(resp *rpcpb.Response) {
v, ok := c.contexts.Load(resp.ID)
if ok && v != nil {
v.(*ctx).done(nil, util.ErrNotLeader)
c.contextsMu.Lock()
defer c.contextsMu.Unlock()

if ctx, ok := c.contextsMu.contexts[resp.ID]; ok {
delete(c.contextsMu.contexts, resp.ID)
ctx.done(nil, util.ErrNotLeader)
}
}

func (c *asyncClient) requestDone(resp *rpcpb.Response) {
v, ok := c.contexts.Load(resp.ID)
if ok && v != nil {
c.contexts.Delete(resp.ID)
c.contextsMu.Lock()
defer c.contextsMu.Unlock()

if ctx, ok := c.contextsMu.contexts[resp.ID]; ok {
delete(c.contextsMu.contexts, resp.ID)
if resp.Error != "" {
v.(*ctx).done(nil, errors.New(resp.Error))
ctx.done(nil, errors.New(resp.Error))
} else {
v.(*ctx).done(resp, nil)
ctx.done(resp, nil)
}
}
}
Expand Down
40 changes: 24 additions & 16 deletions components/prophet/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ func (c *RaftCluster) runBackgroundJobs(interval time.Duration) {
c.Lock()
close(c.createResourceC)
close(c.changedEvents)
c.createResourceC = nil
c.changedEvents = nil
c.Unlock()
c.logger.Info("background jobs has been stopped")
return
Expand Down Expand Up @@ -454,13 +456,15 @@ func (c *RaftCluster) HandleContainerHeartbeat(stats *metapb.ContainerStats) err
newContainer = newContainer.Clone(core.SetLastPersistTime(time.Now()))
}

c.changedEvents <- event.NewContainerEvent(newContainer.Meta)
c.addNotifyLocked(event.NewContainerEvent(newContainer.Meta))
}
if container := c.core.GetContainer(newContainer.Meta.ID()); container != nil {
c.hotStat.UpdateContainerHeartbeatMetrics(container)
}

c.core.PutContainer(newContainer)
c.changedEvents <- event.NewContainerStatsEvent(newContainer.GetContainerStats())
c.addNotifyLocked(event.NewContainerStatsEvent(newContainer.GetContainerStats()))

c.hotStat.Observe(newContainer.Meta.ID(), newContainer.GetContainerStats())
c.hotStat.UpdateTotalLoad(c.core.GetContainers())
c.hotStat.FilterUnhealthyContainer(c)
Expand Down Expand Up @@ -665,24 +669,20 @@ func (c *RaftCluster) processResourceHeartbeat(res *core.CachedResource) error {
}
c.RLock()
if saveKV || saveCache || isNew {
if c.changedEvents != nil {
if res.GetLeader().GetID() != 0 {
from := uint64(0)
if origin != nil {
from = origin.GetLeader().GetContainerID()
}
c.logger.Debug("notify resource leader changed",
zap.Uint64("resource", res.Meta.ID()),
zap.Uint64("from", from),
zap.Uint64("to", res.GetLeader().GetContainerID()))
if res.GetLeader().GetID() != 0 {
from := uint64(0)
if origin != nil {
from = origin.GetLeader().GetContainerID()
}
c.changedEvents <- event.NewResourceEvent(res.Meta, res.GetLeader().GetID(), false, false)
c.logger.Debug("notify resource leader changed",
zap.Uint64("resource", res.Meta.ID()),
zap.Uint64("from", from),
zap.Uint64("to", res.GetLeader().GetContainerID()))
}
c.addNotifyLocked(event.NewResourceEvent(res.Meta, res.GetLeader().GetID(), false, false))
}
if saveCache {
if c.changedEvents != nil {
c.changedEvents <- event.NewResourceStatsEvent(res.GetStat())
}
c.addNotifyLocked(event.NewResourceStatsEvent(res.GetStat()))
}
c.RUnlock()

Expand Down Expand Up @@ -1300,6 +1300,8 @@ func (c *RaftCluster) AllocID() (uint64, error) {

// ChangedEventNotifier changedEventNotifier
func (c *RaftCluster) ChangedEventNotifier() <-chan rpcpb.EventNotify {
c.RLock()
defer c.RUnlock()
return c.changedEvents
}

Expand Down Expand Up @@ -1606,3 +1608,9 @@ func (c *RaftCluster) JointConsensusEnabled() bool {
func (c *RaftCluster) GetResourceFactory() func() metadata.Resource {
return c.adapter.NewResource
}

func (c *RaftCluster) addNotifyLocked(event rpcpb.EventNotify) {
if c.changedEvents != nil {
c.changedEvents <- event
}
}
12 changes: 7 additions & 5 deletions components/prophet/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (c *RaftCluster) HandleRemoveResources(request *rpcpb.Request) (*rpcpb.Remo
c.core.AddRemovedResources(request.RemoveResources.IDs...)
for _, res := range origin {
res.SetState(metapb.ResourceState_Destroyed)
c.changedEvents <- event.NewResourceEvent(res, 0, true, false)
c.addNotifyLocked(event.NewResourceEvent(res, 0, true, false))
}

return &rpcpb.RemoveResourcesRsp{}, nil
Expand Down Expand Up @@ -391,15 +391,17 @@ func (c *RaftCluster) HandleGetScheduleGroupRule(request *rpcpb.Request) ([]meta
}

func (c *RaftCluster) triggerNotifyCreateResources() {
select {
case c.createResourceC <- struct{}{}:
default:
if c.createResourceC != nil {
select {
case c.createResourceC <- struct{}{}:
default:
}
}
}

func (c *RaftCluster) doNotifyCreateResources() {
c.core.ForeachWaittingCreateResources(func(res metadata.Resource) {
c.changedEvents <- event.NewResourceEvent(res, 0, false, true)
c.addNotifyLocked(event.NewResourceEvent(res, 0, false, true))
})
}

Expand Down
15 changes: 13 additions & 2 deletions components/prophet/election/election_leadship.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,13 @@ func (ls *Leadership) ChangeLeaderTo(newLeader string) error {

// Stop stop the current leadship
func (ls *Leadership) Stop() error {
ls.logger.Info("begin to stop")
var err error
lease := ls.GetLease()
if lease != nil {
err = lease.Close(ls.elector.client.Ctx())
}
ls.logger.Info("begin to stop stopper")
ls.stopper.Stop()
return err
}
Expand Down Expand Up @@ -162,12 +164,16 @@ func (ls *Leadership) ElectionLoop() {
func (ls *Leadership) doElectionLoop(ctx context.Context) {
ls.ctx = ctx
for {
ls.logger.Info("ready to next loop",
mainLoopFiled)
select {
case <-ctx.Done():
ls.logger.Info("loop exit due to context done",
mainLoopFiled)
return
default:
ls.logger.Info("ready to load current leader",
mainLoopFiled)
currentLeader, rev, err := ls.CurrentLeader()
if err != nil {
ls.logger.Error("fail to load current leader, retry later",
Expand Down Expand Up @@ -206,6 +212,8 @@ func (ls *Leadership) doElectionLoop(ctx context.Context) {
}

if ls.allowCampaign {
ls.logger.Info("start checkExpectLeader",
mainLoopFiled)
// check expect leader exists
err := ls.checkExpectLeader()
if err != nil {
Expand All @@ -215,14 +223,17 @@ func (ls *Leadership) doElectionLoop(ctx context.Context) {
time.Sleep(200 * time.Millisecond)
continue
}

ls.logger.Info("end checkExpectLeader, and start campaign",
mainLoopFiled)
if err = ls.campaign(); err != nil {
ls.logger.Error("fail to campaign leader",
mainLoopFiled,
zap.Error(err))
time.Sleep(time.Second * time.Duration(ls.elector.options.leaseSec))
continue
}
ls.logger.Info("end campaign",
mainLoopFiled)
}

time.Sleep(loopInterval)
Expand Down Expand Up @@ -345,7 +356,7 @@ func (ls *Leadership) campaign() error {
ls.logger.Info("exit due to client context done",
keepaliveField)
return errors.New("etcd client closed")
case <-ctx.Done():
case <-ls.ctx.Done():
ls.logger.Info("exit due to context done",
keepaliveField)
return nil
Expand Down
9 changes: 6 additions & 3 deletions components/prophet/prophet.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,21 @@ func (p *defaultProphet) Stop() {
p.trans.Stop()
p.logger.Info("transport stopped")

p.member.Stop()
p.logger.Info("member stopped")

p.cancel()
p.elector.Client().Close()
p.logger.Info("etcd client stopped")

p.member.Stop()
p.logger.Info("member stopped")

if p.etcd != nil {
p.etcd.Close()
}
p.logger.Info("etcd server stopped")

p.stopJobs()
p.logger.Info("job begin to stopped")

p.stopper.Stop()
p.logger.Info("prophet stopped")
})
Expand Down
38 changes: 23 additions & 15 deletions components/prophet/prophet_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package prophet

import (
"context"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/matrixorigin/matrixcube/components/prophet/cluster"
"github.com/matrixorigin/matrixcube/components/prophet/event"
"github.com/matrixorigin/matrixcube/components/prophet/pb/rpcpb"
"github.com/matrixorigin/matrixcube/util/stop"
"go.uber.org/zap"
)

Expand All @@ -49,14 +51,17 @@ type watcherNotifier struct {
logger *zap.Logger
watchers map[uint64]*watcherSession
cluster *cluster.RaftCluster
stopper *stop.Stopper
}

func newWatcherNotifier(cluster *cluster.RaftCluster, logger *zap.Logger) *watcherNotifier {
return &watcherNotifier{
wn := &watcherNotifier{
logger: log.Adjust(logger).Named("watch-notify"),
cluster: cluster,
watchers: make(map[uint64]*watcherSession),
}
wn.stopper = stop.NewStopper("event-notifier", stop.WithLogger(wn.logger))
return wn
}

func (wn *watcherNotifier) handleCreateWatcher(req *rpcpb.Request, resp *rpcpb.Response, session goetty.IOSession) error {
Expand Down Expand Up @@ -131,33 +136,36 @@ func (wn *watcherNotifier) doNotify(evt rpcpb.EventNotify) {
}

func (wn *watcherNotifier) start() {
go func() {
defer func() {
if err := recover(); err != nil {
wn.logger.Error("fail to notify event, restart later", zap.Any("error", err))
wn.start()
}
}()
wn.stopper.RunTask(context.Background(), func(ctx context.Context) {
eventC := wn.cluster.ChangedEventNotifier()
if eventC == nil {
wn.logger.Info("watcher notifer exit with nil event channel")
return
}

for {
evt, ok := <-wn.cluster.ChangedEventNotifier()
if !ok {
select {
case <-ctx.Done():
wn.logger.Info("watcher notifer exit")
return
case evt, ok := <-eventC:
if !ok {
wn.logger.Info("watcher notifer exit with channel closed")
return
}
wn.doNotify(evt)
}

wn.doNotify(evt)
}
}()
})
}

func (wn *watcherNotifier) stop() {
wn.Lock()
defer wn.Unlock()

for _, wt := range wn.watchers {
wn.doClearWatcherLocked(wt)
}
wn.watchers = nil
wn.logger.Info("watcher notifier stopped")
wn.Unlock()
wn.stopper.Stop()
}

0 comments on commit 9b6d376

Please sign in to comment.