Skip to content

Commit

Permalink
Merge branch 'open-telemetry:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
amitschendel authored Oct 20, 2024
2 parents fbb3913 + 0a6fc45 commit 64c3f2c
Show file tree
Hide file tree
Showing 16 changed files with 327 additions and 134 deletions.
86 changes: 86 additions & 0 deletions design-docs/159-collector-api/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Public API to run the agent as a collector receiver

## Meta

- **Author(s)**: @dmathieu
- **Start Date**: September 18 2024
- **Goal End Date**:
- **Primary Reviewers**: @open-telemetry/ebpf-profiler-maintainers

## Problem

As we move forward with the OpenTelemetry Collector being able to handle
profiles, we also want the profiling agent to be able to run as a collector
receiver.
See [PR #87](https://github.com/open-telemetry/opentelemetry-ebpf-profiler/pull/87)

We also intend to provide a collector distribution that will bundle the agent,
which most folks will want to use.

To support use cases where a custom built distribution is needed, we must provide
a stable public API that allows for a receiver to be built and integrated in a distribution.

This design document aims to describe the public API we will be exposing.

## Success Criteria

We define a clear and concise public API for building custom distributions of the collector with ebpf profiling enabled.

### Scope

This document describes what the API will look like.

Like any other receiver (see the
[filelogreceiver](https://pkg.go.dev/github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver)
as an example), there is a single method we need to expose: `NewFactory`, which
returns a [receiver
Factory](https://pkg.go.dev/go.opentelemetry.io/collector/receiver#Factory)
that can handle profiles.

Since the scope of this API is so small, the main question is in which package
the API should be located.

## Proposed Solutions

We will expose the `NewFactory` method as part of a `collector` package within
the ebpf repository, meaning importing and using it will be as follows:

```golang
package main

import (
"github.com/open-telemetry/opentelemetry-ebpf-profiler/collector"
)

func main() {
factory := collector.NewFactory()
// Use the factory
}
```

The intent behind making the factory behind a subpackage as opposed to the root
package of the repository is to provide a better separation of concerns between
packages.

This subpackage would not be its own module. Since the vision is that the agent
should always run as a collector receiver, doing so is probably unnecessary.

## Alternatives

Having the receiver in the root package:

```golang
package main

import (
agent "github.com/open-telemetry/opentelemetry-ebpf-profiler"
)

func main() {
factory := agent.NewFactory()
// Use the factory
}
```

This approach would have the advantage of clarifying that the factory is the
root of the agent running as a collector receiver.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/cilium/ebpf v0.16.0
github.com/elastic/go-freelru v0.13.0
github.com/elastic/go-perf v0.0.0-20191212140718-9c656876f595
github.com/elastic/go-perf v0.0.0-20241016160959-1342461adb4a
github.com/google/uuid v1.6.0
github.com/jsimonetti/rtnetlink v1.4.2
github.com/klauspost/compress v1.17.9
Expand All @@ -24,7 +24,7 @@ require (
golang.org/x/arch v0.10.0
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
golang.org/x/sync v0.8.0
golang.org/x/sys v0.25.0
golang.org/x/sys v0.26.0
google.golang.org/grpc v1.66.2
)

Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-freelru v0.13.0 h1:TKKY6yCfNNNky7Pj9xZAOEpBcdNgZJfihEftOb55omg=
github.com/elastic/go-freelru v0.13.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I=
github.com/elastic/go-perf v0.0.0-20191212140718-9c656876f595 h1:q8n4QjcLa4q39Q3fqHRknTBXBtegjriHFrB42YKgXGI=
github.com/elastic/go-perf v0.0.0-20191212140718-9c656876f595/go.mod h1:s09U1b4P1ZxnKx2OsqY7KlHdCesqZWIhyq0Gs/QC/Us=
github.com/elastic/go-perf v0.0.0-20241016160959-1342461adb4a h1:ymmtaN4bVCmKKeu4XEf6JEWNZKRXPMng1zjpKd+8rCU=
github.com/elastic/go-perf v0.0.0-20241016160959-1342461adb4a/go.mod h1:Nt+pnRYvf0POC+7pXsrv8ubsEOSsaipJP0zlz1Ms1RM=
github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI=
github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
Expand Down Expand Up @@ -107,8 +107,8 @@ golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 h1:+rdxYoE3E5htTEWIe15GlN6IfvbURM//Jt0mmkmm6ZU=
Expand Down
5 changes: 2 additions & 3 deletions interpreter/perl/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ func newData(ebpf interpreter.EbpfHandler, info *interpreter.LoaderInfo,
version := perlVersion(verBytes[0], verBytes[1], verBytes[2])
log.Debugf("Perl version %v.%v.%v", verBytes[0], verBytes[1], verBytes[2])

// Currently tested and supported 5.28.x - 5.38.x.
// Currently tested and supported 5.28.x - 5.40.x.
// Could possibly support older Perl versions somewhere back to 5.14-5.20, by just
// checking the introspection offset validity. 5.14 had major rework for internals.
// And 5.18 had some HV related changes.
minVer := perlVersion(5, 28, 0)
maxVer := perlVersion(5, 39, 0)
maxVer := perlVersion(5, 41, 0)
if version < minVer || version >= maxVer {
return nil, fmt.Errorf("unsupported Perl %d.%d.%d (need >= %d.%d and < %d.%d)",
verBytes[0], verBytes[1], verBytes[2],
Expand Down Expand Up @@ -288,7 +288,6 @@ func newData(ebpf interpreter.EbpfHandler, info *interpreter.LoaderInfo,
}

if version >= perlVersion(5, 35, 0) {
vms.xpvhv_aux.xhv_name_count = 0x3c
vms.xpvhv_with_aux.xpvhv_aux = 0x20
}

Expand Down
5 changes: 2 additions & 3 deletions interpreter/perl/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,12 @@ func (i *perlInstance) getHVName(hvAddr libpf.Address) (string, error) {
}

xpvhvAddr := npsr.Ptr(hv, vms.sv.sv_any)
end := i.rm.Uint64(xpvhvAddr + libpf.Address(vms.xpvhv.xhv_max))

xpvhvAux := make([]byte, vms.xpvhv_aux.sizeof)
if i.d.version < perlVersion(5, 35, 0) {
// The aux structure is at the end of the array. Calculate its address.
arrayAddr := npsr.Ptr(hv, vms.sv.svu_hash)
xpvhvAuxAddr := arrayAddr + libpf.Address((end+1)*8)
end := i.rm.Uint64(xpvhvAddr + libpf.Address(vms.xpvhv.xhv_max))
xpvhvAuxAddr := arrayAddr + libpf.Address((end+1)*uint64(vms.xpvhv_aux.pointer_size))
if err := i.rm.Read(xpvhvAuxAddr, xpvhvAux); err != nil {
return "", err
}
Expand Down
15 changes: 12 additions & 3 deletions interpreter/python/python.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ func Loader(ebpf interpreter.EbpfHandler, info *interpreter.LoaderInfo) (interpr
version := pythonVer(major, minor)

minVer := pythonVer(3, 6)
maxVer := pythonVer(3, 12)
maxVer := pythonVer(3, 13)
if version < minVer || version > maxVer {
return nil, fmt.Errorf("unsupported Python %d.%d (need >= %d.%d and <= %d.%d)",
major, minor,
Expand Down Expand Up @@ -753,8 +753,8 @@ func Loader(ebpf interpreter.EbpfHandler, info *interpreter.LoaderInfo) (interpr
// The Python main interpreter loop history in CPython git is:
//
//nolint:lll
// deaf509e8fc v3.11 2022-11-15 _PyEval_EvalFrameDefault(PyThreadState*,_PyInterpreterFrame*,int)
// bc2cdfc8157 v3.10 2022-11-15 _PyEval_EvalFrameDefault(PyThreadState*,PyFrameObject*,int)
// 87af12bff33 v3.11 2022-02-15 _PyEval_EvalFrameDefault(PyThreadState*,_PyInterpreterFrame*,int)
// ae0a2b75625 v3.10 2021-06-25 _PyEval_EvalFrameDefault(PyThreadState*,_interpreter_frame*,int)
// 0b72b23fb0c v3.9 2020-03-12 _PyEval_EvalFrameDefault(PyThreadState*,PyFrameObject*,int)
// 3cebf938727 v3.6 2016-09-05 _PyEval_EvalFrameDefault(PyFrameObject*,int)
// 49fd7fa4431 v3.0 2006-04-21 PyEval_EvalFrameEx(PyFrameObject*,int)
Expand Down Expand Up @@ -808,6 +808,15 @@ func Loader(ebpf interpreter.EbpfHandler, info *interpreter.LoaderInfo) (interpr
vms.PyThreadState.Frame = 56
vms.PyCFrame.CurrentFrame = 0
vms.PyASCIIObject.Data = 40
case pythonVer(3, 13):
vms.PyFrameObject.Code = 0
vms.PyFrameObject.LastI = 56 // _Py_CODEUNIT *prev_instr
vms.PyFrameObject.Back = 8 // struct _PyInterpreterFrame *previous
vms.PyFrameObject.EntryMember = 70 // char owner
vms.PyFrameObject.EntryVal = 3 // enum _frameowner, FRAME_OWNED_BY_CSTACK
vms.PyThreadState.Frame = 72
vms.PyCFrame.CurrentFrame = 8
vms.PyASCIIObject.Data = 40
}

// Read the introspection data from objects types that have it
Expand Down
12 changes: 12 additions & 0 deletions libpf/symbol.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ type SymbolMap struct {
addressToSymbol []Symbol
}

func NewSymbolMap(capacity int) *SymbolMap {
return &SymbolMap{
addressToSymbol: make([]Symbol, 0, capacity),
}
}

// Add a symbol to the map
func (symmap *SymbolMap) Add(s Symbol) {
symmap.addressToSymbol = append(symmap.addressToSymbol, s)
Expand All @@ -51,10 +57,16 @@ func (symmap *SymbolMap) Add(s Symbol) {
// Finalize symbol map by sorting and constructing the nameToSymbol table after
// all symbols are inserted via Add() calls
func (symmap *SymbolMap) Finalize() {
// Adjust the overcommitted capacity
a := make([]Symbol, len(symmap.addressToSymbol))
copy(a, symmap.addressToSymbol)
symmap.addressToSymbol = a

sort.Slice(symmap.addressToSymbol,
func(i, j int) bool {
return symmap.addressToSymbol[i].Address > symmap.addressToSymbol[j].Address
})

symmap.nameToSymbol = make(map[SymbolName]*Symbol, len(symmap.addressToSymbol))
for i, s := range symmap.addressToSymbol {
symmap.nameToSymbol[s.Name] = &symmap.addressToSymbol[i]
Expand Down
11 changes: 9 additions & 2 deletions proc/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func GetKallsyms(kallsymsPath string) (*libpf.SymbolMap, error) {
var address uint64
var symbol string

symmap := libpf.SymbolMap{}
// As an example, the Debian 6.10.11 kernel has ~180k text symbols.
symmap := libpf.NewSymbolMap(200 * 1024)
noSymbols := true

file, err := os.Open(kallsymsPath)
Expand All @@ -52,6 +53,12 @@ func GetKallsyms(kallsymsPath string) (*libpf.SymbolMap, error) {
return nil, fmt.Errorf("unexpected line in kallsyms: '%s'", line)
}

// Skip non-text symbols, see 'man nm'.
// Special case for 'etext', which can be of type `D` (data) in some kernels.
if strings.IndexByte("TtVvWwA", fields[1][0]) == -1 && fields[2] != "_etext" {
continue
}

if address, err = strconv.ParseUint(fields[0], 16, 64); err != nil {
return nil, fmt.Errorf("failed to parse address value: '%s'", fields[0])
}
Expand All @@ -74,7 +81,7 @@ func GetKallsyms(kallsymsPath string) (*libpf.SymbolMap, error) {
"all addresses from kallsyms are zero - check process permissions")
}

return &symmap, nil
return symmap, nil
}

// GetKernelModules returns SymbolMap for kernel modules from /proc/modules.
Expand Down
3 changes: 0 additions & 3 deletions reporter/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ type FrameMetadataArgs struct {
}

type SymbolReporter interface {
// ReportFallbackSymbol enqueues a fallback symbol for reporting, for a given frame.
ReportFallbackSymbol(frameID libpf.FrameID, symbol string)

// ExecutableMetadata accepts a FileID with the corresponding filename
// and takes some action with it (for example, it might cache it for
// periodic reporting to a backend).
Expand Down
61 changes: 7 additions & 54 deletions reporter/otlp_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ type OTLPReporter struct {
// hostmetadata stores metadata that is sent out with every request.
hostmetadata *lru.SyncedLRU[string, string]

// fallbackSymbols keeps track of FrameID to their symbol.
fallbackSymbols *lru.SyncedLRU[libpf.FrameID, string]

// executables stores metadata for executables.
executables *lru.SyncedLRU[libpf.FileID, execInfo]

Expand Down Expand Up @@ -198,14 +195,6 @@ func (r *OTLPReporter) ReportFramesForTrace(_ *libpf.Trace) {}
func (r *OTLPReporter) ReportCountForTrace(_ libpf.TraceHash, _ uint16, _ *TraceEventMeta) {
}

// ReportFallbackSymbol enqueues a fallback symbol for reporting, for a given frame.
func (r *OTLPReporter) ReportFallbackSymbol(frameID libpf.FrameID, symbol string) {
if _, exists := r.fallbackSymbols.Peek(frameID); exists {
return
}
r.fallbackSymbols.Add(frameID, symbol)
}

// ExecutableMetadata accepts a fileID with the corresponding filename
// and caches this information.
func (r *OTLPReporter) ExecutableMetadata(args *ExecutableMetadataArgs) {
Expand Down Expand Up @@ -308,12 +297,6 @@ func (r *OTLPReporter) GetMetrics() Metrics {

// Start sets up and manages the reporting connection to a OTLP backend.
func Start(mainCtx context.Context, cfg *Config) (Reporter, error) {
fallbackSymbols, err := lru.NewSynced[libpf.FrameID, string](cfg.CacheSize,
libpf.FrameID.Hash32)
if err != nil {
return nil, err
}

executables, err := lru.NewSynced[libpf.FileID, execInfo](cfg.CacheSize, libpf.FileID.Hash32)
if err != nil {
return nil, err
Expand Down Expand Up @@ -353,7 +336,6 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) {
pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout,
client: nil,
rpcStats: NewStatsHandler(),
fallbackSymbols: fallbackSymbols,
executables: executables,
frames: frames,
hostmetadata: hostmetadata,
Expand Down Expand Up @@ -501,9 +483,7 @@ func (r *OTLPReporter) getResource() *resource.Resource {
func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS uint64) {
traceEvents := r.traceEvents.WLock()
samples := maps.Clone(*traceEvents)
for key := range *traceEvents {
delete(*traceEvents, key)
}
clear(*traceEvents)
r.traceEvents.WUnlock(&traceEvents)

// stringMap is a temporary helper that will build the StringTable.
Expand Down Expand Up @@ -546,7 +526,6 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u

// Temporary lookup to reference existing Mappings.
fileIDtoMapping := make(map[libpf.FileID]uint64)
frameIDtoFunction := make(map[libpf.FrameID]uint64)

for traceKey, traceInfo := range samples {
sample := &profiles.Sample{}
Expand Down Expand Up @@ -618,31 +597,6 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u
})
}
loc.MappingIndex = locationMappingIndex
case libpf.KernelFrame:
// Reconstruct frameID
frameID := libpf.NewFrameID(traceInfo.files[i], traceInfo.linenos[i])
// Store Kernel frame information as a Line message:
line := &profiles.Line{}

if tmpFunctionIndex, exists := frameIDtoFunction[frameID]; exists {
line.FunctionIndex = tmpFunctionIndex
} else {
symbol, exists := r.fallbackSymbols.Get(frameID)
if !exists {
// TODO: choose a proper default value if the kernel symbol was not
// reported yet.
symbol = "UNKNOWN"
}

// Indicates "no source filename" for kernel frames.
line.FunctionIndex = createFunctionEntry(funcMap,
symbol, "")
}
loc.Line = append(loc.Line, line)

// To be compliant with the protocol, generate a placeholder mapping entry.
loc.MappingIndex = getDummyMappingIndex(fileIDtoMapping, stringMap,
profile, traceInfo.files[i])
case libpf.AbortFrame:
// Next step: Figure out how the OTLP protocol
// could handle artificial frames, like AbortFrame,
Expand All @@ -660,20 +614,19 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u
"UNREPORTED", frameKind.String())
} else {
fileIDInfo := fileIDInfoLock.RLock()
si, exists := (*fileIDInfo)[traceInfo.linenos[i]]
if !exists {
if si, exists := (*fileIDInfo)[traceInfo.linenos[i]]; exists {
line.Line = int64(si.lineNumber)

line.FunctionIndex = createFunctionEntry(funcMap,
si.functionName, si.filePath)
} else {
// At this point, we do not have enough information for the frame.
// Therefore, we report a dummy entry and use the interpreter as filename.
// To differentiate this case from the case where no information about
// the file ID is available at all, we use a different name for reported
// function.
line.FunctionIndex = createFunctionEntry(funcMap,
"UNRESOLVED", frameKind.String())
} else {
line.Line = int64(si.lineNumber)

line.FunctionIndex = createFunctionEntry(funcMap,
si.functionName, si.filePath)
}
fileIDInfoLock.RUnlock(&fileIDInfo)
}
Expand Down
Loading

0 comments on commit 64c3f2c

Please sign in to comment.