Skip to content

Commit

Permalink
test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Roberto Bayardo committed May 8, 2024
1 parent 42ade28 commit 7d605c4
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 13 deletions.
198 changes: 198 additions & 0 deletions op-node/rollup/derive/#channel.go#
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package derive

import (
"bufio"
"bytes"
"compress/zlib"
"fmt"
"io"

"github.com/andybalholm/brotli"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/rlp"
)

const (
ZlibCM8 = 8
ZlibCM15 = 15
)

// A Channel is a set of batches that are split into at least one, but possibly multiple frames.
// Frames are allowed to be ingested out of order.
// Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the
// channel may mark itself as ready for reading once all intervening frames have been added
type Channel struct {
// id of the channel
id ChannelID
openBlock eth.L1BlockRef

// estimated memory size, used to drop the channel if we have too much data
size uint64

// true if we have buffered the last frame
closed bool

// highestFrameNumber is the highest frame number yet seen.
highestFrameNumber uint16

// endFrameNumber is the frame number of the frame where `isLast` is true
// No other frame number must be larger than this.
endFrameNumber uint16

// Store a map of frame number -> frame for constant time ordering
inputs map[uint64]Frame

highestL1InclusionBlock eth.L1BlockRef
}

func NewChannel(id ChannelID, openBlock eth.L1BlockRef) *Channel {
return &Channel{
id: id,
inputs: make(map[uint64]Frame),
openBlock: openBlock,
}
}

// AddFrame adds a frame to the channel.
// If the frame is not valid for the channel it returns an error.
// Otherwise the frame is buffered.
func (ch *Channel) AddFrame(frame Frame, l1InclusionBlock eth.L1BlockRef) error {
if frame.ID != ch.id {
return fmt.Errorf("frame id does not match channel id. Expected %v, got %v", ch.id, frame.ID)
}
// These checks are specified and cannot be changed without a hard fork.
if frame.IsLast && ch.closed {
return fmt.Errorf("cannot add ending frame to a closed channel. id %v", ch.id)
}
if _, ok := ch.inputs[uint64(frame.FrameNumber)]; ok {
return DuplicateErr
}
if ch.closed && frame.FrameNumber >= ch.endFrameNumber {
return fmt.Errorf("frame number (%d) is greater than or equal to end frame number (%d) of a closed channel", frame.FrameNumber, ch.endFrameNumber)
}

// Guaranteed to succeed. Now update internal state
if frame.IsLast {
ch.endFrameNumber = frame.FrameNumber
ch.closed = true
}
// Prune frames with a number higher than the closing frame number when we receive a closing frame
if frame.IsLast && ch.endFrameNumber < ch.highestFrameNumber {
// Do a linear scan over saved inputs instead of ranging over ID numbers
for id, prunedFrame := range ch.inputs {
if id >= uint64(ch.endFrameNumber) {
delete(ch.inputs, id)
}
ch.size -= frameSize(prunedFrame)
}
ch.highestFrameNumber = ch.endFrameNumber
}
// Update highest seen frame number after pruning
if frame.FrameNumber > ch.highestFrameNumber {
ch.highestFrameNumber = frame.FrameNumber
}

if ch.highestL1InclusionBlock.Number < l1InclusionBlock.Number {
ch.highestL1InclusionBlock = l1InclusionBlock
}
ch.inputs[uint64(frame.FrameNumber)] = frame
ch.size += frameSize(frame)

return nil
}

// OpenBlockNumber returns the block number of L1 block that contained
// the first frame for this channel.
func (ch *Channel) OpenBlockNumber() uint64 {
return ch.openBlock.Number
}

// Size returns the current size of the channel including frame overhead.
// Reading from the channel does not reduce the size as reading is done
// on uncompressed data while this size is over compressed data.
func (ch *Channel) Size() uint64 {
return ch.size
}

// IsReady returns true iff the channel is ready to be read.
func (ch *Channel) IsReady() bool {
// Must see the last frame before the channel is ready to be read
if !ch.closed {
return false
}
// Must have the possibility of contiguous frames
if len(ch.inputs) != int(ch.endFrameNumber)+1 {
return false
}
// Check for contiguous frames
for i := uint64(0); i <= uint64(ch.endFrameNumber); i++ {
_, ok := ch.inputs[i]
if !ok {
return false
}
}
return true
}

// Reader returns an io.Reader over the channel data.
// This panics if it is called while `IsReady` is not true.
// This function is able to be called multiple times.
func (ch *Channel) Reader() io.Reader {
var readers []io.Reader
for i := uint64(0); i <= uint64(ch.endFrameNumber); i++ {
frame, ok := ch.inputs[i]
if !ok {
panic("dev error in channel.Reader. Must be called after the channel is ready.")
}
readers = append(readers, bytes.NewReader(frame.Data))
}
return io.MultiReader(readers...)
}

// BatchReader provides a function that iteratively consumes batches from the reader.
// The L1Inclusion block is also provided at creation time.
// Warning: the batch reader can read every batch-type.
// The caller of the batch-reader should filter the results.
func BatchReader(r io.Reader) (func() (*BatchData, error), error) {
// use buffered reader so can peek the first byte
bufReader := bufio.NewReader(r)
compressionType, err := bufReader.Peek(1)
if err != nil {
return nil, err
}

var reader func(io.Reader) (io.Reader, error)
// For zlib, the last 4 bits must be either 8 or 15 (both are reserved value)
if compressionType[0]&0x0F == ZlibCM8 || compressionType[0]&0x0F == ZlibCM15 {
reader = func(r io.Reader) (io.Reader, error) {
return zlib.NewReader(r)
}
// If the bits equal to 1, then it is a brotli reader
} else if compressionType[0] == ChannelVersionBrotli {
// discard the first byte
_, err := bufReader.Discard(1)
if err != nil {
return nil, err
}
reader = func(r io.Reader) (io.Reader, error) {
return brotli.NewReader(r), nil
}
} else {
return nil, fmt.Errorf("cannot distinguish the compression algo used given type byte %v", compressionType[0])
}

// Setup decompressor stage + RLP reader
zr, err := reader(bufReader)
if err != nil {
return nil, err
}
rlpReader := rlp.NewStream(zr, MaxRLPBytesPerChannel)
// Read each batch iteratively
return func() (*BatchData, error) {
var batchData BatchData
if err = rlpReader.Decode(&batchData); err != nil {
return nil, err
}
return &batchData, nil
}, nil
}
18 changes: 7 additions & 11 deletions op-node/rollup/derive/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,13 @@ func BatchReader(r io.Reader) (func() (*BatchData, error), error) {
return nil, err
}

var reader func(io.Reader) (io.Reader, error)
var zr io.Reader
// For zlib, the last 4 bits must be either 8 or 15 (both are reserved value)
if compressionType[0]&0x0F == ZlibCM8 || compressionType[0]&0x0F == ZlibCM15 {
reader = func(r io.Reader) (io.Reader, error) {
return zlib.NewReader(r)
var err error
zr, err = zlib.NewReader(bufReader)
if err != nil {
return nil, err
}
// If the bits equal to 1, then it is a brotli reader
} else if compressionType[0] == ChannelVersionBrotli {
Expand All @@ -174,23 +176,17 @@ func BatchReader(r io.Reader) (func() (*BatchData, error), error) {
if err != nil {
return nil, err
}
reader = func(r io.Reader) (io.Reader, error) {
return brotli.NewReader(r), nil
}
zr = brotli.NewReader(bufReader)
} else {
return nil, fmt.Errorf("cannot distinguish the compression algo used given type byte %v", compressionType[0])
}

// Setup decompressor stage + RLP reader
zr, err := reader(bufReader)
if err != nil {
return nil, err
}
rlpReader := rlp.NewStream(zr, MaxRLPBytesPerChannel)
// Read each batch iteratively
return func() (*BatchData, error) {
var batchData BatchData
if err = rlpReader.Decode(&batchData); err != nil {
if err := rlpReader.Decode(&batchData); err != nil {
return nil, err
}
return &batchData, nil
Expand Down
18 changes: 16 additions & 2 deletions op-node/rollup/derive/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package derive
import (
"bytes"
"compress/zlib"
"fmt"
"math/big"
"math/rand"
"testing"

"github.com/DataDog/zstd"
"github.com/andybalholm/brotli"
"github.com/ethereum-optimism/optimism/op-service/eth"

"github.com/ethereum/go-ethereum/rlp"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -114,8 +115,18 @@ func TestBatchReader(t *testing.T) {
batchDataInput := NewBatchData(singularBatch)

encodedBatch := &bytes.Buffer{}
buf := &bytes.Buffer{}
// Get the encoded data of the batch data
batchDataInput.encodeTyped(encodedBatch)
batchDataInput.encodeTyped(buf)
err := rlp.Encode(encodedBatch, buf.Bytes())
require.NoError(t, err)

zr := bytes.NewReader(encodedBatch.Bytes())
require.NoError(t, err)
rlpReader := rlp.NewStream(zr, MaxRLPBytesPerChannel)
var batchData BatchData
err = rlpReader.Decode(&batchData)
fmt.Println(err, batchData)

var testCases = []struct {
name string
Expand All @@ -125,6 +136,7 @@ func TestBatchReader(t *testing.T) {
algo: func(buf *bytes.Buffer) {
writer := zlib.NewWriter(buf)
writer.Write(encodedBatch.Bytes())
writer.Close()
},
},
{
Expand All @@ -133,12 +145,14 @@ func TestBatchReader(t *testing.T) {
buf.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(buf, 10)
writer.Write(encodedBatch.Bytes())
writer.Close()
},
}, {
name: "zstd",
algo: func(buf *bytes.Buffer) {
writer := zstd.NewWriter(buf)
writer.Write(encodedBatch.Bytes())
writer.Close()
},
}}

Expand Down

0 comments on commit 7d605c4

Please sign in to comment.