Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server/v2/cometbft): optimistic execution #22560

Merged
merged 16 commits into from
Nov 28, 2024
147 changes: 108 additions & 39 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"cosmossdk.io/server/v2/cometbft/oe"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will need to move lower, otherwise linting will complain

"crypto/sha256"
"errors"
"fmt"
Expand Down Expand Up @@ -77,6 +78,11 @@
extendVote handlers.ExtendVoteHandler
checkTxHandler handlers.CheckTxHandler[T]

// optimisticExec contains the context required for Optimistic Execution,
// including the goroutine handling.This is experimental and must be enabled
// by developers.
optimisticExec *oe.OptimisticExecution[T]

addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID

Expand Down Expand Up @@ -385,6 +391,14 @@
return nil, errors.New("no prepare proposal function was set")
}

// Abort any running OE so it cannot overlap with `PrepareProposal`. This could happen if optimistic
// `internalFinalizeBlock` from previous round takes a long time, but consensus has moved on to next round.
// Overlap is undesirable, since `internalFinalizeBlock` and `PrepareProoposal` could share access to
// in-memory structs depending on application implementation.
// No-op if OE is not enabled.
// Similar call to Abort() is done in `ProcessProposal`.
c.optimisticExec.Abort()

Comment on lines +394 to +401
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for Abort operation

The Abort() call could potentially return an error that should be handled to ensure proper cleanup.

-c.optimisticExec.Abort()
+if err := c.optimisticExec.Abort(); err != nil {
+    c.logger.Error("failed to abort optimistic execution", "err", err)
+    // Continue execution as the abort error shouldn't block proposal preparation
+}

Committable suggestion skipped: line range outside the PR's diff.

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
Expand Down Expand Up @@ -421,6 +435,17 @@
return nil, errors.New("no process proposal function was set")
}

// Since the application can get access to FinalizeBlock state and write to it,
// we must be sure to reset it in case ProcessProposal timeouts and is called
// again in a subsequent round. However, we only want to do this after we've
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
if req.Height > int64(c.initialHeight) {
// abort any running OE
c.optimisticExec.Abort()
//c.setState(execModeFinalize, header)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove comment

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential nil pointer dereference in ProcessProposal

In ProcessProposal, c.optimisticExec.Abort() is invoked without verifying if optimisticExec is nil. This could lead to a runtime panic if optimisticExec is not initialized.

Apply this diff to add a nil check before calling Abort():

         if req.Height > int64(c.initialHeight) {
             // abort any running OE
-            c.optimisticExec.Abort()
+            if c.optimisticExec != nil {
+                c.optimisticExec.Abort()
+            }
             //c.setState(execModeFinalize, header)
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Since the application can get access to FinalizeBlock state and write to it,
// we must be sure to reset it in case ProcessProposal timeouts and is called
// again in a subsequent round. However, we only want to do this after we've
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
if req.Height > int64(c.initialHeight) {
// abort any running OE
c.optimisticExec.Abort()
//c.setState(execModeFinalize, header)
}
// Since the application can get access to FinalizeBlock state and write to it,
// we must be sure to reset it in case ProcessProposal timeouts and is called
// again in a subsequent round. However, we only want to do this after we've
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
if req.Height > int64(c.initialHeight) {
// abort any running OE
if c.optimisticExec != nil {
c.optimisticExec.Abort()
}
//c.setState(execModeFinalize, header)
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
Expand All @@ -436,6 +461,17 @@
}, nil
}

// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if c.optimisticExec.Enabled() && req.Height > int64(c.initialHeight) {
c.optimisticExec.Execute(req)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential nil pointer dereference when checking optimisticExec.Enabled()

In ProcessProposal, c.optimisticExec.Enabled() is called without verifying if optimisticExec is nil. This may result in a runtime panic if optimisticExec is not set.

Apply this diff to add a nil check:

     if c.optimisticExec != nil && c.optimisticExec.Enabled() && req.Height > int64(c.initialHeight) {
         c.optimisticExec.Execute(req)
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if c.optimisticExec.Enabled() && req.Height > int64(c.initialHeight) {
c.optimisticExec.Execute(req)
}
// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if c.optimisticExec != nil && c.optimisticExec.Enabled() && req.Height > int64(c.initialHeight) {
c.optimisticExec.Execute(req)
}

return &abciproto.ProcessProposalResponse{
Status: abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT,
}, nil
Expand All @@ -447,46 +483,33 @@
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*abciproto.FinalizeBlockResponse, error) {
if err := c.validateFinalizeBlockHeight(req); err != nil {
return nil, err
}

if err := c.checkHalt(req.Height, req.Time); err != nil {
return nil, err
}

// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(req.Txs, c.txCodec)
if err != nil {
return nil, err
}

cid, err := c.store.LastCommitID()
if err != nil {
return nil, err
}

blockReq := &server.BlockRequest[T]{
Height: uint64(req.Height),
Time: req.Time,
Hash: req.Hash,
AppHash: cid.Hash,
ChainId: c.chainID,
Txs: decodedTxs,
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: toCoreCommitInfo(req.DecidedLastCommit),
})
var (
resp *server.BlockResponse
newState store.WriterMap
decodedTxs []T
err error
)

if c.optimisticExec.Initialized() {
// check if the hash we got is the same as the one we are executing
aborted := c.optimisticExec.AbortIfNeeded(req.Hash)

// Wait for the OE to finish, regardless of whether it was aborted or not
res, err := c.optimisticExec.WaitResult()

if aborted {
resp, newState, decodedTxs, err = c.internalFinalizeBlock(ctx, req)
if err != nil {
return nil, err
}
} else {
resp = res.Resp
newState = res.StateChanges
decodedTxs = res.DecodedTxs
}

resp, newState, err := c.app.DeliverBlock(ciCtx, blockReq)
if err != nil {
return nil, err
// if it was aborted, we need to reset the state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove comment, as we are always resetting

c.optimisticExec.Reset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding timeout for WaitResult

The WaitResult() call could potentially block indefinitely. Consider adding a context with timeout.

-res, err := c.optimisticExec.WaitResult()
+ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
+defer cancel()
+res, err := c.optimisticExec.WaitResult(ctx)
+if err == context.DeadlineExceeded {
+    c.logger.Error("optimistic execution timed out")
+    // Fall back to normal execution
+    resp, newState, decodedTxs, err = c.internalFinalizeBlock(ctx, req)
+    if err != nil {
+        return nil, err
+    }
+}

Committable suggestion skipped: line range outside the PR's diff.

}

// after we get the changeset we can produce the commit hash,
Expand Down Expand Up @@ -531,6 +554,52 @@
return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace)
}

func (c *consensus[T]) internalFinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*server.BlockResponse, store.WriterMap, []T, error) {
if err := c.validateFinalizeBlockHeight(req); err != nil {
return nil, nil, nil, err
}

if err := c.checkHalt(req.Height, req.Time); err != nil {
return nil, nil, nil, err
}

// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(req.Txs, c.txCodec)
if err != nil {
return nil, nil, nil, err
}

cid, err := c.store.LastCommitID()
if err != nil {
return nil, nil, nil, err
}

blockReq := &server.BlockRequest[T]{
Height: uint64(req.Height),
Time: req.Time,
Hash: req.Hash,
AppHash: cid.Hash,
ChainId: c.chainID,
Txs: decodedTxs,
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: toCoreCommitInfo(req.DecidedLastCommit),
})

resp, stateChanges, err := c.app.DeliverBlock(ciCtx, blockReq)

return resp, stateChanges, decodedTxs, err
}

// Commit implements types.Application.
// It is called by cometbft to notify the application that a block was committed.
func (c *consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
Expand Down
75 changes: 75 additions & 0 deletions server/v2/cometbft/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"io"
"strings"
"sync"
"testing"
"time"

"cosmossdk.io/server/v2/cometbft/oe"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
v1 "github.com/cometbft/cometbft/api/cometbft/types/v1"
"github.com/cosmos/gogoproto/proto"
Expand Down Expand Up @@ -724,3 +726,76 @@ func assertStoreLatestVersion(t *testing.T, store types.Store, target uint64) {
require.NoError(t, err)
require.Equal(t, target, commitInfo.Version)
}

func TestOptimisticExecution(t *testing.T) {
c := setUpConsensus(t, 100_000, mempool.NoOpMempool[mock.Tx]{})

// Set up handlers
c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler

// mock optimistic execution
calledTimes := 0
optimisticMockFunc := func(_ context.Context, _ *abciproto.FinalizeBlockRequest) (*abciproto.FinalizeBlockResponse, error) {
calledTimes++
return nil, errors.New("test error")
}
c.optimisticExec = oe.NewOptimisticExecution(log.NewNopLogger(), optimisticMockFunc)

_, err := c.InitChain(context.Background(), &abciproto.InitChainRequest{
Time: time.Now(),
ChainId: "test",
InitialHeight: 1,
})
require.NoError(t, err)

_, err = c.FinalizeBlock(context.Background(), &abciproto.FinalizeBlockRequest{
Time: time.Now(),
Height: 1,
Txs: [][]byte{mockTx.Bytes()},
Hash: emptyHash[:],
})
require.NoError(t, err)

theHash := sha256.Sum256([]byte("test"))
ppReq := &abciproto.ProcessProposalRequest{
Height: 2,
Hash: theHash[:],
Time: time.Now(),
Txs: [][]byte{mockTx.Bytes()},
}

// Start optimistic execution
resp, err := c.ProcessProposal(context.Background(), ppReq)
require.NoError(t, err)
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)

// Initialize FinalizeBlock with correct hash - should use optimistic result
theHash = sha256.Sum256([]byte("test"))
fbReq := &abciproto.FinalizeBlockRequest{
Height: 2,
Hash: theHash[:],
Time: ppReq.Time,
Txs: ppReq.Txs,
}
fbResp, err := c.FinalizeBlock(context.Background(), fbReq)
require.Error(t, err)
require.ErrorContains(t, err, "test error") // from optimisticMockFunc
require.Equal(t, 1, calledTimes)

Comment on lines +788 to +792
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix ineffectual assignment

The fbResp assignment on line 780 is never used. Consider removing it or adding assertions on the response content.

-	fbResp, err := c.FinalizeBlock(context.Background(), fbReq)
+	_, err = c.FinalizeBlock(context.Background(), fbReq)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fbResp, err := c.FinalizeBlock(context.Background(), fbReq)
require.Error(t, err)
require.ErrorContains(t, err, "test error") // from optimisticMockFunc
require.Equal(t, 1, calledTimes)
_, err = c.FinalizeBlock(context.Background(), fbReq)
require.Error(t, err)
require.ErrorContains(t, err, "test error") // from optimisticMockFunc
require.Equal(t, 1, calledTimes)
🧰 Tools
🪛 golangci-lint (1.62.2)

780-780: ineffectual assignment to fbResp

(ineffassign)

resp, err = c.ProcessProposal(context.Background(), ppReq)
require.NoError(t, err)
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)

theWrongHash := sha256.Sum256([]byte("wrong_hash"))
fbReq.Hash = theWrongHash[:]

// Initialize FinalizeBlock with wrong hash - should abort optimistic execution
// Because is aborted, the result comes from the normal execution
fbResp, err = c.FinalizeBlock(context.Background(), fbReq)
require.NotNil(t, fbResp)
require.NoError(t, err)
require.Equal(t, 2, calledTimes)
Comment on lines +800 to +805
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add assertions for optimistic execution state

The test should verify the optimistic execution state immediately after the wrong hash is processed, before the final assertion.

 // Initialize FinalizeBlock with wrong hash - should abort optimistic execution
 // Because is aborted, the result comes from the normal execution
 fbResp, err = c.FinalizeBlock(context.Background(), fbReq)
 require.NotNil(t, fbResp)
 require.NoError(t, err)
 require.Equal(t, 2, calledTimes)
+require.False(t, c.optimisticExec.Initialized(), "optimistic execution should be reset immediately after wrong hash")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Initialize FinalizeBlock with wrong hash - should abort optimistic execution
// Because is aborted, the result comes from the normal execution
fbResp, err = c.FinalizeBlock(context.Background(), fbReq)
require.NotNil(t, fbResp)
require.NoError(t, err)
require.Equal(t, 2, calledTimes)
// Initialize FinalizeBlock with wrong hash - should abort optimistic execution
// Because is aborted, the result comes from the normal execution
fbResp, err = c.FinalizeBlock(context.Background(), fbReq)
require.NotNil(t, fbResp)
require.NoError(t, err)
require.Equal(t, 2, calledTimes)
require.False(t, c.optimisticExec.Initialized(), "optimistic execution should be reset immediately after wrong hash")


// Verify optimistic execution was reset
require.False(t, c.optimisticExec.Initialized())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add test cases for concurrent optimistic executions

The test suite should include scenarios testing concurrent optimistic executions to ensure thread safety.

Consider adding a test case that simulates multiple goroutines attempting optimistic execution simultaneously:

func TestConcurrentOptimisticExecution(t *testing.T) {
    c := setUpConsensus(t, 100_000, mempool.NoOpMempool[mock.Tx]{})
    c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler
    
    var wg sync.WaitGroup
    numGoroutines := 10
    
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(height int64) {
            defer wg.Done()
            
            theHash := sha256.Sum256([]byte(fmt.Sprintf("test-%d", height)))
            ppReq := &abciproto.ProcessProposalRequest{
                Height: height,
                Hash:   theHash[:],
                Time:   time.Now(),
                Txs:    [][]byte{mockTx.Bytes()},
            }
            
            resp, err := c.ProcessProposal(context.Background(), ppReq)
            require.NoError(t, err)
            require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)
        }(int64(i + 2))
    }
    
    wg.Wait()
}

Loading
Loading