diff --git a/.circleci/config.yml b/.circleci/config.yml index 06d6e4d..3d84e4d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -30,8 +30,8 @@ jobs: command: sudo apt-get install postgresql-client - run: psql -h localhost -p 5432 rialto_test < database.dump - run: go test -v ./... - - run: GOOS=linux go build -o solr_derivative cmd/solr/main.go - - run: GOOS=linux go build -o postgres_derivative cmd/postgres/main.go + - run: GOOS=linux go build -o solr_derivative cmd/solr-sqs/main.go + - run: GOOS=linux go build -o postgres_derivative cmd/postgres-sqs/main.go deploy: docker: @@ -46,8 +46,8 @@ jobs: - checkout - run: go get github.com/golang/dep/cmd/dep - run: dep ensure - - run: GOOS=linux go build -o solr_derivative cmd/solr/main.go - - run: GOOS=linux go build -o postgres_derivative cmd/postgres/main.go + - run: GOOS=linux go build -o solr_derivative cmd/solr-sqs/main.go + - run: GOOS=linux go build -o postgres_derivative cmd/postgres-sqs/main.go - run: zip lambda.zip solr_derivative - run: name: Update Lambda Function @@ -87,7 +87,7 @@ jobs: workflows: version: 2 - + deploy-dev: jobs: - build: @@ -97,4 +97,4 @@ workflows: - deploy: filters: branches: - only: master \ No newline at end of file + only: master diff --git a/Makefile b/Makefile index 6317477..a2bd19e 100644 --- a/Makefile +++ b/Makefile @@ -1,15 +1,24 @@ default: package -package: solr postgres +package: solr-sqs postgres-sqs solr-sns postgres-sns -solr: - GOOS=linux go build -o solr_derivative cmd/solr/main.go +solr-sqs: + GOOS=linux go build -o solr_derivative cmd/solr-sqs/main.go zip solr_derivative.zip solr_derivative -postgres: - GOOS=linux go build -o postgres_derivative cmd/postgres/main.go +postgres-sqs: + GOOS=linux go build -o postgres_derivative cmd/postgres-sqs/main.go zip postgres_derivative.zip postgres_derivative +solr-sns: + GOOS=linux go build -o solr_derivative_sns cmd/solr-sns/main.go + zip solr_derivative_sns.zip solr_derivative_sns + +postgres-sns: + GOOS=linux go build -o postgres_derivative_sns cmd/postgres-sns/main.go + zip postgres_derivative_sns.zip postgres_derivative_sns + + local-delete-solr: -AWS_ACCESS_KEY_ID=999999 AWS_SECRET_ACCESS_KEY=1231 aws lambda \ --region us-east-1 \ @@ -33,7 +42,7 @@ local-create-solr: solr local-delete-solr --environment "Variables={SOLR_HOST=http://solr:8983/solr,SOLR_COLLECTION=collection1,\ SPARQL_ENDPOINT=http://triplestore:9999/blazegraph/namespace/kb/sparql, \ SPARQL_RETRIES=300}" \ - --zip-file fileb://solr_derivative.zip + --zip-file fileb://solr_derivative_sns.zip local-create-postgres: postgres local-delete-postgres AWS_ACCESS_KEY_ID=999999 AWS_SECRET_ACCESS_KEY=1231 aws \ @@ -51,7 +60,7 @@ local-create-postgres: postgres local-delete-postgres RDS_PORT=5432, \ RDS_SSL=false, \ RDS_PASSWORD=sekret}" \ - --zip-file fileb://postgres_derivative.zip + --zip-file fileb://postgres_derivative_sns.zip local-create-topic: AWS_ACCESS_KEY_ID=999999 AWS_SECRET_ACCESS_KEY=1231 aws sns \ diff --git a/cmd/postgres-sns/main.go b/cmd/postgres-sns/main.go new file mode 100644 index 0000000..7004eb1 --- /dev/null +++ b/cmd/postgres-sns/main.go @@ -0,0 +1,50 @@ +package main + +import ( + "context" + "os" + "strings" + + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" + "github.com/sul-dlss/rialto-derivatives/actions" + "github.com/sul-dlss/rialto-derivatives/derivative" + "github.com/sul-dlss/rialto-derivatives/message" + "github.com/sul-dlss/rialto-derivatives/repository" + "github.com/sul-dlss/rialto-derivatives/runtime" + + // Added for the postgres driver + _ "github.com/lib/pq" +) + +// Handler is the Lambda function handler +func Handler(ctx context.Context, snsEvent events.SNSEvent) { + repo := repository.BuildRepository() + registry := runtime.NewRegistry(repo, buildDatabase(repo)) + for _, record := range snsEvent.Records { + msg, err := message.ParseSNS(record) + if err != nil { + panic(err) + } + + if err = actions.DispatchMessage(msg, registry).Run(msg); err != nil { + panic(err) + } + } +} + +func buildDatabase(repo repository.Repository) *derivative.PostgresClient { + conf := derivative.NewPostgresConfig(). + WithUser(os.Getenv("RDS_USERNAME")). + WithPassword(os.Getenv("RDS_PASSWORD")). + WithDbname(os.Getenv("RDS_DB_NAME")). + WithHost(os.Getenv("RDS_HOSTNAME")). + WithPort(os.Getenv("RDS_PORT")). + WithSSL(os.Getenv("RDS_SSL") == "" || strings.ToLower(os.Getenv("RDS_SSL")) == "true") + + return derivative.NewPostgresClient(conf, repo) +} + +func main() { + lambda.Start(Handler) +} diff --git a/cmd/postgres/main.go b/cmd/postgres-sqs/main.go similarity index 97% rename from cmd/postgres/main.go rename to cmd/postgres-sqs/main.go index e0f1ce6..eade8d7 100644 --- a/cmd/postgres/main.go +++ b/cmd/postgres-sqs/main.go @@ -22,7 +22,7 @@ func Handler(ctx context.Context, sqsEvent events.SQSEvent) { repo := repository.BuildRepository() registry := runtime.NewRegistry(repo, buildDatabase(repo)) for _, record := range sqsEvent.Records { - msg, err := message.Parse(record) + msg, err := message.ParseSQS(record) if err != nil { panic(err) } diff --git a/cmd/solr-sns/main.go b/cmd/solr-sns/main.go new file mode 100644 index 0000000..a4b9d2d --- /dev/null +++ b/cmd/solr-sns/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "context" + "os" + + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" + "github.com/sul-dlss/rialto-derivatives/actions" + "github.com/sul-dlss/rialto-derivatives/derivative" + "github.com/sul-dlss/rialto-derivatives/message" + "github.com/sul-dlss/rialto-derivatives/repository" + "github.com/sul-dlss/rialto-derivatives/runtime" + "github.com/sul-dlss/rialto-derivatives/transform" +) + +// Handler is the Lambda function handler +func Handler(ctx context.Context, snsEvent events.SNSEvent) { + repo := repository.BuildRepository() + registry := runtime.NewRegistry(repo, buildSolrClient(repo)) + for _, record := range snsEvent.Records { + msg, err := message.ParseSNS(record) + if err != nil { + panic(err) + } + + if err = actions.DispatchMessage(msg, registry).Run(msg); err != nil { + panic(err) + } + } +} + +func buildSolrClient(repo repository.Repository) *derivative.SolrClient { + indexer := transform.NewCompositeIndexer(repo) + + host := os.Getenv("SOLR_HOST") + collection := os.Getenv("SOLR_COLLECTION") + return derivative.NewSolrClient(host, collection, indexer) +} + +func main() { + lambda.Start(Handler) +} diff --git a/cmd/solr/main.go b/cmd/solr-sqs/main.go similarity index 96% rename from cmd/solr/main.go rename to cmd/solr-sqs/main.go index e3f3002..3ff726a 100644 --- a/cmd/solr/main.go +++ b/cmd/solr-sqs/main.go @@ -19,7 +19,7 @@ func Handler(ctx context.Context, sqsEvent events.SQSEvent) { repo := repository.BuildRepository() registry := runtime.NewRegistry(repo, buildSolrClient(repo)) for _, record := range sqsEvent.Records { - msg, err := message.Parse(record) + msg, err := message.ParseSQS(record) if err != nil { panic(err) } diff --git a/message/parser.go b/message/parser.go index 27fa020..6a65280 100644 --- a/message/parser.go +++ b/message/parser.go @@ -18,8 +18,8 @@ type SQSBody struct { Message string } -// Parse transforms an SQSMessage into a message -func Parse(record events.SQSMessage) (*Message, error) { +// ParseSQS transforms an SQSMessage into a message +func ParseSQS(record events.SQSMessage) (*Message, error) { body := &SQSBody{} err := json.Unmarshal([]byte(record.Body), body) if err != nil { @@ -33,3 +33,15 @@ func Parse(record events.SQSMessage) (*Message, error) { return msg, nil } + +// ParseSNS transforms a SNSEventRecord into a message +func ParseSNS(record events.SNSEventRecord) (*Message, error) { + data := record.SNS.Message + msg := &Message{} + err := json.Unmarshal([]byte(data), msg) + if err != nil { + return nil, err + } + + return msg, nil +} diff --git a/message/parser_test.go b/message/parser_test.go index d4c9b29..94147f9 100644 --- a/message/parser_test.go +++ b/message/parser_test.go @@ -7,12 +7,24 @@ import ( "github.com/stretchr/testify/assert" ) -func TestParse(t *testing.T) { +func TestParseSQS(t *testing.T) { evtRecord := events.SQSMessage{ Body: "{\"Message\": \"{\\\"Action\\\": \\\"touch\\\", \\\"Entities\\\":[\\\"http://example.com/foo1\\\"] }\"}", } - event, _ := Parse(evtRecord) + event, _ := ParseSQS(evtRecord) + assert.Equal(t, "touch", event.Action) + assert.Equal(t, []string{"http://example.com/foo1"}, event.Entities) + +} + +func TestParseSNS(t *testing.T) { + evtRecord := events.SNSEventRecord{ + SNS: events.SNSEntity{ + Message: "{\"Action\": \"touch\", \"Entities\":[\"http://example.com/foo1\"] }", + }, + } + event, _ := ParseSNS(evtRecord) assert.Equal(t, "touch", event.Action) assert.Equal(t, []string{"http://example.com/foo1"}, event.Entities)