Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

couchbase: add CAS support #2709

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/modules/components/pages/processors/couchbase.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ couchbase:
--
======

When inserting, replacing or upserting documents, each must have the `content` property set.
When inserting, replacing or upserting documents, each must have the `content` property set. CAS value is stored in meta `couchbase_cas`. It prevent read/write conflict by only allowing write if not modified by other. You can clear the value with `meta couchbase_cas = deleted()` to disable this check.

== Fields

Expand Down
15 changes: 7 additions & 8 deletions internal/impl/couchbase/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,17 @@ cache_resources:
)
}

func removeBucket(ctx context.Context, tb testing.TB, port, bucket string) error {
cluster, err := gocb.Connect(fmt.Sprintf("couchbase://localhost:%v", port), gocb.ClusterOptions{
func getCluster(ctx context.Context, tb testing.TB, port string) (*gocb.Cluster, error) {
return gocb.Connect(fmt.Sprintf("couchbase://localhost:%v", port), gocb.ClusterOptions{
Authenticator: gocb.PasswordAuthenticator{
Username: username,
Password: password,
},
})
}

func removeBucket(ctx context.Context, tb testing.TB, port, bucket string) error {
cluster, err := getCluster(ctx, tb, port)
if err != nil {
return err
}
Expand All @@ -79,12 +83,7 @@ func removeBucket(ctx context.Context, tb testing.TB, port, bucket string) error
}

func createBucket(ctx context.Context, tb testing.TB, port, bucket string) error {
cluster, err := gocb.Connect(fmt.Sprintf("couchbase://localhost:%v", port), gocb.ClusterOptions{
Authenticator: gocb.PasswordAuthenticator{
Username: username,
Password: password,
},
})
cluster, err := getCluster(ctx, tb, port)
if err != nil {
return err
}
Expand Down
44 changes: 30 additions & 14 deletions internal/impl/couchbase/couchbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,56 +20,72 @@ import (
"github.com/couchbase/gocb/v2"
)

func valueFromOp(op gocb.BulkOp) (out any, err error) {
func valueFromOp(op gocb.BulkOp) (out any, cas gocb.Cas, err error) {
switch o := op.(type) {
case *gocb.GetOp:
if o.Err != nil {
return nil, o.Err
return nil, gocb.Cas(0), o.Err
}
err := o.Result.Content(&out)
return out, err

return out, o.Result.Cas(), err
case *gocb.InsertOp:
return nil, o.Err
if o.Result != nil {
return nil, o.Result.Cas(), o.Err
}
return nil, gocb.Cas(0), o.Err
case *gocb.RemoveOp:
return nil, o.Err
if o.Result != nil {
return nil, o.Result.Cas(), o.Err
}
return nil, gocb.Cas(0), o.Err
case *gocb.ReplaceOp:
return nil, o.Err
if o.Result != nil {
return nil, o.Result.Cas(), o.Err
}
return nil, gocb.Cas(0), o.Err
case *gocb.UpsertOp:
return nil, o.Err
if o.Result != nil {
return nil, o.Result.Cas(), o.Err
}
return nil, gocb.Cas(0), o.Err
}

return nil, errors.New("type not supported")
return nil, gocb.Cas(0), errors.New("type not supported")
}

func get(key string, _ []byte) gocb.BulkOp {
func get(key string, _ []byte, _ gocb.Cas) gocb.BulkOp {
return &gocb.GetOp{
ID: key,
}
}

func insert(key string, data []byte) gocb.BulkOp {
func insert(key string, data []byte, _ gocb.Cas) gocb.BulkOp {
return &gocb.InsertOp{
ID: key,
Value: data,
}
}

func remove(key string, _ []byte) gocb.BulkOp {
func remove(key string, _ []byte, cas gocb.Cas) gocb.BulkOp {
return &gocb.RemoveOp{
ID: key,
ID: key,
Cas: cas,
}
}

func replace(key string, data []byte) gocb.BulkOp {
func replace(key string, data []byte, cas gocb.Cas) gocb.BulkOp {
return &gocb.ReplaceOp{
ID: key,
Value: data,
Cas: cas,
}
}

func upsert(key string, data []byte) gocb.BulkOp {
func upsert(key string, data []byte, cas gocb.Cas) gocb.BulkOp {
return &gocb.UpsertOp{
ID: key,
Value: data,
Cas: cas,
}
}
24 changes: 19 additions & 5 deletions internal/impl/couchbase/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import (
"github.com/redpanda-data/connect/v4/internal/impl/couchbase/client"
)

const (
// MetaCASKey hold CAS of entry.
MetaCASKey = "couchbase_cas"
)

var (
// ErrInvalidOperation specified operation is not supported.
ErrInvalidOperation = errors.New("invalid operation")
Expand All @@ -41,7 +46,7 @@ func ProcessorConfig() *service.ConfigSpec {
Version("4.11.0").
Categories("Integration").
Summary("Performs operations against Couchbase for each message, allowing you to store or retrieve data within message payloads.").
Description("When inserting, replacing or upserting documents, each must have the `content` property set.").
Description("When inserting, replacing or upserting documents, each must have the `content` property set. CAS value is stored in meta `couchbase_cas`. It prevent read/write conflict by only allowing write if not modified by other. You can clear the value with `meta couchbase_cas = deleted()` to disable this check.").
Field(service.NewInterpolatedStringField("id").Description("Document id.").Example(`${! json("id") }`)).
Field(service.NewBloblangField("content").Description("Document content.").Optional()).
Field(service.NewStringAnnotatedEnumField("operation", map[string]string{
Expand Down Expand Up @@ -73,7 +78,7 @@ type Processor struct {
*couchbaseClient
id *service.InterpolatedString
content *bloblang.Executor
op func(key string, data []byte) gocb.BulkOp
op func(key string, data []byte, cas gocb.Cas) gocb.BulkOp
}

// NewProcessor returns a Couchbase processor.
Expand Down Expand Up @@ -139,7 +144,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat
}

// generate query
for index := range newMsg {
for index, msg := range newMsg {
// generate id
k, err := inBatch.TryInterpolatedString(index, p.id)
if err != nil {
Expand All @@ -159,7 +164,14 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat
}
}

ops[index] = p.op(k, content)
var cas gocb.Cas // retrieve cas if set
if val, ok := msg.MetaGetMut(MetaCASKey); ok {
if v, ok := val.(gocb.Cas); ok {
cas = v
}
}

ops[index] = p.op(k, content, cas)
}

// execute
Expand All @@ -170,7 +182,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat

// set results
for index, part := range newMsg {
out, err := valueFromOp(ops[index])
out, cas, err := valueFromOp(ops[index])
if err != nil {
part.SetError(fmt.Errorf("couchbase operator failed: %w", err))
}
Expand All @@ -180,6 +192,8 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat
} else if out != nil {
part.SetStructured(out)
}

part.MetaSetMut(MetaCASKey, cas)
}

return []service.MessageBatch{newMsg}, nil
Expand Down
Loading
Loading