Skip to content

Commit ca7281e

Browse files
Ruteriavalonche
authored andcommitted
Remove megabundles as they are no longer needed (ethereum#20)
1 parent f965da2 commit ca7281e

File tree

8 files changed

+16
-228
lines changed

8 files changed

+16
-228
lines changed

.github/workflows/go.yml

+2-9
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ jobs:
5454
uses: actions/checkout@v2
5555
with:
5656
repository: flashbots/mev-geth-demo
57+
ref: no-megabundles
5758
path: e2e
5859

5960
- run: cd e2e && yarn install
@@ -71,17 +72,9 @@ jobs:
7172
cd e2e
7273
GETH=`pwd`/../build/bin/geth ./run.sh &
7374
# Second node, not mining
74-
P2P_PORT=30302 DATADIR=datadir2 HTTP_PORT=8546 MINER_ARGS='--nodiscover' GETH=`pwd`/../build/bin/geth ./run.sh &
75+
P2P_PORT=30302 DATADIR=datadir2 HTTP_PORT=8546 AUTH_PORT=8552 MINER_ARGS='--nodiscover' GETH=`pwd`/../build/bin/geth ./run.sh &
7576
sleep 15
7677
DATADIR1=datadir DATADIR2=datadir2 GETH=`pwd`/../build/bin/geth ./peer_nodes.sh
7778
sleep 15
7879
yarn run demo-private-tx
7980
pkill -9 geth || true
80-
- name: Run megabundle-only node checking for reverts
81-
run: |
82-
cd e2e
83-
# Disable bundle workers
84-
MINER_ARGS='--miner.etherbase=0xd912aecb07e9f4e1ea8e6b4779e7fb6aa1c3e4d8 --miner.trustedrelays=0xfb11e78C4DaFec86237c2862441817701fdf197F --mine --miner.threads=2 --miner.maxmergedbundles=0' GETH=`pwd`/../build/bin/geth ./run.sh &
85-
sleep 15
86-
yarn run e2e-reverting-megabundle
87-
pkill -9 geth || true

core/txpool/txpool.go

-53
Original file line numberDiff line numberDiff line change
@@ -640,59 +640,6 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m
640640
return nil
641641
}
642642

643-
// AddMegaBundle adds a megabundle to the pool. Assumes the relay signature has been verified already.
644-
func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
645-
pool.mu.Lock()
646-
defer pool.mu.Unlock()
647-
648-
fromTrustedRelay := false
649-
for _, trustedAddr := range pool.config.TrustedRelays {
650-
if relayAddr == trustedAddr {
651-
fromTrustedRelay = true
652-
}
653-
}
654-
if !fromTrustedRelay {
655-
return errors.New("megabundle from non-trusted address")
656-
}
657-
658-
megabundle := types.MevBundle{
659-
Txs: txs,
660-
BlockNumber: blockNumber,
661-
MinTimestamp: minTimestamp,
662-
MaxTimestamp: maxTimestamp,
663-
RevertingTxHashes: revertingTxHashes,
664-
}
665-
666-
pool.megabundles[relayAddr] = megabundle
667-
668-
for _, hook := range pool.NewMegabundleHooks {
669-
go hook(relayAddr, &megabundle)
670-
}
671-
672-
return nil
673-
}
674-
675-
// GetMegabundle returns the latest megabundle submitted by a given relay.
676-
func (pool *TxPool) GetMegabundle(relayAddr common.Address, blockNumber *big.Int, blockTimestamp uint64) (types.MevBundle, error) {
677-
pool.mu.Lock()
678-
defer pool.mu.Unlock()
679-
680-
megabundle, ok := pool.megabundles[relayAddr]
681-
if !ok {
682-
return types.MevBundle{}, errors.New("No megabundle found")
683-
}
684-
if megabundle.BlockNumber.Cmp(blockNumber) != 0 {
685-
return types.MevBundle{}, errors.New("Megabundle does not fit blockNumber constraints")
686-
}
687-
if megabundle.MinTimestamp != 0 && megabundle.MinTimestamp > blockTimestamp {
688-
return types.MevBundle{}, errors.New("Megabundle does not fit minTimestamp constraints")
689-
}
690-
if megabundle.MaxTimestamp != 0 && megabundle.MaxTimestamp < blockTimestamp {
691-
return types.MevBundle{}, errors.New("Megabundle does not fit maxTimestamp constraints")
692-
}
693-
return megabundle, nil
694-
}
695-
696643
// Locals retrieves the accounts currently considered local by the pool.
697644
func (pool *TxPool) Locals() []common.Address {
698645
pool.mu.Lock()

eth/api_backend.go

-4
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,6 @@ func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions,
285285
return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes)
286286
}
287287

288-
func (b *EthAPIBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error {
289-
return b.eth.txPool.AddMegabundle(relayAddr, txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes)
290-
}
291-
292288
func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
293289
pending := b.eth.txPool.Pending(false)
294290
var txs types.Transactions

internal/ethapi/api.go

-74
Original file line numberDiff line numberDiff line change
@@ -2133,25 +2133,6 @@ type SendBundleArgs struct {
21332133
RevertingTxHashes []common.Hash `json:"revertingTxHashes"`
21342134
}
21352135

2136-
// SendMegabundleArgs represents the arguments for a SendMegabundle call.
2137-
type SendMegabundleArgs struct {
2138-
Txs []hexutil.Bytes `json:"txs"`
2139-
BlockNumber uint64 `json:"blockNumber"`
2140-
MinTimestamp *uint64 `json:"minTimestamp"`
2141-
MaxTimestamp *uint64 `json:"maxTimestamp"`
2142-
RevertingTxHashes []common.Hash `json:"revertingTxHashes"`
2143-
RelaySignature hexutil.Bytes `json:"relaySignature"`
2144-
}
2145-
2146-
// UnsignedMegabundle is used for serialization and subsequent digital signing.
2147-
type UnsignedMegabundle struct {
2148-
Txs []hexutil.Bytes
2149-
BlockNumber uint64
2150-
MinTimestamp uint64
2151-
MaxTimestamp uint64
2152-
RevertingTxHashes []common.Hash
2153-
}
2154-
21552136
// SendBundle will add the signed transaction to the transaction pool.
21562137
// The sender is responsible for signing the transaction and using the correct nonce and ensuring validity
21572138
func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs) error {
@@ -2182,61 +2163,6 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs
21822163
return s.b.SendBundle(ctx, txs, args.BlockNumber, minTimestamp, maxTimestamp, args.RevertingTxHashes)
21832164
}
21842165

2185-
// Recovers the Ethereum address of the trusted relay that signed the megabundle.
2186-
func RecoverRelayAddress(args SendMegabundleArgs) (common.Address, error) {
2187-
megabundle := UnsignedMegabundle{Txs: args.Txs, BlockNumber: args.BlockNumber, RevertingTxHashes: args.RevertingTxHashes}
2188-
if args.MinTimestamp != nil {
2189-
megabundle.MinTimestamp = *args.MinTimestamp
2190-
} else {
2191-
megabundle.MinTimestamp = 0
2192-
}
2193-
if args.MaxTimestamp != nil {
2194-
megabundle.MaxTimestamp = *args.MaxTimestamp
2195-
} else {
2196-
megabundle.MaxTimestamp = 0
2197-
}
2198-
rlpEncoding, _ := rlp.EncodeToBytes(megabundle)
2199-
signature := args.RelaySignature
2200-
signature[64] -= 27 // account for Ethereum V
2201-
recoveredPubkey, err := crypto.SigToPub(accounts.TextHash(rlpEncoding), args.RelaySignature)
2202-
if err != nil {
2203-
return common.Address{}, err
2204-
}
2205-
return crypto.PubkeyToAddress(*recoveredPubkey), nil
2206-
}
2207-
2208-
// SendMegabundle will add the signed megabundle to one of the workers for evaluation.
2209-
func (s *PrivateTxBundleAPI) SendMegabundle(ctx context.Context, args SendMegabundleArgs) error {
2210-
log.Info("Received a Megabundle request", "signature", args.RelaySignature)
2211-
var txs types.Transactions
2212-
if len(args.Txs) == 0 {
2213-
return errors.New("megabundle missing txs")
2214-
}
2215-
if args.BlockNumber == 0 {
2216-
return errors.New("megabundle missing blockNumber")
2217-
}
2218-
for _, encodedTx := range args.Txs {
2219-
tx := new(types.Transaction)
2220-
if err := tx.UnmarshalBinary(encodedTx); err != nil {
2221-
return err
2222-
}
2223-
txs = append(txs, tx)
2224-
}
2225-
var minTimestamp, maxTimestamp uint64
2226-
if args.MinTimestamp != nil {
2227-
minTimestamp = *args.MinTimestamp
2228-
}
2229-
if args.MaxTimestamp != nil {
2230-
maxTimestamp = *args.MaxTimestamp
2231-
}
2232-
relayAddr, err := RecoverRelayAddress(args)
2233-
log.Info("Megabundle", "relayAddr", relayAddr, "err", err)
2234-
if err != nil {
2235-
return err
2236-
}
2237-
return s.b.SendMegabundle(ctx, txs, rpc.BlockNumber(args.BlockNumber), minTimestamp, maxTimestamp, args.RevertingTxHashes, relayAddr)
2238-
}
2239-
22402166
// BundleAPI offers an API for accepting bundled transactions
22412167
type BundleAPI struct {
22422168
b Backend

internal/ethapi/backend.go

-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ type Backend interface {
7676
// Transaction pool API
7777
SendTx(ctx context.Context, signedTx *types.Transaction, private bool) error
7878
SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error
79-
SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error
8079
GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)
8180
GetPoolTransactions() (types.Transactions, error)
8281
GetPoolTransaction(txHash common.Hash) *types.Transaction

internal/web3ext/web3ext.go

-5
Original file line numberDiff line numberDiff line change
@@ -617,11 +617,6 @@ web3._extend({
617617
params: 3,
618618
inputFormatter: [web3._extend.formatters.inputCallFormatter, web3._extend.formatters.inputDefaultBlockNumberFormatter, null],
619619
}),
620-
new web3._extend.Method({
621-
name: 'sendMegabundle',
622-
call: 'eth_sendMegabundle',
623-
params: 1
624-
}),
625620
],
626621
properties: [
627622
new web3._extend.Property({

miner/multi_worker.go

+8-35
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (w *multiWorker) GetSealingBlockAsync(parent common.Hash, timestamp uint64,
9797
for _, worker := range append(w.workers, w.regularWorker) {
9898
resCh, errCh, err := worker.getSealingBlock(parent, timestamp, coinbase, gasLimit, random, noTxs, noExtra)
9999
if err != nil {
100-
log.Error("could not start async block construction", "isFlashbotsWorker", worker.flashbots.isFlashbots, "isMegabundleWorker", worker.flashbots.isMegabundleWorker, "#bundles", worker.flashbots.maxMergedBundles)
100+
log.Error("could not start async block construction", "isFlashbotsWorker", worker.flashbots.isFlashbots, "#bundles", worker.flashbots.maxMergedBundles)
101101
continue
102102
}
103103
resChans = append(resChans, resChPair{resCh, errCh})
@@ -153,38 +153,12 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons
153153
for i := 1; i <= config.MaxMergedBundles; i++ {
154154
workers = append(workers,
155155
newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{
156-
isFlashbots: true,
157-
isMegabundleWorker: false,
158-
queue: queue,
159-
maxMergedBundles: i,
156+
isFlashbots: true,
157+
queue: queue,
158+
maxMergedBundles: i,
160159
}))
161160
}
162161

163-
relayWorkerMap := make(map[common.Address]*worker)
164-
165-
for i := 0; i < len(config.TrustedRelays); i++ {
166-
relayWorker := newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{
167-
isFlashbots: true,
168-
isMegabundleWorker: true,
169-
queue: queue,
170-
relayAddr: config.TrustedRelays[i],
171-
})
172-
workers = append(workers, relayWorker)
173-
relayWorkerMap[config.TrustedRelays[i]] = relayWorker
174-
}
175-
176-
eth.TxPool().NewMegabundleHooks = append(eth.TxPool().NewMegabundleHooks, func(relayAddr common.Address, megabundle *types.MevBundle) {
177-
worker, found := relayWorkerMap[relayAddr]
178-
if !found {
179-
return
180-
}
181-
182-
select {
183-
case worker.newMegabundleCh <- megabundle:
184-
default:
185-
}
186-
})
187-
188162
log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers))
189163
return &multiWorker{
190164
regularWorker: regularWorker,
@@ -193,9 +167,8 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons
193167
}
194168

195169
type flashbotsData struct {
196-
isFlashbots bool
197-
isMegabundleWorker bool
198-
queue chan *task
199-
maxMergedBundles int
200-
relayAddr common.Address
170+
isFlashbots bool
171+
queue chan *task
172+
maxMergedBundles int
173+
relayAddr common.Address
201174
}

miner/worker.go

+6-47
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,9 @@ type task struct {
164164
block *types.Block
165165
createdAt time.Time
166166

167-
profit *big.Int
168-
isFlashbots bool
169-
worker int
170-
isMegabundle bool
167+
profit *big.Int
168+
isFlashbots bool
169+
worker int
171170
}
172171

173172
const (
@@ -233,7 +232,6 @@ type worker struct {
233232
exitCh chan struct{}
234233
resubmitIntervalCh chan time.Duration
235234
resubmitAdjustCh chan *intervalAdjust
236-
newMegabundleCh chan *types.MevBundle
237235

238236
wg sync.WaitGroup
239237

@@ -583,11 +581,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
583581
timestamp = time.Now().Unix()
584582
commit(false, commitInterruptNewHead)
585583

586-
case <-w.newMegabundleCh:
587-
if w.isRunning() {
588-
commit(true, commitInterruptNone)
589-
}
590-
591584
case <-timer.C:
592585
// If sealing is running resubmit a new work cycle periodically to pull in
593586
// higher priced transactions. Disable this overhead for pending blocks.
@@ -796,7 +789,7 @@ func (w *worker) taskLoop() {
796789
// Interrupt previous sealing operation
797790
interrupt()
798791
stopCh, prev = make(chan struct{}), sealHash
799-
log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker, "isMegabundle", task.isMegabundle)
792+
log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker)
800793
if w.skipSealHook != nil && w.skipSealHook(task) {
801794
continue
802795
}
@@ -1309,7 +1302,7 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment, validatorC
13091302
return err
13101303
}
13111304
}
1312-
if w.flashbots.isFlashbots && !w.flashbots.isMegabundleWorker {
1305+
if w.flashbots.isFlashbots {
13131306
bundles, err := w.eth.TxPool().MevBundles(env.header.Number, env.header.Time)
13141307
if err != nil {
13151308
log.Error("Failed to fetch pending transactions", "err", err)
@@ -1330,40 +1323,6 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment, validatorC
13301323
}
13311324
env.profit.Add(env.profit, bundle.ethSentToCoinbase)
13321325
}
1333-
if w.flashbots.isMegabundleWorker {
1334-
megabundle, err := w.eth.TxPool().GetMegabundle(w.flashbots.relayAddr, env.header.Number, env.header.Time)
1335-
log.Info("Starting to process a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "error", err)
1336-
if err != nil {
1337-
return err // no valid megabundle for this relay, nothing to do
1338-
}
1339-
1340-
// Flashbots bundle merging duplicates work by simulating TXes and then committing them once more.
1341-
// Megabundles API focuses on speed and runs everything in one cycle.
1342-
coinbaseBalanceBefore := env.state.GetBalance(env.coinbase)
1343-
if err := w.commitBundle(env, megabundle.Txs, interrupt); err != nil {
1344-
log.Info("Could not commit a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "err", err)
1345-
return err
1346-
}
1347-
var txStatuses = map[common.Hash]bool{}
1348-
for _, receipt := range env.receipts {
1349-
txStatuses[receipt.TxHash] = receipt.Status == types.ReceiptStatusSuccessful
1350-
}
1351-
for _, tx := range megabundle.Txs {
1352-
status, ok := txStatuses[tx.Hash()]
1353-
if !ok {
1354-
log.Error("No TX receipt after megabundle simulation", "TxHash", tx.Hash())
1355-
return errors.New("no tx receipt after megabundle simulation")
1356-
}
1357-
if !status && !containsHash(megabundle.RevertingTxHashes, tx.Hash()) {
1358-
log.Info("Ignoring megabundle because of failing TX", "relay", w.flashbots.relayAddr, "TxHash", tx.Hash())
1359-
return errors.New("megabundle contains failing tx")
1360-
}
1361-
}
1362-
coinbaseBalanceAfter := env.state.GetBalance(env.coinbase)
1363-
coinbaseDelta := big.NewInt(0).Sub(coinbaseBalanceAfter, coinbaseBalanceBefore)
1364-
env.profit = coinbaseDelta
1365-
log.Info("Megabundle processed", "relay", w.flashbots.relayAddr, "totalProfit", ethIntToFloat(env.profit))
1366-
}
13671326

13681327
if len(localTxs) > 0 {
13691328
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
@@ -1524,7 +1483,7 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti
15241483
// If we're post merge, just ignore
15251484
if !w.isTTDReached(block.Header()) {
15261485
select {
1527-
case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now(), profit: env.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles, isMegabundle: w.flashbots.isMegabundleWorker}:
1486+
case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now(), profit: env.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles}:
15281487
w.unconfirmed.Shift(block.NumberU64() - 1)
15291488

15301489
fees := totalFees(block, env.receipts)

0 commit comments

Comments
 (0)