Skip to content

Commit

Permalink
enable mempool recheck and remove mempool TTL
Browse files Browse the repository at this point in the history
Instead of the mempool TTL, enable recheck but only
execute the transactions every 12 blocks.

This is safer, since a valid mempool transaction won't
be removed by TTL.

And doing recheck every 12 blocks, probably not much more costly
than keeping track of all mempool transactions and runing a
garbage collector process.

Signed-off-by: p4u <[email protected]>
  • Loading branch information
p4u committed May 31, 2023
1 parent ba74644 commit 5381703
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 56 deletions.
59 changes: 6 additions & 53 deletions vochain/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"path/filepath"
"sync"
"sync/atomic"
"time"

Expand All @@ -31,10 +30,9 @@ import (
)

const (
// MempoolLaunchPruneIntervalBlocks is the number of blocks after which the mempool is pruned.
MempoolLaunchPruneIntervalBlocks = 12 // 2 minutes
// MempoolTxTTLBlocks is the maximum live time in blocks for a transaction before is pruned from the mempool.
MempoolTxTTLBlocks = 60 // 10 minutes
// recheckTxHeightInterval is the number of blocks after which the mempool is
// checked for transactions to be rechecked.
recheckTxHeightInterval = 12
)

var (
Expand All @@ -54,13 +52,6 @@ type BaseApplication struct {
// data races when querying about the sync status of the blockchain.
isSynchronizing atomic.Bool

// mempoolTxRef is a map of tx hashes to the block height when they were added to the mempool.
mempoolTxRef map[[32]byte]uint32
// mempoolTxRefLock is a mutex to protect the mempoolTxRef map.
mempoolTxRefLock sync.Mutex
// mempoolTxRefToGC is a slice of tx hashes to be removed from the mempoolTxRef map on Commit().
mempoolTxRefToGC [][32]byte

// Callback blockchain functions
fnGetBlockByHeight func(height int64) *tmtypes.Block
fnGetBlockByHash func(hash []byte) *tmtypes.Block
Expand Down Expand Up @@ -120,7 +111,6 @@ func NewBaseApplication(dbType, dbpath string) (*BaseApplication, error) {
dataDir: dbpath,
circuitConfigTag: circuit.DefaultCircuitConfigurationTag,
genesisInfo: &tmtypes.GenesisDoc{},
mempoolTxRef: make(map[[32]byte]uint32),
}, nil
}

Expand All @@ -133,30 +123,6 @@ func (app *BaseApplication) BeginBlock(req abcitypes.RequestBeginBlock) abcitype
app.isSynchronizing.Store(false)
}
}
if app.Height()%MempoolLaunchPruneIntervalBlocks == 0 {
// remove all expired txs from mempool
count := 0
app.mempoolTxRefLock.Lock()
for txKey, height := range app.mempoolTxRef {
if height+MempoolTxTTLBlocks > app.Height() {
if app.fnMempoolPrune != nil {
if err := app.fnMempoolPrune(txKey); err != nil {
log.Warnw("mempool prune", "err", err.Error(), "tx", hex.EncodeToString(txKey[:]))
}
}
count++
delete(app.mempoolTxRef, txKey)
}
}
app.mempoolTxRefLock.Unlock()
if count > 0 {
log.Infow("mempool prune", "txs", count, "height", app.Height())
}
}

app.mempoolTxRefLock.Lock()
app.mempoolTxRefToGC = [][32]byte{}
app.mempoolTxRefLock.Unlock()
return app.fnBeginBlock(req)
}

Expand Down Expand Up @@ -299,7 +265,9 @@ func (app *BaseApplication) InitChain(req abcitypes.RequestInitChain) abcitypes.
func (app *BaseApplication) CheckTx(req abcitypes.RequestCheckTx) abcitypes.ResponseCheckTx {
height := app.Height()
if req.Type == abcitypes.CheckTxType_Recheck {
return abcitypes.ResponseCheckTx{Code: 0}
if height%recheckTxHeightInterval != 0 {
return abcitypes.ResponseCheckTx{Code: 0}
}
}
tx := new(vochaintx.Tx)
if err := tx.Unmarshal(req.Tx, app.ChainID()); err != nil {
Expand All @@ -313,11 +281,6 @@ func (app *BaseApplication) CheckTx(req abcitypes.RequestCheckTx) abcitypes.Resp
log.Errorw(err, "checkTx")
return abcitypes.ResponseCheckTx{Code: 1, Data: []byte("checkTx " + err.Error())}
}
// add tx to mempool reference map for recheck prunning
app.mempoolTxRefLock.Lock()
app.mempoolTxRef[tx.TxID] = height
app.mempoolTxRefLock.Unlock()

return abcitypes.ResponseCheckTx{
Code: 0,
Data: response.Data,
Expand All @@ -340,10 +303,6 @@ func (app *BaseApplication) DeliverTx(req abcitypes.RequestDeliverTx) abcitypes.
"height", app.Height(),
"tx", tx.Tx,
)
// add tx to mempool reference map for prunning on Commit()
app.mempoolTxRefLock.Lock()
app.mempoolTxRefToGC = append(app.mempoolTxRefToGC, tx.TxID)
app.mempoolTxRefLock.Unlock()
// check tx is correct on the current state
response, err := app.TransactionHandler.CheckTx(tx, true)
if err != nil {
Expand Down Expand Up @@ -383,12 +342,6 @@ func (app *BaseApplication) Commit() abcitypes.ResponseCommit {
log.Infof("snapshot created successfully, took %s", time.Since(startTime))
log.Debugf("%+v", app.State.ListSnapshots())
}
// prune pending mempool tx references
app.mempoolTxRefLock.Lock()
for _, txID := range app.mempoolTxRefToGC {
delete(app.mempoolTxRef, txID)
}
app.mempoolTxRefLock.Unlock()
if app.State.TxCounter() > 0 {
log.Infow("commit block", "height", app.Height(), "txs", app.State.TxCounter())
}
Expand Down
7 changes: 4 additions & 3 deletions vochain/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ func newTendermint(app *BaseApplication,
}

// p2p config
tconfig.P2P.MaxPacketMsgPayloadSize = 4096 // 4KB
/*tconfig.P2P.MaxPacketMsgPayloadSize = 4096 // 4KB
tconfig.P2P.RecvRate = 20480000 // 20MB/s
tconfig.P2P.SendRate = 20480000 // 20MB/s
tcAonfig.P2P.SendRate = 20480000 // 20MB/s
*/
tconfig.P2P.ExternalAddress = localConfig.PublicAddr

if localConfig.Dev {
Expand Down Expand Up @@ -185,7 +186,7 @@ func newTendermint(app *BaseApplication,
// mempool config
tconfig.Mempool.Version = "v0"
tconfig.Mempool.Size = localConfig.MempoolSize
tconfig.Mempool.Recheck = false
tconfig.Mempool.Recheck = true
tconfig.Mempool.KeepInvalidTxsInCache = false
tconfig.Mempool.MaxTxBytes = 1024 * 100 // 100 KiB
tconfig.Mempool.MaxTxsBytes = int64(tconfig.Mempool.Size * tconfig.Mempool.MaxTxBytes)
Expand Down

0 comments on commit 5381703

Please sign in to comment.