diff --git a/pkg/acquisition/modules/victorialogs/internal/vlclient/types.go b/pkg/acquisition/modules/victorialogs/internal/vlclient/types.go new file mode 100644 index 00000000000..167a84e41b1 --- /dev/null +++ b/pkg/acquisition/modules/victorialogs/internal/vlclient/types.go @@ -0,0 +1,12 @@ +package vlclient + +import ( + "time" +) + +// Log represents a VictoriaLogs log line +// See: https://docs.victoriametrics.com/victorialogs/querying/#querying-logs +type Log struct { + Message string `json:"_msg"` + Time time.Time `json:"_time"` +} diff --git a/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go new file mode 100644 index 00000000000..4573985953b --- /dev/null +++ b/pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go @@ -0,0 +1,356 @@ +package vlclient + +import ( + "bufio" + "bytes" + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "time" + + log "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" + + "github.com/crowdsecurity/crowdsec/pkg/apiclient/useragent" +) + +type VLClient struct { + Logger *log.Entry + + config Config + t *tomb.Tomb + failStart time.Time + currentTickerInterval time.Duration + requestHeaders map[string]string + + client *http.Client +} + +type Config struct { + URL string + Prefix string + Query string + Headers map[string]string + + Username string + Password string + + Since time.Duration + + FailMaxDuration time.Duration + + Limit int +} + +func updateURI(uri string, newStart time.Time) string { + u, _ := url.Parse(uri) + queryParams := u.Query() + + if !newStart.IsZero() { + // +1 the last timestamp to avoid getting the same result again. + updatedStart := newStart.Add(1 * time.Nanosecond) + queryParams.Set("start", updatedStart.Format(time.RFC3339Nano)) + } + + u.RawQuery = queryParams.Encode() + return u.String() +} + +func (lc *VLClient) SetTomb(t *tomb.Tomb) { + lc.t = t +} + +func (lc *VLClient) resetFailStart() { + if !lc.failStart.IsZero() { + log.Infof("VictoriaLogs is back after %s", time.Since(lc.failStart)) + } + lc.failStart = time.Time{} +} + +func (lc *VLClient) shouldRetry() bool { + if lc.failStart.IsZero() { + lc.Logger.Warningf("VictoriaLogs is not available, will retry for %s", lc.config.FailMaxDuration) + lc.failStart = time.Now() + return true + } + if time.Since(lc.failStart) > lc.config.FailMaxDuration { + lc.Logger.Errorf("VictoriaLogs didn't manage to recover after %s, giving up", lc.config.FailMaxDuration) + return false + } + return true +} + +func (lc *VLClient) increaseTicker(ticker *time.Ticker) { + maxTicker := 10 * time.Second + if lc.currentTickerInterval < maxTicker { + lc.currentTickerInterval *= 2 + if lc.currentTickerInterval > maxTicker { + lc.currentTickerInterval = maxTicker + } + ticker.Reset(lc.currentTickerInterval) + } +} + +func (lc *VLClient) decreaseTicker(ticker *time.Ticker) { + minTicker := 100 * time.Millisecond + if lc.currentTickerInterval != minTicker { + lc.currentTickerInterval = minTicker + ticker.Reset(lc.currentTickerInterval) + } +} + +func (lc *VLClient) queryRange(ctx context.Context, uri string, c chan *Log, infinite bool) error { + lc.currentTickerInterval = 100 * time.Millisecond + ticker := time.NewTicker(lc.currentTickerInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-lc.t.Dying(): + return lc.t.Err() + case <-ticker.C: + resp, err := lc.Get(ctx, uri) + if err != nil { + if ok := lc.shouldRetry(); !ok { + return fmt.Errorf("error querying range: %w", err) + } + lc.increaseTicker(ticker) + continue + } + + if resp.StatusCode != http.StatusOK { + lc.Logger.Warnf("bad HTTP response code for query range: %d", resp.StatusCode) + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if ok := lc.shouldRetry(); !ok { + return fmt.Errorf("bad HTTP response code: %d: %s: %w", resp.StatusCode, string(body), err) + } + lc.increaseTicker(ticker) + continue + } + + n, largestTime, err := lc.readResponse(ctx, resp, c) + if err != nil { + return err + } + + if !infinite && n < lc.config.Limit { + lc.Logger.Infof("Got less than %d results (%d), stopping", lc.config.Limit, n) + close(c) + return nil + } + lc.Logger.Debugf("(timer:%v) %d results (uri:%s)", lc.currentTickerInterval, n, uri) + if infinite { + if n > 0 { + //as long as we get results, we keep lowest ticker + lc.decreaseTicker(ticker) + } else { + lc.increaseTicker(ticker) + } + } + + uri = updateURI(uri, largestTime) + } + } +} + +// Parses response from body in JSON-LD format and sends results to the channel +func (lc *VLClient) readResponse(ctx context.Context, resp *http.Response, c chan *Log) (int, time.Time, error) { + br := bufio.NewReaderSize(resp.Body, 64*1024) + var ( + finishedReading bool + n int + latestTs time.Time + ) + for !finishedReading { + select { + case <-ctx.Done(): + return n, latestTs, nil + default: + } + + b, err := br.ReadBytes('\n') + if err != nil { + if errors.Is(err, bufio.ErrBufferFull) { + lc.Logger.Info("skipping line number #%d: line too long", n) + continue + } + if errors.Is(err, io.EOF) { + // b can be != nil when EOF is returned, so we need to process it + finishedReading = true + } else if errors.Is(err, context.Canceled) { + return n, latestTs, nil + } else { + return n, latestTs, fmt.Errorf("cannot read line in response: %s", err) + } + } + + if len(b) == 0 { + continue + } + b = bytes.Trim(b, "\n") + var logLine Log + if err := json.Unmarshal(b, &logLine); err != nil { + lc.Logger.Warnf("cannot unmarshal line in response: %s", string(b)) + continue + } + n++ + lc.Logger.Tracef("Got response: %+v", logLine) + c <- &logLine + if logLine.Time.After(latestTs) { + latestTs = logLine.Time + } + } + return n, latestTs, nil +} + +func (lc *VLClient) getURLFor(endpoint string, params map[string]string) string { + u, err := url.Parse(lc.config.URL) + if err != nil { + return "" + } + queryParams := u.Query() + for k, v := range params { + queryParams.Set(k, v) + } + u.RawQuery = queryParams.Encode() + + u.Path, err = url.JoinPath(lc.config.Prefix, u.Path, endpoint) + if err != nil { + return "" + } + return u.String() +} + +func (lc *VLClient) Ready(ctx context.Context) error { + tick := time.NewTicker(500 * time.Millisecond) + u := lc.getURLFor("", nil) + for { + select { + case <-ctx.Done(): + tick.Stop() + return ctx.Err() + case <-lc.t.Dying(): + tick.Stop() + return lc.t.Err() + case <-tick.C: + lc.Logger.Debug("Checking if VictoriaLogs is ready") + resp, err := lc.Get(ctx, u) + if err != nil { + lc.Logger.Warnf("Error checking if VictoriaLogs is ready: %s", err) + continue + } + _ = resp.Body.Close() + if resp.StatusCode != http.StatusOK { + lc.Logger.Debugf("VictoriaLogs is not ready, status code: %d", resp.StatusCode) + continue + } + lc.Logger.Info("VictoriaLogs is ready") + return nil + } + } +} + +// Tail live-tailing for logs +// See: https://docs.victoriametrics.com/victorialogs/querying/#live-tailing +func (lc *VLClient) Tail(ctx context.Context) (chan *Log, error) { + t := time.Now().Add(-1 * lc.config.Since) + u := lc.getURLFor("select/logsql/tail", map[string]string{ + "limit": strconv.Itoa(lc.config.Limit), + "start": t.Format(time.RFC3339Nano), + "query": lc.config.Query, + }) + + lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, t) + lc.Logger.Infof("Connecting to %s", u) + var ( + resp *http.Response + err error + ) + for { + resp, err = lc.Get(ctx, u) + if err != nil { + if ok := lc.shouldRetry(); !ok { + return nil, fmt.Errorf("error querying range: %w", err) + } + continue + } + break + } + + if resp.StatusCode != http.StatusOK { + lc.Logger.Warnf("bad HTTP response code for query range: %d", resp.StatusCode) + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if ok := lc.shouldRetry(); !ok { + return nil, fmt.Errorf("bad HTTP response code: %d: %s: %w", resp.StatusCode, string(body), err) + } + } + responseChan := make(chan *Log) + lc.t.Go(func() error { + _, _, err = lc.readResponse(ctx, resp, responseChan) + if err != nil { + return fmt.Errorf("error while reading tail response: %w", err) + } + return nil + }) + + return responseChan, nil +} + +// QueryRange queries the logs +// See: https://docs.victoriametrics.com/victorialogs/querying/#querying-logs +func (lc *VLClient) QueryRange(ctx context.Context, infinite bool) chan *Log { + t := time.Now().Add(-1 * lc.config.Since) + u := lc.getURLFor("select/logsql/query", map[string]string{ + "query": lc.config.Query, + "start": t.Format(time.RFC3339Nano), + "limit": strconv.Itoa(lc.config.Limit), + }) + + c := make(chan *Log) + + lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, t) + + lc.Logger.Infof("Connecting to %s", u) + lc.t.Go(func() error { + return lc.queryRange(ctx, u, c, infinite) + }) + return c +} + +func (lc *VLClient) Get(ctx context.Context, url string) (*http.Response, error) { + request, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + for k, v := range lc.requestHeaders { + request.Header.Add(k, v) + } + + return lc.client.Do(request) +} + +func NewVLClient(config Config) *VLClient { + headers := make(map[string]string) + for k, v := range config.Headers { + headers[k] = v + } + if config.Username != "" || config.Password != "" { + headers["Authorization"] = "Basic " + base64.StdEncoding.EncodeToString([]byte(config.Username+":"+config.Password)) + } + headers["User-Agent"] = useragent.Default() + return &VLClient{ + Logger: log.WithField("component", "victorialogs-client"), + config: config, + requestHeaders: headers, + client: &http.Client{}, + } +} diff --git a/pkg/acquisition/modules/victorialogs/victorialogs.go b/pkg/acquisition/modules/victorialogs/victorialogs.go new file mode 100644 index 00000000000..2982f29f6b6 --- /dev/null +++ b/pkg/acquisition/modules/victorialogs/victorialogs.go @@ -0,0 +1,359 @@ +package victorialogs + +import ( + "context" + "errors" + "fmt" + "net/url" + "strconv" + "strings" + "time" + + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" + "gopkg.in/yaml.v2" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/victorialogs/internal/vlclient" + "github.com/crowdsecurity/crowdsec/pkg/types" +) + +const ( + defaultLimit int = 100 +) + +var linesRead = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "cs_victorialogssource_hits_total", + Help: "Total lines that were read.", + }, + []string{"source"}) + +type VLAuthConfiguration struct { + Username string `yaml:"username"` + Password string `yaml:"password"` +} + +type VLConfiguration struct { + URL string `yaml:"url"` // VictoriaLogs url + Prefix string `yaml:"prefix"` // VictoriaLogs prefix + Query string `yaml:"query"` // LogsQL query + Limit int `yaml:"limit"` // Limit of logs to read + Since time.Duration `yaml:"since"` + Headers map[string]string `yaml:"headers"` // HTTP headers for talking to VictoriaLogs + WaitForReady time.Duration `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds + Auth VLAuthConfiguration `yaml:"auth"` + MaxFailureDuration time.Duration `yaml:"max_failure_duration"` // Max duration of failure before stopping the source + configuration.DataSourceCommonCfg `yaml:",inline"` +} + +type VLSource struct { + metricsLevel int + Config VLConfiguration + + Client *vlclient.VLClient + + logger *log.Entry +} + +func (l *VLSource) GetMetrics() []prometheus.Collector { + return []prometheus.Collector{linesRead} +} + +func (l *VLSource) GetAggregMetrics() []prometheus.Collector { + return []prometheus.Collector{linesRead} +} + +func (l *VLSource) UnmarshalConfig(yamlConfig []byte) error { + err := yaml.UnmarshalStrict(yamlConfig, &l.Config) + if err != nil { + return fmt.Errorf("cannot parse VictoriaLogs acquisition configuration: %w", err) + } + + if l.Config.Query == "" { + return errors.New("VictoriaLogs query is mandatory") + } + + if l.Config.WaitForReady == 0 { + l.Config.WaitForReady = 10 * time.Second + } + + if l.Config.Mode == "" { + l.Config.Mode = configuration.TAIL_MODE + } + if l.Config.Prefix == "" { + l.Config.Prefix = "/" + } + + if !strings.HasSuffix(l.Config.Prefix, "/") { + l.Config.Prefix += "/" + } + + if l.Config.Limit == 0 { + l.Config.Limit = defaultLimit + } + + if l.Config.Mode == configuration.TAIL_MODE { + l.logger.Infof("Resetting since") + l.Config.Since = 0 + } + + if l.Config.MaxFailureDuration == 0 { + l.Config.MaxFailureDuration = 30 * time.Second + } + + return nil +} + +func (l *VLSource) Configure(config []byte, logger *log.Entry, MetricsLevel int) error { + l.Config = VLConfiguration{} + l.logger = logger + l.metricsLevel = MetricsLevel + err := l.UnmarshalConfig(config) + if err != nil { + return err + } + + l.logger.Infof("Since value: %s", l.Config.Since.String()) + + clientConfig := vlclient.Config{ + URL: l.Config.URL, + Headers: l.Config.Headers, + Limit: l.Config.Limit, + Query: l.Config.Query, + Since: l.Config.Since, + Username: l.Config.Auth.Username, + Password: l.Config.Auth.Password, + FailMaxDuration: l.Config.MaxFailureDuration, + } + + l.Client = vlclient.NewVLClient(clientConfig) + l.Client.Logger = logger.WithFields(log.Fields{"component": "victorialogs-client", "source": l.Config.URL}) + return nil +} + +func (l *VLSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error { + l.logger = logger + l.Config = VLConfiguration{} + l.Config.Mode = configuration.CAT_MODE + l.Config.Labels = labels + l.Config.UniqueId = uuid + + u, err := url.Parse(dsn) + if err != nil { + return fmt.Errorf("while parsing dsn '%s': %w", dsn, err) + } + if u.Scheme != "victorialogs" { + return fmt.Errorf("invalid DSN %s for VictoriaLogs source, must start with victorialogs://", dsn) + } + if u.Host == "" { + return errors.New("empty host") + } + scheme := "http" + + params := u.Query() + if q := params.Get("ssl"); q != "" { + scheme = "https" + } + if q := params.Get("query"); q != "" { + l.Config.Query = q + } + if w := params.Get("wait_for_ready"); w != "" { + l.Config.WaitForReady, err = time.ParseDuration(w) + if err != nil { + return err + } + } else { + l.Config.WaitForReady = 10 * time.Second + } + + if s := params.Get("since"); s != "" { + l.Config.Since, err = time.ParseDuration(s) + if err != nil { + return fmt.Errorf("invalid since in dsn: %w", err) + } + } + + if maxFailureDuration := params.Get("max_failure_duration"); maxFailureDuration != "" { + duration, err := time.ParseDuration(maxFailureDuration) + if err != nil { + return fmt.Errorf("invalid max_failure_duration in dsn: %w", err) + } + l.Config.MaxFailureDuration = duration + } else { + l.Config.MaxFailureDuration = 5 * time.Second // for OneShot mode it doesn't make sense to have longer duration + } + + if limit := params.Get("limit"); limit != "" { + limit, err := strconv.Atoi(limit) + if err != nil { + return fmt.Errorf("invalid limit in dsn: %w", err) + } + l.Config.Limit = limit + } + + if logLevel := params.Get("log_level"); logLevel != "" { + level, err := log.ParseLevel(logLevel) + if err != nil { + return fmt.Errorf("invalid log_level in dsn: %w", err) + } + l.Config.LogLevel = &level + l.logger.Logger.SetLevel(level) + } + + l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host) + if u.User != nil { + l.Config.Auth.Username = u.User.Username() + l.Config.Auth.Password, _ = u.User.Password() + } + + clientConfig := vlclient.Config{ + URL: l.Config.URL, + Headers: l.Config.Headers, + Limit: l.Config.Limit, + Query: l.Config.Query, + Since: l.Config.Since, + Username: l.Config.Auth.Username, + Password: l.Config.Auth.Password, + } + + l.Client = vlclient.NewVLClient(clientConfig) + l.Client.Logger = logger.WithFields(log.Fields{"component": "victorialogs-client", "source": l.Config.URL}) + + return nil +} + +func (l *VLSource) GetMode() string { + return l.Config.Mode +} + +func (l *VLSource) GetName() string { + return "victorialogs" +} + +// OneShotAcquisition reads a set of file and returns when done +func (l *VLSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { + l.logger.Debug("VictoriaLogs one shot acquisition") + l.Client.SetTomb(t) + readyCtx, cancel := context.WithTimeout(ctx, l.Config.WaitForReady) + defer cancel() + err := l.Client.Ready(readyCtx) + if err != nil { + return fmt.Errorf("VictoriaLogs is not ready: %w", err) + } + + ctx, cancel = context.WithCancel(ctx) + defer cancel() + + respChan, err := l.getResponseChan(ctx, false) + if err != nil { + return fmt.Errorf("error when starting aquisition: %w", err) + } + + for { + select { + case <-t.Dying(): + l.logger.Debug("VictoriaLogs one shot acquisition stopped") + return nil + case resp, ok := <-respChan: + if !ok { + l.logger.Info("VictoriaLogs acquisition completed") + return nil + } + l.readOneEntry(resp, l.Config.Labels, out) + } + } +} + +func (l *VLSource) readOneEntry(entry *vlclient.Log, labels map[string]string, out chan types.Event) { + ll := types.Line{} + ll.Raw = entry.Message + ll.Time = entry.Time + ll.Src = l.Config.URL + ll.Labels = labels + ll.Process = true + ll.Module = l.GetName() + + if l.metricsLevel != configuration.METRICS_NONE { + linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc() + } + expectMode := types.LIVE + if l.Config.UseTimeMachine { + expectMode = types.TIMEMACHINE + } + out <- types.Event{ + Line: ll, + Process: true, + Type: types.LOG, + ExpectMode: expectMode, + } +} + +func (l *VLSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { + l.Client.SetTomb(t) + readyCtx, cancel := context.WithTimeout(ctx, l.Config.WaitForReady) + defer cancel() + err := l.Client.Ready(readyCtx) + if err != nil { + return fmt.Errorf("VictoriaLogs is not ready: %w", err) + } + t.Go(func() error { + lctx, cancel := context.WithCancel(ctx) + defer cancel() + + respChan, err := l.getResponseChan(lctx, true) + if err != nil { + l.logger.Errorf("could not start VictoriaLogs tail: %s", err) + return fmt.Errorf("while starting VictoriaLogs tail: %w", err) + } + for { + select { + case resp, ok := <-respChan: + if !ok { + l.logger.Warnf("VictoriaLogs channel closed") + return err + } + l.readOneEntry(resp, l.Config.Labels, out) + case <-t.Dying(): + return nil + } + } + }) + return nil +} + +func (l *VLSource) getResponseChan(ctx context.Context, infinite bool) (chan *vlclient.Log, error) { + var ( + respChan chan *vlclient.Log + err error + ) + + if l.Config.Mode == configuration.TAIL_MODE { + respChan, err = l.Client.Tail(ctx) + if err != nil { + l.logger.Errorf("could not start VictoriaLogs tail: %s", err) + return respChan, fmt.Errorf("while starting VictoriaLogs tail: %w", err) + } + } else { + respChan = l.Client.QueryRange(ctx, infinite) + } + return respChan, err +} + +func (l *VLSource) CanRun() error { + return nil +} + +func (l *VLSource) GetUuid() string { + return l.Config.UniqueId +} + +func (l *VLSource) Dump() interface{} { + return l +} + +// SupportedModes returns the supported modes by the acquisition module +func (l *VLSource) SupportedModes() []string { + return []string{configuration.TAIL_MODE, configuration.CAT_MODE} +} diff --git a/pkg/acquisition/modules/victorialogs/victorialogs_test.go b/pkg/acquisition/modules/victorialogs/victorialogs_test.go new file mode 100644 index 00000000000..ae354f3fa50 --- /dev/null +++ b/pkg/acquisition/modules/victorialogs/victorialogs_test.go @@ -0,0 +1,477 @@ +package victorialogs_test + +import ( + "bytes" + "context" + "fmt" + "io" + "math/rand" + "net/http" + "net/url" + "os" + "runtime" + "strconv" + "strings" + "testing" + "time" + + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "gopkg.in/tomb.v2" + + "github.com/crowdsecurity/go-cs-lib/cstest" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/victorialogs" + "github.com/crowdsecurity/crowdsec/pkg/types" +) + +func TestConfiguration(t *testing.T) { + log.Infof("Test 'TestConfigure'") + + tests := []struct { + config string + expectedErr string + password string + waitForReady time.Duration + testName string + }{ + { + config: `foobar: asd`, + expectedErr: "line 1: field foobar not found in type victorialogs.VLConfiguration", + testName: "Unknown field", + }, + { + config: ` +mode: tail +source: victorialogs`, + expectedErr: "query is mandatory", + testName: "Missing url", + }, + { + config: ` +mode: tail +source: victorialogs +url: http://localhost:9428/ +`, + expectedErr: "query is mandatory", + testName: "Missing query", + }, + { + config: ` +mode: tail +source: victorialogs +url: http://localhost:9428/ +query: > + {server="demo"} +`, + expectedErr: "", + testName: "Correct config", + }, + { + config: ` +mode: tail +source: victorialogs +url: http://localhost:9428/ +wait_for_ready: 5s +query: > + {server="demo"} +`, + expectedErr: "", + testName: "Correct config with wait_for_ready", + waitForReady: 5 * time.Second, + }, + { + config: ` +mode: tail +source: victorialogs +url: http://localhost:9428/ +auth: + username: foo + password: bar +query: > + {server="demo"} +`, + expectedErr: "", + password: "bar", + testName: "Correct config with password", + }, + } + subLogger := log.WithField("type", "victorialogs") + + for _, test := range tests { + t.Run(test.testName, func(t *testing.T) { + vlSource := victorialogs.VLSource{} + err := vlSource.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE) + cstest.AssertErrorContains(t, err, test.expectedErr) + + if test.password != "" { + p := vlSource.Config.Auth.Password + if test.password != p { + t.Fatalf("Password mismatch : %s != %s", test.password, p) + } + } + + if test.waitForReady != 0 { + if vlSource.Config.WaitForReady != test.waitForReady { + t.Fatalf("Wrong WaitForReady %v != %v", vlSource.Config.WaitForReady, test.waitForReady) + } + } + }) + } +} + +func TestConfigureDSN(t *testing.T) { + log.Infof("Test 'TestConfigureDSN'") + + tests := []struct { + name string + dsn string + expectedErr string + since time.Time + password string + scheme string + waitForReady time.Duration + }{ + { + name: "Wrong scheme", + dsn: "wrong://", + expectedErr: "invalid DSN wrong:// for VictoriaLogs source, must start with victorialogs://", + }, + { + name: "Correct DSN", + dsn: `victorialogs://localhost:9428/?query={server="demo"}`, + expectedErr: "", + }, + { + name: "Empty host", + dsn: "victorialogs://", + expectedErr: "empty host", + }, + { + name: "Invalid DSN", + dsn: "victorialogs", + expectedErr: "invalid DSN victorialogs for VictoriaLogs source, must start with victorialogs://", + }, + { + name: "Bad since param", + dsn: `victorialogs://127.0.0.1:9428/?since=3h&query={server="demo"}`, + since: time.Now().Add(-3 * time.Hour), + }, + { + name: "Basic Auth", + dsn: `victorialogs://login:password@localhost:3102/?query={server="demo"}`, + password: "password", + }, + { + name: "Correct DSN", + dsn: `victorialogs://localhost:9428/?query={server="demo"}&wait_for_ready=5s`, + expectedErr: "", + waitForReady: 5 * time.Second, + }, + { + name: "SSL DSN", + dsn: `victorialogs://localhost:9428/?ssl=true`, + scheme: "https", + }, + } + + for _, test := range tests { + subLogger := log.WithFields(log.Fields{ + "type": "victorialogs", + "name": test.name, + }) + + t.Logf("Test : %s", test.name) + + vlSource := &victorialogs.VLSource{} + err := vlSource.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger, "") + cstest.AssertErrorContains(t, err, test.expectedErr) + + noDuration, _ := time.ParseDuration("0s") + if vlSource.Config.Since != noDuration && vlSource.Config.Since.Round(time.Second) != time.Since(test.since).Round(time.Second) { + t.Fatalf("Invalid since %v", vlSource.Config.Since) + } + + if test.password != "" { + p := vlSource.Config.Auth.Password + if test.password != p { + t.Fatalf("Password mismatch : %s != %s", test.password, p) + } + } + + if test.scheme != "" { + url, _ := url.Parse(vlSource.Config.URL) + if test.scheme != url.Scheme { + t.Fatalf("Schema mismatch : %s != %s", test.scheme, url.Scheme) + } + } + + if test.waitForReady != 0 { + if vlSource.Config.WaitForReady != test.waitForReady { + t.Fatalf("Wrong WaitForReady %v != %v", vlSource.Config.WaitForReady, test.waitForReady) + } + } + } +} + +// Ingestion format docs: https://docs.victoriametrics.com/victorialogs/data-ingestion/#json-stream-api +func feedVLogs(ctx context.Context, logger *log.Entry, n int, title string) error { + bb := bytes.NewBuffer(nil) + for i := range n { + bb.WriteString(fmt.Sprintf( + `{ "_time": %q,"_msg":"Log line #%d %v", "server": "demo", "key": %q} +`, time.Now().Format(time.RFC3339), i, title, title)) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://127.0.0.1:9428/insert/jsonline?_stream_fields=server,key", bb) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + logger.Error(string(b)) + + return fmt.Errorf("Bad post status %d", resp.StatusCode) + } + + logger.Info(n, " Events sent") + // VictoriaLogs buffers data before saving to disk + // Default flush deadline is 2s, waiting 3s to be safe + time.Sleep(3 * time.Second) + + return nil +} + +func TestOneShotAcquisition(t *testing.T) { + ctx := context.Background() + + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows") + } + + log.SetOutput(os.Stdout) + log.SetLevel(log.InfoLevel) + log.Info("Test 'TestStreamingAcquisition'") + + key := strconv.Itoa(rand.Intn(1000)) + tests := []struct { + config string + }{ + { + config: fmt.Sprintf(` +mode: cat +source: victorialogs +url: http://127.0.0.1:9428 +query: > + {server=demo, key=%q} +since: 1h +`, key), + }, + } + + for _, ts := range tests { + logger := log.New() + subLogger := logger.WithField("type", "victorialogs") + vlSource := victorialogs.VLSource{} + err := vlSource.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + err = feedVLogs(ctx, subLogger, 20, key) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + out := make(chan types.Event) + read := 0 + + go func() { + for { + <-out + + read++ + } + }() + + vlTomb := tomb.Tomb{} + + err = vlSource.OneShotAcquisition(ctx, out, &vlTomb) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + // Some logs might be buffered + assert.Greater(t, read, 10) + } +} + +func TestStreamingAcquisition(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows") + } + + log.SetOutput(os.Stdout) + log.SetLevel(log.InfoLevel) + log.Info("Test 'TestStreamingAcquisition'") + + title := time.Now().String() + tests := []struct { + name string + config string + expectedErr string + streamErr string + expectedLines int + }{ + { + name: "Bad port", + config: `mode: tail +source: victorialogs +url: "http://127.0.0.1:9429" +query: > + server:"demo"`, // Wrong port + expectedErr: "", + streamErr: `VictoriaLogs is not ready`, + expectedLines: 0, + }, + { + name: "ok", + config: `mode: tail +source: victorialogs +url: "http://127.0.0.1:9428" +query: > + server:"demo"`, + expectedErr: "", + streamErr: "", + expectedLines: 20, + }, + } + + ctx := context.Background() + + for _, ts := range tests { + t.Run(ts.name, func(t *testing.T) { + logger := log.New() + subLogger := logger.WithFields(log.Fields{ + "type": "victorialogs", + "name": ts.name, + }) + + out := make(chan types.Event) + vlTomb := tomb.Tomb{} + vlSource := victorialogs.VLSource{} + + err := vlSource.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + err = vlSource.StreamingAcquisition(ctx, out, &vlTomb) + cstest.AssertErrorContains(t, err, ts.streamErr) + + if ts.streamErr != "" { + return + } + + time.Sleep(time.Second * 2) // We need to give time to start reading from the WS + + readTomb := tomb.Tomb{} + readCtx, cancel := context.WithTimeout(ctx, time.Second*10) + count := 0 + + readTomb.Go(func() error { + defer cancel() + + for { + select { + case <-readCtx.Done(): + return readCtx.Err() + case evt := <-out: + count++ + + if !strings.HasSuffix(evt.Line.Raw, title) { + return fmt.Errorf("Incorrect suffix : %s", evt.Line.Raw) + } + + if count == ts.expectedLines { + return nil + } + } + } + }) + + err = feedVLogs(ctx, subLogger, ts.expectedLines, title) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + err = readTomb.Wait() + + cancel() + + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + assert.Equal(t, ts.expectedLines, count) + }) + } +} + +func TestStopStreaming(t *testing.T) { + ctx := context.Background() + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows") + } + + config := ` +mode: tail +source: victorialogs +url: http://127.0.0.1:9428 +query: > + server:"demo" +` + logger := log.New() + subLogger := logger.WithField("type", "victorialogs") + title := time.Now().String() + vlSource := victorialogs.VLSource{} + + err := vlSource.Configure([]byte(config), subLogger, configuration.METRICS_NONE) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + out := make(chan types.Event, 10) + + vlTomb := &tomb.Tomb{} + + err = vlSource.StreamingAcquisition(ctx, out, vlTomb) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + time.Sleep(time.Second * 2) + + err = feedVLogs(ctx, subLogger, 1, title) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + vlTomb.Kill(nil) + + err = vlTomb.Wait() + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } +} diff --git a/pkg/acquisition/victorialogs.go b/pkg/acquisition/victorialogs.go new file mode 100644 index 00000000000..b097f0c8dfc --- /dev/null +++ b/pkg/acquisition/victorialogs.go @@ -0,0 +1,12 @@ +//go:build !no_datasource_victorialogs + +package acquisition + +import ( + "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/victorialogs" +) + +//nolint:gochecknoinits +func init() { + registerDataSource("victorialogs", func() DataSource { return &victorialogs.VLSource{} }) +} diff --git a/pkg/cwversion/component/component.go b/pkg/cwversion/component/component.go index 4036b63cf00..206feb6a8d9 100644 --- a/pkg/cwversion/component/component.go +++ b/pkg/cwversion/component/component.go @@ -7,20 +7,21 @@ package component // Built is a map of all the known components, and whether they are built-in or not. // This is populated as soon as possible by the respective init() functions -var Built = map[string]bool { - "datasource_appsec": false, - "datasource_cloudwatch": false, - "datasource_docker": false, - "datasource_file": false, - "datasource_journalctl": false, - "datasource_k8s-audit": false, - "datasource_kafka": false, - "datasource_kinesis": false, - "datasource_loki": false, - "datasource_s3": false, - "datasource_syslog": false, - "datasource_wineventlog":false, - "cscli_setup": false, +var Built = map[string]bool{ + "datasource_appsec": false, + "datasource_cloudwatch": false, + "datasource_docker": false, + "datasource_file": false, + "datasource_journalctl": false, + "datasource_k8s-audit": false, + "datasource_kafka": false, + "datasource_kinesis": false, + "datasource_loki": false, + "datasource_s3": false, + "datasource_syslog": false, + "datasource_wineventlog": false, + "datasource_victorialogs": false, + "cscli_setup": false, } func Register(name string) {