Skip to content

Commit

Permalink
op-batcher,op-node,batch_decoder: add logging of compression algo (#1…
Browse files Browse the repository at this point in the history
…0589)

* op-batcher: add logging of compression algo

* add node logging of algo, add brotli default algo

* fix typos

* only log compression algo if present

* add type conversion abstraction to Batch interface

Since we're dealing now with wrapped types around Batch implementations,
this let's us transparently unwrap the underlying batch types. It makes
sense to add this to the interface, because getting the underlying types
from the interface is done in several places, so it's part of the
interface's contract.

* adapt BatchReader test
  • Loading branch information
sebastianst authored and Tarun Khasnavis committed May 28, 2024
1 parent 060b6f5 commit b70c363
Show file tree
Hide file tree
Showing 18 changed files with 185 additions and 119 deletions.
2 changes: 1 addition & 1 deletion op-batcher/batcher/channel_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func TestChannelBuilder_OutputFrames(t *testing.T) {
}

func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) {
for _, algo := range derive.CompressionAlgoTypes {
for _, algo := range derive.CompressionAlgos {
t.Run("ChannelBuilder_OutputFrames_SpanBatch_"+algo.String(), func(t *testing.T) {
if algo.IsBrotli() {
ChannelBuilder_OutputFrames_SpanBatch(t, algo) // to fill faster for brotli
Expand Down
3 changes: 2 additions & 1 deletion op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
}

pc, err := newChannel(s.log, s.metr, s.cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)

if err != nil {
return fmt.Errorf("creating new channel: %w", err)
}
Expand All @@ -218,6 +217,8 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
"l1OriginLastClosedChannel", s.l1OriginLastClosedChannel,
"blocks_pending", len(s.blocks),
"batch_type", s.cfg.BatchType,
"compression_algo", s.cfg.CompressorConfig.CompressionAlgo,
"target_num_frames", s.cfg.TargetNumFrames,
"max_frame_size", s.cfg.MaxFrameSize,
)
s.metr.RecordChannelOpened(pc.ID(), len(s.blocks))
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *CLIConfig) Check() error {
if c.Compressor == compressor.RatioKind && (c.ApproxComprRatio <= 0 || c.ApproxComprRatio > 1) {
return fmt.Errorf("invalid ApproxComprRatio %v for ratio compressor", c.ApproxComprRatio)
}
if !derive.ValidCompressionAlgoType(c.CompressionAlgo) {
if !derive.ValidCompressionAlgo(c.CompressionAlgo) {
return fmt.Errorf("invalid compression algo %v", c.CompressionAlgo)
}
if c.BatchType > 1 {
Expand Down
3 changes: 2 additions & 1 deletion op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,10 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
"max_frame_size", cc.MaxFrameSize,
"target_num_frames", cc.TargetNumFrames,
"compressor", cc.CompressorConfig.Kind,
"compression_algo", cc.CompressorConfig.CompressionAlgo,
"batch_type", cc.BatchType,
"max_channel_duration", cc.MaxChannelDuration,
"channel_timeout", cc.ChannelTimeout,
"batch_type", cc.BatchType,
"sub_safety_margin", cc.SubSafetyMargin)
bs.ChannelConfig = cc
return nil
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var (
}
CompressionAlgoFlag = &cli.GenericFlag{
Name: "compression-algo",
Usage: "The compression algorithm to use. Valid options: " + openum.EnumString(derive.CompressionAlgoTypes),
Usage: "The compression algorithm to use. Valid options: " + openum.EnumString(derive.CompressionAlgos),
EnvVars: prefixEnvVars("COMPRESSION_ALGO"),
Value: func() *derive.CompressionAlgo {
out := derive.Zlib
Expand Down
8 changes: 4 additions & 4 deletions op-node/benchmarks/batchbuilding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
derive.SpanBatchType,
// uncomment to include singular batches in the benchmark
// singular batches are not included by default because they are not the target of the benchmark
//derive.SingularBatchType,
// derive.SingularBatchType,
}
)

Expand Down Expand Up @@ -129,7 +129,7 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix())
}
for _, algo := range derive.CompressionAlgoTypes {
for _, algo := range derive.CompressionAlgos {
b.Run(tc.String()+"_"+algo.String(), func(b *testing.B) {
// reset the compressor used in the test case
for bn := 0; bn < b.N; bn++ {
Expand Down Expand Up @@ -168,7 +168,7 @@ func BenchmarkIncremental(b *testing.B) {
{derive.SpanBatchType, 5, 1, "RealBlindCompressor"},
//{derive.SingularBatchType, 100, 1, "RealShadowCompressor"},
}
for _, algo := range derive.CompressionAlgoTypes {
for _, algo := range derive.CompressionAlgos {
for _, tc := range tcs {
cout, err := channelOutByType(tc.BatchType, tc.compKey, algo)
if err != nil {
Expand Down Expand Up @@ -231,7 +231,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
}
}

for _, algo := range derive.CompressionAlgoTypes {
for _, algo := range derive.CompressionAlgos {
for _, tc := range tests {
chainID := big.NewInt(333)
rng := rand.New(rand.NewSource(0x543331))
Expand Down
26 changes: 16 additions & 10 deletions op-node/cmd/batch_decoder/reassemble/reassemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
)

type ChannelWithMetadata struct {
ID derive.ChannelID `json:"id"`
IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"`
Batches []derive.Batch `json:"batches"`
BatchTypes []int `json:"batch_types"`
ID derive.ChannelID `json:"id"`
IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"`
Batches []derive.Batch `json:"batches"`
BatchTypes []int `json:"batch_types"`
ComprAlgos []derive.CompressionAlgo `json:"compr_alogs"`
}

type FrameWithMetadata struct {
Expand Down Expand Up @@ -54,7 +55,6 @@ func LoadFrames(directory string, inbox common.Address) []FrameWithMetadata {
} else {
return txns[i].BlockNumber < txns[j].BlockNumber
}

})
return transactionsToFrames(txns)
}
Expand Down Expand Up @@ -107,8 +107,12 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
}
}

var batches []derive.Batch
var batchTypes []int
var (
batches []derive.Batch
batchTypes []int
comprAlgos []derive.CompressionAlgo
)

invalidBatches := false
if ch.IsReady() {
br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time), rollupCfg.IsFjord(ch.HighestBlock().Time))
Expand All @@ -118,6 +122,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
fmt.Printf("Error reading batchData for channel %v. Err: %v\n", id.String(), err)
invalidBatches = true
} else {
comprAlgos = append(comprAlgos, batchData.ComprAlgo)
batchType := batchData.GetBatchType()
batchTypes = append(batchTypes, int(batchType))
switch batchType {
Expand Down Expand Up @@ -157,6 +162,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
InvalidBatches: invalidBatches,
Batches: batches,
BatchTypes: batchTypes,
ComprAlgos: comprAlgos,
}
}

Expand Down
18 changes: 17 additions & 1 deletion op-node/rollup/derive/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,30 @@ type Batch interface {
GetBatchType() int
GetTimestamp() uint64
LogContext(log.Logger) log.Logger
AsSingularBatch() (*SingularBatch, bool)
AsSpanBatch() (*SpanBatch, bool)
}

type batchWithMetadata struct {
Batch
comprAlgo CompressionAlgo
}

func (b batchWithMetadata) LogContext(l log.Logger) log.Logger {
lgr := b.Batch.LogContext(l)
if b.comprAlgo == "" {
return lgr
}
return lgr.With("compression_algo", b.comprAlgo)
}

// BatchData is used to represent the typed encoding & decoding.
// and wraps around a single interface InnerBatchData.
// Further fields such as cache can be added in the future, without embedding each type of InnerBatchData.
// Similar design with op-geth's types.Transaction struct.
type BatchData struct {
inner InnerBatchData
inner InnerBatchData
ComprAlgo CompressionAlgo
}

// InnerBatchData is the underlying data of a BatchData.
Expand Down
8 changes: 4 additions & 4 deletions op-node/rollup/derive/batch_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,15 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si
}

var nextBatch *SingularBatch
switch batch.GetBatchType() {
switch typ := batch.GetBatchType(); typ {
case SingularBatchType:
singularBatch, ok := batch.(*SingularBatch)
singularBatch, ok := batch.AsSingularBatch()
if !ok {
return nil, false, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
}
nextBatch = singularBatch
case SpanBatchType:
spanBatch, ok := batch.(*SpanBatch)
spanBatch, ok := batch.AsSpanBatch()
if !ok {
return nil, false, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
}
Expand All @@ -198,7 +198,7 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si
// span-batches are non-empty, so the below pop is safe.
nextBatch = bq.popNextBatch(parent)
default:
return nil, false, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", batch.GetBatchType()))
return nil, false, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", typ))
}

// If the nextBatch is derived from the span batch, len(bq.nextSpan) == 0 means it's the last batch of the span.
Expand Down
10 changes: 5 additions & 5 deletions op-node/rollup/derive/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
)

type BatchWithL1InclusionBlock struct {
Batch
L1InclusionBlock eth.L1BlockRef
Batch Batch
}

type BatchValidity uint8
Expand All @@ -34,23 +34,23 @@ const (
func CheckBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1BlockRef,
l2SafeHead eth.L2BlockRef, batch *BatchWithL1InclusionBlock, l2Fetcher SafeBlockFetcher,
) BatchValidity {
switch batch.Batch.GetBatchType() {
switch typ := batch.GetBatchType(); typ {
case SingularBatchType:
singularBatch, ok := batch.Batch.(*SingularBatch)
singularBatch, ok := batch.AsSingularBatch()
if !ok {
log.Error("failed type assertion to SingularBatch")
return BatchDrop
}
return checkSingularBatch(cfg, log, l1Blocks, l2SafeHead, singularBatch, batch.L1InclusionBlock)
case SpanBatchType:
spanBatch, ok := batch.Batch.(*SpanBatch)
spanBatch, ok := batch.AsSpanBatch()
if !ok {
log.Error("failed type assertion to SpanBatch")
return BatchDrop
}
return checkSpanBatch(ctx, cfg, log, l1Blocks, l2SafeHead, spanBatch, batch.L1InclusionBlock, l2Fetcher)
default:
log.Warn("Unrecognized batch type: %d", batch.Batch.GetBatchType())
log.Warn("Unrecognized batch type: %d", typ)
return BatchDrop
}
}
Expand Down
5 changes: 4 additions & 1 deletion op-node/rollup/derive/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
}

var zr io.Reader
var comprAlgo CompressionAlgo
// For zlib, the last 4 bits must be either 8 or 15 (both are reserved value)
if compressionType[0]&0x0F == ZlibCM8 || compressionType[0]&0x0F == ZlibCM15 {
var err error
Expand All @@ -175,6 +176,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
return nil, err
}
// If the bits equal to 1, then it is a brotli reader
comprAlgo = Zlib
} else if compressionType[0] == ChannelVersionBrotli {
// If before Fjord, we cannot accept brotli compressed batch
if !isFjord {
Expand All @@ -186,6 +188,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
return nil, err
}
zr = brotli.NewReader(bufReader)
comprAlgo = Brotli
} else {
return nil, fmt.Errorf("cannot distinguish the compression algo used given type byte %v", compressionType[0])
}
Expand All @@ -194,7 +197,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
rlpReader := rlp.NewStream(zr, maxRLPBytesPerChannel)
// Read each batch iteratively
return func() (*BatchData, error) {
var batchData BatchData
batchData := BatchData{ComprAlgo: comprAlgo}
if err := rlpReader.Decode(&batchData); err != nil {
return nil, err
}
Expand Down
14 changes: 8 additions & 6 deletions op-node/rollup/derive/channel_in_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,29 +87,31 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) {
cr.NextChannel()
return nil, NotEnoughData
}

batch := batchWithMetadata{comprAlgo: batchData.ComprAlgo}
switch batchData.GetBatchType() {
case SingularBatchType:
singularBatch, err := GetSingularBatch(batchData)
batch.Batch, err = GetSingularBatch(batchData)
if err != nil {
return nil, err
}
singularBatch.LogContext(cr.log).Debug("decoded singular batch from channel", "stage_origin", cr.Origin())
batch.LogContext(cr.log).Debug("decoded singular batch from channel", "stage_origin", cr.Origin())
cr.metrics.RecordDerivedBatches("singular")
return singularBatch, nil
return batch, nil
case SpanBatchType:
if origin := cr.Origin(); !cr.cfg.IsDelta(origin.Time) {
// Check hard fork activation with the L1 inclusion block time instead of the L1 origin block time.
// Therefore, even if the batch passed this rule, it can be dropped in the batch queue.
// This is just for early dropping invalid batches as soon as possible.
return nil, NewTemporaryError(fmt.Errorf("cannot accept span batch in L1 block %s at time %d", origin, origin.Time))
}
spanBatch, err := DeriveSpanBatch(batchData, cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID)
batch.Batch, err = DeriveSpanBatch(batchData, cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID)
if err != nil {
return nil, err
}
spanBatch.LogContext(cr.log).Debug("decoded span batch from channel", "stage_origin", cr.Origin())
batch.LogContext(cr.log).Debug("decoded span batch from channel", "stage_origin", cr.Origin())
cr.metrics.RecordDerivedBatches("span")
return spanBatch, nil
return batch, nil
default:
// error is bubbled up to user, but pipeline can skip the batch and continue after.
return nil, NewTemporaryError(fmt.Errorf("unrecognized batch type: %d", batchData.GetBatchType()))
Expand Down
6 changes: 2 additions & 4 deletions op-node/rollup/derive/channel_out_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
)

var (
rollupCfg rollup.Config
)
var rollupCfg rollup.Config

// basic implementation of the Compressor interface that does no compression
type nonCompressor struct {
Expand Down Expand Up @@ -248,7 +246,7 @@ func TestSpanChannelOut(t *testing.T) {
}
for _, test := range tests {
test := test
for _, algo := range CompressionAlgoTypes {
for _, algo := range CompressionAlgos {
t.Run(test.name+"_"+algo.String(), func(t *testing.T) {
test.f(t, algo)
})
Expand Down
Loading

0 comments on commit b70c363

Please sign in to comment.