Skip to content

Commit

Permalink
acquisition/victorialogs: add new datasource
Browse files Browse the repository at this point in the history
Data source supports:
- cat mode with automatic adjustment of poll interval (same as one at Loki datasource)
- tail mode by using tailing API
  • Loading branch information
zekker6 committed Oct 30, 2024
1 parent 97c1f60 commit a868de6
Show file tree
Hide file tree
Showing 6 changed files with 1,231 additions and 14 deletions.
12 changes: 12 additions & 0 deletions pkg/acquisition/modules/victorialogs/internal/vlclient/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package vlclient

import (
"time"
)

// Log represents a VictoriaLogs log line
// See: https://docs.victoriametrics.com/victorialogs/querying/#querying-logs
type Log struct {
Message string `json:"_msg"`
Time time.Time `json:"_time"`
}
356 changes: 356 additions & 0 deletions pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,356 @@
package vlclient

import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"time"

log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"

"github.com/crowdsecurity/crowdsec/pkg/apiclient/useragent"
)

type VLClient struct {
Logger *log.Entry

config Config
t *tomb.Tomb
failStart time.Time
currentTickerInterval time.Duration
requestHeaders map[string]string

client *http.Client
}

type Config struct {
URL string
Prefix string
Query string
Headers map[string]string

Username string
Password string

Since time.Duration

FailMaxDuration time.Duration

Limit int
}

func updateURI(uri string, newStart time.Time) string {
u, _ := url.Parse(uri)
queryParams := u.Query()

if !newStart.IsZero() {
// +1 the last timestamp to avoid getting the same result again.
updatedStart := newStart.Add(1 * time.Nanosecond)
queryParams.Set("start", updatedStart.Format(time.RFC3339Nano))
}

Check warning on line 59 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L51-L59

Added lines #L51 - L59 were not covered by tests

u.RawQuery = queryParams.Encode()
return u.String()

Check warning on line 62 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}

func (lc *VLClient) SetTomb(t *tomb.Tomb) {
lc.t = t

Check warning on line 66 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L65-L66

Added lines #L65 - L66 were not covered by tests
}

func (lc *VLClient) resetFailStart() {
if !lc.failStart.IsZero() {
log.Infof("VictoriaLogs is back after %s", time.Since(lc.failStart))
}
lc.failStart = time.Time{}

Check warning on line 73 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L69-L73

Added lines #L69 - L73 were not covered by tests
}

func (lc *VLClient) shouldRetry() bool {
if lc.failStart.IsZero() {
lc.Logger.Warningf("VictoriaLogs is not available, will retry for %s", lc.config.FailMaxDuration)
lc.failStart = time.Now()
return true
}
if time.Since(lc.failStart) > lc.config.FailMaxDuration {
lc.Logger.Errorf("VictoriaLogs didn't manage to recover after %s, giving up", lc.config.FailMaxDuration)
return false
}
return true

Check warning on line 86 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L76-L86

Added lines #L76 - L86 were not covered by tests
}

func (lc *VLClient) increaseTicker(ticker *time.Ticker) {
maxTicker := 10 * time.Second
if lc.currentTickerInterval < maxTicker {
lc.currentTickerInterval *= 2
if lc.currentTickerInterval > maxTicker {
lc.currentTickerInterval = maxTicker
}
ticker.Reset(lc.currentTickerInterval)

Check warning on line 96 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L89-L96

Added lines #L89 - L96 were not covered by tests
}
}

func (lc *VLClient) decreaseTicker(ticker *time.Ticker) {
minTicker := 100 * time.Millisecond
if lc.currentTickerInterval != minTicker {
lc.currentTickerInterval = minTicker
ticker.Reset(lc.currentTickerInterval)
}

Check warning on line 105 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L100-L105

Added lines #L100 - L105 were not covered by tests
}

func (lc *VLClient) queryRange(ctx context.Context, uri string, c chan *Log, infinite bool) error {
lc.currentTickerInterval = 100 * time.Millisecond
ticker := time.NewTicker(lc.currentTickerInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-lc.t.Dying():
return lc.t.Err()
case <-ticker.C:
resp, err := lc.Get(ctx, uri)
if err != nil {
if ok := lc.shouldRetry(); !ok {
return fmt.Errorf("error querying range: %w", err)
}
lc.increaseTicker(ticker)
continue

Check warning on line 125 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L108-L125

Added lines #L108 - L125 were not covered by tests
}

if resp.StatusCode != http.StatusOK {
lc.Logger.Warnf("bad HTTP response code for query range: %d", resp.StatusCode)
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
if ok := lc.shouldRetry(); !ok {
return fmt.Errorf("bad HTTP response code: %d: %s: %w", resp.StatusCode, string(body), err)
}
lc.increaseTicker(ticker)
continue

Check warning on line 136 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L128-L136

Added lines #L128 - L136 were not covered by tests
}

n, largestTime, err := lc.readResponse(ctx, resp, c)
if err != nil {
return err
}

Check warning on line 142 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L139-L142

Added lines #L139 - L142 were not covered by tests

if !infinite && n < lc.config.Limit {
lc.Logger.Infof("Got less than %d results (%d), stopping", lc.config.Limit, n)
close(c)
return nil
}
lc.Logger.Debugf("(timer:%v) %d results (uri:%s)", lc.currentTickerInterval, n, uri)
if infinite {
if n > 0 {
//as long as we get results, we keep lowest ticker
lc.decreaseTicker(ticker)
} else {
lc.increaseTicker(ticker)
}

Check warning on line 156 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L144-L156

Added lines #L144 - L156 were not covered by tests
}

uri = updateURI(uri, largestTime)

Check warning on line 159 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L159

Added line #L159 was not covered by tests
}
}
}

// Parses response from body in JSON-LD format and sends results to the channel
func (lc *VLClient) readResponse(ctx context.Context, resp *http.Response, c chan *Log) (int, time.Time, error) {
br := bufio.NewReaderSize(resp.Body, 64*1024)
var (
finishedReading bool
n int
latestTs time.Time
)
for !finishedReading {
select {
case <-ctx.Done():
return n, latestTs, nil
default:

Check warning on line 176 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L165-L176

Added lines #L165 - L176 were not covered by tests
}

b, err := br.ReadBytes('\n')
if err != nil {
if errors.Is(err, bufio.ErrBufferFull) {
lc.Logger.Info("skipping line number #%d: line too long", n)

Check failure on line 182 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View workflow job for this annotation

GitHub Actions / Build + tests

(*github.com/sirupsen/logrus.Entry).Info call has possible Printf formatting directive %d
continue

Check warning on line 183 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L179-L183

Added lines #L179 - L183 were not covered by tests
}
if errors.Is(err, io.EOF) {
// b can be != nil when EOF is returned, so we need to process it
finishedReading = true
} else if errors.Is(err, context.Canceled) {
return n, latestTs, nil
} else {
return n, latestTs, fmt.Errorf("cannot read line in response: %s", err)
}

Check warning on line 192 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L185-L192

Added lines #L185 - L192 were not covered by tests
}

if len(b) == 0 {
continue

Check warning on line 196 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L195-L196

Added lines #L195 - L196 were not covered by tests
}
b = bytes.Trim(b, "\n")
var logLine Log
if err := json.Unmarshal(b, &logLine); err != nil {
lc.Logger.Warnf("cannot unmarshal line in response: %s", string(b))
continue

Check warning on line 202 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L198-L202

Added lines #L198 - L202 were not covered by tests
}
n++
lc.Logger.Tracef("Got response: %+v", logLine)
c <- &logLine
if logLine.Time.After(latestTs) {
latestTs = logLine.Time
}

Check warning on line 209 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L204-L209

Added lines #L204 - L209 were not covered by tests
}
return n, latestTs, nil

Check warning on line 211 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L211

Added line #L211 was not covered by tests
}

func (lc *VLClient) getURLFor(endpoint string, params map[string]string) string {
u, err := url.Parse(lc.config.URL)
if err != nil {
return ""
}
queryParams := u.Query()
for k, v := range params {
queryParams.Set(k, v)
}
u.RawQuery = queryParams.Encode()

u.Path, err = url.JoinPath(lc.config.Prefix, u.Path, endpoint)
if err != nil {
return ""
}
return u.String()

Check warning on line 229 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L214-L229

Added lines #L214 - L229 were not covered by tests
}

func (lc *VLClient) Ready(ctx context.Context) error {
tick := time.NewTicker(500 * time.Millisecond)
u := lc.getURLFor("", nil)
for {
select {
case <-ctx.Done():
tick.Stop()
return ctx.Err()
case <-lc.t.Dying():
tick.Stop()
return lc.t.Err()
case <-tick.C:
lc.Logger.Debug("Checking if VictoriaLogs is ready")
resp, err := lc.Get(ctx, u)
if err != nil {
lc.Logger.Warnf("Error checking if VictoriaLogs is ready: %s", err)
continue

Check warning on line 248 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L232-L248

Added lines #L232 - L248 were not covered by tests
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusOK {
lc.Logger.Debugf("VictoriaLogs is not ready, status code: %d", resp.StatusCode)
continue

Check warning on line 253 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L250-L253

Added lines #L250 - L253 were not covered by tests
}
lc.Logger.Info("VictoriaLogs is ready")
return nil

Check warning on line 256 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L255-L256

Added lines #L255 - L256 were not covered by tests
}
}
}

// Tail live-tailing for logs
// See: https://docs.victoriametrics.com/victorialogs/querying/#live-tailing
func (lc *VLClient) Tail(ctx context.Context) (chan *Log, error) {
t := time.Now().Add(-1 * lc.config.Since)
u := lc.getURLFor("select/logsql/tail", map[string]string{
"limit": strconv.Itoa(lc.config.Limit),
"start": t.Format(time.RFC3339Nano),
"query": lc.config.Query,
})

lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, t)
lc.Logger.Infof("Connecting to %s", u)
var (
resp *http.Response
err error
)
for {
resp, err = lc.Get(ctx, u)
if err != nil {
if ok := lc.shouldRetry(); !ok {
return nil, fmt.Errorf("error querying range: %w", err)
}
continue

Check warning on line 283 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L263-L283

Added lines #L263 - L283 were not covered by tests
}
break

Check warning on line 285 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L285

Added line #L285 was not covered by tests
}

if resp.StatusCode != http.StatusOK {
lc.Logger.Warnf("bad HTTP response code for query range: %d", resp.StatusCode)
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
if ok := lc.shouldRetry(); !ok {
return nil, fmt.Errorf("bad HTTP response code: %d: %s: %w", resp.StatusCode, string(body), err)
}

Check warning on line 294 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L288-L294

Added lines #L288 - L294 were not covered by tests
}
responseChan := make(chan *Log)
lc.t.Go(func() error {
_, _, err = lc.readResponse(ctx, resp, responseChan)
if err != nil {
return fmt.Errorf("error while reading tail response: %w", err)
}
return nil

Check warning on line 302 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L296-L302

Added lines #L296 - L302 were not covered by tests
})

return responseChan, nil

Check warning on line 305 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L305

Added line #L305 was not covered by tests
}

// QueryRange queries the logs
// See: https://docs.victoriametrics.com/victorialogs/querying/#querying-logs
func (lc *VLClient) QueryRange(ctx context.Context, infinite bool) chan *Log {
t := time.Now().Add(-1 * lc.config.Since)
u := lc.getURLFor("select/logsql/query", map[string]string{
"query": lc.config.Query,
"start": t.Format(time.RFC3339Nano),
"limit": strconv.Itoa(lc.config.Limit),
})

c := make(chan *Log)

lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, t)

lc.Logger.Infof("Connecting to %s", u)
lc.t.Go(func() error {
return lc.queryRange(ctx, u, c, infinite)
})
return c

Check warning on line 326 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L310-L326

Added lines #L310 - L326 were not covered by tests
}

func (lc *VLClient) Get(ctx context.Context, url string) (*http.Response, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
for k, v := range lc.requestHeaders {
request.Header.Add(k, v)
}

Check warning on line 336 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L329-L336

Added lines #L329 - L336 were not covered by tests

return lc.client.Do(request)

Check warning on line 338 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L338

Added line #L338 was not covered by tests
}

func NewVLClient(config Config) *VLClient {
headers := make(map[string]string)
for k, v := range config.Headers {
headers[k] = v
}
if config.Username != "" || config.Password != "" {
headers["Authorization"] = "Basic " + base64.StdEncoding.EncodeToString([]byte(config.Username+":"+config.Password))
}
headers["User-Agent"] = useragent.Default()
return &VLClient{
Logger: log.WithField("component", "victorialogs-client"),
config: config,
requestHeaders: headers,
client: &http.Client{},
}

Check warning on line 355 in pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/victorialogs/internal/vlclient/vl_client.go#L341-L355

Added lines #L341 - L355 were not covered by tests
}
Loading

0 comments on commit a868de6

Please sign in to comment.