Skip to content

Commit

Permalink
more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
schnommus committed Sep 24, 2024
1 parent 4e23e80 commit e6cb56b
Showing 1 changed file with 50 additions and 29 deletions.
79 changes: 50 additions & 29 deletions gateware/tests/test_i2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class FakeI2CPads:
bridge = wishbone.WishboneCSRBridge(decoder.bus, data_width=32)
m.submodules += [dut, decoder, bridge]

def to_wb_params(register_start_bytes, field_start_bits, field_width_bits, word_sz=4):
def wb_transaction_params(register_start_bytes, field_start_bits,
field_width_bits, word_sz=4):
"""Convert register byte/bit indices into wishbone transaction arguments."""
# include bit offset in byte offset if it's > 8
register_start_bytes += field_start_bits // 8
field_start_bits %= 8
Expand All @@ -49,71 +51,90 @@ def to_wb_params(register_start_bytes, field_start_bits, field_width_bits, word_
dat_shift = ix_bytes*8 + field_start_bits
return wb_adr, wb_sel, dat_shift, field_width_bits

def register_ix(register_name, field_name=None):
for (reg_object, name, (s_byte, e_byte)) in dut.bus.memory_map.resources():
def wb_register(mmap_bus, register_name, field_name=None):
"""
Find a register (optionally subfield) in a bus memory map.
Return arguments required for a wishbone transaction to access it.
"""
for (reg_object, name, (s_byte, e_byte)) in mmap_bus.memory_map.resources():
name = str(name)[6:-2]
if name == register_name:
if field_name is None:
return s_byte, 0, (e_byte - s_byte)*8
return wb_transaction_params(
register_start_bytes=s_byte,
field_start_bits=0,
field_width_bits=(e_byte - s_byte)*8
)
offset = 0
for path, action in reg_object:
name = "_".join([str(s) for s in path])
width = action.port.shape.width
if name == field_name:
return s_byte, offset, width
return wb_transaction_params(
register_start_bytes=s_byte,
field_start_bits=offset,
field_width_bits=width
)
offset += width
raise ValueError(f"{register_name} {field_name} does not exist in memory map.")

async def wb_transaction(ctx, adr, we, sel, dat_w=None):
ctx.set(bridge.wb_bus.cyc, 1)
ctx.set(bridge.wb_bus.sel, sel)
ctx.set(bridge.wb_bus.we, we)
ctx.set(bridge.wb_bus.adr, adr)
ctx.set(bridge.wb_bus.stb, 1)
async def wb_transaction(ctx, wb_bus, adr, we, sel, dat_w=None):
ctx.set(wb_bus.cyc, 1)
ctx.set(wb_bus.sel, sel)
ctx.set(wb_bus.we, we)
ctx.set(wb_bus.adr, adr)
ctx.set(wb_bus.stb, 1)
if we:
ctx.set(bridge.wb_bus.dat_w, dat_w)
ctx.set(wb_bus.dat_w, dat_w)
await ctx.tick().repeat(5)
self.assertEqual(ctx.get(bridge.wb_bus.ack), 1)
value = ctx.get(bridge.wb_bus.dat_r) if not we else None
ctx.set(bridge.wb_bus.stb, 0)
self.assertEqual(ctx.get(wb_bus.ack), 1)
value = ctx.get(wb_bus.dat_r) if not we else None
ctx.set(wb_bus.stb, 0)
await ctx.tick()
self.assertEqual(ctx.get(bridge.wb_bus.ack), 0)
self.assertEqual(ctx.get(wb_bus.ack), 0)
return value

async def wb_csr_w(ctx, value, register_name, field_name=None):
adr, sel, shift, _ = to_wb_params(*register_ix(register_name, field_name))
return await wb_transaction(ctx, adr, 1, sel, dat_w=value<<shift)
async def wb_csr_w(ctx, mmap_bus, wb_bus, value, register_name, field_name=None):
adr, sel, shift, _ = wb_register(mmap_bus, register_name, field_name)
return await wb_transaction(ctx, wb_bus, adr, 1, sel, dat_w=value<<shift)

async def wb_csr_r(ctx, register_name, field_name=None):
adr, sel, shift, w_bits = to_wb_params(*register_ix(register_name, field_name))
value_32b = await wb_transaction(ctx, adr, 0, sel)
async def wb_csr_r(ctx, mmap_bus, wb_bus, register_name, field_name=None):
adr, sel, shift, w_bits = wb_register(mmap_bus, register_name, field_name)
value_32b = await wb_transaction(ctx, wb_bus, adr, 0, sel)
return (value_32b >> shift) & int('1'*w_bits, base=2)

async def testbench(ctx):

async def csr_write(ctx, value, register, field=None):
await wb_csr_w(ctx, dut.bus, bridge.wb_bus, value, register, field)

async def csr_read(ctx, register, field=None):
return await wb_csr_r(ctx, dut.bus, bridge.wb_bus, register, field)

# set device address
await wb_csr_w(ctx, 0x55, "address")
await csr_write(ctx, 0x55, "address")

# enqueue 2x write ops
await wb_csr_w(ctx, 0x042, "transaction_reg")
await wb_csr_w(ctx, 0x013, "transaction_reg")
await csr_write(ctx, 0x042, "transaction_reg")
await csr_write(ctx, 0x013, "transaction_reg")

# enqueue 1x read op
await wb_csr_w(ctx, 0x100, "transaction_reg")
await csr_write(ctx, 0x100, "transaction_reg")

# 3 transactions are enqueued
self.assertEqual(ctx.get(dut._transactions.level), 3)

# start the i2c core
await wb_csr_w(ctx, 1, "start")
await csr_write(ctx, 1, "start")

# busy flag should go high
self.assertEqual(await wb_csr_r(ctx, "status", "busy"), 1)
self.assertEqual(await csr_read(ctx, "status", "busy"), 1)

# run for a while
await ctx.tick().repeat(500)

# busy flag should be low
self.assertEqual(await wb_csr_r(ctx, "status", "busy"), 0)
self.assertEqual(await csr_read(ctx, "status", "busy"), 0)

# all transactions drained.
self.assertEqual(ctx.get(dut._transactions.level), 0)
Expand Down

0 comments on commit e6cb56b

Please sign in to comment.