Skip to content
This repository has been archived by the owner on Nov 7, 2022. It is now read-only.

Commit

Permalink
interceptor/opencensus: record number of received spans
Browse files Browse the repository at this point in the history
Instrument interceptor/opencensus with stats to record
the number of received spans. This is accomplished by
a helper to process spans that sends over spans even
if the number of spans is 0. Also record with tag_key
"opencensus_interceptor" whose value is the name
of the respective interceptor.

The test to ensure that 0 length spans are also added
is currently disabled because:
* Issue census-instrumentation/opencensus-go#862
is not yet implemented which requests that the OpenCensus-Go
stats worker provide a method Flush to flush all data. Without
it, metrics will contain stale data from previous tests.

Add tests to lock-in this behavior.

Updates #63
  • Loading branch information
odeke-em committed Oct 5, 2018
1 parent d91241e commit f299e24
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 6 deletions.
212 changes: 212 additions & 0 deletions interceptor/opencensus/observability_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright 2018, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ocinterceptor_test

import (
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"

"contrib.go.opencensus.io/exporter/ocagent"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
"github.com/census-instrumentation/opencensus-service/interceptor/opencensus"
"github.com/census-instrumentation/opencensus-service/internal"
)

// Ensure that if we add a metrics exporter that our target metrics
// will be recorded but also with the proper tag keys and values.
// See Issue https://github.com/census-instrumentation/opencensus-service/issues/63
func TestEnsureRecordedMetrics(t *testing.T) {
sappender := newSpanAppender()

_, port, doneFn := ocInterceptorOnGRPCServer(t, sappender, ocinterceptor.WithSpanBufferPeriod(2*time.Millisecond))
defer doneFn()

// Now the opencensus-agent exporter.
oce, err := ocagent.NewExporter(ocagent.WithPort(uint16(port)), ocagent.WithInsecure())
if err != nil {
t.Fatalf("Failed to create the ocagent-exporter: %v", err)
}
trace.RegisterExporter(oce)
defer func() {
oce.Stop()
trace.UnregisterExporter(oce)
}()

// Now for the stats exporter
if err := view.Register(internal.AllViews...); err != nil {
t.Fatalf("Failed to register all views: %v", err)
}
defer view.Unregister(internal.AllViews...)

metricsReportingPeriod := 5 * time.Millisecond
view.SetReportingPeriod(metricsReportingPeriod)
// On exit, revert the metrics reporting period.
defer view.SetReportingPeriod(60 * time.Second)

cme := newCountMetricsExporter()
view.RegisterExporter(cme)
defer view.UnregisterExporter(cme)

n := 20
// Now it is time to send over some spans
// and we'll count the numbers received.
for i := 0; i < n; i++ {
now := time.Now().UTC()
oce.ExportSpan(&trace.SpanData{
StartTime: now.Add(-10 * time.Second),
EndTime: now.Add(20 * time.Second),
SpanContext: trace.SpanContext{
TraceID: trace.TraceID{byte(0x20 + i), 0x4E, 0x4D, 0x4C, 0x4B, 0x4A, 0x49, 0x48, 0x47, 0x46, 0x45, 0x44, 0x43, 0x42, 0x41},
SpanID: trace.SpanID{0x7F, 0x7E, 0x7D, 0x7C, 0x7B, 0x7A, 0x79, 0x78},
TraceOptions: trace.TraceOptions(i & 0x01),
},
ParentSpanID: trace.SpanID{byte(0x01 + i), 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37},
Name: fmt.Sprintf("Span-%d", i),
Status: trace.Status{Code: trace.StatusCodeInternal, Message: "Blocked by firewall"},
})
}

// Give them some time to be exported.
// say n * metricsReportingPeriod
<-time.After(time.Duration(n) * metricsReportingPeriod)
oce.Flush()

checkCountMetricsExporterResults(t, cme, n, 1)
}

func TestEnsureRecordedMetrics_zeroLengthSpansSender(t *testing.T) {
t.Skipf("Currently disabled, enable this test when the following are fixed:\nIssue %s\nPR %s",
"https://github.com/census-instrumentation/opencensus-go/issues/862",
"https://github.com/census-instrumentation/opencensus-go/pull/922",
)
sappender := newSpanAppender()

_, port, doneFn := ocInterceptorOnGRPCServer(t, sappender, ocinterceptor.WithSpanBufferPeriod(2*time.Millisecond))
defer doneFn()

// Now the opencensus-agent exporter.
oce, err := ocagent.NewExporter(ocagent.WithPort(uint16(port)), ocagent.WithInsecure())
if err != nil {
t.Fatalf("Failed to create the ocagent-exporter: %v", err)
}
trace.RegisterExporter(oce)
defer func() {
oce.Stop()
trace.UnregisterExporter(oce)
}()

// Now for the stats exporter
if err := view.Register(internal.AllViews...); err != nil {
t.Fatalf("Failed to register all views: %v", err)
}
defer view.Unregister(internal.AllViews...)

metricsReportingPeriod := 10 * time.Millisecond
view.SetReportingPeriod(metricsReportingPeriod)
// On exit, revert the metrics reporting period.
defer view.SetReportingPeriod(60 * time.Second)

cme := newCountMetricsExporter()
view.RegisterExporter(cme)
defer view.UnregisterExporter(cme)

n := 20
// Now for the traceExporter that sends 0 length spans
traceSvcClient, traceSvcDoneFn, err := makeTraceServiceClient(port)
if err != nil {
t.Fatalf("Failed to create the trace service client: %v", err)
}
defer traceSvcDoneFn()
for i := 0; i <= n; i++ {
_ = traceSvcClient.Send(&agenttracepb.ExportTraceServiceRequest{Spans: nil, Node: &commonpb.Node{}})
}
<-time.After(time.Duration(n) * metricsReportingPeriod)
checkCountMetricsExporterResults(t, cme, n, 0)
}

func checkCountMetricsExporterResults(t *testing.T, cme *countMetricsExporter, n int, wantAllCountsToBe int64) {
cme.mu.Lock()
defer cme.mu.Unlock()

// The only tags that we are expecting are "opencensus_interceptor": "opencensus" * n
wantTagKey, _ := tag.NewKey("opencensus_interceptor")
valuesPlusBlank := strings.Split(strings.Repeat("opencensus,opencensus,", n/2), ",")
wantValues := valuesPlusBlank[:len(valuesPlusBlank)-1]
wantTags := map[tag.Key][]string{
wantTagKey: wantValues,
}

gotTags := cme.tags
if !reflect.DeepEqual(gotTags, wantTags) {
t.Errorf("\nGotTags:\n\t%#v\n\nWantTags:\n\t%#v\n", gotTags, wantTags)
}

// The only data types we are expecting are:
// * DistributionData
for key, aggregation := range cme.data {
switch agg := aggregation.(type) {
case *view.DistributionData:
if g, w := agg.Count, int64(1); g != w {
t.Errorf("Data point #%d GotCount %d Want %d", key, g, w)
}
default:
t.Errorf("Data point #%d Got %T want %T", key, agg, (*view.DistributionData)(nil))
}
}
}

type countMetricsExporter struct {
mu sync.Mutex
tags map[tag.Key][]string
data map[int]view.AggregationData
}

func newCountMetricsExporter() *countMetricsExporter {
return &countMetricsExporter{
tags: make(map[tag.Key][]string),
data: make(map[int]view.AggregationData),
}
}

func (cme *countMetricsExporter) clear() {
cme.mu.Lock()
defer cme.mu.Unlock()

cme.data = make(map[int]view.AggregationData)
cme.tags = make(map[tag.Key][]string)
}

var _ view.Exporter = (*countMetricsExporter)(nil)

func (cme *countMetricsExporter) ExportView(vd *view.Data) {
cme.mu.Lock()
defer cme.mu.Unlock()

for _, row := range vd.Rows {
cme.data[len(cme.data)] = row.Data
for _, tag_ := range row.Tags {
cme.tags[tag_.Key] = append(cme.tags[tag_.Key], tag_.Value)
}
}
}
22 changes: 16 additions & 6 deletions interceptor/opencensus/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ocinterceptor

import (
"context"
"errors"
"time"

Expand All @@ -23,6 +24,7 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/spanreceiver"
)

Expand Down Expand Up @@ -88,20 +90,28 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err
return errTraceExportProtocolViolation
}

var lastNonNilNode *commonpb.Node
spansMetricsFn := internal.NewReceivedSpansRecorder(context.Background(), "opencensus")

processReceivedSpans := func(ni *commonpb.Node, spans []*tracepb.Span) {
// Firstly, we'll add them to the bundler.
if len(recv.Spans) > 0 {
bundlerPayload := &spansAndNode{node: ni, spans: recv.Spans}
traceBundler.Add(bundlerPayload, len(bundlerPayload.spans))
}

// We MUST Unconditionally record metrics from this reception.
spansMetricsFn(ni, recv.Spans)
}

var lastNonNilNode *commonpb.Node
// Now that we've got the first message with a Node, we can start to receive streamed up spans.
for {
// If a Node has been sent from downstream, save and use it.
if recv.Node != nil {
lastNonNilNode = recv.Node
}

// Otherwise add them to the bundler.
if len(recv.Spans) > 0 {
bundlerPayload := &spansAndNode{node: lastNonNilNode, spans: recv.Spans}
traceBundler.Add(bundlerPayload, len(bundlerPayload.spans))
}
processReceivedSpans(lastNonNilNode, recv.Spans)

recv, err = tes.Recv()
if err != nil {
Expand Down
60 changes: 60 additions & 0 deletions internal/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2018, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

// This file contains helpers that are useful to add observability
// with metrics and tracing using OpenCensus to the various pieces
// of the service.

import (
"context"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
)

var tagKeyInterceptorName, _ = tag.NewKey("opencensus_interceptor")
var mReceivedSpans = stats.Int64("oc.io/interceptor/received_spans", "Counts the number of spans received by the interceptor", "1")

var ViewReceivedSpansInterceptor = &view.View{
Name: "oc.io/interceptor/received_spans",
Description: "The number of spans received by the interceptor",
Measure: mReceivedSpans,
Aggregation: view.Distribution(
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 30, 35, 40, 45, 50, 60, 70, 80, 90,
100, 150, 200, 250, 300, 450, 500, 600, 700, 800, 900, 1000, 1200, 1400, 1600, 1800, 2000,
),
TagKeys: []tag.Key{tagKeyInterceptorName},
}

var AllViews = []*view.View{
ViewReceivedSpansInterceptor,
}

// NewReceivedSpansRecorder creates a function that uses a context created
// from the name of the interceptor to record the number of the spans received
// by the interceptor.
func NewReceivedSpansRecorder(parentCtx context.Context, interceptorName string) func(*commonpb.Node, []*tracepb.Span) {
ctx, _ := tag.New(parentCtx, tag.Upsert(tagKeyInterceptorName, interceptorName))
// TODO: (@odeke-em) perhaps also record information from the node?

return func(ni *commonpb.Node, spans []*tracepb.Span) {
stats.Record(ctx, mReceivedSpans.M(int64(len(spans))))
}
}

0 comments on commit f299e24

Please sign in to comment.