Skip to content

Commit

Permalink
Fix data race (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 authored Sep 2, 2021
1 parent 5d8f81b commit c5955ae
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 35 deletions.
14 changes: 0 additions & 14 deletions application.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func (s *server) Stop() error {

s.listener.Close()
close(s.startCh)
s.closeAllSessions()
atomic.StoreInt32(&s.state, stateStopped)
return nil
}
Expand Down Expand Up @@ -183,19 +182,6 @@ func (s *server) Broadcast(msg interface{}) error {
return nil
}

func (s *server) closeAllSessions() {
for _, m := range s.sessions {
go func(m *sessionMap) {
m.Lock()
for _, rs := range m.sessions {
rs.Close()
delete(m.sessions, rs.ID())
}
m.Unlock()
}(m)
}
}

func (s *server) doStart() error {
s.startCh <- struct{}{}
var tempDelay time.Duration
Expand Down
9 changes: 0 additions & 9 deletions application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,6 @@ func TestStop(t *testing.T) {

assert.Equal(t, n, len(sessions))
assert.NoError(t, app.Stop())
time.Sleep(time.Second * 1)
total := 0
for _, m := range app.sessions {
total += len(m.sessions)
}
for _, s := range sessions {
assert.False(t, s.Connected())
}
assert.Equal(t, 0, total)
}

func newTestTCPApp(t *testing.T, handleFunc func(IOSession, interface{}, uint64) error, opts ...AppOption) NetApplication {
Expand Down
3 changes: 0 additions & 3 deletions buf/buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,6 @@ func (b *ByteBuf) Clear() {
b.readerIndex = 0
b.writerIndex = 0
b.markedIndex = 0

b.pool.Free(b.buf)
b.buf = b.pool.Alloc(b.capacity)
}

// Release release buf
Expand Down
27 changes: 18 additions & 9 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type baseIO struct {
attrs sync.Map
disableConnect bool
asyncQueue queue.Queue
stopWriteC chan struct{}
logger *zap.Logger
}

Expand Down Expand Up @@ -138,7 +139,7 @@ func (bio *baseIO) Connect(addr string, timeout time.Duration) (bool, error) {
return false, fmt.Errorf("the session is closing or connecting is other goroutine")
}

bio.reset()
bio.resetToRead()
conn, err := net.DialTimeout("tcp", addr, timeout)
if nil != err {
atomic.StoreInt32(&bio.state, stateReadyToConnect)
Expand Down Expand Up @@ -176,17 +177,13 @@ func (bio *baseIO) Close() error {
return fmt.Errorf("the session is closing or connecting is other goroutine")
}

bio.reset()
atomic.StoreInt32(&bio.state, stateReadyToConnect)

bio.stopWriteLoop()
bio.closeConn()
if bio.disableConnect {
bio.in.Release()
bio.out.Release()
}

if bio.opts.asyncWrite {
bio.asyncQueue.Put(stopFlag)
}
atomic.StoreInt32(&bio.state, stateReadyToConnect)
return nil
}

Expand Down Expand Up @@ -325,6 +322,13 @@ func (bio *baseIO) write(msg interface{}, flush bool) error {
return nil
}

func (bio *baseIO) stopWriteLoop() {
if bio.opts.asyncWrite {
bio.asyncQueue.Put(stopFlag)
<-bio.stopWriteC
}
}

func (bio *baseIO) writeLoop(q queue.Queue) {
defer q.Dispose()

Expand All @@ -337,6 +341,7 @@ func (bio *baseIO) writeLoop(q queue.Queue) {

for i := int64(0); i < n; i++ {
if items[i] == stopFlag {
close(bio.stopWriteC)
return
}

Expand Down Expand Up @@ -370,10 +375,13 @@ func (bio *baseIO) readFromConn(timeout time.Duration) (bool, interface{}, error
return bio.opts.decoder.Decode(bio.in)
}

func (bio *baseIO) reset() {
func (bio *baseIO) closeConn() {
if bio.conn != nil {
bio.conn.Close()
}
}

func (bio *baseIO) resetToRead() {
bio.in.Clear()
bio.out.Clear()
bio.remoteAddr = ""
Expand All @@ -396,6 +404,7 @@ func (bio *baseIO) initConn(conn net.Conn) {
bio.opts.connOptionFunc(bio.conn)
if bio.opts.asyncWrite {
bio.asyncQueue = queue.New(64)
bio.stopWriteC = make(chan struct{})
go bio.writeLoop(bio.asyncQueue)
}
atomic.StoreInt32(&bio.state, stateConnected)
Expand Down

0 comments on commit c5955ae

Please sign in to comment.