Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server/v2/cometbft): optimistic execution #22560

Merged
merged 16 commits into from
Nov 28, 2024
147 changes: 107 additions & 40 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
gogoproto "github.com/cosmos/gogoproto/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"

"cosmossdk.io/collections"
Expand All @@ -28,6 +28,7 @@
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/server/v2/cometbft/handlers"
"cosmossdk.io/server/v2/cometbft/mempool"
"cosmossdk.io/server/v2/cometbft/oe"
"cosmossdk.io/server/v2/cometbft/types"
cometerrors "cosmossdk.io/server/v2/cometbft/types/errors"
"cosmossdk.io/server/v2/streaming"
Expand Down Expand Up @@ -77,6 +78,11 @@
extendVote handlers.ExtendVoteHandler
checkTxHandler handlers.CheckTxHandler[T]

// optimisticExec contains the context required for Optimistic Execution,
// including the goroutine handling.This is experimental and must be enabled
// by developers.
optimisticExec *oe.OptimisticExecution[T]

addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID

Expand Down Expand Up @@ -385,6 +391,14 @@
return nil, errors.New("no prepare proposal function was set")
}

// Abort any running OE so it cannot overlap with `PrepareProposal`. This could happen if optimistic
// `internalFinalizeBlock` from previous round takes a long time, but consensus has moved on to next round.
// Overlap is undesirable, since `internalFinalizeBlock` and `PrepareProoposal` could share access to
// in-memory structs depending on application implementation.
// No-op if OE is not enabled.
// Similar call to Abort() is done in `ProcessProposal`.
c.optimisticExec.Abort()

Comment on lines +394 to +401
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for Abort operation

The Abort() call could potentially return an error that should be handled to ensure proper cleanup.

-c.optimisticExec.Abort()
+if err := c.optimisticExec.Abort(); err != nil {
+    c.logger.Error("failed to abort optimistic execution", "err", err)
+    // Continue execution as the abort error shouldn't block proposal preparation
+}

Committable suggestion skipped: line range outside the PR's diff.

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
Expand Down Expand Up @@ -421,6 +435,16 @@
return nil, errors.New("no process proposal function was set")
}

// Since the application can get access to FinalizeBlock state and write to it,
// we must be sure to reset it in case ProcessProposal timeouts and is called
// again in a subsequent round. However, we only want to do this after we've
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
if req.Height > int64(c.initialHeight) {
// abort any running OE
c.optimisticExec.Abort()
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
Expand All @@ -436,6 +460,17 @@
}, nil
}

// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if req.Height > int64(c.initialHeight) {
c.optimisticExec.Execute(req)
}
Comment on lines +463 to +472
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for Execute operation

The Execute() call should include error handling to properly handle and log any execution failures.

Apply this diff:

-c.optimisticExec.Execute(req)
+if err := c.optimisticExec.Execute(req); err != nil {
+    c.logger.Error("failed to execute optimistically", "height", req.Height, "err", err)
+    // Continue execution as the execute error shouldn't block proposal processing
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if req.Height > int64(c.initialHeight) {
c.optimisticExec.Execute(req)
}
// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if req.Height > int64(c.initialHeight) {
if err := c.optimisticExec.Execute(req); err != nil {
c.logger.Error("failed to execute optimistically", "height", req.Height, "err", err)
// Continue execution as the execute error shouldn't block proposal processing
}
}


return &abciproto.ProcessProposalResponse{
Status: abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT,
}, nil
Expand All @@ -447,46 +482,32 @@
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*abciproto.FinalizeBlockResponse, error) {
if err := c.validateFinalizeBlockHeight(req); err != nil {
return nil, err
}

if err := c.checkHalt(req.Height, req.Time); err != nil {
return nil, err
}

// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(req.Txs, c.txCodec)
if err != nil {
return nil, err
}

cid, err := c.store.LastCommitID()
if err != nil {
return nil, err
}

blockReq := &server.BlockRequest[T]{
Height: uint64(req.Height),
Time: req.Time,
Hash: req.Hash,
AppHash: cid.Hash,
ChainId: c.chainID,
Txs: decodedTxs,
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: toCoreCommitInfo(req.DecidedLastCommit),
})
var (
resp *server.BlockResponse
newState store.WriterMap
decodedTxs []T
err error
)

if c.optimisticExec.Initialized() {
// check if the hash we got is the same as the one we are executing
aborted := c.optimisticExec.AbortIfNeeded(req.Hash)

// Wait for the OE to finish, regardless of whether it was aborted or not
res, err := c.optimisticExec.WaitResult()

if aborted {
resp, newState, decodedTxs, err = c.internalFinalizeBlock(ctx, req)
if err != nil {
return nil, err
}
} else {
resp = res.Resp
newState = res.StateChanges
decodedTxs = res.DecodedTxs
}

resp, newState, err := c.app.DeliverBlock(ciCtx, blockReq)
if err != nil {
return nil, err
c.optimisticExec.Reset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling in optimistic execution flow

  1. The error from WaitResult() at line 497 is ignored but could indicate execution failures.
  2. The Reset() call at line 510 lacks error handling.

Apply this diff:

-res, err := c.optimisticExec.WaitResult()
+res, err := c.optimisticExec.WaitResult()
+if err != nil {
+    c.logger.Error("optimistic execution failed", "height", req.Height, "err", err)
+    resp, newState, decodedTxs, err = c.internalFinalizeBlock(ctx, req)
+    if err != nil {
+        return nil, err
+    }
+}

-c.optimisticExec.Reset()
+if err := c.optimisticExec.Reset(); err != nil {
+    c.logger.Error("failed to reset optimistic execution", "height", req.Height, "err", err)
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if c.optimisticExec.Initialized() {
// check if the hash we got is the same as the one we are executing
aborted := c.optimisticExec.AbortIfNeeded(req.Hash)
// Wait for the OE to finish, regardless of whether it was aborted or not
res, err := c.optimisticExec.WaitResult()
if aborted {
resp, newState, decodedTxs, err = c.internalFinalizeBlock(ctx, req)
if err != nil {
return nil, err
}
} else {
resp = res.Resp
newState = res.StateChanges
decodedTxs = res.DecodedTxs
}
resp, newState, err := c.app.DeliverBlock(ciCtx, blockReq)
if err != nil {
return nil, err
c.optimisticExec.Reset()
if c.optimisticExec.Initialized() {
// check if the hash we got is the same as the one we are executing
aborted := c.optimisticExec.AbortIfNeeded(req.Hash)
// Wait for the OE to finish, regardless of whether it was aborted or not
res, err := c.optimisticExec.WaitResult()
if err != nil {
c.logger.Error("optimistic execution failed", "height", req.Height, "err", err)
resp, newState, decodedTxs, err = c.internalFinalizeBlock(ctx, req)
if err != nil {
return nil, err
}
}
if aborted {
resp, newState, decodedTxs, err = c.internalFinalizeBlock(ctx, req)
if err != nil {
return nil, err
}
} else {
resp = res.Resp
newState = res.StateChanges
decodedTxs = res.DecodedTxs
}
if err := c.optimisticExec.Reset(); err != nil {
c.logger.Error("failed to reset optimistic execution", "height", req.Height, "err", err)
}

}

// after we get the changeset we can produce the commit hash,
Expand Down Expand Up @@ -531,6 +552,52 @@
return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace)
}

func (c *consensus[T]) internalFinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*server.BlockResponse, store.WriterMap, []T, error) {
if err := c.validateFinalizeBlockHeight(req); err != nil {
return nil, nil, nil, err
}

if err := c.checkHalt(req.Height, req.Time); err != nil {
return nil, nil, nil, err
}

// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(req.Txs, c.txCodec)
if err != nil {
return nil, nil, nil, err
}

cid, err := c.store.LastCommitID()
if err != nil {
return nil, nil, nil, err
}

blockReq := &server.BlockRequest[T]{
Height: uint64(req.Height),
Time: req.Time,
Hash: req.Hash,
AppHash: cid.Hash,
ChainId: c.chainID,
Txs: decodedTxs,
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: toCoreCommitInfo(req.DecidedLastCommit),
})

resp, stateChanges, err := c.app.DeliverBlock(ciCtx, blockReq)

return resp, stateChanges, decodedTxs, err
}

// Commit implements types.Application.
// It is called by cometbft to notify the application that a block was committed.
func (c *consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
Expand Down
75 changes: 75 additions & 0 deletions server/v2/cometbft/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"io"
"strings"
"sync"
"testing"
"time"

"cosmossdk.io/server/v2/cometbft/oe"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
v1 "github.com/cometbft/cometbft/api/cometbft/types/v1"
"github.com/cosmos/gogoproto/proto"
Expand Down Expand Up @@ -724,3 +726,76 @@ func assertStoreLatestVersion(t *testing.T, store types.Store, target uint64) {
require.NoError(t, err)
require.Equal(t, target, commitInfo.Version)
}

func TestOptimisticExecution(t *testing.T) {
c := setUpConsensus(t, 100_000, mempool.NoOpMempool[mock.Tx]{})

// Set up handlers
c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler

// mock optimistic execution
calledTimes := 0
optimisticMockFunc := func(_ context.Context, _ *abciproto.FinalizeBlockRequest) (*abciproto.FinalizeBlockResponse, error) {
calledTimes++
return nil, errors.New("test error")
}
c.optimisticExec = oe.NewOptimisticExecution(log.NewNopLogger(), optimisticMockFunc)

_, err := c.InitChain(context.Background(), &abciproto.InitChainRequest{
Time: time.Now(),
ChainId: "test",
InitialHeight: 1,
})
require.NoError(t, err)

_, err = c.FinalizeBlock(context.Background(), &abciproto.FinalizeBlockRequest{
Time: time.Now(),
Height: 1,
Txs: [][]byte{mockTx.Bytes()},
Hash: emptyHash[:],
})
require.NoError(t, err)

theHash := sha256.Sum256([]byte("test"))
ppReq := &abciproto.ProcessProposalRequest{
Height: 2,
Hash: theHash[:],
Time: time.Now(),
Txs: [][]byte{mockTx.Bytes()},
}

// Start optimistic execution
resp, err := c.ProcessProposal(context.Background(), ppReq)
require.NoError(t, err)
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)

// Initialize FinalizeBlock with correct hash - should use optimistic result
theHash = sha256.Sum256([]byte("test"))
fbReq := &abciproto.FinalizeBlockRequest{
Height: 2,
Hash: theHash[:],
Time: ppReq.Time,
Txs: ppReq.Txs,
}
fbResp, err := c.FinalizeBlock(context.Background(), fbReq)
require.Error(t, err)
require.ErrorContains(t, err, "test error") // from optimisticMockFunc
require.Equal(t, 1, calledTimes)

Comment on lines +788 to +792
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix ineffectual assignment

The fbResp assignment on line 780 is never used. Consider removing it or adding assertions on the response content.

-	fbResp, err := c.FinalizeBlock(context.Background(), fbReq)
+	_, err = c.FinalizeBlock(context.Background(), fbReq)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fbResp, err := c.FinalizeBlock(context.Background(), fbReq)
require.Error(t, err)
require.ErrorContains(t, err, "test error") // from optimisticMockFunc
require.Equal(t, 1, calledTimes)
_, err = c.FinalizeBlock(context.Background(), fbReq)
require.Error(t, err)
require.ErrorContains(t, err, "test error") // from optimisticMockFunc
require.Equal(t, 1, calledTimes)
🧰 Tools
🪛 golangci-lint (1.62.2)

780-780: ineffectual assignment to fbResp

(ineffassign)

resp, err = c.ProcessProposal(context.Background(), ppReq)
require.NoError(t, err)
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)

theWrongHash := sha256.Sum256([]byte("wrong_hash"))
fbReq.Hash = theWrongHash[:]

// Initialize FinalizeBlock with wrong hash - should abort optimistic execution
// Because is aborted, the result comes from the normal execution
fbResp, err = c.FinalizeBlock(context.Background(), fbReq)
require.NotNil(t, fbResp)
require.NoError(t, err)
require.Equal(t, 2, calledTimes)
Comment on lines +800 to +805
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add assertions for optimistic execution state

The test should verify the optimistic execution state immediately after the wrong hash is processed, before the final assertion.

 // Initialize FinalizeBlock with wrong hash - should abort optimistic execution
 // Because is aborted, the result comes from the normal execution
 fbResp, err = c.FinalizeBlock(context.Background(), fbReq)
 require.NotNil(t, fbResp)
 require.NoError(t, err)
 require.Equal(t, 2, calledTimes)
+require.False(t, c.optimisticExec.Initialized(), "optimistic execution should be reset immediately after wrong hash")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Initialize FinalizeBlock with wrong hash - should abort optimistic execution
// Because is aborted, the result comes from the normal execution
fbResp, err = c.FinalizeBlock(context.Background(), fbReq)
require.NotNil(t, fbResp)
require.NoError(t, err)
require.Equal(t, 2, calledTimes)
// Initialize FinalizeBlock with wrong hash - should abort optimistic execution
// Because is aborted, the result comes from the normal execution
fbResp, err = c.FinalizeBlock(context.Background(), fbReq)
require.NotNil(t, fbResp)
require.NoError(t, err)
require.Equal(t, 2, calledTimes)
require.False(t, c.optimisticExec.Initialized(), "optimistic execution should be reset immediately after wrong hash")


// Verify optimistic execution was reset
require.False(t, c.optimisticExec.Initialized())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add test cases for concurrent optimistic executions

The test suite should include scenarios testing concurrent optimistic executions to ensure thread safety.

Consider adding a test case that simulates multiple goroutines attempting optimistic execution simultaneously:

func TestConcurrentOptimisticExecution(t *testing.T) {
    c := setUpConsensus(t, 100_000, mempool.NoOpMempool[mock.Tx]{})
    c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler
    
    var wg sync.WaitGroup
    numGoroutines := 10
    
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(height int64) {
            defer wg.Done()
            
            theHash := sha256.Sum256([]byte(fmt.Sprintf("test-%d", height)))
            ppReq := &abciproto.ProcessProposalRequest{
                Height: height,
                Hash:   theHash[:],
                Time:   time.Now(),
                Txs:    [][]byte{mockTx.Bytes()},
            }
            
            resp, err := c.ProcessProposal(context.Background(), ppReq)
            require.NoError(t, err)
            require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)
        }(int64(i + 2))
    }
    
    wg.Wait()
}

Loading
Loading