Skip to content

Commit

Permalink
To support Python3.8, reverted to using Type/List/Dict instead of `…
Browse files Browse the repository at this point in the history
…type/list/dict` for type annotations. Otherwise deserialisation in the library breaks, and also Pydantic breaks.
  • Loading branch information
johnbywater committed Jan 22, 2024
1 parent 4407b9d commit 9a0a3b5
Show file tree
Hide file tree
Showing 55 changed files with 342 additions and 307 deletions.
44 changes: 24 additions & 20 deletions eventsourcing/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
Any,
Callable,
ClassVar,
Dict,
Generic,
Iterable,
Iterator,
List,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
cast,
)
Expand Down Expand Up @@ -85,7 +89,7 @@ def project_aggregate(

class Cache(Generic[S, T]):
def __init__(self) -> None:
self.cache: dict[S, Any] = {}
self.cache: Dict[S, Any] = {}

def get(self, key: S, *, evict: bool = False) -> T:
if evict:
Expand Down Expand Up @@ -116,7 +120,7 @@ def __init__(self, maxsize: int):
self.maxsize = maxsize
self.full = False
self.lock = Lock() # because linkedlist updates aren't threadsafe
self.root: list[Any] = [] # root of the circular doubly linked list
self.root: List[Any] = [] # root of the circular doubly linked list
self.clear()

def clear(self) -> None:
Expand Down Expand Up @@ -242,7 +246,7 @@ def __init__(
self._fastforward_locks_cache: LRUCache[UUID, Lock] = LRUCache(
maxsize=self.FASTFORWARD_LOCKS_CACHE_MAXSIZE
)
self._fastforward_locks_inuse: dict[UUID, tuple[Lock, int]] = {}
self._fastforward_locks_inuse: Dict[UUID, Tuple[Lock, int]] = {}

def get(
self,
Expand Down Expand Up @@ -409,7 +413,7 @@ class Section:
"""

id: str | None
items: list[Notification]
items: List[Notification]
next_id: str | None


Expand All @@ -433,7 +437,7 @@ def select(
limit: int,
stop: int | None = None,
topics: Sequence[str] = (),
) -> list[Notification]:
) -> List[Notification]:
"""
Returns a selection of
:class:`~eventsourcing.persistence.Notification` objects
Expand Down Expand Up @@ -521,7 +525,7 @@ def select(
limit: int,
stop: int | None = None,
topics: Sequence[str] = (),
) -> list[Notification]:
) -> List[Notification]:
"""
Returns a selection of
:class:`~eventsourcing.persistence.Notification` objects
Expand Down Expand Up @@ -554,9 +558,9 @@ def __init__(self, tracking: Tracking | None = None):
Initialises the process event with the given tracking object.
"""
self.tracking = tracking
self.events: list[DomainEventProtocol] = []
self.aggregates: dict[UUID, MutableOrImmutableAggregate] = {}
self.saved_kwargs: dict[Any, Any] = {}
self.events: List[DomainEventProtocol] = []
self.aggregates: Dict[UUID, MutableOrImmutableAggregate] = {}
self.saved_kwargs: Dict[Any, Any] = {}

def collect_events(
self,
Expand Down Expand Up @@ -597,7 +601,7 @@ class RecordingEvent:
def __init__(
self,
application_name: str,
recordings: list[Recording],
recordings: List[Recording],
previous_max_notification_id: int | None,
):
self.application_name = application_name
Expand All @@ -611,15 +615,15 @@ class Application:
"""

name = "Application"
env: ClassVar[dict[str, str]] = {}
env: ClassVar[Dict[str, str]] = {}
is_snapshotting_enabled: bool = False
snapshotting_intervals: ClassVar[
dict[type[MutableOrImmutableAggregate], int] | None
Dict[Type[MutableOrImmutableAggregate], int] | None
] = None
snapshotting_projectors: (
dict[type[MutableOrImmutableAggregate], ProjectorFunction[Any, Any]] | None
Dict[Type[MutableOrImmutableAggregate], ProjectorFunction[Any, Any]] | None
) = None
snapshot_class: type[SnapshotProtocol] = Snapshot
snapshot_class: Type[SnapshotProtocol] = Snapshot
log_section_size = 10
notify_topics: Sequence[str] = []

Expand Down Expand Up @@ -785,7 +789,7 @@ def save(
self,
*objs: MutableOrImmutableAggregate | DomainEventProtocol | None,
**kwargs: Any,
) -> list[Recording]:
) -> List[Recording]:
"""
Collects pending events from given aggregates and
puts them in the application's event store.
Expand All @@ -798,7 +802,7 @@ def save(
self.notify(processing_event.events) # Deprecated.
return recordings

def _record(self, processing_event: ProcessingEvent) -> list[Recording]:
def _record(self, processing_event: ProcessingEvent) -> List[Recording]:
"""
Records given process event in the application's recorder.
"""
Expand Down Expand Up @@ -878,7 +882,7 @@ def take_snapshot(
snapshot = snapshot_class.take(aggregate)
self.snapshots.put([snapshot])

def notify(self, new_events: list[DomainEventProtocol]) -> None:
def notify(self, new_events: List[DomainEventProtocol]) -> None:
"""
Deprecated.
Expand All @@ -888,7 +892,7 @@ def notify(self, new_events: list[DomainEventProtocol]) -> None:
need to take action when new domain events have been saved.
"""

def _notify(self, recordings: list[Recording]) -> None:
def _notify(self, recordings: List[Recording]) -> None:
"""
Called after new aggregate events have been saved. This
method on this class doesn't actually do anything,
Expand Down Expand Up @@ -930,7 +934,7 @@ def __init__(
self,
events: EventStore,
originator_id: UUID,
logged_cls: type[TDomainEvent], # TODO: Rename to 'event_class' in v10.
logged_cls: Type[TDomainEvent], # TODO: Rename to 'event_class' in v10.
):
self.events = events
self.originator_id = originator_id
Expand All @@ -952,7 +956,7 @@ def trigger_event(

def _trigger_event(
self,
logged_cls: type[T] | None,
logged_cls: Type[T] | None,
next_originator_version: int | None = None,
**kwargs: Any,
) -> T:
Expand Down
Loading

0 comments on commit 9a0a3b5

Please sign in to comment.