Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Re-added support for triggering via SNS.
Browse files Browse the repository at this point in the history
closes #175
  • Loading branch information
justinlittman committed Feb 4, 2019
1 parent 09a31a4 commit 67fbdd8
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 19 deletions.
12 changes: 6 additions & 6 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ jobs:
command: sudo apt-get install postgresql-client
- run: psql -h localhost -p 5432 rialto_test < database.dump
- run: go test -v ./...
- run: GOOS=linux go build -o solr_derivative cmd/solr/main.go
- run: GOOS=linux go build -o postgres_derivative cmd/postgres/main.go
- run: GOOS=linux go build -o solr_derivative cmd/solr-sqs/main.go
- run: GOOS=linux go build -o postgres_derivative cmd/postgres-sqs/main.go

deploy:
docker:
Expand All @@ -46,8 +46,8 @@ jobs:
- checkout
- run: go get github.com/golang/dep/cmd/dep
- run: dep ensure
- run: GOOS=linux go build -o solr_derivative cmd/solr/main.go
- run: GOOS=linux go build -o postgres_derivative cmd/postgres/main.go
- run: GOOS=linux go build -o solr_derivative cmd/solr-sqs/main.go
- run: GOOS=linux go build -o postgres_derivative cmd/postgres-sqs/main.go
- run: zip lambda.zip solr_derivative
- run:
name: Update Lambda Function
Expand Down Expand Up @@ -87,7 +87,7 @@ jobs:
workflows:
version: 2

deploy-dev:
jobs:
- build:
Expand All @@ -97,4 +97,4 @@ workflows:
- deploy:
filters:
branches:
only: master
only: master
23 changes: 16 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
default: package

package: solr postgres
package: solr-sqs postgres-sqs solr-sns postgres-sns

solr:
GOOS=linux go build -o solr_derivative cmd/solr/main.go
solr-sqs:
GOOS=linux go build -o solr_derivative cmd/solr-sqs/main.go
zip solr_derivative.zip solr_derivative

postgres:
GOOS=linux go build -o postgres_derivative cmd/postgres/main.go
postgres-sqs:
GOOS=linux go build -o postgres_derivative cmd/postgres-sqs/main.go
zip postgres_derivative.zip postgres_derivative

solr-sns:
GOOS=linux go build -o solr_derivative_sns cmd/solr-sns/main.go
zip solr_derivative_sns.zip solr_derivative_sns

postgres-sns:
GOOS=linux go build -o postgres_derivative_sns cmd/postgres-sns/main.go
zip postgres_derivative_sns.zip postgres_derivative_sns


local-delete-solr:
-AWS_ACCESS_KEY_ID=999999 AWS_SECRET_ACCESS_KEY=1231 aws lambda \
--region us-east-1 \
Expand All @@ -33,7 +42,7 @@ local-create-solr: solr local-delete-solr
--environment "Variables={SOLR_HOST=http://solr:8983/solr,SOLR_COLLECTION=collection1,\
SPARQL_ENDPOINT=http://triplestore:9999/blazegraph/namespace/kb/sparql, \
SPARQL_RETRIES=300}" \
--zip-file fileb://solr_derivative.zip
--zip-file fileb://solr_derivative_sns.zip

local-create-postgres: postgres local-delete-postgres
AWS_ACCESS_KEY_ID=999999 AWS_SECRET_ACCESS_KEY=1231 aws \
Expand All @@ -51,7 +60,7 @@ local-create-postgres: postgres local-delete-postgres
RDS_PORT=5432, \
RDS_SSL=false, \
RDS_PASSWORD=sekret}" \
--zip-file fileb://postgres_derivative.zip
--zip-file fileb://postgres_derivative_sns.zip

local-create-topic:
AWS_ACCESS_KEY_ID=999999 AWS_SECRET_ACCESS_KEY=1231 aws sns \
Expand Down
50 changes: 50 additions & 0 deletions cmd/postgres-sns/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"context"
"os"
"strings"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/sul-dlss/rialto-derivatives/actions"
"github.com/sul-dlss/rialto-derivatives/derivative"
"github.com/sul-dlss/rialto-derivatives/message"
"github.com/sul-dlss/rialto-derivatives/repository"
"github.com/sul-dlss/rialto-derivatives/runtime"

// Added for the postgres driver
_ "github.com/lib/pq"
)

// Handler is the Lambda function handler
func Handler(ctx context.Context, snsEvent events.SNSEvent) {
repo := repository.BuildRepository()
registry := runtime.NewRegistry(repo, buildDatabase(repo))
for _, record := range snsEvent.Records {
msg, err := message.ParseSNS(record)
if err != nil {
panic(err)
}

if err = actions.DispatchMessage(msg, registry).Run(msg); err != nil {
panic(err)
}
}
}

func buildDatabase(repo repository.Repository) *derivative.PostgresClient {
conf := derivative.NewPostgresConfig().
WithUser(os.Getenv("RDS_USERNAME")).
WithPassword(os.Getenv("RDS_PASSWORD")).
WithDbname(os.Getenv("RDS_DB_NAME")).
WithHost(os.Getenv("RDS_HOSTNAME")).
WithPort(os.Getenv("RDS_PORT")).
WithSSL(os.Getenv("RDS_SSL") == "" || strings.ToLower(os.Getenv("RDS_SSL")) == "true")

return derivative.NewPostgresClient(conf, repo)
}

func main() {
lambda.Start(Handler)
}
2 changes: 1 addition & 1 deletion cmd/postgres/main.go → cmd/postgres-sqs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Handler(ctx context.Context, sqsEvent events.SQSEvent) {
repo := repository.BuildRepository()
registry := runtime.NewRegistry(repo, buildDatabase(repo))
for _, record := range sqsEvent.Records {
msg, err := message.Parse(record)
msg, err := message.ParseSQS(record)
if err != nil {
panic(err)
}
Expand Down
43 changes: 43 additions & 0 deletions cmd/solr-sns/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"context"
"os"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/sul-dlss/rialto-derivatives/actions"
"github.com/sul-dlss/rialto-derivatives/derivative"
"github.com/sul-dlss/rialto-derivatives/message"
"github.com/sul-dlss/rialto-derivatives/repository"
"github.com/sul-dlss/rialto-derivatives/runtime"
"github.com/sul-dlss/rialto-derivatives/transform"
)

// Handler is the Lambda function handler
func Handler(ctx context.Context, snsEvent events.SNSEvent) {
repo := repository.BuildRepository()
registry := runtime.NewRegistry(repo, buildSolrClient(repo))
for _, record := range snsEvent.Records {
msg, err := message.ParseSNS(record)
if err != nil {
panic(err)
}

if err = actions.DispatchMessage(msg, registry).Run(msg); err != nil {
panic(err)
}
}
}

func buildSolrClient(repo repository.Repository) *derivative.SolrClient {
indexer := transform.NewCompositeIndexer(repo)

host := os.Getenv("SOLR_HOST")
collection := os.Getenv("SOLR_COLLECTION")
return derivative.NewSolrClient(host, collection, indexer)
}

func main() {
lambda.Start(Handler)
}
2 changes: 1 addition & 1 deletion cmd/solr/main.go → cmd/solr-sqs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func Handler(ctx context.Context, sqsEvent events.SQSEvent) {
repo := repository.BuildRepository()
registry := runtime.NewRegistry(repo, buildSolrClient(repo))
for _, record := range sqsEvent.Records {
msg, err := message.Parse(record)
msg, err := message.ParseSQS(record)
if err != nil {
panic(err)
}
Expand Down
16 changes: 14 additions & 2 deletions message/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type SQSBody struct {
Message string
}

// Parse transforms an SQSMessage into a message
func Parse(record events.SQSMessage) (*Message, error) {
// ParseSQS transforms an SQSMessage into a message
func ParseSQS(record events.SQSMessage) (*Message, error) {
body := &SQSBody{}
err := json.Unmarshal([]byte(record.Body), body)
if err != nil {
Expand All @@ -33,3 +33,15 @@ func Parse(record events.SQSMessage) (*Message, error) {

return msg, nil
}

// ParseSNS transforms a SNSEventRecord into a message
func ParseSNS(record events.SNSEventRecord) (*Message, error) {
data := record.SNS.Message
msg := &Message{}
err := json.Unmarshal([]byte(data), msg)
if err != nil {
return nil, err
}

return msg, nil
}
16 changes: 14 additions & 2 deletions message/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,24 @@ import (
"github.com/stretchr/testify/assert"
)

func TestParse(t *testing.T) {
func TestParseSQS(t *testing.T) {
evtRecord := events.SQSMessage{
Body: "{\"Message\": \"{\\\"Action\\\": \\\"touch\\\", \\\"Entities\\\":[\\\"http://example.com/foo1\\\"] }\"}",
}

event, _ := Parse(evtRecord)
event, _ := ParseSQS(evtRecord)
assert.Equal(t, "touch", event.Action)
assert.Equal(t, []string{"http://example.com/foo1"}, event.Entities)

}

func TestParseSNS(t *testing.T) {
evtRecord := events.SNSEventRecord{
SNS: events.SNSEntity{
Message: "{\"Action\": \"touch\", \"Entities\":[\"http://example.com/foo1\"] }",
},
}
event, _ := ParseSNS(evtRecord)
assert.Equal(t, "touch", event.Action)
assert.Equal(t, []string{"http://example.com/foo1"}, event.Entities)

Expand Down

0 comments on commit 67fbdd8

Please sign in to comment.