diff --git a/build.sbt b/build.sbt index c1eae618..fa4db0a0 100644 --- a/build.sbt +++ b/build.sbt @@ -39,8 +39,9 @@ lazy val frp = crossProject(JVMPlatform, JSPlatform) "co.fs2" %%% "fs2-core" % Fs2Version, "org.typelevel" %%% "cats-laws" % CatsVersion % Test, "org.typelevel" %%% "cats-effect-testkit" % CatsEffectVersion % Test, - "org.typelevel" %%% "discipline-munit" % "1.0.9" % Test, - "org.scalameta" %%% "munit-scalacheck" % "0.7.29" % Test + "org.typelevel" %%% "discipline-munit" % "2.0.0-M3" % Test, + "org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test, + "org.scalameta" %%% "munit-scalacheck" % "1.0.0-M7" % Test ) ) diff --git a/frp/src/main/scala/calico/frp/SignallingSortedMapRef.scala b/frp/src/main/scala/calico/frp/SignallingSortedMapRef.scala index 275d5ea8..536b3187 100644 --- a/frp/src/main/scala/calico/frp/SignallingSortedMapRef.scala +++ b/frp/src/main/scala/calico/frp/SignallingSortedMapRef.scala @@ -19,6 +19,8 @@ package calico.frp import cats.effect.kernel.Concurrent import cats.effect.kernel.Deferred import cats.effect.kernel.Ref +import cats.effect.kernel.Resource +import cats.effect.syntax.all.* import cats.kernel.Order import cats.syntax.all.* import fs2.Stream @@ -46,7 +48,12 @@ object SignallingSortedMapRef: value: SortedMap[K, V], lastUpdate: Long, listeners: LongMap[Deferred[F, (SortedMap[K, V], Long)]], - keyListeners: Map[K, LongMap[Deferred[F, (Option[V], Long)]]] + keys: Map[K, KeyState] + ) + + case class KeyState( + lastUpdate: Long, + listeners: LongMap[Deferred[F, (Option[V], Long)]] ) given Ordering[K] = K.toOrdering @@ -58,19 +65,25 @@ object SignallingSortedMapRef: def traverse_[A, U](it: Iterable[A])(f: A => F[U]): F[Unit] = it.foldLeft(F.unit)(_ <* f(_)) + def incrementLastUpdate(lu: Long) = + // skip -1 b/c of its special semantic + if lu == -2L then 0L else lu + 1 + def updateMapAndNotify[O]( state: State, f: SortedMap[K, V] => (SortedMap[K, V], O)): (State, F[O]) = val (newValue, result) = f(state.value) - val lastUpdate = state.lastUpdate + 1 - val newState = State(newValue, lastUpdate, LongMap.empty, SortedMap.empty) + val lastUpdate = incrementLastUpdate(state.lastUpdate) + val newKeys = newValue.view.mapValues(_ => KeyState(lastUpdate, LongMap.empty)).toMap + val newState = State(newValue, lastUpdate, LongMap.empty, newKeys) val notifyListeners = traverse_(state.listeners.values)(_.complete(newValue -> lastUpdate)) - val notifyKeyListeners = traverse_(state.keyListeners) { (k, listeners) => - val v = newValue.get(k) - traverse_(listeners.values)(_.complete(v -> lastUpdate)) + val notifyKeyListeners = traverse_(state.keys) { + case (k, KeyState(_, listeners)) => + val v = newValue.get(k) + traverse_(listeners.values)(_.complete(v -> lastUpdate)) } newState -> (notifyListeners *> notifyKeyListeners).as(result) @@ -80,14 +93,21 @@ object SignallingSortedMapRef: val (newValue, result) = f(state.value.get(k)) val newMap = newValue.fold(state.value - k)(v => state.value + (k -> v)) - val lastUpdate = state.lastUpdate + 1 - val newKeyListeners = state.keyListeners - k - val newState = State(newMap, lastUpdate, LongMap.empty, newKeyListeners) + + val lastUpdate = incrementLastUpdate(state.lastUpdate) + val lastKeyUpdate = if newValue.isDefined then lastUpdate else -1L + + val newKeys = + if newValue.isDefined then + state.keys.updated(k, KeyState(lastKeyUpdate, LongMap.empty)) + else state.keys - k // prevent memory leak + val newState = State(newMap, lastUpdate, LongMap.empty, newKeys) val notifyListeners = traverse_(state.listeners.values)(_.complete(newMap -> lastUpdate)) - val notifyKeyListeners = state.keyListeners.get(k).fold(F.unit) { listeners => - traverse_(listeners.values)(_.complete(newValue -> lastUpdate)) + val notifyKeyListeners = state.keys.get(k).fold(F.unit) { + case KeyState(_, listeners) => + traverse_(listeners.values)(_.complete(newValue -> lastUpdate)) } newState -> (notifyListeners *> notifyKeyListeners).as(result) @@ -107,25 +127,27 @@ object SignallingSortedMapRef: def updateAndNotify[O](s: State, f: SortedMap[K, V] => (SortedMap[K, V], O)) = updateMapAndNotify(s, f) - def keys = new Signal[F, SortedSet[K]]: - def get = outer.get.map(_.keySet) - def continuous = outer.continuous.map(_.keySet) - def discrete = outer.discrete.map(_.keySet).changes + def keys = outer.map(_.keySet).changes def apply(k: K) = new AbstractSignallingRef[F, State, Option[V]](newId, state): def getValue(s: State) = s.value.get(k) - def getLastUpdate(s: State) = s.lastUpdate + def getLastUpdate(s: State) = s.keys.get(k).fold(-1L)(_.lastUpdate) def withListener(s: State, id: Long, wait: Deferred[F, (Option[V], Long)]) = - s.copy(keyListeners = s - .keyListeners - .updatedWith(k)(_.getOrElse(LongMap.empty).updated(id, wait).some)) + s.copy(keys = s.keys.updatedWith(k) { ks => + val lastUpdate = ks.fold(-1L)(_.lastUpdate) + val listeners = ks.fold(LongMap.empty)(_.listeners).updated(id, wait) + Some(KeyState(lastUpdate, listeners)) + }) def withoutListener(s: State, id: Long) = - s.copy(keyListeners = - s.keyListeners.updatedWith(k)(_.map(_.removed(id)).filterNot(_.isEmpty))) + s.copy(keys = s.keys.updatedWith(k) { + _.map(ks => ks.copy(listeners = ks.listeners.removed(id))) + // prevent memory leak + .filterNot(ks => ks.lastUpdate == -1 && ks.listeners.isEmpty) + }) def updateAndNotify[O](s: State, f: Option[V] => (Option[V], O)) = updateKeyAndNotify(s, k, f) @@ -150,7 +172,16 @@ object SignallingSortedMapRef: def continuous: Stream[F, A] = Stream.repeatEval(get) - def discrete: Stream[F, A] = { + def discrete: Stream[F, A] = + Stream.resource(getAndDiscreteUpdates).flatMap { + case (a, updates) => + Stream.emit(a) ++ updates + } + + override def getAndDiscreteUpdates(using Concurrent[F]): Resource[F, (A, Stream[F, A])] = + getAndDiscreteUpdatesImpl + + private def getAndDiscreteUpdatesImpl: Resource[F, (A, Stream[F, A])] = def go(id: Long, lastSeen: Long): Stream[F, A] = def getNext: F[(A, Long)] = F.deferred[(A, Long)].flatMap { wait => @@ -158,8 +189,7 @@ object SignallingSortedMapRef: val lastUpdate = getLastUpdate(state) if lastUpdate != lastSeen then state -> (getValue(state) -> lastUpdate).pure[F] else withListener(state, id, wait) -> wait.get - - }.flatten + }.flatten // cancelable } Stream.eval(getNext).flatMap { @@ -169,22 +199,21 @@ object SignallingSortedMapRef: def cleanup(id: Long): F[Unit] = state.update(withoutListener(_, id)) - Stream.bracket(newId)(cleanup).flatMap { id => - Stream.eval(state.get).flatMap { state => - Stream.emit(getValue(state)) ++ go(id, getLastUpdate(state)) + Resource.eval { + state.get.map { state => + (getValue(state), Stream.bracket(newId)(cleanup).flatMap(go(_, getLastUpdate(state)))) } } - } def set(a: A): F[Unit] = update(_ => a) def update(f: A => A): F[Unit] = modify(v => (f(v), ())) def modify[B](f: A => (A, B)): F[B] = - state.modify(updateAndNotify(_, f)).flatten + state.flatModify(updateAndNotify(_, f)) def tryModify[B](f: A => (A, B)): F[Option[B]] = - state.tryModify(updateAndNotify(_, f)).flatMap(_.sequence) + state.tryModify(updateAndNotify(_, f)).flatMap(_.sequence).uncancelable def tryUpdate(f: A => A): F[Boolean] = tryModify(a => (f(a), ())).map(_.isDefined) diff --git a/frp/src/test/scala/calico/frp/SignallingSortedMapRefSuite.scala b/frp/src/test/scala/calico/frp/SignallingSortedMapRefSuite.scala new file mode 100644 index 00000000..9b22e33e --- /dev/null +++ b/frp/src/test/scala/calico/frp/SignallingSortedMapRefSuite.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2022 Arman Bilge + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package calico.frp + +import cats.effect.IO +import cats.effect.testkit.TestControl +import munit.CatsEffectSuite + +import scala.collection.immutable.SortedMap +import scala.concurrent.duration.* + +class SignallingSortedMapRefSuite extends CatsEffectSuite: + + test("mapref - does not emit spurious events") { + SignallingSortedMapRef[IO, Boolean, Int] + .flatTap(_.set(SortedMap(false -> 0, true -> 0))) + .flatMap { s => + + val events = + s(false).discrete.evalTap(_ => IO.sleep(1.seconds)).unNoneTerminate.compile.toList + + val updates = + IO.sleep(1100.millis) *> + s(false).update(_.map(_ + 1)) *> + IO.sleep(1.second) *> + s(true).update(_.map(_ + 1)) *> + IO.sleep(1.seconds) *> + s(false).update(_.map(_ + 1)) *> + IO.sleep(1.seconds) *> + s(false).set(None) + + TestControl + .executeEmbed(updates.background.surround(events)) + .assertEquals(List(0, 1, 2)) + } + }