Skip to content

Commit

Permalink
add io session aware
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed Aug 3, 2021
1 parent 2afe9bb commit 3c588c9
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 10 deletions.
6 changes: 6 additions & 0 deletions application.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,13 @@ func (s *server) doStart() error {
defer func() {
s.deleteSession(rs)
rs.Close()
if s.opts.aware != nil {
s.opts.aware.Closed(rs)
}
}()
if s.opts.aware != nil {
s.opts.aware.Created(rs)
}
s.doConnection(rs)
}()
}
Expand Down
16 changes: 16 additions & 0 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,22 @@ const (
DefaultWriteBuf = 256
)

// IOSessionAware io session aware
type IOSessionAware interface {
// Created session created
Created(IOSession)
//Closed session closed
Closed(IOSession)
}

// AppOption application option
type AppOption func(*appOptions)

type appOptions struct {
sessionOpts *options
sessionBucketSize uint64
errorMsgFactory func(IOSession, interface{}, error) interface{}
aware IOSessionAware
}

// WithAppSessionOptions set the number of maps to store session
Expand All @@ -41,6 +50,13 @@ func WithAppSessionBucketSize(value uint64) AppOption {
}
}

// WithAppSessionBucketSize set the app session aware
func WithAppSessionAware(aware IOSessionAware) AppOption {
return func(opts *appOptions) {
opts.aware = aware
}
}

// WithAppErrorMsgFactory set function to process error, closed the client session if this field not set
func WithAppErrorMsgFactory(value func(IOSession, interface{}, error) interface{}) AppOption {
return func(opts *appOptions) {
Expand Down
32 changes: 22 additions & 10 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,33 @@ var (

// IOSession session
type IOSession interface {
// ID sessino id
ID() uint64
// Connect connect to address, only used at client-side
Connect(addr string, timeout time.Duration) (bool, error)
// Close close the session
Close() error
// Connected returns true if connection is ok
Connected() bool
// Read read packet from connection
Read() (interface{}, error)
// Write write packet to connection out buffer
Write(msg interface{}) error
// WriteAndFlush write packet to connection out buffer and flush the out buffer
WriteAndFlush(msg interface{}) error
// Flush flush the out buffer
Flush() error
// InBuf connection read buffer
InBuf() *buf.ByteBuf
// OutBuf connection out buffer
OutBuf() *buf.ByteBuf
// SetAttr set attr
SetAttr(key string, value interface{})
// GetAttr read attr
GetAttr(key string) interface{}
// RemoteAddr returns remote address, include ip and port
RemoteAddr() string
// RemoteIP returns remote address, only ip
RemoteIP() string
}

Expand Down Expand Up @@ -201,15 +215,13 @@ func (bio *baseIO) Read() (interface{}, error) {
}

if complete {
break
}
}
if bio.in.Readable() == 0 {
bio.in.Clear()
}

if bio.in.Readable() == 0 {
bio.in.Clear()
return msg, nil
}
}

return msg, nil
}
}

Expand Down Expand Up @@ -245,7 +257,7 @@ func (bio *baseIO) Flush() error {
break
}

if 0 != bio.opts.writeTimeout {
if bio.opts.writeTimeout != 0 {
bio.conn.SetWriteDeadline(time.Now().Add(bio.opts.writeTimeout))
} else {
bio.conn.SetWriteDeadline(time.Time{})
Expand Down Expand Up @@ -314,7 +326,7 @@ func (bio *baseIO) write(msg interface{}, flush bool) error {
func (bio *baseIO) writeLoop(q queue.Queue) {
defer q.Dispose()

items := make([]interface{}, bio.opts.asyncFlushBatch, bio.opts.asyncFlushBatch)
items := make([]interface{}, bio.opts.asyncFlushBatch)
for {
n, err := q.Get(bio.opts.asyncFlushBatch, items)
if nil != err {
Expand All @@ -338,7 +350,7 @@ func (bio *baseIO) writeLoop(q queue.Queue) {
}

func (bio *baseIO) readFromConn(timeout time.Duration) (bool, interface{}, error) {
if 0 != timeout {
if timeout != 0 {
bio.conn.SetReadDeadline(time.Now().Add(timeout))
} else {
bio.conn.SetReadDeadline(time.Time{})
Expand Down

0 comments on commit 3c588c9

Please sign in to comment.