Skip to content

Commit

Permalink
Remove Chain Monitor ; Trigger Chain Processor on Local Unsafe Update
Browse files Browse the repository at this point in the history
  • Loading branch information
axelKingsley committed Oct 1, 2024
1 parent 64214ea commit ead22d1
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 105 deletions.
72 changes: 37 additions & 35 deletions op-supervisor/supervisor/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
Expand All @@ -29,8 +30,8 @@ type SupervisorBackend struct {
m Metrics
dataDir string

chainMonitors map[types.ChainID]*source.ChainMonitor
db *db.ChainsDB
chainProcessors map[types.ChainID]*source.ChainProcessor
db *db.ChainsDB
}

var _ frontend.Backend = (*SupervisorBackend)(nil)
Expand All @@ -49,15 +50,15 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, logger)

// create an empty map of chain monitors
chainMonitors := make(map[types.ChainID]*source.ChainMonitor, len(cfg.L2RPCs))
chainProcessors := make(map[types.ChainID]*source.ChainProcessor, len(cfg.L2RPCs))

// create the supervisor backend
super := &SupervisorBackend{
logger: logger,
m: m,
dataDir: cfg.Datadir,
chainMonitors: chainMonitors,
db: db,
logger: logger,
m: m,
dataDir: cfg.Datadir,
chainProcessors: chainProcessors,
db: db,
}

// from the RPC strings, have the supervisor backend create a chain monitor
Expand All @@ -74,7 +75,7 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
// addFromRPC adds a chain monitor to the supervisor backend from an rpc endpoint
// it does not expect to be called after the backend has been started
// it will start the monitor if shouldStart is true
func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, rpc string, shouldStart bool) error {
func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, rpc string, _ bool) error {
// create the rpc client, which yields the chain id
rpcClient, chainID, err := clientForL2(ctx, logger, rpc)
if err != nil {
Expand All @@ -91,20 +92,24 @@ func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger,
if err != nil {
return fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
}
if su.chainMonitors[chainID] != nil {
if su.chainProcessors[chainID] != nil {
return fmt.Errorf("chain monitor for chain %v already exists", chainID)
}
monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient, su.db)
// create a client like the monitor would have
cl, err := source.NewL1Client(
ctx,
logger,
cm,
rpc,
rpcClient, 2*time.Second,
false,
sources.RPCKindStandard)
if err != nil {
return fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
}
// start the monitor if requested
if shouldStart {
if err := monitor.Start(); err != nil {
return fmt.Errorf("failed to start monitor for rpc %v: %w", rpc, err)
}
return err
}
su.chainMonitors[chainID] = monitor
logProcessor := source.NewLogProcessor(chainID, su.db)
chainProcessor := source.NewChainProcessor(logger, cl, chainID, logProcessor, su.db)
su.chainProcessors[chainID] = chainProcessor
su.db.AddLogDB(chainID, logDB)
return nil
}
Expand All @@ -131,31 +136,19 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
if err := su.db.ResumeFromLastSealedBlock(); err != nil {
return fmt.Errorf("failed to resume chains db: %w", err)
}
// start chain monitors
for _, monitor := range su.chainMonitors {
if err := monitor.Start(); err != nil {
return fmt.Errorf("failed to start chain monitor: %w", err)
}
}
return nil
}

func (su *SupervisorBackend) Stop(ctx context.Context) error {
if !su.started.CompareAndSwap(true, false) {
return errAlreadyStopped
}
// collect errors from stopping chain monitors
var errs error
for _, monitor := range su.chainMonitors {
if err := monitor.Stop(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err))
}
// close all chain processors
for _, processor := range su.chainProcessors {
processor.Close()
}
// close the database
if err := su.db.Close(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to close database: %w", err))
}
return errs
return su.db.Close()
}

func (su *SupervisorBackend) Close() error {
Expand Down Expand Up @@ -228,6 +221,15 @@ func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.
}

func (su *SupervisorBackend) UpdateLocalUnsafe(chainID types.ChainID, head eth.L2BlockRef) {
// l2 to l1 block ref
ref := eth.L1BlockRef{
ParentHash: head.ParentHash,
Hash: head.Hash,
Number: head.Number,
Time: head.Time,
}
ctx := context.Background()
su.chainProcessors[chainID].OnNewHead(ctx, ref)
su.db.UpdateLocalUnsafe(chainID, head)
}

Expand Down
60 changes: 1 addition & 59 deletions op-supervisor/supervisor/backend/source/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,67 +10,9 @@ import (
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)

// TODO(optimism#11032) Make these configurable and a sensible default
const epochPollInterval = 3 * time.Second
const pollInterval = 2 * time.Second
const trustRpc = false
const rpcKind = sources.RPCKindStandard

type Metrics interface {
caching.Metrics
}

type Storage interface {
ChainsDBClientForLogProcessor
DatabaseRewinder
LatestBlockNum(chainID types.ChainID) (num uint64, ok bool)
}

// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform
// interop consolidation. It detects and notifies when reorgs occur.
type ChainMonitor struct {
log log.Logger
headMonitor *HeadMonitor
chainProcessor *ChainProcessor
}

func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID types.ChainID, rpc string, client client.RPC, store Storage) (*ChainMonitor, error) {
logger = logger.New("chainID", chainID)
cl, err := newClient(ctx, logger, m, rpc, client, pollInterval, trustRpc, rpcKind)
if err != nil {
return nil, err
}

// Create the log processor and fetcher
processLogs := newLogProcessor(chainID, store)
unsafeBlockProcessor := NewChainProcessor(logger, cl, chainID, processLogs, store)

unsafeProcessors := []HeadProcessor{unsafeBlockProcessor}

callback := newHeadUpdateProcessor(logger, unsafeProcessors, nil, nil)
headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, callback)

return &ChainMonitor{
log: logger,
headMonitor: headMonitor,
chainProcessor: unsafeBlockProcessor,
}, nil
}

func (c *ChainMonitor) Start() error {
c.log.Info("Started monitoring chain")
return c.headMonitor.Start()
}

func (c *ChainMonitor) Stop() error {
c.chainProcessor.Close()
return c.headMonitor.Stop()
}

func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient client.RPC, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) {
func NewL1Client(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient client.RPC, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) {
c, err := client.NewRPCWithClient(ctx, logger, rpc, rpcClient, pollRate)
if err != nil {
return nil, fmt.Errorf("failed to create new RPC client: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion op-supervisor/supervisor/backend/source/log_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type logProcessor struct {
eventDecoder EventDecoder
}

func newLogProcessor(chain types.ChainID, logStore LogStorage) *logProcessor {
func NewLogProcessor(chain types.ChainID, logStore LogStorage) *logProcessor {
return &logProcessor{
chain: chain,
logStore: logStore,
Expand Down
6 changes: 3 additions & 3 deletions op-supervisor/supervisor/backend/source/log_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestLogProcessor(t *testing.T) {
}
t.Run("NoOutputWhenLogsAreEmpty", func(t *testing.T) {
store := &stubLogStorage{}
processor := newLogProcessor(logProcessorChainID, store)
processor := NewLogProcessor(logProcessorChainID, store)

err := processor.ProcessLogs(ctx, block1, ethTypes.Receipts{})
require.NoError(t, err)
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestLogProcessor(t *testing.T) {
},
}
store := &stubLogStorage{}
processor := newLogProcessor(logProcessorChainID, store)
processor := NewLogProcessor(logProcessorChainID, store)

err := processor.ProcessLogs(ctx, block1, rcpts)
require.NoError(t, err)
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestLogProcessor(t *testing.T) {
Hash: common.Hash{0xaa},
}
store := &stubLogStorage{}
processor := newLogProcessor(types.ChainID{4}, store)
processor := NewLogProcessor(types.ChainID{4}, store)
processor.eventDecoder = EventDecoderFn(func(l *ethTypes.Log) (types.ExecutingMessage, error) {
require.Equal(t, rcpts[0].Logs[0], l)
return execMsg, nil
Expand Down
17 changes: 10 additions & 7 deletions op-supervisor/supervisor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ import (
"time"

"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/holiman/uint256"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-service/dial"
Expand Down Expand Up @@ -61,12 +58,18 @@ func TestSupervisorService(t *testing.T) {
cl, err := dial.DialRPCClientWithTimeout(context.Background(), time.Second*5, logger, endpoint)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
var dest types.SafetyLevel
err = cl.CallContext(ctx, &dest, "supervisor_checkBlock",
(*hexutil.U256)(uint256.NewInt(1)), common.Hash{0xab}, hexutil.Uint64(123))
var result types.ReferenceView
chainID := types.ChainIDFromUInt64(1)
unsafe := types.ReferenceView{}
err = cl.CallContext(
ctx,
&result,
"supervisor_unsafeView",
chainID,
unsafe)
cancel()
require.NoError(t, err)
require.Equal(t, types.CrossUnsafe, dest, "expecting mock to return cross-unsafe")
require.Equal(t, types.ReferenceView{}, result, "expecting mock to return empty reference view")
cl.Close()
}
require.NoError(t, supervisor.Stop(context.Background()), "stop service")
Expand Down

0 comments on commit ead22d1

Please sign in to comment.