Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Fix evictedQueue memory leak #1203

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions trace/evictedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,45 @@
package trace

type evictedQueue struct {
queue []interface{}
ringQueue []interface{}
capacity int
droppedCount int
writeIdx int
readIdx int
startRead bool
}

func newEvictedQueue(capacity int) *evictedQueue {
eq := &evictedQueue{
capacity: capacity,
queue: make([]interface{}, 0),
capacity: capacity,
ringQueue: make([]interface{}, 0),
}

return eq
}

func (eq *evictedQueue) add(value interface{}) {
if len(eq.queue) == eq.capacity {
eq.queue = eq.queue[1:]
eq.droppedCount++
if len(eq.ringQueue) < eq.capacity {
eq.ringQueue = append(eq.ringQueue, value)
return
}
eq.queue = append(eq.queue, value)

eq.ringQueue[eq.writeIdx] = value
eq.droppedCount++
eq.writeIdx++
eq.writeIdx %= eq.capacity
eq.readIdx = eq.writeIdx
}

// Do not add more item after use readNext
func (eq *evictedQueue) readNext() interface{} {
if eq.startRead && eq.readIdx == eq.writeIdx {
return nil
}

eq.startRead = true
res := eq.ringQueue[eq.readIdx]
eq.readIdx++
eq.readIdx %= eq.capacity
return res
}
76 changes: 40 additions & 36 deletions trace/evictedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,48 @@ import (
func init() {
}

func TestAdd(t *testing.T) {
q := newEvictedQueue(3)
q.add("value1")
q.add("value2")
if wantLen, gotLen := 2, len(q.queue); wantLen != gotLen {
t.Errorf("got queue length %d want %d", gotLen, wantLen)
}
}
func TestAddAndReadNext(t *testing.T) {
t.Run("len(ringQueue) < capacity", func(t *testing.T) {
values := []string{"value1", "value2"}
capacity := 3
q := newEvictedQueue(capacity)

func (eq *evictedQueue) queueToArray() []string {
arr := make([]string, 0)
for _, value := range eq.queue {
arr = append(arr, value.(string))
}
return arr
}
for _, value := range values {
q.add(value)
}

gotValues := make([]string, len(q.ringQueue))
for i := 0; i < len(gotValues); i++ {
gotValues[i] = q.readNext().(string)
}

if !reflect.DeepEqual(values, gotValues) {
t.Errorf("got array = %#v; want %#v", gotValues, values)
}
})
t.Run("dropped count", func(t *testing.T) {
values := []string{"value1", "value2", "value3", "value1", "value4", "value1", "value3", "value1", "value4"}
wantValues := []string{"value3", "value1", "value4"}
capacity := 3
wantDroppedCount := len(values) - capacity

q := newEvictedQueue(capacity)

for _, value := range values {
q.add(value)
}

func TestDropCount(t *testing.T) {
q := newEvictedQueue(3)
q.add("value1")
q.add("value2")
q.add("value3")
q.add("value1")
q.add("value4")
if wantLen, gotLen := 3, len(q.queue); wantLen != gotLen {
t.Errorf("got queue length %d want %d", gotLen, wantLen)
}
if wantDropCount, gotDropCount := 2, q.droppedCount; wantDropCount != gotDropCount {
t.Errorf("got drop count %d want %d", gotDropCount, wantDropCount)
}
wantArr := []string{"value3", "value1", "value4"}
gotArr := q.queueToArray()
gotValues := make([]string, len(wantValues))
for i := 0; i < len(gotValues); i++ {
gotValues[i] = q.readNext().(string)
}

if wantLen, gotLen := len(wantArr), len(gotArr); gotLen != wantLen {
t.Errorf("got array len %d want %d", gotLen, wantLen)
}
if !reflect.DeepEqual(wantValues, gotValues) {
t.Errorf("got array = %#v; want %#v", gotValues, wantValues)
}

if !reflect.DeepEqual(gotArr, wantArr) {
t.Errorf("got array = %#v; want %#v", gotArr, wantArr)
}
if wantDroppedCount != q.droppedCount {
t.Errorf("got dropped count %d want %d", q.droppedCount, wantDroppedCount)
}
})
}
24 changes: 12 additions & 12 deletions trace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,15 +300,15 @@ func (s *Span) makeSpanData() *SpanData {
sd.Attributes = s.lruAttributesToAttributeMap()
sd.DroppedAttributeCount = s.lruAttributes.droppedCount
}
if len(s.annotations.queue) > 0 {
if len(s.annotations.ringQueue) > 0 {
sd.Annotations = s.interfaceArrayToAnnotationArray()
sd.DroppedAnnotationCount = s.annotations.droppedCount
}
if len(s.messageEvents.queue) > 0 {
if len(s.messageEvents.ringQueue) > 0 {
sd.MessageEvents = s.interfaceArrayToMessageEventArray()
sd.DroppedMessageEventCount = s.messageEvents.droppedCount
}
if len(s.links.queue) > 0 {
if len(s.links.ringQueue) > 0 {
sd.Links = s.interfaceArrayToLinksArray()
sd.DroppedLinkCount = s.links.droppedCount
}
Expand Down Expand Up @@ -345,25 +345,25 @@ func (s *Span) SetStatus(status Status) {
}

func (s *Span) interfaceArrayToLinksArray() []Link {
linksArr := make([]Link, 0, len(s.links.queue))
for _, value := range s.links.queue {
linksArr = append(linksArr, value.(Link))
linksArr := make([]Link, len(s.links.ringQueue))
for i := 0; i < len(linksArr); i++ {
linksArr[i] = s.links.readNext().(Link)
}
return linksArr
}

func (s *Span) interfaceArrayToMessageEventArray() []MessageEvent {
messageEventArr := make([]MessageEvent, 0, len(s.messageEvents.queue))
for _, value := range s.messageEvents.queue {
messageEventArr = append(messageEventArr, value.(MessageEvent))
messageEventArr := make([]MessageEvent, len(s.messageEvents.ringQueue))
for i := 0; i < len(messageEventArr); i++ {
messageEventArr[i] = s.messageEvents.readNext().(MessageEvent)
}
return messageEventArr
}

func (s *Span) interfaceArrayToAnnotationArray() []Annotation {
annotationArr := make([]Annotation, 0, len(s.annotations.queue))
for _, value := range s.annotations.queue {
annotationArr = append(annotationArr, value.(Annotation))
annotationArr := make([]Annotation, len(s.annotations.ringQueue))
for i := 0; i < len(annotationArr); i++ {
annotationArr[i] = s.annotations.readNext().(Annotation)
}
return annotationArr
}
Expand Down