Skip to content

Commit

Permalink
Adding allow disk use in all Mongo aggregations (#1006)
Browse files Browse the repository at this point in the history
  • Loading branch information
ecamellini authored Sep 25, 2024
1 parent c478186 commit 03ebc1a
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 40 deletions.
9 changes: 3 additions & 6 deletions packages/agreement-process/src/services/readModelService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,7 @@ async function searchTenantsByName(
results: result.data,
totalCount: await ReadModelRepository.getTotalCount(
agreements,
aggregationPipeline,
true
aggregationPipeline
),
};
}
Expand Down Expand Up @@ -402,8 +401,7 @@ export function readModelServiceBuilder(
results: result.data,
totalCount: await ReadModelRepository.getTotalCount(
agreements,
aggregationPipeline,
true
aggregationPipeline
),
};
},
Expand Down Expand Up @@ -560,8 +558,7 @@ export function readModelServiceBuilder(
results: result.data,
totalCount: await ReadModelRepository.getTotalCount(
eservices,
aggregationPipeline,
true
aggregationPipeline
),
};
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ async function getAttributes({
limit: number;
}): Promise<ListResult<Attribute>> {
const data = await attributes
.aggregate([...aggregationPipeline, { $skip: offset }, { $limit: limit }])
.aggregate([...aggregationPipeline, { $skip: offset }, { $limit: limit }], {
allowDiskUse: true,
})
.toArray();
const result = z.array(Attribute).safeParse(data.map((d) => d.data));
if (!result.success) {
Expand All @@ -99,8 +101,7 @@ async function getAttributes({
results: result.data,
totalCount: await ReadModelRepository.getTotalCount(
attributes,
aggregationPipeline,
false
aggregationPipeline
),
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ export function readModelServiceBuilder(
results: result.data,
totalCount: await ReadModelRepository.getTotalCount(
clients,
aggregationPipeline,
false
aggregationPipeline
),
};
},
Expand Down Expand Up @@ -347,8 +346,7 @@ export function readModelServiceBuilder(
results: result.data,
totalCount: await ReadModelRepository.getTotalCount(
producerKeychains,
aggregationPipeline,
false
aggregationPipeline
),
};
},
Expand Down
28 changes: 13 additions & 15 deletions packages/catalog-process/src/services/readModelService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,10 @@ export function readModelServiceBuilder(
];

const data = await eservices
.aggregate([
...aggregationPipeline,
{ $skip: offset },
{ $limit: limit },
])
.aggregate(
[...aggregationPipeline, { $skip: offset }, { $limit: limit }],
{ allowDiskUse: true }
)
.toArray();

const result = z.array(EService).safeParse(data.map((d) => d.data));
Expand All @@ -269,8 +268,7 @@ export function readModelServiceBuilder(
results: result.data,
totalCount: await ReadModelRepository.getTotalCount(
eservices,
aggregationPipeline,
false
aggregationPipeline
),
};
},
Expand Down Expand Up @@ -380,11 +378,10 @@ export function readModelServiceBuilder(
];

const data = await eservices
.aggregate([
...aggregationPipeline,
{ $skip: offset },
{ $limit: limit },
])
.aggregate(
[...aggregationPipeline, { $skip: offset }, { $limit: limit }],
{ allowDiskUse: true }
)
.toArray();

const result = z.array(consumer).safeParse(data);
Expand All @@ -400,8 +397,7 @@ export function readModelServiceBuilder(
results: result.data,
totalCount: await ReadModelRepository.getTotalCount(
eservices,
aggregationPipeline,
false
aggregationPipeline
),
};
},
Expand Down Expand Up @@ -462,7 +458,9 @@ export function readModelServiceBuilder(
const aggregationWithLimit = limit
? [...aggregationPipeline, { $limit: limit }]
: aggregationPipeline;
const data = await agreements.aggregate(aggregationWithLimit).toArray();
const data = await agreements
.aggregate(aggregationWithLimit, { allowDiskUse: true })
.toArray();
const result = z.array(Agreement).safeParse(data.map((a) => a.data));

if (!result.success) {
Expand Down
5 changes: 2 additions & 3 deletions packages/commons/src/repositories/ReadModelRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,10 @@ export class ReadModelRepository {

public static async getTotalCount(
collection: Collections,
aggregation: object[],
allowDiskUse: boolean
aggregation: object[]
): Promise<number> {
const query = collection.aggregate([...aggregation, { $count: "count" }], {
allowDiskUse,
allowDiskUse: true,
});

const data = await query.toArray();
Expand Down
3 changes: 1 addition & 2 deletions packages/purpose-process/src/services/readModelService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ export function readModelServiceBuilder(
results: result.data,
totalCount: await ReadModelRepository.getTotalCount(
purposes,
aggregationPipeline,
false
aggregationPipeline
),
};
},
Expand Down
13 changes: 6 additions & 7 deletions packages/tenant-process/src/services/readModelService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ export const getTenants = async ({
aggregationPipeline,
offset,
limit,
allowDiskUse = false,
}: {
tenants: TenantCollection;
aggregationPipeline: Array<Filter<TenantReadModel>>;
Expand All @@ -67,7 +66,7 @@ export const getTenants = async ({
}> => {
const data = await tenants
.aggregate([...aggregationPipeline, { $skip: offset }, { $limit: limit }], {
allowDiskUse,
allowDiskUse: true,
})
.toArray();

Expand All @@ -84,8 +83,7 @@ export const getTenants = async ({
results: result.data,
totalCount: await ReadModelRepository.getTotalCount(
tenants,
aggregationPipeline,
allowDiskUse
aggregationPipeline
),
};
};
Expand Down Expand Up @@ -324,7 +322,9 @@ export function readModelServiceBuilder(

async getAttributesById(attributeIds: AttributeId[]): Promise<Attribute[]> {
const data = await attributes
.aggregate([{ $match: { "data.id": { $in: attributeIds } } }])
.aggregate([{ $match: { "data.id": { $in: attributeIds } } }], {
allowDiskUse: true,
})
.toArray();
const result = z.array(Attribute).safeParse(data.map((d) => d.data));
if (!result.success) {
Expand Down Expand Up @@ -481,8 +481,7 @@ export function readModelServiceBuilder(
results: result.data,
totalCount: await ReadModelRepository.getTotalCount(
attributes,
aggregationPipeline,
false
aggregationPipeline
),
};
},
Expand Down

0 comments on commit 03ebc1a

Please sign in to comment.