Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/azureblobstorage: add support for CSV decoding #40978

Merged
merged 3 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add support to CEL for reading host environment variables. {issue}40762[40762] {pull}40779[40779]
- Add CSV decoder to awss3 input. {pull}40896[40896]
- Change request trace logging to include headers instead of complete request. {pull}41072[41072]
- Add CSV decoding capacity to azureblobstorage input {pull}40978[40978]

*Auditbeat*

Expand Down
55 changes: 55 additions & 0 deletions x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,61 @@ Example : `10s` would mean we would like the polling to occur every 10 seconds.
This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always
take priority and override the root level values if both are specified.

[id="input-{type}-encoding"]
[float]
==== `encoding`

The file encoding to use for reading data that contains international
characters. This only applies to non-JSON logs. See <<_encoding_3>>.

[id="input-{type}-decoding"]
[float]
==== `decoding`

The file decoding option is used to specify a codec that will be used to
decode the file contents. This can apply to any file stream data.
An example config is shown below:

Currently supported codecs are given below:-

1. <<attrib-decoding-csv-azureblobstorage,CSV>>: This codec decodes RFC 4180 CSV data streams.

[id="attrib-decoding-csv-azureblobstorage"]
[float]
==== `the CSV codec`
The `CSV` codec is used to decode RFC 4180 CSV data streams.
Enabling the codec without other options will use the default codec options.

[source,yaml]
----
decoding.codec.csv.enabled: true
----

The CSV codec supports five sub attributes to control aspects of CSV decoding.
The `comma` attribute specifies the field separator character used by the CSV
format. If it is not specified, the comma character '`,`' is used. The `comment`
attribute specifies the character that should be interpreted as a comment mark.
If it is specified, lines starting with the character will be ignored. Both
`comma` and `comment` must be single characters. The `lazy_quotes` attribute
controls how quoting in fields is handled. If `lazy_quotes` is true, a quote may
appear in an unquoted field and a non-doubled quote may appear in a quoted field.
The `trim_leading_space` attribute specifies that leading white space should be
ignored, even if the `comma` character is white space. For complete details
of the preceding configuration attribute behaviors, see the CSV decoder
https://pkg.go.dev/encoding/csv#Reader[documentation] The `fields_names`
attribute can be used to specify the column names for the data. If it is
absent, the field names are obtained from the first non-comment line of
data. The number of fields must match the number of field names.

An example config is shown below:

[source,yaml]
----
decoding.codec.csv.enabled: true
decoding.codec.csv.comma: "\t"
decoding.codec.csv.comment: "#"
----

[id="attrib-file_selectors"]
[float]
==== `file_selectors`
Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/input/azureblobstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore"

"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/reader/parser"
)

// MaxWorkers, Poll, PollInterval, FileSelectors, TimeStampEpoch & ExpandEventListFromField can
Expand All @@ -25,6 +26,7 @@ type config struct {
PollInterval *time.Duration `config:"poll_interval"`
Containers []container `config:"containers" validate:"required"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
ReaderConfig readerConfig `config:",inline"`
TimeStampEpoch *int64 `config:"timestamp_epoch"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}
Expand All @@ -36,6 +38,7 @@ type container struct {
Poll *bool `config:"poll"`
PollInterval *time.Duration `config:"poll_interval"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
ReaderConfig readerConfig `config:",inline"`
TimeStampEpoch *int64 `config:"timestamp_epoch"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}
Expand All @@ -46,6 +49,12 @@ type fileSelectorConfig struct {
// TODO: Add support for reader config in future
}

// readerConfig defines the options for reading the content of an azure container.
type readerConfig struct {
Parsers parser.Config `config:",inline"`
Decoding decoderConfig `config:"decoding"`
}

type authConfig struct {
SharedCredentials *sharedKeyConfig `config:"shared_credentials"`
ConnectionString *connectionStringConfig `config:"connection_string"`
Expand Down
47 changes: 47 additions & 0 deletions x-pack/filebeat/input/azureblobstorage/decoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package azureblobstorage

import (
"fmt"
"io"
)

// decoder is an interface for decoding data from an io.Reader.
type decoder interface {
// decode reads and decodes data from an io reader based on the codec type.
// It returns the decoded data and an error if the data cannot be decoded.
decode() ([]byte, error)
// next advances the decoder to the next data item and returns true if there is more data to be decoded.
next() bool
// close closes the decoder and releases any resources associated with it.
// It returns an error if the decoder cannot be closed.

// more returns whether there are more records to read.
more() bool

close() error
}

// valueDecoder is a decoder that can decode directly to a JSON serialisable value.
type valueDecoder interface {

Check failure on line 29 in x-pack/filebeat/input/azureblobstorage/decoding.go

View workflow job for this annotation

GitHub Actions / lint (windows)

type `valueDecoder` is unused (unused)

Check failure on line 29 in x-pack/filebeat/input/azureblobstorage/decoding.go

View workflow job for this annotation

GitHub Actions / lint (linux)

type `valueDecoder` is unused (unused)
decoder

decodeValue() ([]byte, map[string]any, error)
}

// newDecoder creates a new decoder based on the codec type.
// It returns a decoder type and an error if the codec type is not supported.
// If the reader config codec option is not set, it returns a nil decoder and nil error.
func newDecoder(cfg decoderConfig, r io.Reader) (decoder, error) {
switch {
case cfg.Codec == nil:
return nil, nil
case cfg.Codec.CSV != nil:
return newCSVDecoder(cfg, r)
default:
return nil, fmt.Errorf("unsupported config value: %v", cfg)
}
}
54 changes: 54 additions & 0 deletions x-pack/filebeat/input/azureblobstorage/decoding_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package azureblobstorage

import (
"fmt"
"unicode/utf8"
)

// decoderConfig contains the configuration options for instantiating a decoder.
type decoderConfig struct {
Codec *codecConfig `config:"codec"`
}

// codecConfig contains the configuration options for different codecs used by a decoder.
type codecConfig struct {
CSV *csvCodecConfig `config:"csv"`
}

// csvCodecConfig contains the configuration options for the CSV codec.
type csvCodecConfig struct {
Enabled bool `config:"enabled"`

// Fields is the set of field names. If it is present
// it is used to specify the object names of returned
// values and the FieldsPerRecord field in the csv.Reader.
// Otherwise, names are obtained from the first
// line of the CSV data.
Fields []string `config:"fields_names"`

// The fields below have the same meaning as the
// fields of the same name in csv.Reader.
Comma *configRune `config:"comma"`
Comment configRune `config:"comment"`
LazyQuotes bool `config:"lazy_quotes"`
TrimLeadingSpace bool `config:"trim_leading_space"`
}

type configRune rune

func (r *configRune) Unpack(s string) error {
if s == "" {
return nil
}
n := utf8.RuneCountInString(s)
if n != 1 {
return fmt.Errorf("single character option given more than one character: %q", s)
}
_r, _ := utf8.DecodeRuneInString(s)
*r = configRune(_r)
return nil
}
139 changes: 139 additions & 0 deletions x-pack/filebeat/input/azureblobstorage/decoding_csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package azureblobstorage

import (
"bytes"
"encoding/csv"
"fmt"
"io"
"slices"
)

// csvDecoder is a decoder for CSV data.
type csvDecoder struct {
r *csv.Reader

header []string
current []string
coming []string

err error
}

// newCSVDecoder creates a new CSV decoder.
func newCSVDecoder(config decoderConfig, r io.Reader) (decoder, error) {
d := csvDecoder{r: csv.NewReader(r)}
d.r.ReuseRecord = true
if config.Codec.CSV.Comma != nil {
d.r.Comma = rune(*config.Codec.CSV.Comma)
}
d.r.Comment = rune(config.Codec.CSV.Comment)
d.r.LazyQuotes = config.Codec.CSV.LazyQuotes
d.r.TrimLeadingSpace = config.Codec.CSV.TrimLeadingSpace
if len(config.Codec.CSV.Fields) != 0 {
d.r.FieldsPerRecord = len(config.Codec.CSV.Fields)
d.header = config.Codec.CSV.Fields
} else {
h, err := d.r.Read()
if err != nil {
return nil, err
}
d.header = slices.Clone(h)
}
var err error
d.coming, err = d.r.Read()
if err != nil {
return nil, err
}
d.current = make([]string, 0, len(d.header))
return &d, nil
}

func (d *csvDecoder) more() bool { return len(d.coming) == len(d.header) }

// next advances the decoder to the next data item and returns true if
// there is more data to be decoded.
func (d *csvDecoder) next() bool {
if !d.more() && d.err != nil {
return false
}
d.current = d.current[:len(d.header)]
copy(d.current, d.coming)
d.coming, d.err = d.r.Read()
if d.err == io.EOF {

Check failure on line 66 in x-pack/filebeat/input/azureblobstorage/decoding_csv.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)

Check failure on line 66 in x-pack/filebeat/input/azureblobstorage/decoding_csv.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
d.coming = nil
}
return true
}

// decode returns the JSON encoded value of the current CSV line. next must
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
// have been called before any calls to decode.
func (d *csvDecoder) decode() ([]byte, error) {
err := d.check()
if err != nil {
return nil, err
}
var buf bytes.Buffer
buf.WriteByte('{')
for i, n := range d.header {
if i != 0 {
buf.WriteByte(',')
}
buf.WriteByte('"')
buf.WriteString(n)
buf.WriteString(`":"`)
buf.WriteString(d.current[i])
buf.WriteByte('"')
}
buf.WriteByte('}')
d.current = d.current[:0]
return buf.Bytes(), nil
}

// decodeValue returns the value of the current CSV line interpreted as
// an object with fields based on the header held by the receiver. next must
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
// have been called before any calls to decode.
func (d *csvDecoder) decodeValue() ([]byte, map[string]any, error) {
err := d.check()
if err != nil {
return nil, nil, err
}
m := make(map[string]any, len(d.header))
for i, n := range d.header {
m[n] = d.current[i]
}
d.current = d.current[:0]
b, err := d.decode()
if err != nil {
return nil, nil, err
}
return b, m, nil
}

func (d *csvDecoder) check() error {
if d.err != nil {
if d.err == io.EOF && d.coming == nil {

Check failure on line 118 in x-pack/filebeat/input/azureblobstorage/decoding_csv.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)

Check failure on line 118 in x-pack/filebeat/input/azureblobstorage/decoding_csv.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
return d.err
}
if len(d.current) == 0 {
return fmt.Errorf("decode called before next")
}
// By the time we are here, current must be the same
// length as header; if it was not read, it would be
// zero, but if it was, it must match by the contract
// of the csv.Reader.
return nil
}

// close closes the csv decoder and releases the resources.
func (d *csvDecoder) close() error {
if d.err == io.EOF {

Check failure on line 135 in x-pack/filebeat/input/azureblobstorage/decoding_csv.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)

Check failure on line 135 in x-pack/filebeat/input/azureblobstorage/decoding_csv.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
return d.err
}
Loading
Loading