Skip to content

Commit

Permalink
Merge pull request #229 from armanbilge/pr/fix-fs2-3206
Browse files Browse the repository at this point in the history
Delegate to upstream `Applicative[Signal]`
  • Loading branch information
armanbilge authored May 13, 2023
2 parents ef0b75a + a1b9b8c commit 759ec75
Showing 1 changed file with 4 additions and 58 deletions.
62 changes: 4 additions & 58 deletions frp/src/main/scala/calico/frp/frp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,6 @@
* limitations under the License.
*/

/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package calico
package frp

Expand Down Expand Up @@ -67,41 +46,8 @@ given [F[_]: Concurrent]: Monad[Signal[F, _]] = new StackSafeMonad[Signal[F, _]]
}
}

override def ap[A, B](ff: Signal[F, A => B])(fa: Signal[F, A]) =
new:
def discrete: Stream[F, B] =
nondeterministicZip(ff.discrete, fa.discrete).map(_(_))
def continuous: Stream[F, B] = Stream.repeatEval(get)
def get: F[B] = ff.get.ap(fa.get)
override def product[A, B](fa: Signal[F, A], fb: Signal[F, B]) =
Signal.applicativeInstance.product(fa, fb)

override def getAndDiscreteUpdates(using Concurrent[F]): Resource[F, (B, Stream[F, B])] =
getAndDiscreteUpdatesImpl

private def getAndDiscreteUpdatesImpl =
(ff.getAndDiscreteUpdates, fa.getAndDiscreteUpdates).mapN {
case ((f, fs), (a, as)) =>
(f(a), nondeterministicZip(fs, as).map { case (f, a) => f(a) })
}

private def nondeterministicZip[A0, A1](
xs: Stream[F, A0],
ys: Stream[F, A1]
): Stream[F, (A0, A1)] =
type PullOutput = (A0, A1, Stream[F, A0], Stream[F, A1])

val firstPull: OptionT[Pull[F, PullOutput, *], Unit] = for
firstXAndRestOfXs <- OptionT(xs.pull.uncons1.covaryOutput[PullOutput])
(x, restOfXs) = firstXAndRestOfXs
firstYAndRestOfYs <- OptionT(ys.pull.uncons1.covaryOutput[PullOutput])
(y, restOfYs) = firstYAndRestOfYs
_ <- OptionT.liftF {
Pull.output1[F, PullOutput]((x, y, restOfXs, restOfYs)): Pull[F, PullOutput, Unit]
}
yield ()

firstPull.value.void.stream.flatMap { (x, y, restOfXs, restOfYs) =>
restOfXs.either(restOfYs).scan((x, y)) {
case ((_, rightElem), Left(newElem)) => (newElem, rightElem)
case ((leftElem, _), Right(newElem)) => (leftElem, newElem)
}
}
override def map2[A, B, C](fa: Signal[F, A], fb: Signal[F, B])(f: (A, B) => C) =
Signal.applicativeInstance.map2(fa, fb)(f)

0 comments on commit 759ec75

Please sign in to comment.