diff --git a/gateware/tests/test_i2c.py b/gateware/tests/test_i2c.py index f234d615..e295d0a7 100644 --- a/gateware/tests/test_i2c.py +++ b/gateware/tests/test_i2c.py @@ -30,7 +30,9 @@ class FakeI2CPads: bridge = wishbone.WishboneCSRBridge(decoder.bus, data_width=32) m.submodules += [dut, decoder, bridge] - def to_wb_params(register_start_bytes, field_start_bits, field_width_bits, word_sz=4): + def wb_transaction_params(register_start_bytes, field_start_bits, + field_width_bits, word_sz=4): + """Convert register byte/bit indices into wishbone transaction arguments.""" # include bit offset in byte offset if it's > 8 register_start_bytes += field_start_bits // 8 field_start_bits %= 8 @@ -49,71 +51,90 @@ def to_wb_params(register_start_bytes, field_start_bits, field_width_bits, word_ dat_shift = ix_bytes*8 + field_start_bits return wb_adr, wb_sel, dat_shift, field_width_bits - def register_ix(register_name, field_name=None): - for (reg_object, name, (s_byte, e_byte)) in dut.bus.memory_map.resources(): + def wb_register(mmap_bus, register_name, field_name=None): + """ + Find a register (optionally subfield) in a bus memory map. + Return arguments required for a wishbone transaction to access it. + """ + for (reg_object, name, (s_byte, e_byte)) in mmap_bus.memory_map.resources(): name = str(name)[6:-2] if name == register_name: if field_name is None: - return s_byte, 0, (e_byte - s_byte)*8 + return wb_transaction_params( + register_start_bytes=s_byte, + field_start_bits=0, + field_width_bits=(e_byte - s_byte)*8 + ) offset = 0 for path, action in reg_object: name = "_".join([str(s) for s in path]) width = action.port.shape.width if name == field_name: - return s_byte, offset, width + return wb_transaction_params( + register_start_bytes=s_byte, + field_start_bits=offset, + field_width_bits=width + ) offset += width raise ValueError(f"{register_name} {field_name} does not exist in memory map.") - async def wb_transaction(ctx, adr, we, sel, dat_w=None): - ctx.set(bridge.wb_bus.cyc, 1) - ctx.set(bridge.wb_bus.sel, sel) - ctx.set(bridge.wb_bus.we, we) - ctx.set(bridge.wb_bus.adr, adr) - ctx.set(bridge.wb_bus.stb, 1) + async def wb_transaction(ctx, wb_bus, adr, we, sel, dat_w=None): + ctx.set(wb_bus.cyc, 1) + ctx.set(wb_bus.sel, sel) + ctx.set(wb_bus.we, we) + ctx.set(wb_bus.adr, adr) + ctx.set(wb_bus.stb, 1) if we: - ctx.set(bridge.wb_bus.dat_w, dat_w) + ctx.set(wb_bus.dat_w, dat_w) await ctx.tick().repeat(5) - self.assertEqual(ctx.get(bridge.wb_bus.ack), 1) - value = ctx.get(bridge.wb_bus.dat_r) if not we else None - ctx.set(bridge.wb_bus.stb, 0) + self.assertEqual(ctx.get(wb_bus.ack), 1) + value = ctx.get(wb_bus.dat_r) if not we else None + ctx.set(wb_bus.stb, 0) await ctx.tick() - self.assertEqual(ctx.get(bridge.wb_bus.ack), 0) + self.assertEqual(ctx.get(wb_bus.ack), 0) return value - async def wb_csr_w(ctx, value, register_name, field_name=None): - adr, sel, shift, _ = to_wb_params(*register_ix(register_name, field_name)) - return await wb_transaction(ctx, adr, 1, sel, dat_w=value<> shift) & int('1'*w_bits, base=2) async def testbench(ctx): + + async def csr_write(ctx, value, register, field=None): + await wb_csr_w(ctx, dut.bus, bridge.wb_bus, value, register, field) + + async def csr_read(ctx, register, field=None): + return await wb_csr_r(ctx, dut.bus, bridge.wb_bus, register, field) + # set device address - await wb_csr_w(ctx, 0x55, "address") + await csr_write(ctx, 0x55, "address") # enqueue 2x write ops - await wb_csr_w(ctx, 0x042, "transaction_reg") - await wb_csr_w(ctx, 0x013, "transaction_reg") + await csr_write(ctx, 0x042, "transaction_reg") + await csr_write(ctx, 0x013, "transaction_reg") # enqueue 1x read op - await wb_csr_w(ctx, 0x100, "transaction_reg") + await csr_write(ctx, 0x100, "transaction_reg") # 3 transactions are enqueued self.assertEqual(ctx.get(dut._transactions.level), 3) # start the i2c core - await wb_csr_w(ctx, 1, "start") + await csr_write(ctx, 1, "start") # busy flag should go high - self.assertEqual(await wb_csr_r(ctx, "status", "busy"), 1) + self.assertEqual(await csr_read(ctx, "status", "busy"), 1) # run for a while await ctx.tick().repeat(500) # busy flag should be low - self.assertEqual(await wb_csr_r(ctx, "status", "busy"), 0) + self.assertEqual(await csr_read(ctx, "status", "busy"), 0) # all transactions drained. self.assertEqual(ctx.get(dut._transactions.level), 0)