diff --git a/op-supervisor/supervisor/backend/backend.go b/op-supervisor/supervisor/backend/backend.go index f9b97bb2c5c6..994c80751fee 100644 --- a/op-supervisor/supervisor/backend/backend.go +++ b/op-supervisor/supervisor/backend/backend.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-supervisor/config" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs" @@ -29,8 +30,8 @@ type SupervisorBackend struct { m Metrics dataDir string - chainMonitors map[types.ChainID]*source.ChainMonitor - db *db.ChainsDB + chainProcessors map[types.ChainID]*source.ChainProcessor + db *db.ChainsDB } var _ frontend.Backend = (*SupervisorBackend)(nil) @@ -49,15 +50,15 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, logger) // create an empty map of chain monitors - chainMonitors := make(map[types.ChainID]*source.ChainMonitor, len(cfg.L2RPCs)) + chainProcessors := make(map[types.ChainID]*source.ChainProcessor, len(cfg.L2RPCs)) // create the supervisor backend super := &SupervisorBackend{ - logger: logger, - m: m, - dataDir: cfg.Datadir, - chainMonitors: chainMonitors, - db: db, + logger: logger, + m: m, + dataDir: cfg.Datadir, + chainProcessors: chainProcessors, + db: db, } // from the RPC strings, have the supervisor backend create a chain monitor @@ -74,7 +75,7 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg // addFromRPC adds a chain monitor to the supervisor backend from an rpc endpoint // it does not expect to be called after the backend has been started // it will start the monitor if shouldStart is true -func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, rpc string, shouldStart bool) error { +func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, rpc string, _ bool) error { // create the rpc client, which yields the chain id rpcClient, chainID, err := clientForL2(ctx, logger, rpc) if err != nil { @@ -91,20 +92,24 @@ func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, if err != nil { return fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err) } - if su.chainMonitors[chainID] != nil { + if su.chainProcessors[chainID] != nil { return fmt.Errorf("chain monitor for chain %v already exists", chainID) } - monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient, su.db) + // create a client like the monitor would have + cl, err := source.NewL1Client( + ctx, + logger, + cm, + rpc, + rpcClient, 2*time.Second, + false, + sources.RPCKindStandard) if err != nil { - return fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err) - } - // start the monitor if requested - if shouldStart { - if err := monitor.Start(); err != nil { - return fmt.Errorf("failed to start monitor for rpc %v: %w", rpc, err) - } + return err } - su.chainMonitors[chainID] = monitor + logProcessor := source.NewLogProcessor(chainID, su.db) + chainProcessor := source.NewChainProcessor(logger, cl, chainID, logProcessor, su.db) + su.chainProcessors[chainID] = chainProcessor su.db.AddLogDB(chainID, logDB) return nil } @@ -131,12 +136,6 @@ func (su *SupervisorBackend) Start(ctx context.Context) error { if err := su.db.ResumeFromLastSealedBlock(); err != nil { return fmt.Errorf("failed to resume chains db: %w", err) } - // start chain monitors - for _, monitor := range su.chainMonitors { - if err := monitor.Start(); err != nil { - return fmt.Errorf("failed to start chain monitor: %w", err) - } - } return nil } @@ -144,18 +143,12 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error { if !su.started.CompareAndSwap(true, false) { return errAlreadyStopped } - // collect errors from stopping chain monitors - var errs error - for _, monitor := range su.chainMonitors { - if err := monitor.Stop(); err != nil { - errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err)) - } + // close all chain processors + for _, processor := range su.chainProcessors { + processor.Close() } // close the database - if err := su.db.Close(); err != nil { - errs = errors.Join(errs, fmt.Errorf("failed to close database: %w", err)) - } - return errs + return su.db.Close() } func (su *SupervisorBackend) Close() error { @@ -228,6 +221,15 @@ func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common. } func (su *SupervisorBackend) UpdateLocalUnsafe(chainID types.ChainID, head eth.L2BlockRef) { + // l2 to l1 block ref + ref := eth.L1BlockRef{ + ParentHash: head.ParentHash, + Hash: head.Hash, + Number: head.Number, + Time: head.Time, + } + ctx := context.Background() + su.chainProcessors[chainID].OnNewHead(ctx, ref) su.db.UpdateLocalUnsafe(chainID, head) } diff --git a/op-supervisor/supervisor/backend/source/chain.go b/op-supervisor/supervisor/backend/source/chain.go index 383a5fb74de8..590372abf9c6 100644 --- a/op-supervisor/supervisor/backend/source/chain.go +++ b/op-supervisor/supervisor/backend/source/chain.go @@ -10,67 +10,9 @@ import ( "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources/caching" - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" ) -// TODO(optimism#11032) Make these configurable and a sensible default -const epochPollInterval = 3 * time.Second -const pollInterval = 2 * time.Second -const trustRpc = false -const rpcKind = sources.RPCKindStandard - -type Metrics interface { - caching.Metrics -} - -type Storage interface { - ChainsDBClientForLogProcessor - DatabaseRewinder - LatestBlockNum(chainID types.ChainID) (num uint64, ok bool) -} - -// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform -// interop consolidation. It detects and notifies when reorgs occur. -type ChainMonitor struct { - log log.Logger - headMonitor *HeadMonitor - chainProcessor *ChainProcessor -} - -func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID types.ChainID, rpc string, client client.RPC, store Storage) (*ChainMonitor, error) { - logger = logger.New("chainID", chainID) - cl, err := newClient(ctx, logger, m, rpc, client, pollInterval, trustRpc, rpcKind) - if err != nil { - return nil, err - } - - // Create the log processor and fetcher - processLogs := newLogProcessor(chainID, store) - unsafeBlockProcessor := NewChainProcessor(logger, cl, chainID, processLogs, store) - - unsafeProcessors := []HeadProcessor{unsafeBlockProcessor} - - callback := newHeadUpdateProcessor(logger, unsafeProcessors, nil, nil) - headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, callback) - - return &ChainMonitor{ - log: logger, - headMonitor: headMonitor, - chainProcessor: unsafeBlockProcessor, - }, nil -} - -func (c *ChainMonitor) Start() error { - c.log.Info("Started monitoring chain") - return c.headMonitor.Start() -} - -func (c *ChainMonitor) Stop() error { - c.chainProcessor.Close() - return c.headMonitor.Stop() -} - -func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient client.RPC, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) { +func NewL1Client(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient client.RPC, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) { c, err := client.NewRPCWithClient(ctx, logger, rpc, rpcClient, pollRate) if err != nil { return nil, fmt.Errorf("failed to create new RPC client: %w", err) diff --git a/op-supervisor/supervisor/backend/source/log_processor.go b/op-supervisor/supervisor/backend/source/log_processor.go index 8a815f7ca9e9..8d5a1238970f 100644 --- a/op-supervisor/supervisor/backend/source/log_processor.go +++ b/op-supervisor/supervisor/backend/source/log_processor.go @@ -34,7 +34,7 @@ type logProcessor struct { eventDecoder EventDecoder } -func newLogProcessor(chain types.ChainID, logStore LogStorage) *logProcessor { +func NewLogProcessor(chain types.ChainID, logStore LogStorage) *logProcessor { return &logProcessor{ chain: chain, logStore: logStore, diff --git a/op-supervisor/supervisor/backend/source/log_processor_test.go b/op-supervisor/supervisor/backend/source/log_processor_test.go index 6e96d731fcff..64f25f274af1 100644 --- a/op-supervisor/supervisor/backend/source/log_processor_test.go +++ b/op-supervisor/supervisor/backend/source/log_processor_test.go @@ -25,7 +25,7 @@ func TestLogProcessor(t *testing.T) { } t.Run("NoOutputWhenLogsAreEmpty", func(t *testing.T) { store := &stubLogStorage{} - processor := newLogProcessor(logProcessorChainID, store) + processor := NewLogProcessor(logProcessorChainID, store) err := processor.ProcessLogs(ctx, block1, ethTypes.Receipts{}) require.NoError(t, err) @@ -59,7 +59,7 @@ func TestLogProcessor(t *testing.T) { }, } store := &stubLogStorage{} - processor := newLogProcessor(logProcessorChainID, store) + processor := NewLogProcessor(logProcessorChainID, store) err := processor.ProcessLogs(ctx, block1, rcpts) require.NoError(t, err) @@ -115,7 +115,7 @@ func TestLogProcessor(t *testing.T) { Hash: common.Hash{0xaa}, } store := &stubLogStorage{} - processor := newLogProcessor(types.ChainID{4}, store) + processor := NewLogProcessor(types.ChainID{4}, store) processor.eventDecoder = EventDecoderFn(func(l *ethTypes.Log) (types.ExecutingMessage, error) { require.Equal(t, rcpts[0].Logs[0], l) return execMsg, nil diff --git a/op-supervisor/supervisor/service_test.go b/op-supervisor/supervisor/service_test.go index 8cc4dcfa6678..dc7e3514ca29 100644 --- a/op-supervisor/supervisor/service_test.go +++ b/op-supervisor/supervisor/service_test.go @@ -6,11 +6,8 @@ import ( "time" "github.com/ethereum-optimism/optimism/op-supervisor/config" - "github.com/holiman/uint256" "github.com/stretchr/testify/require" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" "github.com/ethereum-optimism/optimism/op-service/dial" @@ -61,12 +58,18 @@ func TestSupervisorService(t *testing.T) { cl, err := dial.DialRPCClientWithTimeout(context.Background(), time.Second*5, logger, endpoint) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - var dest types.SafetyLevel - err = cl.CallContext(ctx, &dest, "supervisor_checkBlock", - (*hexutil.U256)(uint256.NewInt(1)), common.Hash{0xab}, hexutil.Uint64(123)) + var result types.ReferenceView + chainID := types.ChainIDFromUInt64(1) + unsafe := types.ReferenceView{} + err = cl.CallContext( + ctx, + &result, + "supervisor_unsafeView", + chainID, + unsafe) cancel() require.NoError(t, err) - require.Equal(t, types.CrossUnsafe, dest, "expecting mock to return cross-unsafe") + require.Equal(t, types.ReferenceView{}, result, "expecting mock to return empty reference view") cl.Close() } require.NoError(t, supervisor.Stop(context.Background()), "stop service")