Skip to content

Commit

Permalink
add node logging of algo, add brotli default algo
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianst committed May 20, 2024
1 parent a956ccf commit 22c3c9c
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 43 deletions.
2 changes: 1 addition & 1 deletion op-batcher/batcher/channel_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func TestChannelBuilder_OutputFrames(t *testing.T) {
}

func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) {
for _, algo := range derive.CompressionAlgoTypes {
for _, algo := range derive.CompressionAlgos {
t.Run("ChannelBuilder_OutputFrames_SpanBatch_"+algo.String(), func(t *testing.T) {
if algo.IsBrotli() {
ChannelBuilder_OutputFrames_SpanBatch(t, algo) // to fill faster for brotli
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *CLIConfig) Check() error {
if c.Compressor == compressor.RatioKind && (c.ApproxComprRatio <= 0 || c.ApproxComprRatio > 1) {
return fmt.Errorf("invalid ApproxComprRatio %v for ratio compressor", c.ApproxComprRatio)
}
if !derive.ValidCompressionAlgoType(c.CompressionAlgo) {
if !derive.ValidCompressionAlgo(c.CompressionAlgo) {
return fmt.Errorf("invalid compression algo %v", c.CompressionAlgo)
}
if c.BatchType > 1 {
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var (
}
CompressionAlgoFlag = &cli.GenericFlag{
Name: "compression-algo",
Usage: "The compression algorithm to use. Valid options: " + openum.EnumString(derive.CompressionAlgoTypes),
Usage: "The compression algorithm to use. Valid options: " + openum.EnumString(derive.CompressionAlgos),
EnvVars: prefixEnvVars("COMPRESSION_ALGO"),
Value: func() *derive.CompressionAlgo {
out := derive.Zlib
Expand Down
8 changes: 4 additions & 4 deletions op-node/benchmarks/batchbuilding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
derive.SpanBatchType,
// uncomment to include singular batches in the benchmark
// singular batches are not included by default because they are not the target of the benchmark
//derive.SingularBatchType,
// derive.SingularBatchType,
}
)

Expand Down Expand Up @@ -129,7 +129,7 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix())
}
for _, algo := range derive.CompressionAlgoTypes {
for _, algo := range derive.CompressionAlgos {
b.Run(tc.String()+"_"+algo.String(), func(b *testing.B) {
// reset the compressor used in the test case
for bn := 0; bn < b.N; bn++ {
Expand Down Expand Up @@ -168,7 +168,7 @@ func BenchmarkIncremental(b *testing.B) {
{derive.SpanBatchType, 5, 1, "RealBlindCompressor"},
//{derive.SingularBatchType, 100, 1, "RealShadowCompressor"},
}
for _, algo := range derive.CompressionAlgoTypes {
for _, algo := range derive.CompressionAlgos {
for _, tc := range tcs {
cout, err := channelOutByType(tc.BatchType, tc.compKey, algo)
if err != nil {
Expand Down Expand Up @@ -231,7 +231,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
}
}

for _, algo := range derive.CompressionAlgoTypes {
for _, algo := range derive.CompressionAlgos {
for _, tc := range tests {
chainID := big.NewInt(333)
rng := rand.New(rand.NewSource(0x543331))
Expand Down
26 changes: 16 additions & 10 deletions op-node/cmd/batch_decoder/reassemble/reassemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
)

type ChannelWithMetadata struct {
ID derive.ChannelID `json:"id"`
IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"`
Batches []derive.Batch `json:"batches"`
BatchTypes []int `json:"batch_types"`
ID derive.ChannelID `json:"id"`
IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"`
Batches []derive.Batch `json:"batches"`
BatchTypes []int `json:"batch_types"`
ComprAlgos []derive.CompressionAlgo `json:"compr_alogs"`
}

type FrameWithMetadata struct {
Expand Down Expand Up @@ -54,7 +55,6 @@ func LoadFrames(directory string, inbox common.Address) []FrameWithMetadata {
} else {
return txns[i].BlockNumber < txns[j].BlockNumber
}

})
return transactionsToFrames(txns)
}
Expand Down Expand Up @@ -107,8 +107,12 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
}
}

var batches []derive.Batch
var batchTypes []int
var (
batches []derive.Batch
batchTypes []int
comprAlgos []derive.CompressionAlgo
)

invalidBatches := false
if ch.IsReady() {
br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time), rollupCfg.IsFjord(ch.HighestBlock().Time))
Expand All @@ -118,6 +122,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
fmt.Printf("Error reading batchData for channel %v. Err: %v\n", id.String(), err)
invalidBatches = true
} else {
comprAlgos = append(comprAlgos, batchData.ComprAlgo)
batchType := batchData.GetBatchType()
batchTypes = append(batchTypes, int(batchType))
switch batchType {
Expand Down Expand Up @@ -157,6 +162,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
InvalidBatches: invalidBatches,
Batches: batches,
BatchTypes: batchTypes,
ComprAlgos: comprAlgos,
}
}

Expand Down
12 changes: 11 additions & 1 deletion op-node/rollup/derive/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,22 @@ type Batch interface {
LogContext(log.Logger) log.Logger
}

type batchWithMetadata struct {
Batch
comprAlgo CompressionAlgo
}

func (b batchWithMetadata) LogContext(l log.Logger) log.Logger {
return b.Batch.LogContext(l).With("compression_algo", b.comprAlgo)
}

// BatchData is used to represent the typed encoding & decoding.
// and wraps around a single interface InnerBatchData.
// Further fields such as cache can be added in the future, without embedding each type of InnerBatchData.
// Similar design with op-geth's types.Transaction struct.
type BatchData struct {
inner InnerBatchData
inner InnerBatchData
ComprAlgo CompressionAlgo
}

// InnerBatchData is the underlying data of a BatchData.
Expand Down
5 changes: 4 additions & 1 deletion op-node/rollup/derive/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
}

var zr io.Reader
var comprAlgo CompressionAlgo
// For zlib, the last 4 bits must be either 8 or 15 (both are reserved value)
if compressionType[0]&0x0F == ZlibCM8 || compressionType[0]&0x0F == ZlibCM15 {
var err error
Expand All @@ -175,6 +176,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
return nil, err
}
// If the bits equal to 1, then it is a brotli reader
comprAlgo = Zlib
} else if compressionType[0] == ChannelVersionBrotli {
// If before Fjord, we cannot accept brotli compressed batch
if !isFjord {
Expand All @@ -186,6 +188,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
return nil, err
}
zr = brotli.NewReader(bufReader)
comprAlgo = Brotli
} else {
return nil, fmt.Errorf("cannot distinguish the compression algo used given type byte %v", compressionType[0])
}
Expand All @@ -194,7 +197,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
rlpReader := rlp.NewStream(zr, maxRLPBytesPerChannel)
// Read each batch iteratively
return func() (*BatchData, error) {
var batchData BatchData
batchData := BatchData{ComprAlgo: comprAlgo}
if err := rlpReader.Decode(&batchData); err != nil {
return nil, err
}
Expand Down
14 changes: 8 additions & 6 deletions op-node/rollup/derive/channel_in_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,29 +87,31 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) {
cr.NextChannel()
return nil, NotEnoughData
}

batch := batchWithMetadata{comprAlgo: batchData.ComprAlgo}
switch batchData.GetBatchType() {
case SingularBatchType:
singularBatch, err := GetSingularBatch(batchData)
batch.Batch, err = GetSingularBatch(batchData)
if err != nil {
return nil, err
}
singularBatch.LogContext(cr.log).Debug("decoded singular batch from channel", "stage_origin", cr.Origin())
batch.LogContext(cr.log).Debug("decoded singular batch from channel", "stage_origin", cr.Origin())
cr.metrics.RecordDerivedBatches("singular")
return singularBatch, nil
return batch, nil
case SpanBatchType:
if origin := cr.Origin(); !cr.cfg.IsDelta(origin.Time) {
// Check hard fork activation with the L1 inclusion block time instead of the L1 origin block time.
// Therefore, even if the batch passed this rule, it can be dropped in the batch queue.
// This is just for early dropping invalid batches as soon as possible.
return nil, NewTemporaryError(fmt.Errorf("cannot accept span batch in L1 block %s at time %d", origin, origin.Time))
}
spanBatch, err := DeriveSpanBatch(batchData, cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID)
batch.Batch, err = DeriveSpanBatch(batchData, cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID)
if err != nil {
return nil, err
}
spanBatch.LogContext(cr.log).Debug("decoded span batch from channel", "stage_origin", cr.Origin())
batch.LogContext(cr.log).Debug("decoded span batch from channel", "stage_origin", cr.Origin())
cr.metrics.RecordDerivedBatches("span")
return spanBatch, nil
return batch, nil
default:
// error is bubbled up to user, but pipeline can skip the batch and continue after.
return nil, NewTemporaryError(fmt.Errorf("unrecognized batch type: %d", batchData.GetBatchType()))
Expand Down
6 changes: 2 additions & 4 deletions op-node/rollup/derive/channel_out_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
)

var (
rollupCfg rollup.Config
)
var rollupCfg rollup.Config

// basic implementation of the Compressor interface that does no compression
type nonCompressor struct {
Expand Down Expand Up @@ -248,7 +246,7 @@ func TestSpanChannelOut(t *testing.T) {
}
for _, test := range tests {
test := test
for _, algo := range CompressionAlgoTypes {
for _, algo := range CompressionAlgos {
t.Run(test.name+"_"+algo.String(), func(t *testing.T) {
test.f(t, algo)
})
Expand Down
14 changes: 8 additions & 6 deletions op-node/rollup/derive/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,28 @@ type CompressionAlgo string
const (
// compression algo types
Zlib CompressionAlgo = "zlib"
Brotli CompressionAlgo = "brotli" // default level
Brotli9 CompressionAlgo = "brotli-9"
Brotli10 CompressionAlgo = "brotli-10"
Brotli11 CompressionAlgo = "brotli-11"
)

var CompressionAlgoTypes = []CompressionAlgo{
var CompressionAlgos = []CompressionAlgo{
Zlib,
Brotli,
Brotli9,
Brotli10,
Brotli11,
}

var brotliRegexp = regexp.MustCompile(`^brotli-(9|10|11)$`)
var brotliRegexp = regexp.MustCompile(`^brotli(|-(9|10|11))$`)

func (algo CompressionAlgo) String() string {
return string(algo)
}

func (algo *CompressionAlgo) Set(value string) error {
if !ValidCompressionAlgoType(CompressionAlgo(value)) {
if !ValidCompressionAlgo(CompressionAlgo(value)) {
return fmt.Errorf("unknown compression algo type: %q", value)
}
*algo = CompressionAlgo(value)
Expand All @@ -49,7 +51,7 @@ func GetBrotliLevel(algo CompressionAlgo) int {
switch algo {
case Brotli9:
return 9
case Brotli10:
case Brotli10, Brotli: // make level 10 the default
return 10
case Brotli11:
return 11
Expand All @@ -58,8 +60,8 @@ func GetBrotliLevel(algo CompressionAlgo) int {
}
}

func ValidCompressionAlgoType(value CompressionAlgo) bool {
for _, k := range CompressionAlgoTypes {
func ValidCompressionAlgo(value CompressionAlgo) bool {
for _, k := range CompressionAlgos {
if k == value {
return true
}
Expand Down
31 changes: 23 additions & 8 deletions op-node/rollup/derive/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,49 +10,64 @@ func TestCompressionAlgo(t *testing.T) {
testCases := []struct {
name string
algo CompressionAlgo
isBrotli bool
isValidCompressionAlgoType bool
isBrotli bool
brotliLevel int
}{
{
name: "zlib",
algo: Zlib,
isValidCompressionAlgoType: true,
isBrotli: false,
},
{
name: "brotli",
algo: Brotli,
isValidCompressionAlgoType: true,
isBrotli: true,
brotliLevel: 10,
},
{
name: "brotli-9",
algo: Brotli9,
isBrotli: true,
isValidCompressionAlgoType: true,
isBrotli: true,
brotliLevel: 9,
},
{
name: "brotli-10",
algo: Brotli10,
isBrotli: true,
isValidCompressionAlgoType: true,
isBrotli: true,
brotliLevel: 10,
},
{
name: "brotli-11",
algo: Brotli11,
isBrotli: true,
isValidCompressionAlgoType: true,
isBrotli: true,
brotliLevel: 11,
},
{
name: "invalid",
algo: CompressionAlgo("invalid"),
isBrotli: false,
isValidCompressionAlgoType: false,
}}
isBrotli: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.isBrotli, tc.algo.IsBrotli())
if tc.isBrotli {
require.NotPanics(t, func() { GetBrotliLevel((tc.algo)) })
require.NotPanics(t, func() {
blvl := GetBrotliLevel((tc.algo))
require.Equal(t, tc.brotliLevel, blvl)
})
} else {
require.Panics(t, func() { GetBrotliLevel(tc.algo) })
}
require.Equal(t, tc.isValidCompressionAlgoType, ValidCompressionAlgoType(tc.algo))
require.Equal(t, tc.isValidCompressionAlgoType, ValidCompressionAlgo(tc.algo))
})
}
}

0 comments on commit 22c3c9c

Please sign in to comment.