diff --git a/package.json b/package.json index 01d2aa7971..1fdfee6cb3 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "start:notifier-seeder": "turbo start --filter pagopa-interop-notifier-seeder", "start:purpose": "turbo start --filter pagopa-interop-purpose-process", "start:purpose-readmodel-writer": "turbo start --filter pagopa-interop-purpose-readmodel-writer", + "start:purpose-platformstate-writer": "turbo start --filter pagopa-interop-purpose-platformstate-writer", "start:authorization": "turbo start --filter pagopa-interop-authorization-process", "start:client-readmodel-writer": "turbo start --filter pagopa-interop-client-readmodel-writer", "start:key-readmodel-writer": "turbo start --filter pagopa-interop-key-readmodel-writer", diff --git a/packages/purpose-platformstate-writer/.env b/packages/purpose-platformstate-writer/.env new file mode 100644 index 0000000000..ec305e1206 --- /dev/null +++ b/packages/purpose-platformstate-writer/.env @@ -0,0 +1,12 @@ +LOG_LEVEL=info + +KAFKA_CLIENT_ID="purpose" +KAFKA_GROUP_ID="purpose-group-local" +KAFKA_BROKERS="localhost:9092" +KAFKA_DISABLE_AWS_IAM_AUTH="true" +PURPOSE_TOPIC="event-store.purpose.events" +AWS_CONFIG_FILE=aws.config.local +TOKEN_GENERATION_READMODEL_TABLE_NAME_PLATFORM="platform-states" +TOKEN_GENERATION_READMODEL_TABLE_NAME_TOKEN_GENERATION="token-generation-states" + +AWS_REGION="eu-south-1" diff --git a/packages/purpose-platformstate-writer/Dockerfile b/packages/purpose-platformstate-writer/Dockerfile new file mode 100644 index 0000000000..2fa2593320 --- /dev/null +++ b/packages/purpose-platformstate-writer/Dockerfile @@ -0,0 +1,44 @@ +FROM node:20.14.0-slim@sha256:5e8ac65a0231d76a388683d07ca36a9769ab019a85d85169fe28e206f7a3208e as build + +RUN corepack enable + +WORKDIR /app +COPY package.json /app/ +COPY pnpm-lock.yaml /app/ +COPY pnpm-workspace.yaml /app/ + +COPY ./packages/purpose-platformstate-writer/package.json /app/packages/purpose-platformstate-writer/package.json +COPY ./packages/commons/package.json /app/packages/commons/package.json +COPY ./packages/models/package.json /app/packages/models/package.json +COPY ./packages/kafka-iam-auth/package.json /app/packages/kafka-iam-auth/package.json + +RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --frozen-lockfile + +COPY tsconfig.json /app/ +COPY turbo.json /app/ +COPY ./packages/purpose-platformstate-writer /app/packages/purpose-platformstate-writer +COPY ./packages/commons /app/packages/commons +COPY ./packages/models /app/packages/models +COPY ./packages/kafka-iam-auth /app/packages/kafka-iam-auth + +RUN pnpm build && \ + rm -rf /app/node_modules/.modules.yaml && \ + rm -rf /app/node_modules/.cache && \ + mkdir /out && \ + cp -a --parents -t /out \ + node_modules packages/purpose-platformstate-writer/node_modules \ + package*.json packages/purpose-platformstate-writer/package*.json \ + packages/commons \ + packages/models \ + packages/kafka-iam-auth \ + packages/purpose-platformstate-writer/dist && \ + find /out -exec touch -h --date=@0 {} \; + +FROM node:20.14.0-slim@sha256:5e8ac65a0231d76a388683d07ca36a9769ab019a85d85169fe28e206f7a3208e as final + +COPY --from=build /out /app + +WORKDIR /app/packages/purpose-platformstate-writer +EXPOSE 3000 + +CMD ["node", "."] diff --git a/packages/purpose-platformstate-writer/aws.config.local b/packages/purpose-platformstate-writer/aws.config.local new file mode 100644 index 0000000000..f3016f81d6 --- /dev/null +++ b/packages/purpose-platformstate-writer/aws.config.local @@ -0,0 +1,9 @@ +[default] +aws_access_key_id=key +aws_secret_access_key=secret +region=eu-south-1 +services=local + +[services local] +dynamodb= + endpoint_url=http://localhost:8085 diff --git a/packages/purpose-platformstate-writer/package.json b/packages/purpose-platformstate-writer/package.json new file mode 100644 index 0000000000..ea2a130497 --- /dev/null +++ b/packages/purpose-platformstate-writer/package.json @@ -0,0 +1,47 @@ +{ + "name": "pagopa-interop-purpose-platformstate-writer", + "private": true, + "version": "1.0.0", + "description": "PagoPA Interoperability purpose consumer service that updates the token-generation-read-model", + "main": "dist", + "type": "module", + "scripts": { + "test": "vitest", + "test:it": "vitest integration", + "lint": "eslint . --ext .ts,.tsx", + "lint:autofix": "eslint . --ext .ts,.tsx --fix", + "format:check": "prettier --check src", + "format:write": "prettier --write src", + "start": "node --loader ts-node/esm -r 'dotenv-flow/config' --watch ./src/index.ts", + "build": "tsc", + "check": "tsc --project tsconfig.check.json" + }, + "keywords": [], + "author": "", + "license": "Apache-2.0", + "devDependencies": { + "@pagopa/eslint-config": "3.0.0", + "@types/node": "20.14.6", + "@types/uuid": "9.0.8", + "date-fns": "3.6.0", + "pagopa-interop-commons-test": "workspace:*", + "prettier": "2.8.8", + "ts-node": "10.9.2", + "typescript": "5.4.5", + "uuid": "10.0.0", + "vitest": "1.6.0" + }, + "dependencies": { + "@aws-sdk/client-dynamodb": "3.637.0", + "@aws-sdk/util-dynamodb": "3.637.0", + "@protobuf-ts/runtime": "2.9.4", + "connection-string": "4.4.0", + "dotenv-flow": "4.1.0", + "kafka-iam-auth": "workspace:*", + "kafkajs": "2.2.4", + "pagopa-interop-commons": "workspace:*", + "pagopa-interop-models": "workspace:*", + "ts-pattern": "5.2.0", + "zod": "3.23.8" + } +} diff --git a/packages/purpose-platformstate-writer/src/config/config.ts b/packages/purpose-platformstate-writer/src/config/config.ts new file mode 100644 index 0000000000..93079e3bbc --- /dev/null +++ b/packages/purpose-platformstate-writer/src/config/config.ts @@ -0,0 +1,15 @@ +import { + PlatformStateWriterConfig, + PurposeTopicConfig, +} from "pagopa-interop-commons"; +import { z } from "zod"; + +export const PurposePlatformStateWriterConfig = + PlatformStateWriterConfig.and(PurposeTopicConfig); + +export type PurposePlatformStateWriterConfig = z.infer< + typeof PurposePlatformStateWriterConfig +>; + +export const config: PurposePlatformStateWriterConfig = + PurposePlatformStateWriterConfig.parse(process.env); diff --git a/packages/purpose-platformstate-writer/src/consumerServiceV1.ts b/packages/purpose-platformstate-writer/src/consumerServiceV1.ts new file mode 100644 index 0000000000..642c5f3709 --- /dev/null +++ b/packages/purpose-platformstate-writer/src/consumerServiceV1.ts @@ -0,0 +1,25 @@ +import { match } from "ts-pattern"; +import { PurposeEventEnvelopeV1 } from "pagopa-interop-models"; +import { DynamoDBClient } from "@aws-sdk/client-dynamodb"; + +export async function handleMessageV1( + message: PurposeEventEnvelopeV1, + _dynamoDBClient: DynamoDBClient +): Promise { + await match(message) + .with( + { type: "PurposeCreated" }, + { type: "PurposeVersionCreated" }, + { type: "PurposeUpdated" }, + { type: "PurposeVersionActivated" }, + { type: "PurposeVersionSuspended" }, + { type: "PurposeVersionArchived" }, + { type: "PurposeVersionWaitedForApproval" }, + { type: "PurposeVersionRejected" }, + { type: "PurposeVersionUpdated" }, + { type: "PurposeDeleted" }, + { type: "PurposeVersionDeleted" }, + () => Promise.resolve() + ) + .exhaustive(); +} diff --git a/packages/purpose-platformstate-writer/src/consumerServiceV2.ts b/packages/purpose-platformstate-writer/src/consumerServiceV2.ts new file mode 100644 index 0000000000..d9dd0560dd --- /dev/null +++ b/packages/purpose-platformstate-writer/src/consumerServiceV2.ts @@ -0,0 +1,32 @@ +import { DynamoDBClient } from "@aws-sdk/client-dynamodb"; +import { PurposeEventEnvelopeV2 } from "pagopa-interop-models"; +import { match } from "ts-pattern"; + +export async function handleMessageV2( + message: PurposeEventEnvelopeV2, + _dynamoDBClient: DynamoDBClient +): Promise { + await match(message) + .with( + { type: "DraftPurposeDeleted" }, + { type: "WaitingForApprovalPurposeDeleted" }, + { type: "PurposeAdded" }, + { type: "DraftPurposeUpdated" }, + { type: "NewPurposeVersionActivated" }, + { type: "NewPurposeVersionWaitingForApproval" }, + { type: "PurposeActivated" }, + { type: "PurposeArchived" }, + { type: "PurposeVersionOverQuotaUnsuspended" }, + { type: "PurposeVersionRejected" }, + { type: "PurposeVersionSuspendedByConsumer" }, + { type: "PurposeVersionSuspendedByProducer" }, + { type: "PurposeVersionUnsuspendedByConsumer" }, + { type: "PurposeVersionUnsuspendedByProducer" }, + { type: "PurposeWaitingForApproval" }, + { type: "WaitingForApprovalPurposeVersionDeleted" }, + { type: "PurposeVersionActivated" }, + { type: "PurposeCloned" }, + () => Promise.resolve() + ) + .exhaustive(); +} diff --git a/packages/purpose-platformstate-writer/src/index.ts b/packages/purpose-platformstate-writer/src/index.ts new file mode 100644 index 0000000000..12d053d588 --- /dev/null +++ b/packages/purpose-platformstate-writer/src/index.ts @@ -0,0 +1,36 @@ +import { EachMessagePayload } from "kafkajs"; +import { logger, decodeKafkaMessage } from "pagopa-interop-commons"; +import { runConsumer } from "kafka-iam-auth"; +import { PurposeEvent } from "pagopa-interop-models"; +import { match } from "ts-pattern"; +import { DynamoDBClient } from "@aws-sdk/client-dynamodb"; +import { handleMessageV1 } from "./consumerServiceV1.js"; +import { handleMessageV2 } from "./consumerServiceV2.js"; +import { config } from "./config/config.js"; + +const dynamoDBClient = new DynamoDBClient(); +async function processMessage({ + message, + partition, +}: EachMessagePayload): Promise { + const decodedMessage = decodeKafkaMessage(message, PurposeEvent); + + const loggerInstance = logger({ + serviceName: "purpose-platformstate-writer", + eventType: decodedMessage.type, + eventVersion: decodedMessage.event_version, + streamId: decodedMessage.stream_id, + correlationId: decodedMessage.correlation_id, + }); + + await match(decodedMessage) + .with({ event_version: 1 }, (msg) => handleMessageV1(msg, dynamoDBClient)) + .with({ event_version: 2 }, (msg) => handleMessageV2(msg, dynamoDBClient)) + .exhaustive(); + + loggerInstance.info( + `Token-generation read model was updated. Partition number: ${partition}. Offset: ${message.offset}` + ); +} + +await runConsumer(config, [config.purposeTopic], processMessage); diff --git a/packages/purpose-platformstate-writer/src/utils.ts b/packages/purpose-platformstate-writer/src/utils.ts new file mode 100644 index 0000000000..9d56cfcfc8 --- /dev/null +++ b/packages/purpose-platformstate-writer/src/utils.ts @@ -0,0 +1,128 @@ +import { + DeleteItemCommand, + DeleteItemInput, + DynamoDBClient, + GetItemCommand, + GetItemCommandOutput, + GetItemInput, + PutItemCommand, + PutItemInput, +} from "@aws-sdk/client-dynamodb"; +import { unmarshall } from "@aws-sdk/util-dynamodb"; +import { + genericInternalError, + PlatformStatesCatalogEntry, + PlatformStatesEServiceDescriptorPK, + PlatformStatesPurposeEntry, + PlatformStatesPurposePK, +} from "pagopa-interop-models"; +import { config } from "./config/config.js"; + +export const writePlatformPurposeEntry = async ( + dynamoDBClient: DynamoDBClient, + purposeEntry: PlatformStatesPurposeEntry +): Promise => { + const input: PutItemInput = { + ConditionExpression: "attribute_not_exists(PK)", + Item: { + PK: { + S: purposeEntry.PK, + }, + state: { + S: purposeEntry.state, + }, + purposeVersionId: { + S: purposeEntry.purposeVersionId, + }, + purposeEserviceId: { + S: purposeEntry.purposeEserviceId, + }, + purposeConsumerId: { + S: purposeEntry.purposeConsumerId, + }, + version: { + N: purposeEntry.version.toString(), + }, + updatedAt: { + S: purposeEntry.updatedAt, + }, + }, + TableName: config.tokenGenerationReadModelTableNamePlatform, + }; + const command = new PutItemCommand(input); + await dynamoDBClient.send(command); +}; + +export const readPlatformPurposeEntry = async ( + dynamoDBClient: DynamoDBClient, + primaryKey: PlatformStatesPurposePK +): Promise => { + const input: GetItemInput = { + Key: { + PK: { S: primaryKey }, + }, + TableName: config.tokenGenerationReadModelTableNamePlatform, + }; + const command = new GetItemCommand(input); + const data: GetItemCommandOutput = await dynamoDBClient.send(command); + + if (!data.Item) { + return undefined; + } else { + const unmarshalled = unmarshall(data.Item); + const purposeEntry = PlatformStatesPurposeEntry.safeParse(unmarshalled); + + if (!purposeEntry.success) { + throw genericInternalError( + `Unable to parse purpose entry item: result ${JSON.stringify( + purposeEntry + )} - data ${JSON.stringify(data)} ` + ); + } + return purposeEntry.data; + } +}; + +export const deletePlatformPurposeEntry = async ( + dynamoDBClient: DynamoDBClient, + primaryKey: PlatformStatesPurposePK +): Promise => { + const input: DeleteItemInput = { + Key: { + PK: { S: primaryKey }, + }, + TableName: config.tokenGenerationReadModelTableNamePlatform, + }; + const command = new DeleteItemCommand(input); + await dynamoDBClient.send(command); +}; + +export const readCatalogEntry = async ( + dynamoDBClient: DynamoDBClient, + primaryKey: PlatformStatesEServiceDescriptorPK +): Promise => { + const input: GetItemInput = { + Key: { + PK: { S: primaryKey }, + }, + TableName: config.tokenGenerationReadModelTableNamePlatform, + }; + const command = new GetItemCommand(input); + const data: GetItemCommandOutput = await dynamoDBClient.send(command); + + if (!data.Item) { + return undefined; + } else { + const unmarshalled = unmarshall(data.Item); + const catalogEntry = PlatformStatesCatalogEntry.safeParse(unmarshalled); + + if (!catalogEntry.success) { + throw genericInternalError( + `Unable to parse catalog entry item: result ${JSON.stringify( + catalogEntry + )} - data ${JSON.stringify(data)} ` + ); + } + return catalogEntry.data; + } +}; diff --git a/packages/purpose-platformstate-writer/test/sample.test.ts b/packages/purpose-platformstate-writer/test/sample.test.ts new file mode 100644 index 0000000000..e110c51648 --- /dev/null +++ b/packages/purpose-platformstate-writer/test/sample.test.ts @@ -0,0 +1,7 @@ +import { describe, expect, it } from "vitest"; + +describe("sample", () => { + it("test", () => { + expect(1).toBe(1); + }); +}); diff --git a/packages/purpose-platformstate-writer/test/tsconfig.json b/packages/purpose-platformstate-writer/test/tsconfig.json new file mode 100644 index 0000000000..379a994d81 --- /dev/null +++ b/packages/purpose-platformstate-writer/test/tsconfig.json @@ -0,0 +1,4 @@ +{ + "extends": "../tsconfig.json", + "include": ["."] +} diff --git a/packages/purpose-platformstate-writer/test/vitestGlobalSetup.ts b/packages/purpose-platformstate-writer/test/vitestGlobalSetup.ts new file mode 100644 index 0000000000..85a4c8ea41 --- /dev/null +++ b/packages/purpose-platformstate-writer/test/vitestGlobalSetup.ts @@ -0,0 +1,3 @@ +import { setupTestContainersVitestGlobal } from "pagopa-interop-commons-test/index.js"; + +export default setupTestContainersVitestGlobal(); diff --git a/packages/purpose-platformstate-writer/tsconfig.check.json b/packages/purpose-platformstate-writer/tsconfig.check.json new file mode 100644 index 0000000000..a19f84bcb7 --- /dev/null +++ b/packages/purpose-platformstate-writer/tsconfig.check.json @@ -0,0 +1,7 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "noEmit": true, + }, + "include": ["src", "test"] +} diff --git a/packages/purpose-platformstate-writer/tsconfig.json b/packages/purpose-platformstate-writer/tsconfig.json new file mode 100644 index 0000000000..a1ec44f6e6 --- /dev/null +++ b/packages/purpose-platformstate-writer/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "src" + ] +} diff --git a/packages/purpose-platformstate-writer/vitest.config.ts b/packages/purpose-platformstate-writer/vitest.config.ts new file mode 100644 index 0000000000..9ece1be991 --- /dev/null +++ b/packages/purpose-platformstate-writer/vitest.config.ts @@ -0,0 +1,11 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + globalSetup: ["./test/vitestGlobalSetup.ts"], + testTimeout: 60000, + hookTimeout: 60000, + fileParallelism: false, + pool: "forks", + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 85c8bc06d1..7cd438b6a2 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2288,6 +2288,73 @@ importers: specifier: 1.6.0 version: 1.6.0(@types/node@20.14.6) + packages/purpose-platformstate-writer: + dependencies: + '@aws-sdk/client-dynamodb': + specifier: 3.637.0 + version: 3.637.0 + '@aws-sdk/util-dynamodb': + specifier: 3.637.0 + version: 3.637.0(@aws-sdk/client-dynamodb@3.637.0) + '@protobuf-ts/runtime': + specifier: 2.9.4 + version: 2.9.4 + connection-string: + specifier: 4.4.0 + version: 4.4.0 + dotenv-flow: + specifier: 4.1.0 + version: 4.1.0 + kafka-iam-auth: + specifier: workspace:* + version: link:../kafka-iam-auth + kafkajs: + specifier: 2.2.4 + version: 2.2.4 + pagopa-interop-commons: + specifier: workspace:* + version: link:../commons + pagopa-interop-models: + specifier: workspace:* + version: link:../models + ts-pattern: + specifier: 5.2.0 + version: 5.2.0 + zod: + specifier: 3.23.8 + version: 3.23.8 + devDependencies: + '@pagopa/eslint-config': + specifier: 3.0.0 + version: 3.0.0(typescript@5.4.5) + '@types/node': + specifier: 20.14.6 + version: 20.14.6 + '@types/uuid': + specifier: 9.0.8 + version: 9.0.8 + date-fns: + specifier: 3.6.0 + version: 3.6.0 + pagopa-interop-commons-test: + specifier: workspace:* + version: link:../commons-test + prettier: + specifier: 2.8.8 + version: 2.8.8 + ts-node: + specifier: 10.9.2 + version: 10.9.2(@types/node@20.14.6)(typescript@5.4.5) + typescript: + specifier: 5.4.5 + version: 5.4.5 + uuid: + specifier: 10.0.0 + version: 10.0.0 + vitest: + specifier: 1.6.0 + version: 1.6.0(@types/node@20.14.6) + packages/purpose-process: dependencies: '@zodios/core':