Skip to content

Commit

Permalink
Attempt to fix the relays projection in a generic way.
Browse files Browse the repository at this point in the history
  • Loading branch information
matteius committed Oct 6, 2024
1 parent 7dbb95c commit 98d98c6
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions opensensor/collection_apis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
import logging
from datetime import datetime, timedelta, timezone
from typing import Dict, Generic, List, Optional, Type, TypeVar, get_args, get_origin
from enum import Enum
from typing import (
Any,
Dict,
Generic,
List,
Optional,
Type,
TypeVar,
get_args,
get_origin,
)

from bson import Binary
from fastapi import APIRouter, Depends, HTTPException, Path, Query, Response, status
Expand Down Expand Up @@ -184,10 +195,6 @@ def get_initial_match_clause(
return match_clause


def is_pydantic_model(obj):
return isinstance(obj, type) and issubclass(obj, BaseModel)


def get_nested_fields(model: Type[BaseModel]) -> Dict[str, Type[BaseModel]]:
nested_fields = {}
for field_name, field in model.__fields__.items():
Expand All @@ -198,17 +205,14 @@ def get_nested_fields(model: Type[BaseModel]) -> Dict[str, Type[BaseModel]]:
return nested_fields


def is_list_of_models(field: ModelField) -> bool:
if field.shape != 2: # 2 represents a list shape in Pydantic
return False
args = field.sub_fields[0].type_.__args__
return args and is_pydantic_model(args[0])


def get_list_item_type(field: ModelField) -> Type[BaseModel]:
return field.sub_fields[0].type_.__args__[0]


def is_enum(field_type):
return issubclass(field_type, Enum)


def create_nested_pipeline(model: Type[BaseModel], prefix=""):
logger.debug(f"Creating nested pipeline for model: {model.__name__}, prefix: {prefix}")
match_conditions = {}
Expand All @@ -229,13 +233,21 @@ def create_nested_pipeline(model: Type[BaseModel], prefix=""):
unit_field_name = f"{prefix}{mongo_field}_unit"
pipeline["unit"] = f"${unit_field_name}"
match_conditions[unit_field_name] = {"$exists": True}
elif is_list_of_models(field):
item_model = get_list_item_type(field)
elif is_pydantic_model(field.type_):
nested_pipeline, nested_match = create_nested_pipeline(field.type_, f"{field_name}.")
pipeline[field_name] = nested_pipeline
match_conditions.update({f"{field_name}.{k}": v for k, v in nested_match.items()})
elif get_origin(field.type_) is List and is_pydantic_model(get_args(field.type_)[0]):
item_model = get_args(field.type_)[0]
nested_pipeline, nested_match = create_nested_pipeline(item_model, "")
pipeline[field_name] = {
"$map": {"input": f"${full_mongo_field_name}", "as": "item", "in": nested_pipeline}
}
match_conditions[full_mongo_field_name] = {"$exists": True, "$ne": []}
elif is_enum(field.type_):
# Handle enum fields as simple fields
pipeline[field_name] = f"${full_mongo_field_name}"
match_conditions[full_mongo_field_name] = {"$exists": True}
else:
# Handle simple field
pipeline[field_name] = f"${full_mongo_field_name}"
Expand All @@ -248,6 +260,10 @@ def create_nested_pipeline(model: Type[BaseModel], prefix=""):
return pipeline, match_conditions


def is_pydantic_model(obj: Any) -> bool:
return isinstance(obj, type) and issubclass(obj, BaseModel)


def create_model_instance(model: Type[BaseModel], data: dict, target_unit: Optional[str] = None):
nested_fields = get_nested_fields(model)

Expand Down

0 comments on commit 98d98c6

Please sign in to comment.