Skip to content

Commit

Permalink
IMN 301 - Add details in consumer logs (#310)
Browse files Browse the repository at this point in the history
  • Loading branch information
MalpenZibo authored Apr 3, 2024
1 parent 57dc51b commit 293face
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 40 deletions.
14 changes: 12 additions & 2 deletions packages/agreement-readmodel-writer/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
/* eslint-disable functional/immutable-data */
import { EachMessagePayload } from "kafkajs";
import {
ReadModelRepository,
readModelWriterConfig,
agreementTopicConfig,
decodeKafkaMessage,
logger,
getContext,
} from "pagopa-interop-commons";
import { runConsumer } from "kafka-iam-auth";
import { AgreementEvent } from "pagopa-interop-models";
Expand All @@ -21,9 +23,17 @@ async function processMessage({
partition,
}: EachMessagePayload): Promise<void> {
try {
const decodedMessage = decodeKafkaMessage(message, AgreementEvent);
const msg = decodeKafkaMessage(message, AgreementEvent);

await match(decodedMessage)
const ctx = getContext();
ctx.messageData = {
eventType: msg.type,
eventVersion: msg.event_version,
streamId: msg.stream_id,
};
ctx.correlationId = msg.correlation_id;

await match(msg)
.with({ event_version: 1 }, (msg) => handleMessageV1(msg, agreements))
.with({ event_version: 2 }, (msg) => handleMessageV2(msg, agreements))
.exhaustive();
Expand Down
16 changes: 12 additions & 4 deletions packages/attribute-registry-readmodel-writer/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
/* eslint-disable functional/immutable-data */
import { Kafka, KafkaMessage } from "kafkajs";
import {
ReadModelRepository,
attributeTopicConfig,
decodeKafkaMessage,
getContext,
logger,
readModelWriterConfig,
} from "pagopa-interop-commons";
Expand Down Expand Up @@ -48,10 +50,16 @@ await consumer.subscribe({

async function processMessage(message: KafkaMessage): Promise<void> {
try {
await handleMessage(
decodeKafkaMessage(message, AttributeEvent),
attributes
);
const msg = decodeKafkaMessage(message, AttributeEvent);
const ctx = getContext();
ctx.messageData = {
eventType: msg.type,
eventVersion: msg.event_version,
streamId: msg.stream_id,
};
ctx.correlationId = msg.correlation_id;

await handleMessage(msg, attributes);

logger.info("Read model was updated");
} catch (e) {
Expand Down
3 changes: 2 additions & 1 deletion packages/authorization-updater/src/authorizationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
jwtSeedConfig,
logger,
} from "pagopa-interop-commons";
import { v4 } from "uuid";
import { DescriptorId, EServiceId } from "pagopa-interop-models";
import { buildAuthMgmtClient } from "./authorizationManagementClient.js";
import { ApiClientComponentState } from "./model/models.js";
Expand Down Expand Up @@ -56,7 +57,7 @@ export const authorizationServiceBuilder =
const getHeaders = () => {
const appContext = getContext();
return {
"X-Correlation-Id": appContext.correlationId,
"X-Correlation-Id": appContext.correlationId || v4(),
};
};

Expand Down
12 changes: 8 additions & 4 deletions packages/authorization-updater/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/* eslint-disable functional/immutable-data */
import { v4 as uuidv4 } from "uuid";
import { runConsumer } from "kafka-iam-auth";
import { match } from "ts-pattern";
import { EachMessagePayload } from "kafkajs";
Expand Down Expand Up @@ -75,15 +74,20 @@ function processMessage(
) {
return async (messagePayload: EachMessagePayload): Promise<void> => {
try {
const appContext = getContext();
appContext.correlationId = uuidv4();

const messageDecoder = messageDecoderSupplier(
topicConfig,
messagePayload.topic
);
const decodedMsg = messageDecoder(messagePayload.message);

const ctx = getContext();
ctx.messageData = {
eventType: decodedMsg.type,
eventVersion: decodedMsg.event_version,
streamId: decodedMsg.stream_id,
};
ctx.correlationId = decodedMsg.correlation_id;

const updateSeed = match(decodedMsg)
.with(
{
Expand Down
1 change: 1 addition & 0 deletions packages/catalog-readmodel-writer/src/consumerServiceV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export async function handleMessageV2(
eservices: EServiceCollection
): Promise<void> {
logger.info(message);

const eservice = message.data.eservice;

await match(message)
Expand Down
10 changes: 10 additions & 0 deletions packages/catalog-readmodel-writer/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
/* eslint-disable functional/immutable-data */
import { EachMessagePayload } from "kafkajs";
import {
logger,
ReadModelRepository,
readModelWriterConfig,
catalogTopicConfig,
decodeKafkaMessage,
getContext,
} from "pagopa-interop-commons";
import { v4 } from "uuid";
import { runConsumer } from "kafka-iam-auth";
import { EServiceEvent } from "pagopa-interop-models";
import { match } from "ts-pattern";
Expand All @@ -22,6 +25,13 @@ async function processMessage({
}: EachMessagePayload): Promise<void> {
try {
const decodedMessage = decodeKafkaMessage(message, EServiceEvent);
const ctx = getContext();
ctx.messageData = {
eventType: decodedMessage.type,
eventVersion: decodedMessage.event_version,
streamId: decodedMessage.stream_id,
};
ctx.correlationId = decodedMessage.correlation_id || v4();

await match(decodedMessage)
.with({ event_version: 1 }, (msg) => handleMessageV1(msg, eservices))
Expand Down
2 changes: 1 addition & 1 deletion packages/commons/src/auth/headers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export type Headers = z.infer<typeof Headers>;

export const ParsedHeaders = z
.object({
correlationId: z.string().uuid(),
correlationId: z.string(),
})
.and(AuthData);
export type ParsedHeaders = z.infer<typeof ParsedHeaders>;
Expand Down
13 changes: 10 additions & 3 deletions packages/commons/src/context/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,28 @@ import { z } from "zod";
import { AuthData, defaultAuthData } from "../auth/authData.js";
import { readHeaders } from "../auth/headers.js";

export type AppContext = z.infer<typeof ctx>;
export type AppContext = {
authData: AuthData;
messageData?: {
eventType: string;
eventVersion: number;
streamId: string;
};
correlationId?: string;
};
export type ZodiosContext = NonNullable<typeof zodiosCtx>;
export type ExpressContext = NonNullable<typeof zodiosCtx.context>;

export const ctx = z.object({
authData: AuthData,
correlationId: z.string().uuid(),
correlationId: z.string(),
});

export const zodiosCtx = zodiosContext(z.object({ ctx }));

const globalStore = new AsyncLocalStorage<AppContext>();
const defaultAppContext: AppContext = {
authData: defaultAuthData,
correlationId: "",
};

export const getContext = (): AppContext => {
Expand Down
85 changes: 64 additions & 21 deletions packages/commons/src/logging/loggerMiddleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/* eslint-disable @typescript-eslint/explicit-function-return-type */
import * as expressWinston from "express-winston";
import * as winston from "winston";
import { v4 } from "uuid";
import { LoggerConfig } from "../config/commonConfig.js";
import { getContext } from "../index.js";
import { bigIntReplacer } from "./utils.js";
Expand All @@ -10,6 +11,9 @@ export type SessionMetaData = {
userId: string | undefined;
organizationId: string | undefined;
correlationId: string | undefined;
eventType: string | undefined;
eventVersion: number | undefined;
streamId: string | undefined;
};

export const parsedLoggerConfig = LoggerConfig.safeParse(process.env);
Expand All @@ -23,31 +27,78 @@ const getLoggerMetadata = (): SessionMetaData => {
const appContext = getContext();
return !appContext
? {
userId: "",
organizationId: "",
correlationId: "",
userId: undefined,
organizationId: undefined,
correlationId: v4(),
eventType: undefined,
eventVersion: undefined,
streamId: undefined,
}
: {
userId: appContext.authData.userId,
organizationId: appContext.authData.organizationId,
userId: appContext.authData?.userId,
organizationId: appContext.authData?.organizationId,
correlationId: appContext.correlationId,
eventType: appContext.messageData?.eventType,
eventVersion: appContext.messageData?.eventVersion,
streamId: appContext.messageData?.streamId,
};
};

const logFormat = (
msg: string,
timestamp: string,
level: string,
userId: string | undefined,
organizationId: string | undefined,
correlationId: string | undefined,
serviceName: string = ""
) =>
`${timestamp} ${level.toUpperCase()} [${serviceName}] - [UID=${userId}] [OID=${organizationId}] [CID=${correlationId}] ${msg}`;
{
userId,
organizationId,
correlationId,
serviceName,
eventType,
eventVersion,
streamId,
}: {
userId: string | undefined;
organizationId: string | undefined;
correlationId: string | undefined;
serviceName: string | undefined;
eventType: string | undefined;
eventVersion: number | undefined;
streamId: string | undefined;
}
) => {
const serviceLogPart = serviceName ? `[${serviceName}]` : undefined;
const userLogPart = userId ? `[UID=${userId}]` : undefined;
const organizationLogPart = organizationId
? `[OID=${organizationId}]`
: undefined;
const correlationLogPart = correlationId
? `[CID=${correlationId}]`
: undefined;
const eventTypePart = eventType ? `[ET=${eventType}]` : undefined;
const eventVersionPart = eventVersion ? `[EV=${eventVersion}]` : undefined;
const streamIdPart = streamId ? `[SID=${streamId}]` : undefined;

const firstPart = [timestamp, level.toUpperCase(), serviceLogPart]
.filter((e) => e !== undefined)
.join(" ");

const secondPart = [
userLogPart,
organizationLogPart,
correlationLogPart,
eventTypePart,
eventVersionPart,
streamIdPart,
]
.filter((e) => e !== undefined)
.join(" ");

return `${firstPart} - ${secondPart} ${msg}`;
};

export const customFormat = (serviceName?: string) =>
winston.format.printf(({ level, message, timestamp }) => {
const { userId, organizationId, correlationId } = getLoggerMetadata();
const logMetadata = getLoggerMetadata();
const clearMessage =
typeof message === "object"
? JSON.stringify(message, bigIntReplacer)
Expand All @@ -56,15 +107,7 @@ export const customFormat = (serviceName?: string) =>
.toString()
.split("\n")
.map((line: string) =>
logFormat(
line,
timestamp,
level,
userId,
organizationId,
correlationId,
serviceName
)
logFormat(line, timestamp, level, { ...logMetadata, serviceName })
);
return lines.join("\n");
});
Expand Down
2 changes: 1 addition & 1 deletion packages/commons/src/repositories/EventRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ export const eventRepository = <T extends Event>(

await t.none(sql.insertEvent, {
stream_id: createEvent.streamId,
correlation_id: createEvent.correlationId,
version: newVersion,
correlation_id: createEvent.correlationId,
type: createEvent.event.type,
event_version: createEvent.event.event_version,
data: Buffer.from(toBinaryData(createEvent.event)),
Expand Down
4 changes: 2 additions & 2 deletions packages/commons/src/repositories/sql/insertEvent.sql
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
INSERT INTO
"events"(
"stream_id",
"correlation_id",
"version",
"correlation_id",
"type",
"event_version",
"data"
)
VALUES
(
$(stream_id),
$(correlation_id),
$(version),
$(correlation_id),
$(type),
$(event_version),
$(data)
Expand Down
1 change: 1 addition & 0 deletions packages/models/src/events/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export const EventEnvelope = <TEventZodType extends z.ZodType>(
sequence_num: z.number(),
stream_id: z.string().uuid(),
version: z.number(),
correlation_id: z.string().optional(),
}),
event
);
Expand Down
14 changes: 13 additions & 1 deletion packages/tenant-readmodel-writer/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
/* eslint-disable functional/immutable-data */
import { EachMessagePayload } from "kafkajs";
import {
logger,
readModelWriterConfig,
tenantTopicConfig,
decodeKafkaMessage,
getContext,
} from "pagopa-interop-commons";
import { runConsumer } from "kafka-iam-auth";
import { TenantEvent } from "pagopa-interop-models";
Expand All @@ -14,7 +16,17 @@ async function processMessage({
partition,
}: EachMessagePayload): Promise<void> {
try {
await handleMessage(decodeKafkaMessage(message, TenantEvent));
const decodedMessage = decodeKafkaMessage(message, TenantEvent);

const ctx = getContext();
ctx.messageData = {
eventType: decodedMessage.type,
eventVersion: decodedMessage.event_version,
streamId: decodedMessage.stream_id,
};
ctx.correlationId = decodedMessage.correlation_id;

await handleMessage(decodedMessage);
logger.info(
`Read model was updated. Partition number: ${partition}. Offset: ${message.offset}`
);
Expand Down

0 comments on commit 293face

Please sign in to comment.