Skip to content

Commit

Permalink
Add support for different blob target and max
Browse files Browse the repository at this point in the history
Add support for different blob target and max
  • Loading branch information
terencechain committed Dec 9, 2024
1 parent 0d810a1 commit 68d9ce7
Show file tree
Hide file tree
Showing 54 changed files with 266 additions and 203 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Added a Prometheus error counter metric for HTTP requests to track beacon node requests.
- Added a Prometheus error counter metric for SSE requests.
- Save light client updates and bootstraps in DB.
- Added support for different blob target and max values through config.

### Changed

Expand Down
1 change: 1 addition & 0 deletions api/client/builder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//api/client:go_default_library",
"//api/server/structs:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
Expand Down
2 changes: 1 addition & 1 deletion api/client/builder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (c *Client) GetHeader(ctx context.Context, slot primitives.Slot, parentHash
if err := json.Unmarshal(hb, hr); err != nil {
return nil, errors.Wrapf(err, "error unmarshaling the builder GetHeader response, using slot=%d, parentHash=%#x, pubkey=%#x", slot, parentHash, pubkey)
}
p, err := hr.ToProto()
p, err := hr.ToProto(slot)
if err != nil {
return nil, errors.Wrapf(err, "could not extract proto message from header")
}
Expand Down
10 changes: 6 additions & 4 deletions api/client/builder/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
consensusblocks "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
types "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
Expand Down Expand Up @@ -996,8 +997,8 @@ type ExecHeaderResponseDeneb struct {
}

// ToProto creates a SignedBuilderBidDeneb Proto from ExecHeaderResponseDeneb.
func (ehr *ExecHeaderResponseDeneb) ToProto() (*eth.SignedBuilderBidDeneb, error) {
bb, err := ehr.Data.Message.ToProto()
func (ehr *ExecHeaderResponseDeneb) ToProto(s types.Slot) (*eth.SignedBuilderBidDeneb, error) {
bb, err := ehr.Data.Message.ToProto(s)
if err != nil {
return nil, err
}
Expand All @@ -1008,12 +1009,13 @@ func (ehr *ExecHeaderResponseDeneb) ToProto() (*eth.SignedBuilderBidDeneb, error
}

// ToProto creates a BuilderBidDeneb Proto from BuilderBidDeneb.
func (bb *BuilderBidDeneb) ToProto() (*eth.BuilderBidDeneb, error) {
func (bb *BuilderBidDeneb) ToProto(s types.Slot) (*eth.BuilderBidDeneb, error) {
header, err := bb.Header.ToProto()
if err != nil {
return nil, err
}
if len(bb.BlobKzgCommitments) > fieldparams.MaxBlobsPerBlock {
maxBlobsPerBlock := params.BeaconConfig().MaxBlobCount(s)
if uint64(len(bb.BlobKzgCommitments)) > maxBlobsPerBlock {
return nil, fmt.Errorf("too many blob commitments: %d", len(bb.BlobKzgCommitments))
}
kzgCommitments := make([][]byte, len(bb.BlobKzgCommitments))
Expand Down
12 changes: 6 additions & 6 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
forkchoicetypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/features"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
consensusblocks "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
Expand Down Expand Up @@ -496,14 +495,15 @@ func (s *Service) runLateBlockTasks() {
// It returns a map where each key represents a missing BlobSidecar index.
// An empty map means we have all indices; a non-empty map can be used to compare incoming
// BlobSidecars against the set of known missing sidecars.
func missingIndices(bs *filesystem.BlobStorage, root [32]byte, expected [][]byte) (map[uint64]struct{}, error) {
func missingIndices(bs *filesystem.BlobStorage, root [32]byte, expected [][]byte, slot primitives.Slot) (map[uint64]struct{}, error) {
maxBlobsPerBlock := params.BeaconConfig().MaxBlobCount(slot)
if len(expected) == 0 {
return nil, nil
}
if len(expected) > fieldparams.MaxBlobsPerBlock {
if uint64(len(expected)) > maxBlobsPerBlock {
return nil, errMaxBlobsExceeded
}
indices, err := bs.Indices(root)
indices, err := bs.Indices(root, slot)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -552,7 +552,7 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int
return nil
}
// get a map of BlobSidecar indices that are not currently available.
missing, err := missingIndices(s.blobStorage, root, kzgCommitments)
missing, err := missingIndices(s.blobStorage, root, kzgCommitments, block.Slot())
if err != nil {
return err
}
Expand All @@ -563,7 +563,7 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int

// The gossip handler for blobs writes the index of each verified blob referencing the given
// root to the channel returned by blobNotifiers.forRoot.
nc := s.blobNotifiers.forRoot(root)
nc := s.blobNotifiers.forRoot(root, block.Slot())

// Log for DA checks that cross over into the next slot; helpful for debugging.
nextSlot := slots.BeginsAt(signed.Block().Slot()+1, s.genesisTime)
Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/blockchain/process_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2205,23 +2205,23 @@ func TestMissingIndices(t *testing.T) {
},
{
name: "expected exceeds max",
expected: fakeCommitments(fieldparams.MaxBlobsPerBlock + 1),
expected: fakeCommitments(int(params.BeaconConfig().MaxBlobsPerBlock) + 1),
err: errMaxBlobsExceeded,
},
{
name: "first missing",
expected: fakeCommitments(fieldparams.MaxBlobsPerBlock),
expected: fakeCommitments(int(params.BeaconConfig().MaxBlobsPerBlock)),
present: []uint64{1, 2, 3, 4, 5},
result: fakeResult([]uint64{0}),
},
{
name: "all missing",
expected: fakeCommitments(fieldparams.MaxBlobsPerBlock),
expected: fakeCommitments(int(params.BeaconConfig().MaxBlobsPerBlock)),
result: fakeResult([]uint64{0, 1, 2, 3, 4, 5}),
},
{
name: "none missing",
expected: fakeCommitments(fieldparams.MaxBlobsPerBlock),
expected: fakeCommitments(int(params.BeaconConfig().MaxBlobsPerBlock)),
present: []uint64{0, 1, 2, 3, 4, 5},
result: fakeResult([]uint64{}),
},
Expand Down Expand Up @@ -2255,7 +2255,7 @@ func TestMissingIndices(t *testing.T) {
bm, bs := filesystem.NewEphemeralBlobStorageWithMocker(t)
t.Run(c.name, func(t *testing.T) {
require.NoError(t, bm.CreateFakeIndices(c.root, c.present...))
missing, err := missingIndices(bs, c.root, c.expected)
missing, err := missingIndices(bs, c.root, c.expected, 0)
if c.err != nil {
require.ErrorIs(t, err, c.err)
return
Expand Down
7 changes: 4 additions & 3 deletions beacon-chain/blockchain/receive_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"

"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
)

// SendNewBlobEvent sends a message to the BlobNotifier channel that the blob
// for the block root `root` is ready in the database
func (s *Service) sendNewBlobEvent(root [32]byte, index uint64) {
s.blobNotifiers.notifyIndex(root, index)
func (s *Service) sendNewBlobEvent(root [32]byte, index uint64, slot primitives.Slot) {
s.blobNotifiers.notifyIndex(root, index, slot)
}

// ReceiveBlob saves the blob to database and sends the new event
Expand All @@ -18,6 +19,6 @@ func (s *Service) ReceiveBlob(ctx context.Context, b blocks.VerifiedROBlob) erro
return err
}

s.sendNewBlobEvent(b.BlockRoot(), b.Index)
s.sendNewBlobEvent(b.BlockRoot(), b.Index, b.Slot())
return nil
}
21 changes: 13 additions & 8 deletions beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v5/config/features"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
Expand Down Expand Up @@ -104,18 +104,22 @@ var ErrMissingClockSetter = errors.New("blockchain Service initialized without a
type blobNotifierMap struct {
sync.RWMutex
notifiers map[[32]byte]chan uint64
seenIndex map[[32]byte][fieldparams.MaxBlobsPerBlock]bool
seenIndex map[[32]byte][]bool
}

// notifyIndex notifies a blob by its index for a given root.
// It uses internal maps to keep track of seen indices and notifier channels.
func (bn *blobNotifierMap) notifyIndex(root [32]byte, idx uint64) {
if idx >= fieldparams.MaxBlobsPerBlock {
func (bn *blobNotifierMap) notifyIndex(root [32]byte, idx uint64, slot primitives.Slot) {
maxBlobsPerBlock := params.BeaconConfig().MaxBlobCount(slot)
if idx >= maxBlobsPerBlock {
return
}

bn.Lock()
seen := bn.seenIndex[root]
if seen == nil {
seen = make([]bool, maxBlobsPerBlock)
}
if seen[idx] {
bn.Unlock()
return
Expand All @@ -126,7 +130,7 @@ func (bn *blobNotifierMap) notifyIndex(root [32]byte, idx uint64) {
// Retrieve or create the notifier channel for the given root.
c, ok := bn.notifiers[root]
if !ok {
c = make(chan uint64, fieldparams.MaxBlobsPerBlock)
c = make(chan uint64, maxBlobsPerBlock)
bn.notifiers[root] = c
}

Expand All @@ -135,12 +139,13 @@ func (bn *blobNotifierMap) notifyIndex(root [32]byte, idx uint64) {
c <- idx
}

func (bn *blobNotifierMap) forRoot(root [32]byte) chan uint64 {
func (bn *blobNotifierMap) forRoot(root [32]byte, slot primitives.Slot) chan uint64 {
maxBlobsPerBlock := params.BeaconConfig().MaxBlobCount(slot)
bn.Lock()
defer bn.Unlock()
c, ok := bn.notifiers[root]
if !ok {
c = make(chan uint64, fieldparams.MaxBlobsPerBlock)
c = make(chan uint64, maxBlobsPerBlock)
bn.notifiers[root] = c
}
return c
Expand All @@ -166,7 +171,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
bn := &blobNotifierMap{
notifiers: make(map[[32]byte]chan uint64),
seenIndex: make(map[[32]byte][fieldparams.MaxBlobsPerBlock]bool),
seenIndex: make(map[[32]byte][]bool),
}
srv := &Service{
ctx: ctx,
Expand Down
8 changes: 4 additions & 4 deletions beacon-chain/blockchain/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ func (s *MockClockSetter) SetClock(g *startup.Clock) error {
func TestNotifyIndex(t *testing.T) {
// Initialize a blobNotifierMap
bn := &blobNotifierMap{
seenIndex: make(map[[32]byte][fieldparams.MaxBlobsPerBlock]bool),
seenIndex: make(map[[32]byte][]bool),
notifiers: make(map[[32]byte]chan uint64),
}

Expand All @@ -596,7 +596,7 @@ func TestNotifyIndex(t *testing.T) {
copy(root[:], "exampleRoot")

// Test notifying a new index
bn.notifyIndex(root, 1)
bn.notifyIndex(root, 1, 1)
if !bn.seenIndex[root][1] {
t.Errorf("Index was not marked as seen")
}
Expand All @@ -607,13 +607,13 @@ func TestNotifyIndex(t *testing.T) {
}

// Test notifying an already seen index
bn.notifyIndex(root, 1)
bn.notifyIndex(root, 1, 1)
if len(bn.notifiers[root]) > 1 {
t.Errorf("Notifier channel should not receive multiple messages for the same index")
}

// Test notifying a new index again
bn.notifyIndex(root, 2)
bn.notifyIndex(root, 2, 1)
if !bn.seenIndex[root][2] {
t.Errorf("Index was not marked as seen")
}
Expand Down
10 changes: 6 additions & 4 deletions beacon-chain/core/blocks/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
field_params "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
consensus_types "github.com/prysmaticlabs/prysm/v5/consensus-types"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
Expand Down Expand Up @@ -210,7 +211,7 @@ func ProcessPayload(st state.BeaconState, body interfaces.ReadOnlyBeaconBlockBod
if err != nil {
return err
}
if err := verifyBlobCommitmentCount(body); err != nil {
if err := verifyBlobCommitmentCount(st.Slot(), body); err != nil {
return err
}
if err := ValidatePayloadWhenMergeCompletes(st, payload); err != nil {
Expand All @@ -225,15 +226,16 @@ func ProcessPayload(st state.BeaconState, body interfaces.ReadOnlyBeaconBlockBod
return nil
}

func verifyBlobCommitmentCount(body interfaces.ReadOnlyBeaconBlockBody) error {
func verifyBlobCommitmentCount(slot primitives.Slot, body interfaces.ReadOnlyBeaconBlockBody) error {
if body.Version() < version.Deneb {
return nil
}
kzgs, err := body.BlobKzgCommitments()
if err != nil {
return err
}
if len(kzgs) > field_params.MaxBlobsPerBlock {
maxBlobsPerBlock := params.BeaconConfig().MaxBlobCount(slot)
if uint64(len(kzgs)) > maxBlobsPerBlock {
return fmt.Errorf("too many kzg commitments in block: %d", len(kzgs))
}
return nil
Expand Down
7 changes: 4 additions & 3 deletions beacon-chain/core/blocks/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
consensusblocks "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
Expand Down Expand Up @@ -923,10 +924,10 @@ func TestVerifyBlobCommitmentCount(t *testing.T) {
b := &ethpb.BeaconBlockDeneb{Body: &ethpb.BeaconBlockBodyDeneb{}}
rb, err := consensusblocks.NewBeaconBlock(b)
require.NoError(t, err)
require.NoError(t, blocks.VerifyBlobCommitmentCount(rb.Body()))
require.NoError(t, blocks.VerifyBlobCommitmentCount(rb.Slot(), rb.Body()))

b = &ethpb.BeaconBlockDeneb{Body: &ethpb.BeaconBlockBodyDeneb{BlobKzgCommitments: make([][]byte, fieldparams.MaxBlobsPerBlock+1)}}
b = &ethpb.BeaconBlockDeneb{Body: &ethpb.BeaconBlockBodyDeneb{BlobKzgCommitments: make([][]byte, params.BeaconConfig().MaxBlobCount(rb.Slot())+1)}}
rb, err = consensusblocks.NewBeaconBlock(b)
require.NoError(t, err)
require.ErrorContains(t, fmt.Sprintf("too many kzg commitments in block: %d", fieldparams.MaxBlobsPerBlock+1), blocks.VerifyBlobCommitmentCount(rb.Body()))
require.ErrorContains(t, fmt.Sprintf("too many kzg commitments in block: %d", params.BeaconConfig().MaxBlobCount(rb.Slot())+1), blocks.VerifyBlobCommitmentCount(rb.Slot(), rb.Body()))
}
2 changes: 0 additions & 2 deletions beacon-chain/das/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_library(
deps = [
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",
Expand All @@ -35,7 +34,6 @@ go_test(
deps = [
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",
Expand Down
Loading

0 comments on commit 68d9ce7

Please sign in to comment.