Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add mTLS support to otel exporter #1389

Merged
merged 10 commits into from
Sep 23, 2024
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
FROM squidfunk/mkdocs-material:9.5
RUN pip install mkdocs-include-markdown-plugin
RUN pip install mkdocs-include-markdown-plugin
82 changes: 73 additions & 9 deletions core/pkg/telemetry/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package telemetry

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"time"

"connectrpc.com/connect"
"connectrpc.com/otelconnect"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/flagd/pkg/certreloader"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
Expand All @@ -20,6 +24,7 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

Expand All @@ -28,10 +33,18 @@ const (
exportInterval = 2 * time.Second
)

type CollectorConfig struct {
Target string
CertPath string
KeyPath string
ReloadInterval time.Duration
CAPath string
}

// Config of the telemetry runtime. These are expected to be mapped to start-up arguments
type Config struct {
MetricsExporter string
CollectorTarget string
CollectorConfig CollectorConfig
}

func RegisterErrorHandling(log *logger.Logger) {
Expand Down Expand Up @@ -64,13 +77,13 @@ func BuildMetricsRecorder(
// provide the grpc collector target. Providing empty target results in skipping provider & propagator registration.
// This results in tracers having NoopTracerProvider and propagator having No-Op TextMapPropagator performing no action
func BuildTraceProvider(ctx context.Context, logger *logger.Logger, svc string, svcVersion string, cfg Config) error {
if cfg.CollectorTarget == "" {
if cfg.CollectorConfig.Target == "" {
logger.Debug("skipping trace provider setup as collector target is not set." +
" Traces will use NoopTracerProvider provider and propagator will use no-Op TextMapPropagator")
return nil
}

exporter, err := buildOtlpExporter(ctx, cfg.CollectorTarget)
exporter, err := buildOtlpExporter(ctx, cfg.CollectorConfig)
if err != nil {
return err
}
Expand All @@ -95,7 +108,7 @@ func BuildConnectOptions(cfg Config) ([]connect.HandlerOption, error) {
options := []connect.HandlerOption{}

// add interceptor if configuration is available for collector
if cfg.CollectorTarget != "" {
if cfg.CollectorConfig.Target != "" {
interceptor, err := otelconnect.NewInterceptor(otelconnect.WithTrustRemote())
if err != nil {
return nil, fmt.Errorf("error creating interceptor, %w", err)
Expand All @@ -107,6 +120,47 @@ func BuildConnectOptions(cfg Config) ([]connect.HandlerOption, error) {
return options, nil
}

func buildTransportCredentials(_ context.Context, cfg CollectorConfig) (credentials.TransportCredentials, error) {
creds := insecure.NewCredentials()
if cfg.KeyPath != "" || cfg.CertPath != "" || cfg.CAPath != "" {
capool := x509.NewCertPool()
if cfg.CAPath != "" {
ca, err := os.ReadFile(cfg.CAPath)
if err != nil {
return nil, fmt.Errorf("can't read ca file from %s", cfg.CAPath)
}
if !capool.AppendCertsFromPEM(ca) {
return nil, fmt.Errorf("can't add CA '%s' to pool", cfg.CAPath)
}
}

reloader, err := certreloader.NewCertReloader(certreloader.Config{
KeyPath: cfg.KeyPath,
CertPath: cfg.CertPath,
ReloadInterval: cfg.ReloadInterval,
})
if err != nil {
return nil, fmt.Errorf("failed to create certreloader: %w", err)
}

tlsConfig := &tls.Config{
RootCAs: capool,
MinVersion: tls.VersionTLS13,
GetCertificate: func(chi *tls.ClientHelloInfo) (*tls.Certificate, error) {
kevinschoonover marked this conversation as resolved.
Show resolved Hide resolved
certs, err := reloader.GetCertificate()
if err != nil {
return nil, fmt.Errorf("failed to reload certs: %w", err)
}
return certs, nil
},
}

creds = credentials.NewTLS(tlsConfig)
}

return creds, nil
}

// buildMetricReader builds a metric reader based on provided configurations
func buildMetricReader(ctx context.Context, cfg Config) (metric.Reader, error) {
if cfg.MetricsExporter == "" {
Expand All @@ -120,13 +174,18 @@ func buildMetricReader(ctx context.Context, cfg Config) (metric.Reader, error) {
}

// Otel override require target configuration
if cfg.CollectorTarget == "" {
if cfg.CollectorConfig.Target == "" {
return nil, fmt.Errorf("metric exporter is set(%s) without providing otel collector target."+
" collector target is required for this option", cfg.MetricsExporter)
}

transportCredentials, err := buildTransportCredentials(ctx, cfg.CollectorConfig)
if err != nil {
return nil, fmt.Errorf("metric export would not build transport credentials: %w", err)
}

// Non-blocking, insecure grpc connection
conn, err := grpc.NewClient(cfg.CollectorTarget, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(cfg.CollectorConfig.Target, grpc.WithTransportCredentials(transportCredentials))
if err != nil {
return nil, fmt.Errorf("error creating client connection: %w", err)
}
Expand All @@ -141,9 +200,14 @@ func buildMetricReader(ctx context.Context, cfg Config) (metric.Reader, error) {
}

// buildOtlpExporter is a helper to build grpc backed otlp trace exporter
func buildOtlpExporter(ctx context.Context, collectorTarget string) (*otlptrace.Exporter, error) {
// Non-blocking, insecure grpc connection
conn, err := grpc.NewClient(collectorTarget, grpc.WithTransportCredentials(insecure.NewCredentials()))
func buildOtlpExporter(ctx context.Context, cfg CollectorConfig) (*otlptrace.Exporter, error) {
transportCredentials, err := buildTransportCredentials(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("metric export would not build transport credentials: %w", err)
}

// Non-blocking, grpc connection
conn, err := grpc.NewClient(cfg.Target, grpc.WithTransportCredentials(transportCredentials))
if err != nil {
return nil, fmt.Errorf("error creating client connection: %w", err)
}
Expand Down
20 changes: 15 additions & 5 deletions core/pkg/telemetry/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ func TestBuildMetricsRecorder(t *testing.T) {
// Simple happy-path test
recorder, err := BuildMetricsRecorder(context.Background(), "service", "0.0.1", Config{
MetricsExporter: "otel",
CollectorTarget: "localhost:8080",
CollectorConfig: CollectorConfig{
Target: "localhost:8080",
},
})

require.Nil(t, err, "expected no error, but got: %v", err)
Expand Down Expand Up @@ -52,15 +54,19 @@ func TestBuildMetricReader(t *testing.T) {
name: "Metric exporter overriding require valid configuration combination",
cfg: Config{
MetricsExporter: metricsExporterOtel,
CollectorTarget: "", // collector target is unset
CollectorConfig: CollectorConfig{
Target: "", // collector target is unset
},
},
error: true,
},
{
name: "Metric exporter overriding with valid configurations",
cfg: Config{
MetricsExporter: metricsExporterOtel,
CollectorTarget: "localhost:8080",
CollectorConfig: CollectorConfig{
Target: "localhost:8080",
},
},
error: false,
},
Expand Down Expand Up @@ -90,7 +96,9 @@ func TestBuildSpanProcessor(t *testing.T) {
{
name: "Valid configurations yield a valid processor",
cfg: Config{
CollectorTarget: "localhost:8080",
CollectorConfig: CollectorConfig{
Target: "localhost:8080",
},
},
error: false,
},
Expand Down Expand Up @@ -127,7 +135,9 @@ func TestBuildConnectOptions(t *testing.T) {
{
name: "Connect option is set when telemetry target is set",
cfg: Config{
CollectorTarget: "localhost:8080",
CollectorConfig: CollectorConfig{
Target: "localhost:8080",
},
},
optionCount: 1,
},
Expand Down
32 changes: 18 additions & 14 deletions docs/reference/flagd-cli/flagd_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@ flagd start [flags]
### Options

```
-C, --cors-origin strings CORS allowed origins, * will allow all origins
-h, --help help for start
-z, --log-format string Set the logging format, e.g. console or json (default "console")
-m, --management-port int32 Port for management operations (default 8014)
-t, --metrics-exporter string Set the metrics exporter. Default(if unset) is Prometheus. Can be override to otel - OpenTelemetry metric exporter. Overriding to otel require otelCollectorURI to be present
-r, --ofrep-port int32 ofrep service port (default 8016)
-o, --otel-collector-uri string Set the grpc URI of the OpenTelemetry collector for flagd runtime. If unset, the collector setup will be ignored and traces will not be exported.
-p, --port int32 Port to listen on (default 8013)
-c, --server-cert-path string Server side tls certificate path
-k, --server-key-path string Server side tls key path
-d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally.
-s, --sources string JSON representation of an array of SourceConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object: https://flagd.dev/reference/sync-configuration/#source-configuration
-g, --sync-port int32 gRPC Sync port (default 8015)
-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath, URL (HTTP and gRPC) or FeatureFlag custom resource. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.
-C, --cors-origin strings CORS allowed origins, * will allow all origins
-h, --help help for start
-z, --log-format string Set the logging format, e.g. console or json (default "console")
-m, --management-port int32 Port for management operations (default 8014)
-t, --metrics-exporter string Set the metrics exporter. Default(if unset) is Prometheus. Can be override to otel - OpenTelemetry metric exporter. Overriding to otel require otelCollectorURI to be present
-r, --ofrep-port int32 ofrep service port (default 8016)
-A, --otel-ca-path string tls certificate authority path to use with OpenTelemetry collector
-D, --otel-cert-path string tls certificate path to use with OpenTelemetry collector
-o, --otel-collector-uri string Set the grpc URI of the OpenTelemetry collector for flagd runtime. If unset, the collector setup will be ignored and traces will not be exported.
-K, --otel-key-path string tls key path to use with OpenTelemetry collector
-I, --otel-reload-interval duration how long between reloading the otel tls certificate from disk (default 1h0m0s)
-p, --port int32 Port to listen on (default 8013)
-c, --server-cert-path string Server side tls certificate path
-k, --server-key-path string Server side tls key path
-d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally.
-s, --sources string JSON representation of an array of SourceConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object: https://flagd.dev/reference/sync-configuration/#source-configuration
-g, --sync-port int32 gRPC Sync port (default 8015)
-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath, URL (HTTP and gRPC) or FeatureFlag custom resource. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.
```

### Options inherited from parent commands
Expand Down
65 changes: 41 additions & 24 deletions flagd/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"
"strings"
"time"

"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
Expand All @@ -16,19 +17,23 @@ import (
)

const (
corsFlagName = "cors-origin"
logFormatFlagName = "log-format"
managementPortFlagName = "management-port"
metricsExporter = "metrics-exporter"
ofrepPortFlagName = "ofrep-port"
otelCollectorURI = "otel-collector-uri"
portFlagName = "port"
serverCertPathFlagName = "server-cert-path"
serverKeyPathFlagName = "server-key-path"
socketPathFlagName = "socket-path"
sourcesFlagName = "sources"
syncPortFlagName = "sync-port"
uriFlagName = "uri"
corsFlagName = "cors-origin"
logFormatFlagName = "log-format"
managementPortFlagName = "management-port"
metricsExporter = "metrics-exporter"
ofrepPortFlagName = "ofrep-port"
otelCollectorURI = "otel-collector-uri"
otelCertPathFlagName = "otel-cert-path"
otelKeyPathFlagName = "otel-key-path"
otelCAPathFlagName = "otel-ca-path"
otelReloadIntervalFlagName = "otel-reload-interval"
portFlagName = "port"
serverCertPathFlagName = "server-cert-path"
serverKeyPathFlagName = "server-key-path"
socketPathFlagName = "socket-path"
sourcesFlagName = "sources"
syncPortFlagName = "sync-port"
uriFlagName = "uri"
)

func init() {
Expand Down Expand Up @@ -67,12 +72,20 @@ func init() {
" be present")
flags.StringP(otelCollectorURI, "o", "", "Set the grpc URI of the OpenTelemetry collector "+
"for flagd runtime. If unset, the collector setup will be ignored and traces will not be exported.")
flags.StringP(otelCertPathFlagName, "D", "", "tls certificate path to use with OpenTelemetry collector")
flags.StringP(otelKeyPathFlagName, "K", "", "tls key path to use with OpenTelemetry collector")
flags.StringP(otelCAPathFlagName, "A", "", "tls certificate authority path to use with OpenTelemetry collector")
flags.DurationP(otelReloadIntervalFlagName, "I", time.Hour, "how long between reloading the otel tls certificate "+
"from disk")

_ = viper.BindPFlag(corsFlagName, flags.Lookup(corsFlagName))
_ = viper.BindPFlag(logFormatFlagName, flags.Lookup(logFormatFlagName))
_ = viper.BindPFlag(metricsExporter, flags.Lookup(metricsExporter))
_ = viper.BindPFlag(managementPortFlagName, flags.Lookup(managementPortFlagName))
_ = viper.BindPFlag(otelCollectorURI, flags.Lookup(otelCollectorURI))
_ = viper.BindPFlag(otelCertPathFlagName, flags.Lookup(otelCertPathFlagName))
_ = viper.BindPFlag(otelKeyPathFlagName, flags.Lookup(otelKeyPathFlagName))
_ = viper.BindPFlag(otelCAPathFlagName, flags.Lookup(otelCAPathFlagName))
_ = viper.BindPFlag(portFlagName, flags.Lookup(portFlagName))
_ = viper.BindPFlag(serverCertPathFlagName, flags.Lookup(serverCertPathFlagName))
_ = viper.BindPFlag(serverKeyPathFlagName, flags.Lookup(serverKeyPathFlagName))
Expand Down Expand Up @@ -127,17 +140,21 @@ var startCmd = &cobra.Command{

// Build Runtime -----------------------------------------------------------
rt, err := runtime.FromConfig(logger, Version, runtime.Config{
CORS: viper.GetStringSlice(corsFlagName),
MetricExporter: viper.GetString(metricsExporter),
ManagementPort: viper.GetUint16(managementPortFlagName),
OfrepServicePort: viper.GetUint16(ofrepPortFlagName),
OtelCollectorURI: viper.GetString(otelCollectorURI),
ServiceCertPath: viper.GetString(serverCertPathFlagName),
ServiceKeyPath: viper.GetString(serverKeyPathFlagName),
ServicePort: viper.GetUint16(portFlagName),
ServiceSocketPath: viper.GetString(socketPathFlagName),
SyncServicePort: viper.GetUint16(syncPortFlagName),
SyncProviders: syncProviders,
CORS: viper.GetStringSlice(corsFlagName),
MetricExporter: viper.GetString(metricsExporter),
ManagementPort: viper.GetUint16(managementPortFlagName),
OfrepServicePort: viper.GetUint16(ofrepPortFlagName),
OtelCollectorURI: viper.GetString(otelCollectorURI),
OtelCertPath: viper.GetString(otelCertPathFlagName),
OtelKeyPath: viper.GetString(otelKeyPathFlagName),
OtelReloadInterval: viper.GetDuration(otelReloadIntervalFlagName),
OtelCAPath: viper.GetString(otelCAPathFlagName),
ServiceCertPath: viper.GetString(serverCertPathFlagName),
ServiceKeyPath: viper.GetString(serverKeyPathFlagName),
ServicePort: viper.GetUint16(portFlagName),
ServiceSocketPath: viper.GetString(socketPathFlagName),
SyncServicePort: viper.GetUint16(syncPortFlagName),
SyncProviders: syncProviders,
})
if err != nil {
rtLogger.Fatal(err.Error())
Expand Down
Loading
Loading