Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interop: Integrate for new input flow #12213

Draft
wants to merge 5 commits into
base: interop-inputs-2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions op-e2e/interop/interop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package interop

import (
"context"
"fmt"
"math/big"
"testing"
"time"
Expand Down Expand Up @@ -86,14 +85,12 @@ func TestInteropTrivial(t *testing.T) {
require.Equal(t, expectedBalance, bobBalance)

s2.DeployEmitterContract(chainA, "Alice")
rec := s2.EmitData(chainA, "Alice", "0x1234567890abcdef")

fmt.Println("Result of emitting event:", rec)

s2.DeployEmitterContract(chainB, "Alice")
rec = s2.EmitData(chainB, "Alice", "0x1234567890abcdef")
for i := 0; i < 1; i++ {
s2.EmitData(chainA, "Alice", "0x1234567890abcdef")

fmt.Println("Result of emitting event:", rec)
s2.EmitData(chainB, "Alice", "0x1234567890abcdef")
}

time.Sleep(60 * time.Second)

Expand Down
15 changes: 12 additions & 3 deletions op-service/sources/supervisor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"

"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
Expand Down Expand Up @@ -68,7 +67,12 @@ func (cl *SupervisorClient) AddL2RPC(

func (cl *SupervisorClient) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) {
var result types.ReferenceView
err := cl.client.CallContext(ctx, &result, "supervisor_unsafeView", (*hexutil.U256)(&chainID), unsafe)
err := cl.client.CallContext(
ctx,
&result,
"supervisor_unsafeView",
chainID,
unsafe)
if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to share unsafe block view %s (chain %s): %w", unsafe, chainID, err)
}
Expand All @@ -77,7 +81,12 @@ func (cl *SupervisorClient) UnsafeView(ctx context.Context, chainID types.ChainI

func (cl *SupervisorClient) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) {
var result types.ReferenceView
err := cl.client.CallContext(ctx, &result, "supervisor_safeView", (*hexutil.U256)(&chainID), safe)
err := cl.client.CallContext(
ctx,
&result,
"supervisor_safeView",
chainID,
safe)
if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to share safe block view %s (chain %s): %w", safe, chainID, err)
}
Expand Down
130 changes: 91 additions & 39 deletions op-supervisor/supervisor/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
Expand All @@ -29,14 +30,16 @@ type SupervisorBackend struct {
m Metrics
dataDir string

chainMonitors map[types.ChainID]*source.ChainMonitor
db *db.ChainsDB
chainProcessors map[types.ChainID]*source.ChainProcessor
db *db.ChainsDB
}

var _ frontend.Backend = (*SupervisorBackend)(nil)

var _ io.Closer = (*SupervisorBackend)(nil)

var errAlreadyStopped = errors.New("already stopped")

func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
// attempt to prepare the data directory
if err := prepDataDir(cfg.Datadir); err != nil {
Expand All @@ -47,15 +50,15 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, logger)

// create an empty map of chain monitors
chainMonitors := make(map[types.ChainID]*source.ChainMonitor, len(cfg.L2RPCs))
chainProcessors := make(map[types.ChainID]*source.ChainProcessor, len(cfg.L2RPCs))

// create the supervisor backend
super := &SupervisorBackend{
logger: logger,
m: m,
dataDir: cfg.Datadir,
chainMonitors: chainMonitors,
db: db,
logger: logger,
m: m,
dataDir: cfg.Datadir,
chainProcessors: chainProcessors,
db: db,
}

// from the RPC strings, have the supervisor backend create a chain monitor
Expand All @@ -72,9 +75,9 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
// addFromRPC adds a chain monitor to the supervisor backend from an rpc endpoint
// it does not expect to be called after the backend has been started
// it will start the monitor if shouldStart is true
func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, rpc string, shouldStart bool) error {
func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, rpc string, _ bool) error {
// create the rpc client, which yields the chain id
rpcClient, chainID, err := createRpcClient(ctx, logger, rpc)
rpcClient, chainID, err := clientForL2(ctx, logger, rpc)
if err != nil {
return err
}
Expand All @@ -89,25 +92,29 @@ func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger,
if err != nil {
return fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
}
if su.chainMonitors[chainID] != nil {
if su.chainProcessors[chainID] != nil {
return fmt.Errorf("chain monitor for chain %v already exists", chainID)
}
monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient, su.db)
// create a client like the monitor would have
cl, err := source.NewL1Client(
ctx,
logger,
cm,
rpc,
rpcClient, 2*time.Second,
false,
sources.RPCKindStandard)
if err != nil {
return fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
}
// start the monitor if requested
if shouldStart {
if err := monitor.Start(); err != nil {
return fmt.Errorf("failed to start monitor for rpc %v: %w", rpc, err)
}
return err
}
su.chainMonitors[chainID] = monitor
logProcessor := source.NewLogProcessor(chainID, su.db)
chainProcessor := source.NewChainProcessor(logger, cl, chainID, logProcessor, su.db)
su.chainProcessors[chainID] = chainProcessor
su.db.AddLogDB(chainID, logDB)
return nil
}

func createRpcClient(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
func clientForL2(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
if err != nil {
return nil, types.ChainID{}, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
Expand All @@ -129,33 +136,19 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
if err := su.db.ResumeFromLastSealedBlock(); err != nil {
return fmt.Errorf("failed to resume chains db: %w", err)
}
// start chain monitors
for _, monitor := range su.chainMonitors {
if err := monitor.Start(); err != nil {
return fmt.Errorf("failed to start chain monitor: %w", err)
}
}
return nil
}

var errAlreadyStopped = errors.New("already stopped")

func (su *SupervisorBackend) Stop(ctx context.Context) error {
if !su.started.CompareAndSwap(true, false) {
return errAlreadyStopped
}
// collect errors from stopping chain monitors
var errs error
for _, monitor := range su.chainMonitors {
if err := monitor.Stop(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err))
}
// close all chain processors
for _, processor := range su.chainProcessors {
processor.Close()
}
// close the database
if err := su.db.Close(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to close database: %w", err))
}
return errs
return su.db.Close()
}

func (su *SupervisorBackend) Close() error {
Expand Down Expand Up @@ -227,6 +220,65 @@ func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.
return safest, nil
}

func (su *SupervisorBackend) UpdateLocalUnsafe(chainID types.ChainID, head eth.BlockRef) {
// l2 to l1 block ref
ref := eth.BlockRef{
ParentHash: head.ParentHash,
Hash: head.Hash,
Number: head.Number,
Time: head.Time,
}
ctx := context.Background()
su.chainProcessors[chainID].OnNewHead(ctx, ref)
su.db.UpdateLocalUnsafe(chainID, head)
}

func (su *SupervisorBackend) UpdateLocalSafe(chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) {
su.db.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
}

func (su *SupervisorBackend) UpdateFinalizedL1(chainID types.ChainID, finalized eth.BlockRef) {
su.db.UpdateFinalizedL1(finalized)
}

func (su *SupervisorBackend) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) {
u, xu, err := su.db.UnsafeView(chainID, unsafe)
if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to get unsafe view: %w", err)
}
return types.ReferenceView{
Local: eth.BlockID{
Hash: u.LastSealedBlockHash,
Number: u.LastSealedBlockNum,
},
Cross: eth.BlockID{
Hash: xu.LastSealedBlockHash,
Number: xu.LastSealedBlockNum,
},
}, nil
}

func (su *SupervisorBackend) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) {
s, xs, err := su.db.SafeView(chainID, safe)
if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to get safe view: %w", err)
}
return types.ReferenceView{
Local: eth.BlockID{
Hash: s.LastSealedBlockHash,
Number: s.LastSealedBlockNum,
},
Cross: eth.BlockID{
Hash: xs.LastSealedBlockHash,
Number: xs.LastSealedBlockNum,
},
}, nil
}

func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
return eth.BlockID{}, nil
}

func (su *SupervisorBackend) DerivedFrom(
ctx context.Context,
chainID types.ChainID,
Expand Down
45 changes: 45 additions & 0 deletions op-supervisor/supervisor/backend/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/safety"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
Expand Down Expand Up @@ -200,3 +201,47 @@ func (db *ChainsDB) Close() error {
}
return combined
}

func (db *ChainsDB) UpdateLocalUnsafe(chain types.ChainID, head eth.BlockRef) error {
err := db.safetyIndex.UpdateLocalUnsafe(chain, head)
if err != nil {
return fmt.Errorf("failed to update local-unsafe: %w", err)
}
return nil
}

func (db *ChainsDB) UpdateLocalSafe(chain types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
err := db.safetyIndex.UpdateLocalSafe(chain, derivedFrom, lastDerived)
if err != nil {
return fmt.Errorf("failed to update local-safe: %w", err)
}
return nil
}

func (db *ChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error {
return db.safetyIndex.UpdateFinalizeL1(finalized)
}

func (db *ChainsDB) UnsafeView(chainID types.ChainID, unsafe types.ReferenceView) (heads.HeadPointer, heads.HeadPointer, error) {
u, err := db.safetyIndex.UnsafeL2(chainID)
if err != nil {
return heads.HeadPointer{}, heads.HeadPointer{}, err
}
xu, err := db.safetyIndex.CrossUnsafeL2(chainID)
if err != nil {
return heads.HeadPointer{}, heads.HeadPointer{}, err
}
return u, xu, nil
}

func (db *ChainsDB) SafeView(chainID types.ChainID, unsafe types.ReferenceView) (heads.HeadPointer, heads.HeadPointer, error) {
s, err := db.safetyIndex.UnsafeL2(chainID)
if err != nil {
return heads.HeadPointer{}, heads.HeadPointer{}, err
}
xs, err := db.safetyIndex.CrossUnsafeL2(chainID)
if err != nil {
return heads.HeadPointer{}, heads.HeadPointer{}, err
}
return s, xs, nil
}
20 changes: 20 additions & 0 deletions op-supervisor/supervisor/backend/db/entrydb/entry_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,26 @@ const (
FlagPadding2 EntryTypeFlag = FlagPadding << 1
)

var FlagNames = map[EntryTypeFlag]string{
FlagSearchCheckpoint: "searchCheckpoint",
FlagCanonicalHash: "canonicalHash",
FlagInitiatingEvent: "initiatingEvent",
FlagExecutingLink: "executingLink",
FlagExecutingCheck: "executingCheck",
FlagPadding: "padding",
FlagPadding2: "padding2",
}

func EntryFlagsToStrings(flags EntryTypeFlag) []string {
res := make([]string, 0, 8)
for k, v := range FlagNames {
if flags.Any(k) {
res = append(res, v)
}
}
return res
}

func (ex EntryTypeFlag) Any(v EntryTypeFlag) bool {
return ex&v != 0
}
Expand Down
13 changes: 6 additions & 7 deletions op-supervisor/supervisor/backend/db/logs/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ func (l *logContext) ExecMessage() *types.ExecutingMessage {

func (l *logContext) HeadPointer() (heads.HeadPointer, error) {
if l.need != 0 {
return heads.HeadPointer{}, errors.New("cannot provide head pointer while state is incomplete")
needs := entrydb.EntryFlagsToStrings(l.need)
return heads.HeadPointer{}, fmt.Errorf("(LastSealedBlockHash) cannot provide head pointer while state is incomplete. needs: %v", needs)
}
return heads.HeadPointer{
LastSealedBlockHash: l.blockHash,
Expand Down Expand Up @@ -253,10 +254,9 @@ func (l *logContext) appendEntry(obj EntryObj) {
l.nextEntryIndex += 1
}

// infer advances the logContext in cases where multiple entries are to be appended implicitly
// depending on the last type of entry, a new entry is appended,
// or when the searchCheckpoint should be inserted.
// This can be done repeatedly until there is no more implied data to extend.
// infer advances the logContext in cases where complex entries contain multiple implied entries
// eg. a SearchCheckpoint implies a CannonicalHash will follow
// this also handles inserting the searchCheckpoint at the set frequency, and padding entries
func (l *logContext) infer() error {
// We force-insert a checkpoint whenever we hit the known fixed interval.
if l.nextEntryIndex%searchCheckpointFrequency == 0 {
Expand Down Expand Up @@ -322,8 +322,7 @@ func (l *logContext) infer() error {
return io.EOF
}

// inferFull advances the queued entries held by the log context repeatedly
// until no more implied entries can be added
// inferFul advances the logContext until it cannot infer any more entries.
func (l *logContext) inferFull() error {
for i := 0; i < 10; i++ {
err := l.infer()
Expand Down
Loading