diff --git a/gateware/src/tiliqua/cache.py b/gateware/src/tiliqua/cache.py index ab141a0..428e17a 100644 --- a/gateware/src/tiliqua/cache.py +++ b/gateware/src/tiliqua/cache.py @@ -109,7 +109,7 @@ def elaborate(self, platform): }) m.submodules.tag_mem = tag_mem= Memory(shape=tag_layout, depth=2**linebits, init=[]) tag_wr_port = tag_mem.write_port() - tag_rd_port = tag_mem.read_port() + tag_rd_port = tag_mem.read_port(domain='comb') tag_do = Signal(shape=tag_layout) tag_di = Signal(shape=tag_layout) m.d.comb += [ diff --git a/gateware/src/tiliqua/delay.py b/gateware/src/tiliqua/delay.py new file mode 100644 index 0000000..89948b4 --- /dev/null +++ b/gateware/src/tiliqua/delay.py @@ -0,0 +1,180 @@ +# Copyright (c) 2024 Seb Holzapfel, apfelaudio UG +# +# SPDX-License-Identifier: CERN-OHL-S-2.0 + +""" +High-level delay effects, built on components from the DSP library. +""" + +from amaranth import * +from amaranth.build import * +from amaranth.lib import wiring, data, stream +from amaranth.lib.wiring import In, Out +from amaranth_soc import wishbone +from amaranth_future import fixed + +from tiliqua import eurorack_pmod, dsp, midi, psram_peripheral +from tiliqua.cache import WishboneL2Cache +from tiliqua.eurorack_pmod import ASQ + +class PingPongDelay(wiring.Component): + + """ + 2-channel stereo ping-pong delay. + + Based on 2 equal-length delay lines, fed back into each other. + + Delay lines are created external to this component, and may be + SRAM-backed or PSRAM-backed depending on the application. + """ + + i: In(stream.Signature(data.ArrayLayout(ASQ, 2))) + o: Out(stream.Signature(data.ArrayLayout(ASQ, 2))) + + def __init__(self, delayln1, delayln2, delay_samples=15000): + super().__init__() + + self.delayln1 = delayln1 + self.delayln2 = delayln2 + + assert self.delayln1.write_triggers_read + assert self.delayln2.write_triggers_read + + # Each delay has a single read tap. `write_triggers_read` above ensures + # stream is connected such that it emits a sample stream synchronized + # with writes, rather than us needing to connect up tapX.i. (this is + # only needed if you want multiple delayline reads per write per tap). + + self.tap1 = self.delayln1.add_tap(fixed_delay=delay_samples) + self.tap2 = self.delayln2.add_tap(fixed_delay=delay_samples) + + def elaborate(self, platform): + m = Module() + + # Feedback network of ping-ping delay. Each tap is fed back into the input of the + # opposite tap, mixed 50% with the audio input. + + m.submodules.matrix_mix = matrix_mix = dsp.MatrixMix( + i_channels=4, o_channels=4, + coefficients=[[0.5, 0.0, 0.5, 0.0], # in0 + [0.0, 0.5, 0.0, 0.5], # in1 + [0.5, 0.0, 0.0, 0.5], # tap1.o + [0.0, 0.5, 0.5, 0.0]]) # tap2.o + # out0 out1 tap1.i tap2.i + + # Split matrix input / output into independent streams + + m.submodules.imix4 = imix4 = dsp.Merge(n_channels=4) + m.submodules.omix4 = omix4 = dsp.Split(n_channels=4, source=matrix_mix.o) + + # Close feedback path + + dsp.connect_feedback_kick(m, imix4.o, matrix_mix.i) + + # Split left/right channels of self.i / self.o into independent streams + + m.submodules.isplit2 = isplit2 = dsp.Split(n_channels=2, source=wiring.flipped(self.i)) + m.submodules.omerge2 = omerge2 = dsp.Merge(n_channels=2, sink=wiring.flipped(self.o)) + + # Connect up delayln writes, read tap, audio in / out as described above + # to the matrix feedback network. + + wiring.connect(m, isplit2.o[0], imix4.i[0]) + wiring.connect(m, isplit2.o[1], imix4.i[1]) + wiring.connect(m, self.tap1.o, imix4.i[2]) + wiring.connect(m, self.tap2.o, imix4.i[3]) + + wiring.connect(m, omix4.o[0], omerge2.i[0]) + wiring.connect(m, omix4.o[1], omerge2.i[1]) + wiring.connect(m, omix4.o[2], self.delayln1.i) + wiring.connect(m, omix4.o[3], self.delayln2.i) + + return m + +class Diffuser(wiring.Component): + + """ + 4-channel shuffling feedback delay. + + Based on 4 separate delay lines with separate delay lengths, + where the feedback paths are shuffled into different channels + by a matrix mixer. + + Delay lines are created external to this component, and may be + SRAM-backed or PSRAM-backed depending on the application. + """ + + i: In(stream.Signature(data.ArrayLayout(ASQ, 4))) + o: Out(stream.Signature(data.ArrayLayout(ASQ, 4))) + + def __init__(self, delay_lines): + super().__init__() + + # Verify we were supplied 4 delay lines with the correct properties + + assert len(delay_lines) == 4 + self.delays = [2000, 3000, 5000, 7000] # tap delays of each channel. + self.delay_lines = delay_lines + for delay_line, delay in zip(delay_lines, self.delays): + assert delay_line.write_triggers_read + assert delay_line.max_delay >= delay + + # Each delay has a single read tap. `write_triggers_read` above ensures + # stream is connected such that it emits a sample stream synchronized + # with writes, rather than us needing to connect up tapX.i. (this is + # only needed if you want multiple delayline reads per write per tap). + + self.taps = [] + for delay, delayln in zip(self.delays, self.delay_lines): + self.taps.append(delayln.add_tap(fixed_delay=delay)) + + # quadrants in the below matrix are: + # + # [in -> out] [in -> delay] + # [delay -> out] [delay -> delay] <- feedback + # + + self.matrix_mix = dsp.MatrixMix( + i_channels=8, o_channels=8, + coefficients=[[0.6, 0.0, 0.0, 0.0, 0.8, 0.0, 0.0, 0.0], # in0 + [0.0, 0.6, 0.0, 0.0, 0.0, 0.8, 0.0, 0.0], # | + [0.0, 0.0, 0.6, 0.0, 0.0, 0.0, 0.8, 0.0], # | + [0.0, 0.0, 0.0, 0.6, 0.0, 0.0, 0.0, 0.8], # in3 + [0.4, 0.0, 0.0, 0.0, 0.4,-0.4,-0.4,-0.4], # ds0 + [0.0, 0.4, 0.0, 0.0,-0.4, 0.4,-0.4,-0.4], # | + [0.0, 0.0, 0.4, 0.0,-0.4,-0.4, 0.4,-0.4], # | + [0.0, 0.0, 0.0, 0.4,-0.4,-0.4,-0.4, 0.4]])# ds3 + # out0 ------- out3 sw0 ---------- sw3 + + def elaborate(self, platform): + m = Module() + + m.submodules.matrix_mix = matrix_mix = self.matrix_mix + + m.submodules.split4 = split4 = dsp.Split(n_channels=4) + m.submodules.merge4 = merge4 = dsp.Merge(n_channels=4) + + m.submodules.split8 = split8 = dsp.Split(n_channels=8) + m.submodules.merge8 = merge8 = dsp.Merge(n_channels=8) + + wiring.connect(m, wiring.flipped(self.i), split4.i) + + # matrix <-> independent streams + wiring.connect(m, matrix_mix.o, split8.i) + dsp.connect_feedback_kick(m, merge8.o, matrix_mix.i) + + for n in range(4): + # audio -> matrix [0-3] + wiring.connect(m, split4.o[n], merge8.i[n]) + # delay -> matrix [4-7] + wiring.connect(m, self.taps[n].o, merge8.i[4+n]) + + for n in range(4): + # matrix -> audio [0-3] + wiring.connect(m, split8.o[n], merge4.i[n]) + # matrix -> delay [4-7] + wiring.connect(m, split8.o[4+n], self.delay_lines[n].i) + + wiring.connect(m, merge4.o, wiring.flipped(self.o)) + + return m diff --git a/gateware/src/tiliqua/delay_line.py b/gateware/src/tiliqua/delay_line.py new file mode 100644 index 0000000..a0b24f6 --- /dev/null +++ b/gateware/src/tiliqua/delay_line.py @@ -0,0 +1,362 @@ +# Copyright (c) 2024 S. Holzapfel, apfelaudio UG +# +# SPDX-License-Identifier: CERN-OHL-S-2.0 +# + +"""PSRAM- or SRAM-backed streaming audio delay lines.""" + +from amaranth import * +from amaranth.lib import wiring, data, stream +from amaranth.lib.wiring import In, Out +from amaranth.utils import exact_log2 + +from amaranth_future import fixed +from amaranth_soc import wishbone + +from vendor.soc.cores import sram + +from tiliqua.eurorack_pmod import ASQ +from tiliqua.cache import WishboneL2Cache + +from tiliqua.dsp import * + +class DelayLine(wiring.Component): + + """ + SRAM- or PSRAM- backed audio delay line. + + This forms the backbone of many different types of effects - echoes, + pitch shifting, chorus, feedback synthesis etc. + + Usage + ----- + + Each `DelayLine` instance operates in a single-writer, multiple-reader + fashion - that is, for each `DelayLine`, there may be only one stream + of samples being *written*, however from each `DelayLine` you may + create N instances of `DelayLineTap`, which are submodules of `DelayLine` + used to produce output streams (read operations) on the `DelayLine`. + + For a simple, SRAM-backed delay line, the following is sufficient: + + delayln = DelayLine( + max_delay=8192, + write_triggers_read=False, + ) + + From this, you can create some read taps: + + tap1 = delayln.add_tap() + tap2 = delayln.add_tap() + + Each tap automatically becomes a submodule of the `DelayLine` instance. + That is, you only need to add `DelayLine` itself to `m.submodules`. + + The `delayln` instance requires a single incoming stream `delayln.i`, + on which incoming samples are taken and written to the backing store. + + Each `tap` instance requires both an incoming *and* outgoing stream, + `tap1.i`, `tap1.o`, where an output sample is *only* produced some + time after the requested delay count has arrived on `tap1.i`. + + This gives applications the flexibility to read multiple times per + write sample (useful for example for fractional delay lines where + we want to interpolate between two adjacent samples). + + Fixed (simple) delay taps + ------------------------- + + It can be a bit cumbersome to need to provide each tap with an + input stream if you just want some taps with fixed delays. + + So, if you want a simple fixed delay tap, you can use the + `write_triggers_read=True` option when creating the `DelayLine`. Then, + you can specify explicit fixed delay taps as follows: + + delayln = DelayLine(max_delay=8192, write_triggers_read=True) + tap1 = delayln.add_tap(fixed_delay=5000) + tap2 = delayln.add_tap(fixed_delay=7000) + + When used in this mode, `tap1` and `tap2` will internally have their + inputs (sample request streams) hooked up to the write strobe. This + means you no longer need to hook up `tapX.i` and will automatically + get a single sample on each `tapX.o` after every write to `delayln`. + + Backing store + ------------- + + The backing store is a contiguous region of memory where samples are + written to a wrapped incrementing index (i.e circular buffer fashion). + + The same memory space is shared by all read & write operations, however + the way this works is slightly different when comparing SRAM- and PSRAM- + backed delay lines. In both cases, all read & write operations go through + an arbiter and share the same memory bus. + + In the SRAM case, this memory bus is connected directly to an FPGA DPRAM + instantiation and as such does not need to be connected to any external + memory bus. + + In the PSRAM case, this is a bit more complicated. Due to the memory + access latency of PSRAM, simply forwarding each read/write access would + quickly consume memory bandwidth simply due to the access latency. + So, in the PSRAM case, a small L2 cache is inserted between the internal + delay line R/W bus and the memory bus exposed by `delayln.bus` (normally + hooked up to the PSRAM). The purpose of this cache is to collect as many + read & write operations into burstable transactions as possible. + + As each delayline contains completely different samples and individually + has quite a predictable access pattern, it makes sense to have one cache + per `DelayLine`, rather than one larger shared cache (which would likely + perform worse considering area/bandwidth). The important factor is that + all writes and reads on the same delayline share the small cache. + + """ + + INTERNAL_BUS_DATA_WIDTH = 16 + INTERNAL_BUS_GRANULARITY = 8 + + def __init__(self, max_delay, psram_backed=False, addr_width_o=None, base=None, + write_triggers_read=True): + + if psram_backed: + assert base is not None + assert addr_width_o is not None + else: + assert base is None + assert addr_width_o is None + + self.max_delay = max_delay + self.address_width = exact_log2(max_delay) + self.write_triggers_read = write_triggers_read + self.psram_backed = psram_backed + + # reader taps that may read from this delay line + self.taps = [] + + # internal bus is lower footprint than the SoC bus. + data_width = self.INTERNAL_BUS_DATA_WIDTH + granularity = self.INTERNAL_BUS_GRANULARITY + + # bus that this delayline writes samples to + self.internal_writer_bus = wishbone.Signature( + addr_width=self.address_width, + data_width=data_width, + granularity=granularity + ).create() + + # arbiter to round-robin between write transactions (from this + # DelayLine) and read transactions (from children DelayLineTap) + self._arbiter = wishbone.Arbiter(addr_width=self.address_width, + data_width=data_width, + granularity=granularity) + self._arbiter.add(self.internal_writer_bus) + + # internal signal between DelayLine and DelayLineTap + self._wrpointer = Signal(unsigned(self.address_width)) + + # ports exposed to the outside world + ports = { + "i": In(stream.Signature(ASQ)), + } + + if psram_backed: + + ports |= { + "bus": Out(wishbone.Signature(addr_width=addr_width_o, + data_width=32, + granularity=8, + features={'bte', 'cti'})), + } + + self._adapter = WishboneAdapter( + addr_width_i=self.address_width, + addr_width_o=addr_width_o, + base=base + ) + + self._cache = WishboneL2Cache( + addr_width=addr_width_o, + cachesize_words=64 + ) + + super().__init__(ports) + + def add_tap(self, fixed_delay=None): + if self.write_triggers_read: + assert fixed_delay is not None + assert fixed_delay < self.max_delay + tap = DelayLineTap(parent_bus=self._arbiter.bus, fixed_delay=fixed_delay) + self.taps.append(tap) + self._arbiter.add(tap._bus) + return tap + + def elaborate(self, platform): + m = Module() + + if self.write_triggers_read: + # split the write strobe up into identical streams to be used by read taps. + m.submodules.isplit = isplit = Split(n_channels=1+len(self.taps), replicate=True, + source=wiring.flipped(self.i)) + istream = isplit.o[0] + else: + # otherwise, the user wants to handle read tap synchronization themselves. + istream = wiring.flipped(self.i) + + for n, tap in enumerate(self.taps): + m.d.comb += tap._wrpointer.eq(self._wrpointer) + if self.write_triggers_read: + # Every write sample propagates to a read sample without needing + # to hook up the 'i' stream on delay taps. + sync_on = isplit.o[1+n] + m.d.comb += [ + tap.i.valid.eq(sync_on.valid), + sync_on.ready.eq(tap.i.ready), + tap.i.payload.eq(tap.fixed_delay), + ] + + named_submodules(m.submodules, self.taps) + + m.submodules.arbiter = self._arbiter + + if self.psram_backed: + # adapt small internal 16-bit shared bus to external 32-bit shared bus + # through a small L2 cache so reads + writes burst the memory accesses. + m.submodules.adapter = self._adapter + m.submodules.cache = self._cache + wiring.connect(m, self._arbiter.bus, self._adapter.i) + wiring.connect(m, self._adapter.o, self._cache.master) + wiring.connect(m, self._cache.slave, wiring.flipped(self.bus)) + else: + # Local SRAM-backed delay line. No need for adapters or caches. + sram_size = self.max_delay * (self._arbiter.bus.data_width // + self._arbiter.bus.granularity) + m.submodules.sram = sram_peripheral = sram.Peripheral( + size=sram_size, data_width=self._arbiter.bus.data_width, + granularity=self._arbiter.bus.granularity + ) + wiring.connect(m, self._arbiter.bus, sram_peripheral.bus) + + # bus for sample writes which sits before the arbiter + bus = self.internal_writer_bus + + with m.FSM() as fsm: + with m.State('WAIT-VALID'): + m.d.comb += istream.ready.eq(1) + with m.If(istream.valid): + m.d.sync += [ + bus.adr .eq(self._wrpointer), + bus.dat_w.eq(istream.payload), + bus.sel .eq(0b11), + ] + m.next = 'WRITE' + with m.State('WRITE'): + m.d.comb += [ + bus.stb.eq(1), + bus.cyc.eq(1), + bus.we.eq(1), + ] + with m.If(bus.ack): + with m.If(self._wrpointer != (self.max_delay - 1)): + m.d.sync += self._wrpointer.eq(self._wrpointer + 1) + with m.Else(): + m.d.sync += self._wrpointer.eq(0) + m.next = 'WAIT-VALID' + + return m + +class DelayLineTap(wiring.Component): + """ + A single read tap of a parent `DelayLine`. + See `DelayLine` top-level comment for information on usage. + """ + def __init__(self, parent_bus, fixed_delay=None): + + self.fixed_delay = fixed_delay + self.max_delay = 2**parent_bus.addr_width + self.addr_width = parent_bus.addr_width + + # internal signals between parent DelayLine and child DelayLineTap + self._wrpointer = Signal(unsigned(parent_bus.addr_width)) + self._bus = wishbone.Signature(addr_width=parent_bus.addr_width, + data_width=parent_bus.data_width, + granularity=parent_bus.granularity).create() + + super().__init__({ + "i": In(stream.Signature(unsigned(parent_bus.addr_width))), + "o": Out(stream.Signature(ASQ)), + }) + + def elaborate(self, platform): + m = Module() + + bus = self._bus + + with m.FSM() as fsm: + with m.State('WAIT-VALID'): + m.d.comb += self.i.ready.eq(1) + with m.If(self.i.valid): + m.d.sync += bus.adr.eq(self._wrpointer - self.i.payload) + m.next = 'READ' + with m.State('READ'): + m.d.comb += [ + bus.stb.eq(1), + bus.cyc.eq(1), + bus.we.eq(0), + bus.sel.eq(0b11), + ] + with m.If(bus.ack): + m.d.sync += self.o.payload.eq(bus.dat_r) + m.next = 'WAIT-READY' + with m.State('WAIT-READY'): + m.d.comb += self.o.valid.eq(1) + with m.If(self.o.ready): + m.next = 'WAIT-VALID' + + return m + +class WishboneAdapter(wiring.Component): + """ + Adapter between external (dw=32) and internal (dw=16) buses of DelayLine. + Used to adapt the internal bus to the correct size for external memory. + + TODO: this should really be parameterized beyond 16-bit samples... + """ + + def __init__(self, addr_width_i, addr_width_o, base): + self.base = base + super().__init__({ + "i": In(wishbone.Signature(addr_width=addr_width_i, + data_width=16, + granularity=8)), + "o": Out(wishbone.Signature(addr_width=addr_width_o, + data_width=32, + granularity=8, + features={'bte', 'cti'})), + }) + + def elaborate(self, platform): + m = Module() + + m.d.comb += [ + self.i.ack.eq(self.o.ack), + self.o.adr.eq((self.base<<2) + (self.i.adr>>1)), + self.o.we.eq(self.i.we), + self.o.cyc.eq(self.i.cyc), + self.o.stb.eq(self.i.stb), + ] + + with m.If(self.i.adr[0]): + m.d.comb += [ + self.i.dat_r.eq(self.o.dat_r>>16), + self.o.sel .eq(self.i.sel<<2), + self.o.dat_w.eq(self.i.dat_w<<16), + ] + with m.Else(): + m.d.comb += [ + self.i.dat_r.eq(self.o.dat_r), + self.o.sel .eq(self.i.sel), + self.o.dat_w.eq(self.i.dat_w), + ] + + return m diff --git a/gateware/src/tiliqua/dsp.py b/gateware/src/tiliqua/dsp.py index 37fecc5..1cba4eb 100644 --- a/gateware/src/tiliqua/dsp.py +++ b/gateware/src/tiliqua/dsp.py @@ -135,6 +135,14 @@ def connect_remap(m, stream_o, stream_i, mapping): stream_o.ready.eq(stream_i.ready) ] +def channel_remap(m, stream_o, stream_i, mapping_o_to_i): + def remap(o, i): + connections = [] + for k in mapping_o_to_i: + connections.append(i.payload[mapping_o_to_i[k]].eq(o.payload[k])) + return connections + return connect_remap(m, stream_o, stream_i, remap) + class VCA(wiring.Component): """ @@ -496,87 +504,30 @@ def elaborate(self, platform): return m -class DelayLine(wiring.Component): - +class KickFeedback(Elaboratable): """ - Delay line with variable delay length. This can also be - used as a fixed delay line or a wavetable / grain storage. - - - 'sw': sample write, each one written to an incrementing - index in a local circular buffer. - - 'da': delay address, each strobe (later) emits a 'ds' (sample), - the value of the audio sample 'da' elements later than the - last sample write 'sw' to occur up to 'max_delay'. - - Other uses: - - If 'da' is a constant, this becomes a fixed delay line. - - If 'sw' stop sending samples, this is like a frozen wavetable. - + Inject a single dummy (garbage) sample after reset between + two streams. This is necessary to break infinite blocking + after reset if streams are set up in a feedback loop. """ - - def __init__(self, max_delay=512): - self.max_delay = max_delay - self.address_width = exact_log2(max_delay) - super().__init__({ - "sw": In(stream.Signature(ASQ)), - "da": In(stream.Signature(unsigned(self.address_width))), - "ds": Out(stream.Signature(ASQ)), - }) - + def __init__(self, o, i): + self.o = o + self.i = i def elaborate(self, platform): m = Module() - - # TODO (amaranth 0.5+): use native ASQ shape in LUT memory - m.submodules.mem = mem = Memory( - shape=signed(ASQ.as_shape().width), depth=self.max_delay, init=[]) - wport = mem.write_port() - rport = mem.read_port(transparent_for=(wport,)) - - wrpointer = Signal(self.address_width) - rdpointer = Signal(self.address_width) - - # - # read side (da -> ds) - # - - m.d.comb += [ - rport.addr.eq(rdpointer), - self.ds.payload.eq(rport.data), - self.da.ready.eq(1), - ] - - # Set read pointer on valid delay address - with m.If(self.da.valid): - m.d.comb += [ - # Read pointer must be wrapped to max delay - # Should wrap correctly as long as max delay is POW2 - rdpointer.eq(wrpointer - self.da.payload), - rport.en.eq(1), - ] - m.d.sync += self.ds.valid.eq(1), - # FIXME: don't go here unless ds is ready! - with m.Else(): - m.d.sync += self.ds.valid.eq(0), - - # - # write side (sw -> circular buffer) - # - - m.d.comb += [ - self.sw.ready.eq(1), - wport.addr.eq(wrpointer), - wport.en.eq(self.sw.valid), - wport.data.eq(self.sw.payload), - ] - - with m.If(wport.en): - with m.If(wrpointer != (self.max_delay - 1)): - m.d.sync += wrpointer.eq(wrpointer + 1) - with m.Else(): - m.d.sync += wrpointer.eq(0) - + wiring.connect(m, self.o, self.i) + with m.FSM() as fsm: + with m.State('KICK'): + m.d.comb += self.i.valid.eq(1) + with m.If(self.i.ready): + m.next = 'FORWARD' + with m.State('FORWARD'): + pass return m +def connect_feedback_kick(m, o, i): + m.submodules += KickFeedback(o, i) + class PitchShift(wiring.Component): """ @@ -584,23 +535,23 @@ class PitchShift(wiring.Component): tracked taps on a delay line. As a result, maximum grain size is the delay line 'max_delay' // 2. - The delay line itself must be hooked up to the input audio + The delay line tap itself must be hooked up to the input source from outside this component (this allows multiple shifters to share a single delay line). """ - def __init__(self, delayln, xfade=256): - assert(xfade <= delayln.max_delay/4) - self.delayln = delayln + def __init__(self, tap, xfade=256): + assert xfade <= (tap.max_delay // 4) + self.tap = tap self.xfade = xfade self.xfade_bits = exact_log2(xfade) # delay type: integer component is index into delay line # +1 is necessary so that we don't overflow on adding grain_sz. - self.dtype = fixed.SQ(self.delayln.address_width+1, 8) + self.dtype = fixed.SQ(self.tap.addr_width+1, 8) super().__init__({ "i": In(stream.Signature(data.StructLayout({ "pitch": self.dtype, - "grain_sz": unsigned(exact_log2(delayln.max_delay)), + "grain_sz": unsigned(exact_log2(tap.max_delay)), }))), "o": Out(stream.Signature(ASQ)), }) @@ -608,7 +559,6 @@ def __init__(self, delayln, xfade=256): def elaborate(self, platform): m = Module() - # Current position in delay line 0, 1 (+= pitch every sample) delay0 = Signal(self.dtype) delay1 = Signal(self.dtype) @@ -644,23 +594,23 @@ def elaborate(self, platform): m.next = 'TAP0' with m.State('TAP0'): m.d.comb += [ - self.delayln.ds.ready.eq(1), - self.delayln.da.valid.eq(1), - self.delayln.da.payload.eq(delay0.round() >> delay0.f_width), + self.tap.o.ready.eq(1), + self.tap.i.valid.eq(1), + self.tap.i.payload.eq(delay0.round() >> delay0.f_width), ] - with m.If(self.delayln.ds.valid): - m.d.comb += self.delayln.da.valid.eq(0), - m.d.sync += sample0.eq(self.delayln.ds.payload) + with m.If(self.tap.o.valid): + m.d.comb += self.tap.i.valid.eq(0), + m.d.sync += sample0.eq(self.tap.o.payload) m.next = 'TAP1' with m.State('TAP1'): m.d.comb += [ - self.delayln.ds.ready.eq(1), - self.delayln.da.valid.eq(1), - self.delayln.da.payload.eq(delay1.round() >> delay1.f_width), + self.tap.o.ready.eq(1), + self.tap.i.valid.eq(1), + self.tap.i.payload.eq(delay1.round() >> delay1.f_width), ] - with m.If(self.delayln.ds.valid): - m.d.comb += self.delayln.da.valid.eq(0), - m.d.sync += sample1.eq(self.delayln.ds.payload) + with m.If(self.tap.o.valid): + m.d.comb += self.tap.i.valid.eq(0), + m.d.sync += sample1.eq(self.tap.o.payload) m.next = 'ENV' with m.State('ENV'): with m.If(delay0 < self.xfade): diff --git a/gateware/src/top/dsp/sim_dsp_core.cpp b/gateware/src/top/dsp/sim_dsp_core.cpp index 888287a..58e4113 100644 --- a/gateware/src/top/dsp/sim_dsp_core.cpp +++ b/gateware/src/top/dsp/sim_dsp_core.cpp @@ -46,41 +46,86 @@ int main(int argc, char** argv) { tfp->dump(contextp->time()); #endif - uint32_t clkdiv = 0; - uint32_t n_clk_audio = 0; - uint32_t n_samples = 0; + uint64_t ns_in_s = 1e9; + uint64_t ns_in_sync_cycle = ns_in_s / SYNC_CLK_HZ; + uint64_t ns_in_audio_cycle = ns_in_s / AUDIO_CLK_HZ; + + printf("sync domain is: %i KHz (%i ns/cycle)\n", SYNC_CLK_HZ/1000, ns_in_sync_cycle); + printf("audio clock is: %i KHz (%i ns/cycle)\n", AUDIO_CLK_HZ/1000, ns_in_audio_cycle); + + uint32_t psram_size_bytes = 1024*1024*16; + uint8_t *psram_data = (uint8_t*)malloc(psram_size_bytes); + memset(psram_data, 0, psram_size_bytes); + + uint32_t mod = 0; + uint32_t mod_pmod; + uint32_t pmod_clocks = 0; while (contextp->time() < sim_time && !contextp->gotFinish()) { - // clk_sync ~= 60MHz - top->clk_sync = !top->clk_sync; - // clk_audio ~= 12MHz - if (clkdiv % 5 == 0) { + + uint64_t timestamp_ns = contextp->time() / 1000; + + // Sync clock domain (PSRAM read/write simulation) + if (timestamp_ns % (ns_in_sync_cycle/2) == 0) { + top->clk_sync = !top->clk_sync; + if (top->clk_sync) { + + // Probably incorrect ram r/w timing is causing the visual shift + // Switch these assignments to use internal comb do_read / do_write? + // put these inside the ram simulation component + + if (top->read_ready) { + top->read_data_view = + (psram_data[top->address_ptr+3] << 24) | + (psram_data[top->address_ptr+2] << 16) | + (psram_data[top->address_ptr+1] << 8) | + (psram_data[top->address_ptr+0] << 0); + /* + if (top->read_data_view != 0) { + printf("read %x@%x\n", top->read_data_view, top->address_ptr); + } + */ + top->eval(); + } + + if (top->write_ready) { + psram_data[top->address_ptr+0] = (uint8_t)(top->write_data >> 0); + psram_data[top->address_ptr+1] = (uint8_t)(top->write_data >> 8); + psram_data[top->address_ptr+2] = (uint8_t)(top->write_data >> 16); + psram_data[top->address_ptr+3] = (uint8_t)(top->write_data >> 24); + //printf("write %x@%x\n", top->write_data, top->address_ptr); + top->eval(); + } + + } + } + + + // Audio clock domain (Audio stimulation) + if (timestamp_ns % (ns_in_audio_cycle/2) == 0) { top->clk_audio = !top->clk_audio; if (top->clk_audio) { - if (n_clk_audio % 256 == 0) { + // 256x I2S clock divider + if (mod_pmod % 256 == 0) { + ++pmod_clocks; top->fs_strobe = 1; - /* - top->pmod0_sample_i0 = (int16_t)20000.0*sin((float)pmod_clocks / 2000.0); - top->pmod0_sample_i1 = (int16_t)20000.0*cos((float)pmod_clocks / 50.0); - */ - //top->__024signal = 1000; - top->fs_inject0 = (int16_t)10000.0*sin((float)n_samples / 50.0); - top->fs_inject1 = (int16_t)10000.0*cos((float)n_samples / 10.0); - ++n_samples; + // audio signals + top->fs_inject0 = (int16_t)10000.0*sin((float)pmod_clocks / 50.0); + top->fs_inject1 = (int16_t)10000.0*cos((float)pmod_clocks / 10.0); } else { if (top->fs_strobe) { top->fs_strobe = 0; } } - ++n_clk_audio; + mod_pmod += 1; } } - contextp->timeInc(8333); + + contextp->timeInc(1000); top->eval(); #if defined VM_TRACE_FST && VM_TRACE_FST == 1 tfp->dump(contextp->time()); #endif - clkdiv += 1; } #if defined VM_TRACE_FST && VM_TRACE_FST == 1 diff --git a/gateware/src/top/dsp/top.py b/gateware/src/top/dsp/top.py index a7ef89b..7960cdc 100644 --- a/gateware/src/top/dsp/top.py +++ b/gateware/src/top/dsp/top.py @@ -11,21 +11,21 @@ import math -from amaranth import * -from amaranth.build import * -from amaranth.lib import wiring, data, stream -from amaranth.lib.wiring import In, Out - - -from amaranth_future import fixed - -from tiliqua import eurorack_pmod, dsp, midi +from amaranth import * +from amaranth.build import * +from amaranth.lib import wiring, data, stream +from amaranth.lib.wiring import In, Out +from amaranth_soc import wishbone +from amaranth_future import fixed + +from tiliqua import eurorack_pmod, dsp, midi, psram_peripheral, delay from tiliqua.eurorack_pmod import ASQ from tiliqua.cli import top_level_cli +from tiliqua.delay_line import DelayLine # for sim -from amaranth.back import verilog -from tiliqua import sim +from amaranth.back import verilog +from tiliqua import sim class Mirror(wiring.Component): @@ -127,14 +127,15 @@ def elaborate(self, platform): m.submodules.split4 = split4 = dsp.Split(n_channels=4) m.submodules.merge4 = merge4 = dsp.Merge(n_channels=4) - m.submodules.delay_line = delay_line = dsp.DelayLine(max_delay=8192) + m.submodules.delay_line = delay_line = DelayLine( + max_delay=8192, write_triggers_read=False) m.submodules.pitch_shift = pitch_shift = dsp.PitchShift( - delayln=delay_line, xfade=delay_line.max_delay//4) + tap=delay_line.add_tap(), xfade=delay_line.max_delay//4) wiring.connect(m, wiring.flipped(self.i), split4.i) # write audio samples to delay line - wiring.connect(m, split4.o[0], delay_line.sw) + wiring.connect(m, split4.o[0], delay_line.i) # hook up 2nd input channel as pitch control, use fixed grain_sz m.d.comb += [ @@ -179,80 +180,6 @@ def elaborate(self, platform): return m -class Diffuser(wiring.Component): - - """ - 4-channel feedback delay, diffused by a matrix mixer. - """ - - i: In(stream.Signature(data.ArrayLayout(ASQ, 4))) - o: Out(stream.Signature(data.ArrayLayout(ASQ, 4))) - - def elaborate(self, platform): - m = Module() - - # quadrants in the below matrix are: - # - # [in -> out] [in -> delay] - # [delay -> out] [delay -> delay] <- feedback - # - - m.submodules.matrix_mix = matrix_mix = dsp.MatrixMix( - i_channels=8, o_channels=8, - coefficients=[[0.6, 0.0, 0.0, 0.0, 0.8, 0.0, 0.0, 0.0], # in0 - [0.0, 0.6, 0.0, 0.0, 0.0, 0.8, 0.0, 0.0], # | - [0.0, 0.0, 0.6, 0.0, 0.0, 0.0, 0.8, 0.0], # | - [0.0, 0.0, 0.0, 0.6, 0.0, 0.0, 0.0, 0.8], # in3 - [0.4, 0.0, 0.0, 0.0, 0.4,-0.4,-0.4,-0.4], # ds0 - [0.0, 0.4, 0.0, 0.0,-0.4, 0.4,-0.4,-0.4], # | - [0.0, 0.0, 0.4, 0.0,-0.4,-0.4, 0.4,-0.4], # | - [0.0, 0.0, 0.0, 0.4,-0.4,-0.4,-0.4, 0.4]])# ds3 - # out0 ------- out3 sw0 ---------- sw3 - - delay_lines = [ - dsp.DelayLine(max_delay=2048), - dsp.DelayLine(max_delay=4096), - dsp.DelayLine(max_delay=8192), - dsp.DelayLine(max_delay=8192), - ] - m.submodules += delay_lines - - m.d.comb += [delay_lines[n].da.valid.eq(1) for n in range(4)] - m.d.comb += [ - delay_lines[0].da.payload.eq(2000), - delay_lines[1].da.payload.eq(3000), - delay_lines[2].da.payload.eq(5000), - delay_lines[3].da.payload.eq(7000), - ] - - m.submodules.split4 = split4 = dsp.Split(n_channels=4) - m.submodules.merge4 = merge4 = dsp.Merge(n_channels=4) - - m.submodules.split8 = split8 = dsp.Split(n_channels=8) - m.submodules.merge8 = merge8 = dsp.Merge(n_channels=8) - - wiring.connect(m, wiring.flipped(self.i), split4.i) - - # matrix <-> independent streams - wiring.connect(m, matrix_mix.o, split8.i) - wiring.connect(m, merge8.o, matrix_mix.i) - - for n in range(4): - # audio -> matrix [0-3] - wiring.connect(m, split4.o[n], merge8.i[n]) - # delay -> matrix [4-7] - wiring.connect(m, delay_lines[n].ds, merge8.i[4+n]) - - for n in range(4): - # matrix -> audio [0-3] - wiring.connect(m, split8.o[n], merge4.i[n]) - # matrix -> delay [4-7] - wiring.connect(m, split8.o[4+n], delay_lines[n].sw) - - wiring.connect(m, merge4.o, wiring.flipped(self.o)) - - return m - class DualWaveshaper(wiring.Component): """Soft distortion, channel 1/2 inputs, 3 is overdrive gain.""" @@ -318,7 +245,6 @@ def elaborate(self, platform): return m - class QuadNCO(wiring.Component): """Audio-rate NCO with oversampling. 4 different waveform outputs.""" @@ -511,6 +437,223 @@ def elaborate(self, platform): return m +class PSRAMPingPongDelay(wiring.Component): + + """ + 2-channel stereo ping-pong delay, backed by external PSRAM. + + 2 delay lines are instantiated in isolated slices of the external + memory address space. Using external memory allows for much longer + delay times whilst using less resources, compared to SRAM-backed + delay lines, however on a larger design, you have to be careful + that PSRAM-backed delay lines don't get starved by other PSRAM + traffic (i.e video framebuffer operations). + + Tiliqua input 0/1 is stereo in, output 0/1 is stereo out. + """ + + i: In(stream.Signature(data.ArrayLayout(ASQ, 4))) + o: Out(stream.Signature(data.ArrayLayout(ASQ, 4))) + + # shared bus to external memory + bus: Out(wishbone.Signature(addr_width=22, + data_width=32, + granularity=8, + features={'bte', 'cti'})) + + def __init__(self): + super().__init__() + + # 2 delay lines, backed by 2 different slices of PSRAM address space. + + self.delayln1 = DelayLine( + max_delay=0x4000, # careful this doesn't collide with delayln2.base! + psram_backed=True, + addr_width_o=self.bus.addr_width, + base=0x00000, + ) + + self.delayln2 = DelayLine( + max_delay=0x4000, + psram_backed=True, + addr_width_o=self.bus.addr_width, + base=0x4000, + ) + + # Both delay lines share our memory bus round-robin for all operations. + + self._arbiter = wishbone.Arbiter(addr_width=self.bus.addr_width, + data_width=self.bus.data_width, + granularity=self.bus.granularity, + features=self.bus.features) + self._arbiter.add(self.delayln1.bus) + self._arbiter.add(self.delayln2.bus) + + # Create the PingPongCore using the above delay lines. + + self.pingpong = delay.PingPongDelay(self.delayln1, self.delayln2) + + def elaborate(self, platform): + m = Module() + + m.submodules.arbiter = self._arbiter + m.submodules.delayln1 = self.delayln1 + m.submodules.delayln2 = self.delayln2 + m.submodules.pingping = self.pingpong + + wiring.connect(m, self._arbiter.bus, wiring.flipped(self.bus)) + + # Map hardware in/out channels 0, 1 (of 4) to pingpong stereo channels 0, 1 + + dsp.channel_remap(m, wiring.flipped(self.i), self.pingpong.i, {0: 0, 1: 1}) + dsp.channel_remap(m, self.pingpong.o, wiring.flipped(self.o), {0: 0, 1: 1}) + + return m + +class SRAMPingPongDelay(wiring.Component): + + """ + 2-channel stereo ping-pong delay, backed by internal SRAM. + + Tiliqua input 0/1 is stereo in, output 0/1 is stereo out. + """ + + i: In(stream.Signature(data.ArrayLayout(ASQ, 4))) + o: Out(stream.Signature(data.ArrayLayout(ASQ, 4))) + + def __init__(self): + super().__init__() + + # 2 delay lines, backed by independent slabs of internal SRAM. + + self.delayln1 = DelayLine(max_delay=0x4000) + self.delayln2 = DelayLine(max_delay=0x4000) + + # Create the PingPongCore using the above delay lines. + + self.pingpong = delay.PingPongDelay(self.delayln1, self.delayln2) + + def elaborate(self, platform): + m = Module() + + m.submodules.delayln1 = self.delayln1 + m.submodules.delayln2 = self.delayln2 + + m.submodules.pingping = self.pingpong + + # Map hardware in/out channels 0, 1 (of 4) to pingpong stereo channels 0, 1 + + dsp.channel_remap(m, wiring.flipped(self.i), self.pingpong.i, {0: 0, 1: 1}) + dsp.channel_remap(m, self.pingpong.o, wiring.flipped(self.o), {0: 0, 1: 1}) + + return m + +class PSRAMDiffuser(wiring.Component): + + """ + PSRAM-backed 4-channel feedback delay, diffused by a matrix mixer. + """ + + i: In(stream.Signature(data.ArrayLayout(ASQ, 4))) + o: Out(stream.Signature(data.ArrayLayout(ASQ, 4))) + bus: Out(wishbone.Signature(addr_width=22, + data_width=32, + granularity=8, + features={'bte', 'cti'})) + + def __init__(self): + super().__init__() + + # 4 delay lines, backed by 4 different slices of PSRAM address space. + + self.delay_lines = [ + DelayLine( + max_delay=0x10000, + psram_backed=True, + addr_width_o=self.bus.addr_width, + base=0x00000, + ), + DelayLine( + max_delay=0x10000, + psram_backed=True, + addr_width_o=self.bus.addr_width, + base=0x10000, + ), + DelayLine( + max_delay=0x10000, + psram_backed=True, + addr_width_o=self.bus.addr_width, + base=0x20000, + ), + DelayLine( + max_delay=0x10000, + psram_backed=True, + addr_width_o=self.bus.addr_width, + base=0x30000, + ), + ] + + # All delay lines share our top-level bus for read/write operations. + + self._arbiter = wishbone.Arbiter(addr_width=self.bus.addr_width, + data_width=self.bus.data_width, + granularity=self.bus.granularity, + features=self.bus.features) + for delayln in self.delay_lines: + self._arbiter.add(delayln.bus) + + self.diffuser = delay.Diffuser(self.delay_lines) + + def elaborate(self, platform): + m = Module() + + m.submodules.arbiter = self._arbiter + wiring.connect(m, self._arbiter.bus, wiring.flipped(self.bus)) + + dsp.named_submodules(m.submodules, self.delay_lines) + + m.submodules.diffuser = self.diffuser + + wiring.connect(m, wiring.flipped(self.i), self.diffuser.i) + wiring.connect(m, self.diffuser.o, wiring.flipped(self.o)) + + return m + +class SRAMDiffuser(wiring.Component): + + """ + SRAM-backed 4-channel feedback delay, diffused by a matrix mixer. + """ + + i: In(stream.Signature(data.ArrayLayout(ASQ, 4))) + o: Out(stream.Signature(data.ArrayLayout(ASQ, 4))) + + def __init__(self): + super().__init__() + + # 4 delay lines, backed by 4 independent SRAM banks. + + self.delay_lines = [ + DelayLine(max_delay=2048), + DelayLine(max_delay=4096), + DelayLine(max_delay=8192), + DelayLine(max_delay=8192), + ] + + self.diffuser = delay.Diffuser(self.delay_lines) + + def elaborate(self, platform): + m = Module() + + dsp.named_submodules(m.submodules, self.delay_lines) + + m.submodules.diffuser = self.diffuser + + wiring.connect(m, wiring.flipped(self.i), self.diffuser.i) + wiring.connect(m, self.diffuser.o, wiring.flipped(self.o)) + + return m + class CoreTop(Elaboratable): def __init__(self, dsp_core, enable_touch): @@ -524,6 +667,8 @@ def __init__(self, dsp_core, enable_touch): self.inject2 = Signal(signed(16)) self.inject3 = Signal(signed(16)) + self.psram_periph = psram_peripheral.Peripheral(size=16*1024*1024) + super().__init__() def elaborate(self, platform): @@ -562,21 +707,28 @@ def elaborate(self, platform): wiring.connect(m, serialrx.o, midi_decode.i) wiring.connect(m, midi_decode.o, self.core.i_midi) + if hasattr(self.core, "bus"): + m.submodules.psram_periph = self.psram_periph + wiring.connect(m, self.core.bus, self.psram_periph.bus) + return m # Different DSP cores that can be selected at top-level CLI. CORES = { - # (touch, class name) - "mirror": (False, Mirror), - "svf": (False, ResonantFilter), - "vca": (False, DualVCA), - "pitch": (False, Pitch), - "matrix": (False, Matrix), - "diffuser": (False, Diffuser), - "touchmix": (True, TouchMixTop), - "waveshaper": (False, DualWaveshaper), - "nco": (False, QuadNCO), - "midicv": (False, MidiCVTop), + # (touch, class name) + "mirror": (False, Mirror), + "svf": (False, ResonantFilter), + "vca": (False, DualVCA), + "pitch": (False, Pitch), + "matrix": (False, Matrix), + "touchmix": (True, TouchMixTop), + "waveshaper": (False, DualWaveshaper), + "nco": (False, QuadNCO), + "midicv": (False, MidiCVTop), + "psram_pingpong": (False, PSRAMPingPongDelay), + "sram_pingpong": (False, SRAMPingPongDelay), + "psram_diffuser": (False, PSRAMDiffuser), + "sram_diffuser": (False, SRAMDiffuser), } def simulation_ports(fragment): @@ -590,6 +742,12 @@ def simulation_ports(fragment): "fs_inject1": (fragment.inject1, None), "fs_inject2": (fragment.inject2, None), "fs_inject3": (fragment.inject3, None), + "idle": (fragment.psram_periph.simif.idle, None), + "address_ptr": (fragment.psram_periph.simif.address_ptr, None), + "read_data_view": (fragment.psram_periph.simif.read_data_view, None), + "write_data": (fragment.psram_periph.simif.write_data, None), + "read_ready": (fragment.psram_periph.simif.read_ready, None), + "write_ready": (fragment.psram_periph.simif.write_ready, None), } def argparse_callback(parser): diff --git a/gateware/src/top/polysyn/top.py b/gateware/src/top/polysyn/top.py index b8d7488..b85b994 100644 --- a/gateware/src/top/polysyn/top.py +++ b/gateware/src/top/polysyn/top.py @@ -15,88 +15,44 @@ from amaranth_future import fixed -from tiliqua import eurorack_pmod, dsp, midi, scope, sim +from tiliqua import eurorack_pmod, dsp, midi, scope, sim, delay +from tiliqua.delay_line import DelayLine from tiliqua.eurorack_pmod import ASQ from tiliqua.tiliqua_soc import TiliquaSoc from tiliqua.cli import top_level_cli -# TODO: reconcile this with Diffuser in tiliqua.dsp -# it's almost the same, just some coefficients tweaked so it doesn't -# saturate quite as easily. class Diffuser(wiring.Component): - """ - 4-channel feedback delay, diffused by a matrix mixer. - """ - i: In(stream.Signature(data.ArrayLayout(ASQ, 4))) o: Out(stream.Signature(data.ArrayLayout(ASQ, 4))) - def elaborate(self, platform): - m = Module() - - # quadrants in the below matrix are: - # - # [in -> out] [in -> delay] - # [delay -> out] [delay -> delay] <- feedback - # + def __init__(self): + super().__init__() - m.submodules.matrix_mix = matrix_mix = dsp.MatrixMix( - i_channels=8, o_channels=8, - coefficients=[[0.6, 0.0, 0.0, 0.0, 0.8, 0.0, 0.0, 0.0], # in0 - [0.0, 0.6, 0.0, 0.0, 0.0, 0.8, 0.0, 0.0], # | - [0.0, 0.0, 0.6, 0.0, 0.0, 0.0, 0.8, 0.0], # | - [0.0, 0.0, 0.0, 0.6, 0.0, 0.0, 0.0, 0.8], # in3 - [0.4, 0.0, 0.0, 0.0, 0.4,-0.4,-0.4,-0.4], # ds0 - [0.0, 0.4, 0.0, 0.0,-0.4, 0.4,-0.4,-0.4], # | - [0.0, 0.0, 0.4, 0.0,-0.4,-0.4, 0.4,-0.4], # | - [0.0, 0.0, 0.0, 0.4,-0.4,-0.4,-0.4, 0.4]])# ds3 - # out0 ------- out3 sw0 ---------- sw3 - - self.matrix = matrix_mix - - delay_lines = [ - dsp.DelayLine(max_delay=2048), - dsp.DelayLine(max_delay=4096), - dsp.DelayLine(max_delay=8192), - dsp.DelayLine(max_delay=8192), - ] - - dsp.named_submodules(m.submodules, delay_lines) + # 4 delay lines, backed by 4 independent SRAM banks. - m.d.comb += [delay_lines[n].da.valid.eq(1) for n in range(4)] - m.d.comb += [ - delay_lines[0].da.payload.eq(2000), - delay_lines[1].da.payload.eq(3000), - delay_lines[2].da.payload.eq(5000), - delay_lines[3].da.payload.eq(7000), + self.delay_lines = [ + DelayLine(max_delay=2048), + DelayLine(max_delay=4096), + DelayLine(max_delay=8192), + DelayLine(max_delay=8192), ] - m.submodules.split4 = split4 = dsp.Split(n_channels=4) - m.submodules.merge4 = merge4 = dsp.Merge(n_channels=4) + self.diffuser = delay.Diffuser(self.delay_lines) - m.submodules.split8 = split8 = dsp.Split(n_channels=8) - m.submodules.merge8 = merge8 = dsp.Merge(n_channels=8) + # Coefficients of this are tweaked by the SoC - wiring.connect(m, wiring.flipped(self.i), split4.i) + self.matrix = self.diffuser.matrix_mix - # matrix <-> independent streams - wiring.connect(m, matrix_mix.o, split8.i) - wiring.connect(m, merge8.o, matrix_mix.i) + def elaborate(self, platform): + m = Module() - for n in range(4): - # audio -> matrix [0-3] - wiring.connect(m, split4.o[n], merge8.i[n]) - # delay -> matrix [4-7] - wiring.connect(m, delay_lines[n].ds, merge8.i[4+n]) + dsp.named_submodules(m.submodules, self.delay_lines) - for n in range(4): - # matrix -> audio [0-3] - wiring.connect(m, split8.o[n], merge4.i[n]) - # matrix -> delay [4-7] - wiring.connect(m, split8.o[4+n], delay_lines[n].sw) + m.submodules.diffuser = self.diffuser - wiring.connect(m, merge4.o, wiring.flipped(self.o)) + wiring.connect(m, wiring.flipped(self.i), self.diffuser.i) + wiring.connect(m, self.diffuser.o, wiring.flipped(self.o)) return m diff --git a/gateware/tests/test_delayln.py b/gateware/tests/test_delayln.py new file mode 100644 index 0000000..931f2df --- /dev/null +++ b/gateware/tests/test_delayln.py @@ -0,0 +1,151 @@ +# Copyright (c) 2024 Seb Holzapfel, apfelaudio UG +# +# SPDX-License-Identifier: CERN-OHL-S-2.0 + +import math +import sys +import unittest + +from amaranth import * +from amaranth.sim import * +from amaranth.lib import wiring +from amaranth.lib.wiring import In, Out +from tiliqua import dsp, eurorack_pmod, cache, delay_line +from tiliqua.eurorack_pmod import ASQ + +from amaranth_soc import csr +from amaranth_soc import wishbone + +from amaranth_future import fixed + +class DelayLineTests(unittest.TestCase): + + def test_sram_delayln(self): + + dut = delay_line.DelayLine( + max_delay=256, + write_triggers_read=False, + ) + + tap1 = dut.add_tap() + tap2 = dut.add_tap() + + async def stimulus_wr(ctx): + for n in range(0, sys.maxsize): + ctx.set(dut.i.valid, 1) + ctx.set(dut.i.payload, + fixed.Const(0.8*math.sin(n*0.2), shape=ASQ)) + await ctx.tick() + ctx.set(dut.i.valid, 0) + await ctx.tick().repeat(30) + + async def stimulus_rd1(ctx): + ctx.set(tap1.o.ready, 1) + for n in range(0, sys.maxsize): + ctx.set(tap1.i.valid, 1) + ctx.set(tap1.i.payload, 4) + await ctx.tick() + ctx.set(tap1.i.valid, 0) + await ctx.tick().repeat(30) + + async def stimulus_rd2(ctx): + ctx.set(tap2.o.ready, 1) + for n in range(0, sys.maxsize): + ctx.set(tap2.i.valid, 1) + ctx.set(tap2.i.payload, 10) + await ctx.tick() + ctx.set(tap2.i.valid, 0) + await ctx.tick().repeat(30) + + async def testbench(ctx): + n_rd1 = 0 + n_rd2 = 0 + for n in range(200): + await ctx.tick() + if ctx.get(tap1.o.valid) and ctx.get(tap1.o.ready): + n_rd1 += 1 + if ctx.get(tap2.o.valid) and ctx.get(tap2.o.ready): + n_rd2 += 1 + # both taps produced some output samples + assert n_rd1 > 5 + assert n_rd2 > 5 + + sim = Simulator(dut) + sim.add_clock(1e-6) + sim.add_testbench(testbench) + sim.add_process(stimulus_wr) + sim.add_process(stimulus_rd1) + sim.add_process(stimulus_rd2) + with sim.write_vcd(vcd_file=open("test_sram_delayln.vcd", "w")): + sim.run() + + def test_psram_delayln(self): + + dut = delay_line.DelayLine( + max_delay=256, + psram_backed=True, + base=0x0, + addr_width_o=22, + write_triggers_read=False, + ) + + tap1 = dut.add_tap() + tap2 = dut.add_tap() + + async def stimulus_wr(ctx): + for n in range(0, sys.maxsize): + ctx.set(dut.i.valid, 1) + ctx.set(dut.i.payload, + fixed.Const(0.8*math.sin(n*0.2), shape=ASQ)) + await ctx.tick() + ctx.set(dut.i.valid, 0) + await ctx.tick().repeat(30) + + async def stimulus_rd1(ctx): + ctx.set(tap1.o.ready, 1) + for n in range(0, sys.maxsize): + ctx.set(tap1.i.valid, 1) + ctx.set(tap1.i.payload, 4) + await ctx.tick() + ctx.set(tap1.i.valid, 0) + await ctx.tick().repeat(30) + + async def stimulus_rd2(ctx): + ctx.set(tap2.o.ready, 1) + for n in range(0, sys.maxsize): + ctx.set(tap2.i.valid, 1) + ctx.set(tap2.i.payload, 10) + await ctx.tick() + ctx.set(tap2.i.valid, 0) + await ctx.tick().repeat(30) + + async def testbench(ctx): + # Simulate some transactions against a fake PSRAM bus. + mem = [0] * dut.max_delay + membus = dut.bus + for _ in range(200): + while not ctx.get(membus.stb): + await ctx.tick() + # Simulate acks delayed from stb + await ctx.tick().repeat(2) + ctx.set(membus.ack, 1) + adr = ctx.get(membus.adr) + if ctx.get(membus.we): + # warn: only whole-word transactions are simulated + mem[adr] = ctx.get(membus.dat_w) + print("write", hex(mem[adr]), "@", adr) + else: + print("read", hex(mem[adr]), "@", adr) + ctx.set(membus.dat_r, mem[ctx.get(membus.adr)]) + await ctx.tick() + ctx.set(membus.ack, 0) + await ctx.tick() + + sim = Simulator(dut) + sim.add_clock(1e-6) + sim.add_testbench(testbench) + sim.add_process(stimulus_wr) + sim.add_process(stimulus_rd1) + sim.add_process(stimulus_rd2) + with sim.write_vcd(vcd_file=open("test_psram_delayln.vcd", "w")): + sim.run() diff --git a/gateware/tests/test_dsp.py b/gateware/tests/test_dsp.py index 9d11ee7..4cb9f53 100644 --- a/gateware/tests/test_dsp.py +++ b/gateware/tests/test_dsp.py @@ -13,62 +13,32 @@ from amaranth.lib import wiring, data from tiliqua.eurorack_pmod import ASQ -from tiliqua import dsp +from tiliqua import dsp, delay_line class DSPTests(unittest.TestCase): - def test_delayline(self): - - delay_line = dsp.DelayLine() - - async def stimulus(ctx): - for n in range(0, sys.maxsize): - ctx.set(delay_line.sw.valid, 1) - ctx.set(delay_line.sw.payload, - fixed.Const(0.8*math.sin(n*0.2), shape=ASQ)) - await ctx.tick() - ctx.set(delay_line.sw.valid, 0) - await ctx.tick() - - async def testbench(ctx): - await ctx.tick().repeat(200) - for n in range(0, 10): - ctx.set(delay_line.da.payload, n) - ctx.set(delay_line.ds.ready, 1) - ctx.set(delay_line.da.valid, 1) - await ctx.tick() - ctx.set(delay_line.da.valid, 0) - await ctx.tick() - - sim = Simulator(delay_line) - sim.add_clock(1e-6) - sim.add_process(stimulus) - sim.add_testbench(testbench) - with sim.write_vcd(vcd_file=open("test_delayline.vcd", "w")): - sim.run() - def test_pitch(self): m = Module() - delay_line = dsp.DelayLine(max_delay=256) - pitch_shift = dsp.PitchShift(delayln=delay_line, xfade=32) - m.submodules += [delay_line, pitch_shift] + delayln = delay_line.DelayLine(max_delay=256, write_triggers_read=False) + pitch_shift = dsp.PitchShift(tap=delayln.add_tap(), xfade=32) + m.submodules += [delayln, pitch_shift] async def testbench(ctx): await ctx.tick() await ctx.tick() for n in range(0, 1000): x = fixed.Const(0.8*math.sin(n*0.1), shape=ASQ) - ctx.set(delay_line.sw.valid, 1) - ctx.set(delay_line.sw.payload, x) + ctx.set(delayln.i.valid, 1) + ctx.set(delayln.i.payload, x) await ctx.tick() - ctx.set(delay_line.sw.valid, 0) + ctx.set(delayln.i.valid, 0) await ctx.tick() await ctx.tick() ctx.set(pitch_shift.i.payload.pitch, fixed.Const(-0.8, shape=pitch_shift.dtype)) ctx.set(pitch_shift.i.payload.grain_sz, - delay_line.max_delay//2) + delayln.max_delay//2) ctx.set(pitch_shift.o.ready, 1) ctx.set(pitch_shift.i.valid, 1) await ctx.tick()