Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup source.dynamic, fix append #4156

Merged
merged 4 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ New:

Changed:

- Reimplemented `request.once`, `single` and more using `source.dynamic`. Removed experiment
flag on `source.dynamic`. The operator is considered stable enough to define advanced sources
but the user should be careful when using it.
- Mute SDL startup messages (#2913).
- `int` can optionally raises an error when passing `nan` or `infinity`, `int(infinity)`
now returns `max_int` and `int(-infinity)` returns `min_int`. (#3407)
Expand Down
9 changes: 9 additions & 0 deletions src/core/builtins/builtins_source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ let _ =
s#set_name n;
Lang.unit)

let _ =
Lang.add_builtin ~base:source "last_metadata" ~category:(`Source `Liquidsoap)
~descr:"Return the last metadata from the source."
[("", Lang.source_t (Lang.univ_t ()), None, None)]
(Lang.nullable_t Lang.metadata_t)
(fun p ->
let s = Lang.to_source (List.assoc "" p) in
match s#last_metadata with None -> Lang.null | Some m -> Lang.metadata m)

let _ =
Lang.add_builtin ~base:source "skip" ~category:(`Source `Liquidsoap)
~descr:"Skip to the next track."
Expand Down
131 changes: 67 additions & 64 deletions src/core/operators/dyn_op.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,84 +20,85 @@
*****************************************************************************)

class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f =
class dyn ~init ~track_sensitive ~infallible ~self_sync ~merge next_fn =
object (self)
inherit Source.source ~name:"source.dynamic" ()

inherit
Source.generate_from_multiple_sources
~merge:(fun () -> false)
~track_sensitive ()

inherit Source.generate_from_multiple_sources ~merge ~track_sensitive ()
method fallible = not infallible
val mutable activation = []
val source : Source.source option Atomic.t = Atomic.make init
val current_source : Source.source option Atomic.t = Atomic.make init
method current_source = Atomic.get current_source
val mutable last_select = Unix.gettimeofday ()
val proposed = Atomic.make None
method propose s = Atomic.set proposed (Some s)

method private prepare s =
method private no_source =
if infallible then
Lang.raise_error ~pos:[]
~message:
(Printf.sprintf
"Infallible source.dynamic %s was not able to prepare a source \
in time! Make sure to either define infallible sources in the \
source's dynamic function or mark the source as fallible.."
self#id)
"failure";
None

method prepare s =
Typing.(s#frame_type <: self#frame_type);
Clock.unify ~pos:self#pos s#clock self#clock;
s#wake_up;
(match Atomic.exchange source (Some s) with
| Some s -> s#sleep
| None -> ());
if s#is_ready then Some s else None
s#wake_up

method private exchange s =
self#log#info "Switching to source %s" s#id;
self#prepare s;
Atomic.set current_source (Some s);
if s#is_ready then Some s else self#no_source

method private get_next reselect =
self#mutexify
(fun () ->
match Atomic.exchange proposed None with
| Some s -> self#prepare s
last_select <- Unix.gettimeofday ();
let s =
Lang.apply next_fn [] |> Lang.to_option |> Option.map Lang.to_source
in
match s with
| None -> (
last_select <- Unix.gettimeofday ();
let s =
Lang.apply f [] |> Lang.to_option |> Option.map Lang.to_source
in
match s with
| None -> (
match Atomic.get source with
| Some s
when self#can_reselect
~reselect:
(match reselect with
| `Force -> `Ok
| v -> v)
s ->
Some s
| _ -> None)
| Some s -> self#prepare s))
match self#current_source with
| Some s
when self#can_reselect
~reselect:
(match reselect with `Force -> `Ok | v -> v)
s ->
Some s
| _ -> self#no_source)
| Some s -> self#exchange s)
()

method private get_source ~reselect () =
match (Atomic.get source, reselect) with
| None, _ | _, `Force -> self#get_next reselect
match (self#current_source, reselect) with
| None, _ | _, `Force | Some _, `After_position _ ->
self#get_next reselect
| Some s, _ when self#can_reselect ~reselect s -> Some s
| Some _, _ when Unix.gettimeofday () -. last_select < resurection_time
->
None
| _ -> self#get_next reselect

initializer
self#on_wake_up (fun () ->
Lang.iter_sources
(fun s -> Typing.(s#frame_type <: self#frame_type))
f;
next_fn;
ignore (self#get_source ~reselect:`Force ()));
self#on_sleep (fun () ->
match Atomic.exchange source None with
match Atomic.exchange current_source None with
| Some s -> s#sleep
| None -> ())

method remaining =
match Atomic.get source with Some s -> s#remaining | None -> -1
match self#current_source with Some s -> s#remaining | None -> -1

method abort_track =
match Atomic.get source with Some s -> s#abort_track | None -> ()
match self#current_source with Some s -> s#abort_track | None -> ()

method seek_source =
match Atomic.get source with
match self#current_source with
| Some s -> s#seek_source
| None -> (self :> Source.source)

Expand All @@ -106,7 +107,7 @@ class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f =
| Some v -> (`Static, self#source_sync v)
| None -> (
( `Dynamic,
match Atomic.get source with
match self#current_source with
| Some s -> snd s#self_sync
| None -> None ))
end
Expand All @@ -133,16 +134,13 @@ let _ =
Lang.nullable_t Lang.bool_t,
Some Lang.null,
Some "For the source's `self_sync` property." );
( "resurection_time",
Lang.nullable_t Lang.float_t,
Some (Lang.float 1.),
Some
"When track sensitive and the source is unavailable, how long we \
should wait before trying to update source again (`null` means \
never)." );
( "merge",
Lang.getter_t Lang.bool_t,
Some (Lang.bool false),
Some "Set or return `true` to merge subsequent tracks." );
( "",
Lang.fun_t [] (Lang.nullable_t (Lang.source_t frame_t)),
Some (Lang.val_fun [] (fun _ -> Lang.null)),
None,
Some
"Function returning the source to be used, `null` means keep current \
source." );
Expand All @@ -152,17 +150,25 @@ let _ =
"Dynamically change the underlying source: it can either be changed by \
the function given as argument, which returns the source to be played, \
or by calling the `set` method."
~category:`Track ~flags:[`Experimental]
~category:`Track
~meth:
[
( "set",
( "current_source",
([], Lang.fun_t [] (Lang.nullable_t (Lang.source_t frame_t))),
"Return the source currently selected.",
fun s ->
Lang.val_fun [] (fun _ ->
match s#current_source with
| None -> Lang.null
| Some s -> Lang.source s) );
( "prepare",
([], Lang.fun_t [(false, "", Lang.source_t frame_t)] Lang.unit_t),
"Set the source.",
"Prepare a source that will be returned later.",
fun s ->
Lang.val_fun
[("", "x", None)]
(fun p ->
s#propose (List.assoc "x" p |> Lang.to_source);
s#prepare (List.assoc "x" p |> Lang.to_source);
Lang.unit) );
]
(fun p ->
Expand All @@ -172,13 +178,10 @@ let _ =
let track_sensitive = List.assoc "track_sensitive" p |> Lang.to_getter in
let track_sensitive () = Lang.to_bool (track_sensitive ()) in
let infallible = List.assoc "infallible" p |> Lang.to_bool in
let resurection_time =
List.assoc "resurection_time" p |> Lang.to_valued_option Lang.to_float
in
let resurection_time = Option.value ~default:(-1.) resurection_time in
let merge = Lang.to_getter (List.assoc "merge" p) in
let merge () = Lang.to_bool (merge ()) in
let self_sync =
Lang.to_valued_option Lang.to_bool (List.assoc "self_sync" p)
in
let next = List.assoc "" p in
new dyn
~init ~track_sensitive ~infallible ~resurection_time ~self_sync next)
new dyn ~init ~track_sensitive ~infallible ~merge ~self_sync next)
4 changes: 2 additions & 2 deletions src/libs/extra/source.liq
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ end
# @param ~every Duration of a track (in seconds).
# @param ~metadata Metadata for tracks.
# @param s The stream.
def chop(~every=getter(3.), ~metadata=getter([]), s) =
def chop(~id=null(), ~every=getter(3.), ~metadata=getter([]), s) =
s = insert_metadata(s)

# Track time in the source's context:
Expand All @@ -211,7 +211,7 @@ def chop(~every=getter(3.), ~metadata=getter([]), s) =
end
end

source.on_frame(s, f)
source.on_frame(id=id, s, f)
end

# Regularly skip tracks from a source (useful for testing skipping).
Expand Down
54 changes: 41 additions & 13 deletions src/libs/request.liq
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ def request.queue(
) =
ignore(native)
id = string.id.default(default="request.queue", id)
initial_queue = queue
initial_queue = ref(queue)
queue = ref([])
fetch = ref(fun () -> true)
started = ref(false)

def next() =
if
Expand All @@ -60,13 +61,23 @@ def request.queue(
end

def push(r) =
log.info(
label=id,
"Pushing #{r} on the queue."
)
queue := [...queue(), r]
fn = fetch()
ignore(fn())
if
started()
then
log.info(
label=id,
"Pushing #{r} on the queue."
)
queue := [...queue(), r]
fn = fetch()
ignore(fn())
else
log.info(
label=id,
"Pushing #{r} on the initial queue."
)
initial_queue := [...initial_queue(), r]
end
end

def push_uri(uri) =
Expand Down Expand Up @@ -110,12 +121,12 @@ def request.queue(
end

def set_queue(q) =
queue := q
if started() then queue := q else initial_queue := q end
s.set_queue([])
end

def get_queue() =
[...s.queue(), ...(queue())]
[...s.queue(), ...initial_queue(), ...queue()]
end

s =
Expand All @@ -127,7 +138,14 @@ def request.queue(
queue=get_queue
}

s.on_wake_up({s.set_queue(initial_queue)})
s.on_wake_up(
fun () ->
begin
started := true
s.set_queue(initial_queue())
initial_queue := []
end
)

source.set_name(s, "request.queue")
fetch := s.fetch
Expand Down Expand Up @@ -432,10 +450,20 @@ def request.player(~simultaneous=true) =
}
}
else
s = source.dynamic()
next_source = ref(null())

def next() =
s = next_source()
next_source := null()
s
end

s = source.dynamic(next)

def play(r) =
s.set(request.once(r))
r = request.once(r)
s.prepare(r)
next_source := r
end

s.{play=play, length={1}}
Expand Down
Loading
Loading