From 1197c09f38ccf20db9db4b75055c5e3d570876da Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 23 Sep 2024 12:07:35 +0930 Subject: [PATCH] x-pack/filebeat/input/gcs: add support for CSV decoding The test file txn.csv.gz was obtained from https://netskopepartnerlogfilebucket.s3.amazonaws.com/txn-1722875066329034-fe10b6a23cc643c4b282e6190de2352d.csv.gz --- CHANGELOG.next.asciidoc | 2 + x-pack/filebeat/input/gcs/config.go | 10 + x-pack/filebeat/input/gcs/decoding.go | 47 ++ x-pack/filebeat/input/gcs/decoding_config.go | 54 ++ x-pack/filebeat/input/gcs/decoding_csv.go | 139 ++++ x-pack/filebeat/input/gcs/decoding_test.go | 225 +++++++ x-pack/filebeat/input/gcs/input.go | 2 + x-pack/filebeat/input/gcs/input_stateless.go | 1 + x-pack/filebeat/input/gcs/job.go | 109 +++- x-pack/filebeat/input/gcs/testdata/txn.csv | 5 + x-pack/filebeat/input/gcs/testdata/txn.csv.gz | Bin 0 -> 2527 bytes x-pack/filebeat/input/gcs/testdata/txn.json | 594 ++++++++++++++++++ x-pack/filebeat/input/gcs/types.go | 1 + 13 files changed, 1169 insertions(+), 20 deletions(-) create mode 100644 x-pack/filebeat/input/gcs/decoding.go create mode 100644 x-pack/filebeat/input/gcs/decoding_config.go create mode 100644 x-pack/filebeat/input/gcs/decoding_csv.go create mode 100644 x-pack/filebeat/input/gcs/decoding_test.go create mode 100644 x-pack/filebeat/input/gcs/testdata/txn.csv create mode 100644 x-pack/filebeat/input/gcs/testdata/txn.csv.gz create mode 100644 x-pack/filebeat/input/gcs/testdata/txn.json diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 447f03d5ac35..4f32e3a19da8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -306,6 +306,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add `use_kubeadm` config option for filebeat (both filbeat.input and autodiscovery) in order to toggle kubeadm-config api requests {pull}40301[40301] - Make HTTP library function inclusion non-conditional in CEL input. {pull}40912[40912] - Add support for Crowdstrike streaming API to the streaming input. {issue}40264[40264] {pull}40838[40838] +- Add CSV decoding capacity to gcs input {pull}40979[40979] + *Auditbeat* diff --git a/x-pack/filebeat/input/gcs/config.go b/x-pack/filebeat/input/gcs/config.go index ed589e43df1a..6a7b93d5e479 100644 --- a/x-pack/filebeat/input/gcs/config.go +++ b/x-pack/filebeat/input/gcs/config.go @@ -16,6 +16,7 @@ import ( "golang.org/x/oauth2/google" "github.com/elastic/beats/v7/libbeat/common/match" + "github.com/elastic/beats/v7/libbeat/reader/parser" ) // MaxWorkers, Poll, PollInterval, BucketTimeOut, ParseJSON, FileSelectors, TimeStampEpoch & ExpandEventListFromField @@ -41,6 +42,8 @@ type config struct { Buckets []bucket `config:"buckets" validate:"required"` // FileSelectors - Defines a list of regex patterns that can be used to filter out objects from the bucket. FileSelectors []fileSelectorConfig `config:"file_selectors"` + // ReaderConfig is the default parser and decoder configuration. + ReaderConfig readerConfig `config:",inline"` // TimeStampEpoch - Defines the epoch time in seconds, which is used to filter out objects that are older than the specified timestamp. TimeStampEpoch *int64 `config:"timestamp_epoch"` // ExpandEventListFromField - Defines the field name that will be used to expand the event into separate events. @@ -58,6 +61,7 @@ type bucket struct { PollInterval *time.Duration `config:"poll_interval,omitempty"` ParseJSON *bool `config:"parse_json,omitempty"` FileSelectors []fileSelectorConfig `config:"file_selectors"` + ReaderConfig readerConfig `config:",inline"` TimeStampEpoch *int64 `config:"timestamp_epoch"` ExpandEventListFromField string `config:"expand_event_list_from_field"` } @@ -68,6 +72,12 @@ type fileSelectorConfig struct { // TODO: Add support for reader config in future } +// readerConfig defines the options for reading the content of an GCS object. +type readerConfig struct { + Parsers parser.Config `config:",inline"` + Decoding decoderConfig `config:"decoding"` +} + type authConfig struct { CredentialsJSON *jsonCredentialsConfig `config:"credentials_json,omitempty"` CredentialsFile *fileCredentialsConfig `config:"credentials_file,omitempty"` diff --git a/x-pack/filebeat/input/gcs/decoding.go b/x-pack/filebeat/input/gcs/decoding.go new file mode 100644 index 000000000000..c6236147d4bf --- /dev/null +++ b/x-pack/filebeat/input/gcs/decoding.go @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package gcs + +import ( + "fmt" + "io" +) + +// decoder is an interface for decoding data from an io.Reader. +type decoder interface { + // decode reads and decodes data from an io reader based on the codec type. + // It returns the decoded data and an error if the data cannot be decoded. + decode() ([]byte, error) + // next advances the decoder to the next data item and returns true if there is more data to be decoded. + next() bool + // close closes the decoder and releases any resources associated with it. + // It returns an error if the decoder cannot be closed. + + // more returns whether there are more records to read. + more() bool + + close() error +} + +// valueDecoder is a decoder that can decode directly to a JSON serialisable value. +type valueDecoder interface { + decoder + + decodeValue() ([]byte, map[string]any, error) +} + +// newDecoder creates a new decoder based on the codec type. +// It returns a decoder type and an error if the codec type is not supported. +// If the reader config codec option is not set, it returns a nil decoder and nil error. +func newDecoder(cfg decoderConfig, r io.Reader) (decoder, error) { + switch { + case cfg.Codec == nil: + return nil, nil + case cfg.Codec.CSV != nil: + return newCSVDecoder(cfg, r) + default: + return nil, fmt.Errorf("unsupported config value: %v", cfg) + } +} diff --git a/x-pack/filebeat/input/gcs/decoding_config.go b/x-pack/filebeat/input/gcs/decoding_config.go new file mode 100644 index 000000000000..625dbce473e9 --- /dev/null +++ b/x-pack/filebeat/input/gcs/decoding_config.go @@ -0,0 +1,54 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package gcs + +import ( + "fmt" + "unicode/utf8" +) + +// decoderConfig contains the configuration options for instantiating a decoder. +type decoderConfig struct { + Codec *codecConfig `config:"codec"` +} + +// codecConfig contains the configuration options for different codecs used by a decoder. +type codecConfig struct { + CSV *csvCodecConfig `config:"csv"` +} + +// csvCodecConfig contains the configuration options for the CSV codec. +type csvCodecConfig struct { + Enabled bool `config:"enabled"` + + // Fields is the set of field names. If it is present + // it is used to specify the object names of returned + // values and the FieldsPerRecord field in the csv.Reader. + // Otherwise, names are obtained from the first + // line of the CSV data. + Fields []string `config:"fields_names"` + + // The fields below have the same meaning as the + // fields of the same name in csv.Reader. + Comma *configRune `config:"comma"` + Comment configRune `config:"comment"` + LazyQuotes bool `config:"lazy_quotes"` + TrimLeadingSpace bool `config:"trim_leading_space"` +} + +type configRune rune + +func (r *configRune) Unpack(s string) error { + if s == "" { + return nil + } + n := utf8.RuneCountInString(s) + if n != 1 { + return fmt.Errorf("single character option given more than one character: %q", s) + } + _r, _ := utf8.DecodeRuneInString(s) + *r = configRune(_r) + return nil +} diff --git a/x-pack/filebeat/input/gcs/decoding_csv.go b/x-pack/filebeat/input/gcs/decoding_csv.go new file mode 100644 index 000000000000..3920078a5dad --- /dev/null +++ b/x-pack/filebeat/input/gcs/decoding_csv.go @@ -0,0 +1,139 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package gcs + +import ( + "bytes" + "encoding/csv" + "fmt" + "io" + "slices" +) + +// csvDecoder is a decoder for CSV data. +type csvDecoder struct { + r *csv.Reader + + header []string + current []string + coming []string + + err error +} + +// newParquetDecoder creates a new CSV decoder. +func newCSVDecoder(config decoderConfig, r io.Reader) (decoder, error) { + d := csvDecoder{r: csv.NewReader(r)} + d.r.ReuseRecord = true + if config.Codec.CSV.Comma != nil { + d.r.Comma = rune(*config.Codec.CSV.Comma) + } + d.r.Comment = rune(config.Codec.CSV.Comment) + d.r.LazyQuotes = config.Codec.CSV.LazyQuotes + d.r.TrimLeadingSpace = config.Codec.CSV.TrimLeadingSpace + if len(config.Codec.CSV.Fields) != 0 { + d.r.FieldsPerRecord = len(config.Codec.CSV.Fields) + d.header = config.Codec.CSV.Fields + } else { + h, err := d.r.Read() + if err != nil { + return nil, err + } + d.header = slices.Clone(h) + } + var err error + d.coming, err = d.r.Read() + if err != nil { + return nil, err + } + d.current = make([]string, 0, len(d.header)) + return &d, nil +} + +func (d *csvDecoder) more() bool { return len(d.coming) == len(d.header) } + +// next advances the decoder to the next data item and returns true if +// there is more data to be decoded. +func (d *csvDecoder) next() bool { + if !d.more() && d.err != nil { + return false + } + d.current = d.current[:len(d.header)] + copy(d.current, d.coming) + d.coming, d.err = d.r.Read() + if d.err == io.EOF { + d.coming = nil + } + return true +} + +// decode returns the JSON encoded value of the current CSV line. next must +// have been called before any calls to decode. +func (d *csvDecoder) decode() ([]byte, error) { + err := d.check() + if err != nil { + return nil, err + } + var buf bytes.Buffer + buf.WriteByte('{') + for i, n := range d.header { + if i != 0 { + buf.WriteByte(',') + } + buf.WriteByte('"') + buf.WriteString(n) + buf.WriteString(`":"`) + buf.WriteString(d.current[i]) + buf.WriteByte('"') + } + buf.WriteByte('}') + d.current = d.current[:0] + return buf.Bytes(), nil +} + +// decodeValue returns the value of the current CSV line interpreted as +// an object with fields based on the header held by the receiver. next must +// have been called before any calls to decode. +func (d *csvDecoder) decodeValue() ([]byte, map[string]any, error) { + err := d.check() + if err != nil { + return nil, nil, err + } + m := make(map[string]any, len(d.header)) + for i, n := range d.header { + m[n] = d.current[i] + } + d.current = d.current[:0] + b, err := d.decode() + if err != nil { + return nil, nil, err + } + return b, m, nil +} + +func (d *csvDecoder) check() error { + if d.err != nil { + if d.err == io.EOF && d.coming == nil { + return nil + } + return d.err + } + if len(d.current) == 0 { + return fmt.Errorf("decode called before next") + } + // By the time we are here, current must be the same + // length as header; if it was not read, it would be + // zero, but if it was, it must match by the contract + // of the csv.Reader. + return nil +} + +// close closes the parquet decoder and releases the resources. +func (d *csvDecoder) close() error { + if d.err == io.EOF { + return nil + } + return d.err +} diff --git a/x-pack/filebeat/input/gcs/decoding_test.go b/x-pack/filebeat/input/gcs/decoding_test.go new file mode 100644 index 000000000000..0a2ee5e3f0d7 --- /dev/null +++ b/x-pack/filebeat/input/gcs/decoding_test.go @@ -0,0 +1,225 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package gcs + +import ( + "context" + "encoding/json" + "errors" + "os" + "path/filepath" + "reflect" + "testing" + + "cloud.google.com/go/storage" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/beat" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +// all test files are read from the "testdata" directory +const testDataPath = "testdata" + +func TestDecoding(t *testing.T) { + logp.TestingSetup() + log := logp.L() + + testCases := []struct { + name string + file string + contentType string + numEvents int + assertAgainst string + config decoderConfig + }{ + { + name: "gzip_csv", + file: "txn.csv.gz", + numEvents: 4, + assertAgainst: "txn.json", + config: decoderConfig{ + Codec: &codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + Comma: ptr[configRune](' '), + }, + }, + }, + }, + { + name: "csv", + file: "txn.csv", + numEvents: 4, + assertAgainst: "txn.json", + config: decoderConfig{ + Codec: &codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + Comma: ptr[configRune](' '), + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + file := filepath.Join(testDataPath, tc.file) + if tc.contentType == "" { + tc.contentType = "application/octet-stream" + } + f, err := os.Open(file) + if err != nil { + t.Fatalf("failed to open test data: %v", err) + } + defer f.Close() + p := &pub{t: t} + j := newJob(&storage.BucketHandle{}, &storage.ObjectAttrs{Name: "test_object"}, "gs://test_uri", newState(), &Source{}, p, log, false) + j.src.ReaderConfig.Decoding = tc.config + err = j.decode(context.Background(), f, "test") + if err != nil { + t.Errorf("unexpected error calling decode: %v", err) + } + + events := p.events + if tc.assertAgainst != "" { + targetData := readJSONFromFile(t, filepath.Join(testDataPath, tc.assertAgainst)) + assert.Equal(t, len(targetData), len(events)) + + for i, event := range events { + msg, err := event.Fields.GetValue("message") + assert.NoError(t, err) + assert.JSONEq(t, targetData[i], msg.(string)) + } + } + }) + } +} + +type pub struct { + t *testing.T + events []beat.Event +} + +func (p *pub) Publish(e beat.Event, _cursor interface{}) error { + p.t.Logf("%v\n", e.Fields) + p.events = append(p.events, e) + return nil +} + +// readJSONFromFile reads the json file and returns the data as a slice of strings +func readJSONFromFile(t *testing.T, filepath string) []string { + fileBytes, err := os.ReadFile(filepath) + assert.NoError(t, err) + var rawMessages []json.RawMessage + err = json.Unmarshal(fileBytes, &rawMessages) + assert.NoError(t, err) + var data []string + + for _, rawMsg := range rawMessages { + data = append(data, string(rawMsg)) + } + return data +} + +var codecConfigTests = []struct { + name string + yaml string + want decoderConfig + wantErr error +}{ + { + name: "handle_rune", + yaml: ` +codec: + csv: + enabled: true + comma: ' ' + comment: '#' +`, + want: decoderConfig{&codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + Comma: ptr[configRune](' '), + Comment: '#', + }, + }}, + }, + { + name: "no_comma", + yaml: ` +codec: + csv: + enabled: true +`, + want: decoderConfig{&codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + }, + }}, + }, + { + name: "null_comma", + yaml: ` +codec: + csv: + enabled: true + comma: "\u0000" +`, + want: decoderConfig{&codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + Comma: ptr[configRune]('\x00'), + }, + }}, + }, + { + name: "bad_rune", + yaml: ` +codec: + csv: + enabled: true + comma: 'this is too long' +`, + wantErr: errors.New(`single character option given more than one character: "this is too long" accessing 'codec.csv.comma'`), + }, +} + +func TestCodecConfig(t *testing.T) { + for _, test := range codecConfigTests { + t.Run(test.name, func(t *testing.T) { + c, err := conf.NewConfigWithYAML([]byte(test.yaml), "") + if err != nil { + t.Fatalf("unexpected error unmarshaling config: %v", err) + } + + var got decoderConfig + err = c.Unpack(&got) + if !sameError(err, test.wantErr) { + t.Errorf("unexpected error unpacking config: got:%v want:%v", err, test.wantErr) + } + + if !reflect.DeepEqual(got, test.want) { + t.Errorf("unexpected result\n--- want\n+++ got\n%s", cmp.Diff(test.want, got)) + } + }) + } +} + +func sameError(a, b error) bool { + switch { + case a == nil && b == nil: + return true + case a == nil, b == nil: + return false + default: + return a.Error() == b.Error() + } +} + +func ptr[T any](v T) *T { return &v } diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index 97b14dc2b345..a2ecf2c28afc 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -71,6 +71,7 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { TimeStampEpoch: bucket.TimeStampEpoch, ExpandEventListFromField: bucket.ExpandEventListFromField, FileSelectors: bucket.FileSelectors, + ReaderConfig: bucket.ReaderConfig, }) } @@ -125,6 +126,7 @@ func tryOverrideOrDefault(cfg config, b bucket) bucket { if len(b.FileSelectors) == 0 && len(cfg.FileSelectors) != 0 { b.FileSelectors = cfg.FileSelectors } + b.ReaderConfig = cfg.ReaderConfig return b } diff --git a/x-pack/filebeat/input/gcs/input_stateless.go b/x-pack/filebeat/input/gcs/input_stateless.go index 04ec19de5ddf..3cdeb3794739 100644 --- a/x-pack/filebeat/input/gcs/input_stateless.go +++ b/x-pack/filebeat/input/gcs/input_stateless.go @@ -62,6 +62,7 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher TimeStampEpoch: bucket.TimeStampEpoch, ExpandEventListFromField: bucket.ExpandEventListFromField, FileSelectors: bucket.FileSelectors, + ReaderConfig: bucket.ReaderConfig, } st := newState() diff --git a/x-pack/filebeat/input/gcs/job.go b/x-pack/filebeat/input/gcs/job.go index 63e631e39be9..403555311e9d 100644 --- a/x-pack/filebeat/input/gcs/job.go +++ b/x-pack/filebeat/input/gcs/job.go @@ -137,20 +137,85 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error { } }() - err = j.readJsonAndPublish(ctx, reader, id) - if err != nil { - return fmt.Errorf("failed to read data from object: %s, with error: %w", j.object.Name, err) - } - - return err + return j.decode(ctx, reader, id) } -func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) error { +func (j *job) decode(ctx context.Context, r io.Reader, id string) error { r, err := j.addGzipDecoderIfNeeded(bufio.NewReader(r)) if err != nil { return fmt.Errorf("failed to add gzip decoder to object: %s, with error: %w", j.object.Name, err) } + dec, err := newDecoder(j.src.ReaderConfig.Decoding, r) + if err != nil { + return err + } + var evtOffset int64 + switch dec := dec.(type) { + case valueDecoder: + defer dec.close() + + for dec.next() { + var ( + msg []byte + val []mapstr.M + ) + if j.src.ParseJSON { + var v mapstr.M + msg, v, err = dec.decodeValue() + if err != nil { + if err == io.EOF { + return nil + } + break + } + val = []mapstr.M{v} + } else { + msg, err = dec.decode() + if err != nil { + if err == io.EOF { + return nil + } + break + } + } + evt := j.createEvent(msg, val, evtOffset) + j.publish(evt, !dec.more(), id) + } + + case decoder: + defer dec.close() + + for dec.next() { + msg, err := dec.decode() + if err != nil { + if err == io.EOF { + return nil + } + break + } + var val []mapstr.M + if j.src.ParseJSON { + val, err = decodeJSON(bytes.NewReader(msg)) + if err != nil { + j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) + } + } + evt := j.createEvent(msg, val, evtOffset) + j.publish(evt, !dec.more(), id) + } + + default: + err = j.readJsonAndPublish(ctx, r, id) + if err != nil { + return fmt.Errorf("failed to read data from object: %s, with error: %w", j.object.Name, err) + } + } + + return err +} +func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) error { + var err error r, j.isRootArray, err = evaluateJSON(bufio.NewReader(r)) if err != nil { return fmt.Errorf("failed to evaluate json for object: %s, with error: %w", j.object.Name, err) @@ -190,23 +255,27 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er } } evt := j.createEvent(item, parsedData, offset) - if !dec.More() { - // if this is the last object, then perform a complete state save - cp, done := j.state.saveForTx(j.object.Name, j.object.Updated) - if err := j.publisher.Publish(evt, cp); err != nil { - j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) - } - done() - } else { - // since we don't update the cursor checkpoint, lack of a lock here is not a problem - if err := j.publisher.Publish(evt, nil); err != nil { - j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) - } - } + j.publish(evt, !dec.More(), id) } return nil } +func (j *job) publish(evt beat.Event, last bool, id string) { + if last { + // if this is the last object, then perform a complete state save + cp, done := j.state.saveForTx(j.object.Name, j.object.Updated) + if err := j.publisher.Publish(evt, cp); err != nil { + j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) + } + done() + return + } + // since we don't update the cursor checkpoint, lack of a lock here is not a problem + if err := j.publisher.Publish(evt, nil); err != nil { + j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) + } +} + // splitEventList splits the event list into individual events and publishes them func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objHash string, id string) error { var jsonObject map[string]json.RawMessage diff --git a/x-pack/filebeat/input/gcs/testdata/txn.csv b/x-pack/filebeat/input/gcs/testdata/txn.csv new file mode 100644 index 000000000000..80ca65df21ef --- /dev/null +++ b/x-pack/filebeat/input/gcs/testdata/txn.csv @@ -0,0 +1,5 @@ +date time time-taken cs-bytes sc-bytes bytes c-ip s-ip cs-username cs-method cs-uri-scheme cs-uri-query cs-user-agent cs-content-type sc-status sc-content-type cs-dns cs-host cs-uri cs-uri-port cs-referer x-cs-session-id x-cs-access-method x-cs-app x-s-country x-s-latitude x-s-longitude x-s-location x-s-region x-s-zipcode x-c-country x-c-latitude x-c-longitude x-c-location x-c-region x-c-zipcode x-c-os x-c-browser x-c-browser-version x-c-device x-cs-site x-cs-timestamp x-cs-page-id x-cs-userip x-cs-traffic-type x-cs-tunnel-id x-category x-other-category x-type x-server-ssl-err x-client-ssl-err x-transaction-id x-request-id x-cs-sni x-cs-domain-fronted-sni x-category-id x-other-category-id x-sr-headers-name x-sr-headers-value x-cs-ssl-ja3 x-sr-ssl-ja3s x-ssl-bypass x-ssl-bypass-reason x-r-cert-subject-cn x-r-cert-issuer-cn x-r-cert-startdate x-r-cert-enddate x-r-cert-valid x-r-cert-expired x-r-cert-untrusted-root x-r-cert-incomplete-chain x-r-cert-self-signed x-r-cert-revoked x-r-cert-revocation-check x-r-cert-mismatch x-cs-ssl-fronting-error x-cs-ssl-handshake-error x-sr-ssl-handshake-error x-sr-ssl-client-certificate-error x-sr-ssl-malformed-ssl x-s-custom-signing-ca-error x-cs-ssl-engine-action x-cs-ssl-engine-action-reason x-sr-ssl-engine-action x-sr-ssl-engine-action-reason x-ssl-policy-src-ip x-ssl-policy-dst-ip x-ssl-policy-dst-host x-ssl-policy-dst-host-source x-ssl-policy-categories x-ssl-policy-action x-ssl-policy-name x-cs-ssl-version x-cs-ssl-cipher x-sr-ssl-version x-sr-ssl-cipher x-cs-src-ip-egress x-s-dp-name x-cs-src-ip x-cs-src-port x-cs-dst-ip x-cs-dst-port x-sr-src-ip x-sr-src-port x-sr-dst-ip x-sr-dst-port x-cs-ip-connect-xff x-cs-ip-xff x-cs-connect-host x-cs-connect-port x-cs-connect-user-agent x-cs-url x-cs-uri-path x-cs-http-version rs-status x-cs-app-category x-cs-app-cci x-cs-app-ccl x-cs-app-tags x-cs-app-suite x-cs-app-instance-id x-cs-app-instance-name x-cs-app-instance-tag x-cs-app-activity x-cs-app-from-user x-cs-app-to-user x-cs-app-object-type x-cs-app-object-name x-cs-app-object-id x-rs-file-type x-rs-file-category x-rs-file-language x-rs-file-size x-rs-file-md5 x-rs-file-sha256 x-error x-c-local-time x-policy-action x-policy-name x-policy-src-ip x-policy-dst-ip x-policy-dst-host x-policy-dst-host-source x-policy-justification-type x-policy-justification-reason x-sc-notification-name +2024-08-05 16:24:20 64 2971 2050 5021 10.5.78.159 204.79.197.237 "vikash.ranjan@riverbed.com" GET https cc=US&setlang=en-US "Mozilla/5.0 (Windows NT 10.0; Win64; x64; Cortana 1.14.7.19041; 10.0.0.0.19045.2006) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19045" - 200 "application/json; charset=utf-8" www.bing.com www.bing.com /client/config?cc=US&setlang=en-US 443 - 3683772769278232507 "Client" "Microsoft Bing" "US" 47.682899 -122.120903 "Redmond" "Washington" "N/A" "US" 29.775400 -95.598000 "Houston" "Texas" "77079" "Windows 10" "Edge" "18.19045" "Windows Device" "bing" 1722875060 5762388460300455936 10.5.78.159 CloudApp - "Search Engines" - http_transaction - - 2696581500064586450 2901306739654139904 www.bing.com - 551 - - - 28a2c9bd18a11de089ef85a160da29e4 NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No No NotChecked NotChecked NotChecked No Allow "Established" None "NotEstablished" 10.5.78.159 69.192.139.97 www.bing.com Sni "Search Engines" Decrypt - TLSv1.2 ECDHE-RSA-AES256-GCM-SHA384 NotChecked NotChecked 208.185.23.18 "US-ATL2" 10.5.78.159 25941 69.192.139.97 443 - - 10.144.54.201 842 - - - - - https://www.bing.com/client/config?cc=US&setlang=en-US /client/config HTTP1.1 200 "Search Engines" 58 low "Consumer,Unsanctioned" - - - - "Browse" - - - - - - - - - - - - "2024-08-05 11:24:00" "allow" "NetskopeAllow" 10.5.78.159 204.79.197.237 www.bing.com HttpHostHeader - - - +2024-08-05 16:24:19 - 18 0 18 10.70.0.19 - "nadav@skyformation.onmicrosoft.com" PRI - - - - - - - us-west1-b-osconfig.googleapis.com * 443 - 0 "Client" - - - - - - - "US" 45.605600 -121.180700 "The Dalles" "Oregon" "97058" - - - - - 1722875059 0 10.70.0.19 - - "Technology" "Cloud Storage" http_transaction - - 2035489204758272484 0 us-west1-b-osconfig.googleapis.com - 564 "7" - - 7a15285d4efc355608b304698cd7f9ab NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No No NotChecked NotChecked NotChecked No Allow "Established" None "NotEstablished" 10.70.0.19 142.250.99.95 us-west1-b-osconfig.googleapis.com Sni "Technology, Cloud Storage" Decrypt - TLSv1.3 TLS_AES_256_GCM_SHA384 NotChecked NotChecked 34.82.190.203 "US-SEA2" 10.70.0.19 32951 142.250.99.95 443 - - - - - - - - - - - HTTP1.1 - - - - - - - - - - - - - - - - - - - - - http-malformed "NotChecked" NotChecked - - - - - - - - +2024-08-05 16:24:20 - 0 0 0 10.0.20.111 - "levente.fangli@cososys.com" - - - - - - - - achecker-alliances.eu.goskope.com - 443 - 0 "Client" - - - - - - - "RO" 46.765700 23.594300 "Cluj-Napoca" "Cluj County" "400027" - - - - - 1722875060 0 10.0.20.111 - - - - http_transaction - "HsFailure (error:14094418:SSL routines:ssl3_read_bytes:tlsv1 alert unknown ca)" 1350739992944030464 0 achecker-alliances.eu.goskope.com - - - - - bc29aa426fc99c0be1b9be941869f88a NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No Yes NotChecked NotChecked NotChecked No Block "SSL Error - SSL Handshake Error" None "NotEstablished" - - - Unknown - Decrypt - - - NotChecked NotChecked 81.196.156.53 "AT-VIE1" 10.0.20.111 57897 31.186.239.94 443 - - - - - - - - - - - UNKNOWN - - - - - - - - - - - - - - - - - - - - - client-ssl "NotChecked" NotChecked - - - - - - - - +2024-08-05 16:24:23 - 0 0 0 10.0.20.111 - "levente.fangli@cososys.com" - - - - - - - - achecker-alliances.eu.goskope.com - 443 - 0 "Client" - - - - - - - "RO" 46.765700 23.594300 "Cluj-Napoca" "Cluj County" "400027" - - - - - 1722875063 0 10.0.20.111 - - - - http_transaction - "HsFailure (error:14094418:SSL routines:ssl3_read_bytes:tlsv1 alert unknown ca)" 1615432978285898071 0 achecker-alliances.eu.goskope.com - - - - - bc29aa426fc99c0be1b9be941869f88a NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No Yes NotChecked NotChecked NotChecked No Block "SSL Error - SSL Handshake Error" None "NotEstablished" - - - Unknown - Decrypt - - - NotChecked NotChecked 81.196.156.53 "AT-VIE1" 10.0.20.111 57897 31.186.239.94 443 - - - - - - - - - - - UNKNOWN - - - - - - - - - - - - - - - - - - - - - client-ssl "NotChecked" NotChecked - - - - - - - - diff --git a/x-pack/filebeat/input/gcs/testdata/txn.csv.gz b/x-pack/filebeat/input/gcs/testdata/txn.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..52e8fb20539a40e3127997a8877e3346ea1c6dfc GIT binary patch literal 2527 zcmV<52_W_#iwFP!00000|8!Mbn(H0bbhgbUzHnwq|6>2XLC4)%VlQk~5hX z8-672pb3p{=C<+y!!HguQuufp>rJT=J&Y^wJt84{jZSZ)s>1!Ywi1GuU1 zNW^-&SfG8sJs|mT1e0f`J({y);=;tRj2xl47&0TgH1dzzDdsaYsG?C7T~yJiin=Pg zsEV3@9}@r55>%t%&Z6zg#CJzrV|G@lR7Ex1_QLMC@R^BJ09OSjIM0qsk_&Q#z`h}5Vg2ZTpxj=E-JsSh9=tisTXHd6SuP)h~ZfLajROK?bu%t3Z>G+Ca(iOk>6x4M7D5x!w?6}#b;=P{O(9cn} zRCM)aCJ(i^<%p6Ggs*OJSX^Kzjr9ZEOIYo^UU~-)m(;@^y`v|na2amuach}27@z-vD9l_V}`#yRRBdnDHW2YPaiJ6tYDfd9TlNM^3p&sZOZ`R-AEh#wip6xKRu^l@ui93>YPn zC{36&xGRVhPGr)pQQ)aoh}0*B5uyi@}YZ z^{Wf#)4K|ioA7trbvBcP!pr;CH(@ux?L3B~=YSi9IEPR1H3|J_y#+=XM#tb(u=Bm% z{)w_AoYLYIl$%ZG?%m>TJ7zLhBro9Q+u8i)`ZaXzgM%wqKZI8>S@+@5Wh$EIGRN8| zhsrK(-+nfmHY=Ageej+@iltLXEc$P2JGQ}RABW)2psJwi zQz5In*7-53gI~6*KmH37A#x0q7dlsptHN-ldCnz8peK9d1Yv9IJ`77E9YaWAdQ0X37LiI6t6mGNf*^V2cloVK5Y9x_H(^%vzJPilU zx%*@XT$Q5AL^sbgF^WP&2$wGy{d;`rwrw5H7W)*L;#@093v4J=@LX$A(43-;G>G(Z ze@(jZ)F6onLsB{0*K3&iRqNdlDTv+lVZ>$|Y#k}UE###zh15)9>V=RR{}k(@ET@Ia z@kp@TV1B1$q#z~Ro1+75d2JRA)0VNuQEl8(OUnw{SZpjXcKwa^vt2Hm*F# z|8rBHur@;gW?>j-&Ni+;g!=ure*h9uW=ReP001A02mk;800003ol{MZ+aM6VSL%Nl zajXgg3^wekRyOKZ>c^^f&la)qht@HXVYAA=FHR24M%gy1#P7RhgVRI`r8yOuR6OHCi3odxR17i|fU*;{%rME7Y=mv=L`H3DlTeXr zRX16y%?7@^knhbfn$2kAV&NJ~tKQa6wn4Wp{^b9gz72PJ{)q(nIJy@=YagPP!S^Si z$sVC!RnLaQ1SeclqBYU-mg_C1;e47O;bNHgww@2W5bv@4TI?A~mT#4>i;By%`n>v9 z6%Tn5hk0E`B8cLV7$wmFiz{)l&iO0K3D=Tc5I?#5T0E5HW3=)cyVqi|Bi@>E`3L_p zn2Wae?|uVN)}`m^0{{RYiwFP!000001MO2yZxb;Py(94-ET7=8mcQ0(=R^rXMW9ro z6vQd=I!?2!v7@y&DF43RO%SwAqzDd3sG7^nc>LbXXdcCs7g!d!1Vl_IpHK`6fJY-3 zVMH+^gybA_c58jIXr=wFYhIZUL)=AV!grXT@0&JjYg*eC+POx1V?Ifhti}7z4^ssW`)9egCbP>rF5k?jtY_y-&Lwaf~soe#ase zrLroqcs$bH)+ucksnb4clbYZM?#uXAH?Hq2cwxIPbQ8j{;+)9I_4NnPg+4Xj#z~Bh zEjp{0i+Y!GoLt-zpq=d!=zZ(M4-brf2?1dxu9#AaW)#dY7d&@!&X#^!ZK%?kQ?W8i z8C+XZtJciWLUy;gJ1rix~Jj5R~yfP0#Er7$S zXT$dMEQ5ck{W7j~Yxmg!-P@6G7E>O(iR*!LzptCa`U~*Jnr^pAhx~C;Nf1#2ky4