Skip to content

Commit

Permalink
[processorhelper] add processorhelperprofiles package (#11556)
Browse files Browse the repository at this point in the history
#### Description
Add `processorhelperprofiles` to support profiles signal.

cc @mx-psi @dmathieu

---------

Co-authored-by: Florian Bacher <[email protected]>
Co-authored-by: Damien Mathieu <[email protected]>
  • Loading branch information
3 people authored Oct 29, 2024
1 parent 39f714b commit 002a748
Show file tree
Hide file tree
Showing 8 changed files with 422 additions and 0 deletions.
25 changes: 25 additions & 0 deletions .chloggen/add-pkg-processorhelperprofiles.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processorhelperprofiles

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add processorhelperprofiles to support profiles signal

# One or more tracking issues or pull requests related to the change
issues: [11556]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
1 change: 1 addition & 0 deletions processor/processorhelper/processorhelperprofiles/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../../Makefile.Common
72 changes: 72 additions & 0 deletions processor/processorhelper/processorhelperprofiles/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
module go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles

go 1.22.0

require (
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.112.0
go.opentelemetry.io/collector/consumer v0.112.0
go.opentelemetry.io/collector/consumer/consumerprofiles v0.112.0
go.opentelemetry.io/collector/consumer/consumertest v0.112.0
go.opentelemetry.io/collector/pdata/pprofile v0.112.0
go.opentelemetry.io/collector/processor v0.112.0
go.opentelemetry.io/collector/processor/processorprofiles v0.112.0
go.opentelemetry.io/collector/processor/processortest v0.112.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/component/componentstatus v0.112.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.112.0 // indirect
go.opentelemetry.io/collector/pdata v1.18.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.112.0 // indirect
go.opentelemetry.io/collector/pipeline v0.112.0 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/protobuf v1.35.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace go.opentelemetry.io/collector/consumer/consumertest => ../../../consumer/consumertest

replace go.opentelemetry.io/collector/pdata/pprofile => ../../../pdata/pprofile

replace go.opentelemetry.io/collector/pdata/testdata => ../../../pdata/testdata

replace go.opentelemetry.io/collector/processor => ../../../processor

replace go.opentelemetry.io/collector/consumer => ../../../consumer

replace go.opentelemetry.io/collector/consumer/consumerprofiles => ../../../consumer/consumerprofiles

replace go.opentelemetry.io/collector/component => ../../../component

replace go.opentelemetry.io/collector/pdata => ../../../pdata

replace go.opentelemetry.io/collector/config/configtelemetry => ../../../config/configtelemetry

replace go.opentelemetry.io/collector/pipeline => ../../../pipeline

replace go.opentelemetry.io/collector/component/componentstatus => ../../../component/componentstatus

replace go.opentelemetry.io/collector/processor/processortest => ../../processortest

replace go.opentelemetry.io/collector/processor/processorprofiles => ../../processorprofiles
96 changes: 96 additions & 0 deletions processor/processorhelper/processorhelperprofiles/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions processor/processorhelper/processorhelperprofiles/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package processorhelperprofiles // import "go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
)

// Option apply changes to internalOptions.
type Option interface {
apply(*baseSettings)
}

type optionFunc func(*baseSettings)

func (of optionFunc) apply(e *baseSettings) {
of(e)
}

// WithStart overrides the default Start function for an processor.
// The default shutdown function does nothing and always returns nil.
func WithStart(start component.StartFunc) Option {
return optionFunc(func(o *baseSettings) {
o.StartFunc = start
})
}

// WithShutdown overrides the default Shutdown function for an processor.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown component.ShutdownFunc) Option {
return optionFunc(func(o *baseSettings) {
o.ShutdownFunc = shutdown
})
}

// WithCapabilities overrides the default GetCapabilities function for an processor.
// The default GetCapabilities function returns mutable capabilities.
func WithCapabilities(capabilities consumer.Capabilities) Option {
return optionFunc(func(o *baseSettings) {
o.consumerOptions = append(o.consumerOptions, consumer.WithCapabilities(capabilities))
})
}

type baseSettings struct {
component.StartFunc
component.ShutdownFunc
consumerOptions []consumer.Option
}

// fromOptions returns the internal settings starting from the default and applying all options.
func fromOptions(options []Option) *baseSettings {
// Start from the default options:
opts := &baseSettings{
consumerOptions: []consumer.Option{consumer.WithCapabilities(consumer.Capabilities{MutatesData: true})},
}

for _, op := range options {
op.apply(opts)
}

return opts
}
61 changes: 61 additions & 0 deletions processor/processorhelper/processorhelperprofiles/profiles.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package processorhelperprofiles // import "go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles"

import (
"context"
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/processor/processorprofiles"
)

// ProcessProfilesFunc is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
type ProcessProfilesFunc func(context.Context, pprofile.Profiles) (pprofile.Profiles, error)

type profiles struct {
component.StartFunc
component.ShutdownFunc
consumerprofiles.Profiles
}

// NewProfiles creates a processorprofiles.Profiles that ensure context propagation.
func NewProfiles(
_ context.Context,
_ processor.Settings,
_ component.Config,
nextConsumer consumerprofiles.Profiles,
profilesFunc ProcessProfilesFunc,
options ...Option,
) (processorprofiles.Profiles, error) {
if profilesFunc == nil {
return nil, errors.New("nil profilesFunc")
}

bs := fromOptions(options)
profilesConsumer, err := consumerprofiles.NewProfiles(func(ctx context.Context, pd pprofile.Profiles) (err error) {
pd, err = profilesFunc(ctx, pd)
if err != nil {
if errors.Is(err, processorhelper.ErrSkipProcessingData) {
return nil
}
return err
}
return nextConsumer.ConsumeProfiles(ctx, pd)
}, bs.consumerOptions...)
if err != nil {
return nil, err
}

return &profiles{
StartFunc: bs.StartFunc,
ShutdownFunc: bs.ShutdownFunc,
Profiles: profilesConsumer,
}, nil
}
Loading

0 comments on commit 002a748

Please sign in to comment.