Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-batcher,op-node,batch_decoder: add logging of compression algo #10589

Merged
merged 6 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion op-batcher/batcher/channel_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func TestChannelBuilder_OutputFrames(t *testing.T) {
}

func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) {
for _, algo := range derive.CompressionAlgoTypes {
for _, algo := range derive.CompressionAlgos {
t.Run("ChannelBuilder_OutputFrames_SpanBatch_"+algo.String(), func(t *testing.T) {
if algo.IsBrotli() {
ChannelBuilder_OutputFrames_SpanBatch(t, algo) // to fill faster for brotli
Expand Down
3 changes: 2 additions & 1 deletion op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
}

pc, err := newChannel(s.log, s.metr, s.cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)

if err != nil {
return fmt.Errorf("creating new channel: %w", err)
}
Expand All @@ -218,6 +217,8 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
"l1OriginLastClosedChannel", s.l1OriginLastClosedChannel,
"blocks_pending", len(s.blocks),
"batch_type", s.cfg.BatchType,
"compression_algo", s.cfg.CompressorConfig.CompressionAlgo,
"target_num_frames", s.cfg.TargetNumFrames,
"max_frame_size", s.cfg.MaxFrameSize,
)
s.metr.RecordChannelOpened(pc.ID(), len(s.blocks))
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *CLIConfig) Check() error {
if c.Compressor == compressor.RatioKind && (c.ApproxComprRatio <= 0 || c.ApproxComprRatio > 1) {
return fmt.Errorf("invalid ApproxComprRatio %v for ratio compressor", c.ApproxComprRatio)
}
if !derive.ValidCompressionAlgoType(c.CompressionAlgo) {
if !derive.ValidCompressionAlgo(c.CompressionAlgo) {
return fmt.Errorf("invalid compression algo %v", c.CompressionAlgo)
}
if c.BatchType > 1 {
Expand Down
3 changes: 2 additions & 1 deletion op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,10 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
"max_frame_size", cc.MaxFrameSize,
"target_num_frames", cc.TargetNumFrames,
"compressor", cc.CompressorConfig.Kind,
"compression_algo", cc.CompressorConfig.CompressionAlgo,
"batch_type", cc.BatchType,
"max_channel_duration", cc.MaxChannelDuration,
"channel_timeout", cc.ChannelTimeout,
"batch_type", cc.BatchType,
"sub_safety_margin", cc.SubSafetyMargin)
bs.ChannelConfig = cc
return nil
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var (
}
CompressionAlgoFlag = &cli.GenericFlag{
Name: "compression-algo",
Usage: "The compression algorithm to use. Valid options: " + openum.EnumString(derive.CompressionAlgoTypes),
Usage: "The compression algorithm to use. Valid options: " + openum.EnumString(derive.CompressionAlgos),
EnvVars: prefixEnvVars("COMPRESSION_ALGO"),
Value: func() *derive.CompressionAlgo {
out := derive.Zlib
Expand Down
8 changes: 4 additions & 4 deletions op-node/benchmarks/batchbuilding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
derive.SpanBatchType,
// uncomment to include singular batches in the benchmark
// singular batches are not included by default because they are not the target of the benchmark
//derive.SingularBatchType,
// derive.SingularBatchType,
}
)

Expand Down Expand Up @@ -129,7 +129,7 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix())
}
for _, algo := range derive.CompressionAlgoTypes {
for _, algo := range derive.CompressionAlgos {
b.Run(tc.String()+"_"+algo.String(), func(b *testing.B) {
// reset the compressor used in the test case
for bn := 0; bn < b.N; bn++ {
Expand Down Expand Up @@ -168,7 +168,7 @@ func BenchmarkIncremental(b *testing.B) {
{derive.SpanBatchType, 5, 1, "RealBlindCompressor"},
//{derive.SingularBatchType, 100, 1, "RealShadowCompressor"},
}
for _, algo := range derive.CompressionAlgoTypes {
for _, algo := range derive.CompressionAlgos {
for _, tc := range tcs {
cout, err := channelOutByType(tc.BatchType, tc.compKey, algo)
if err != nil {
Expand Down Expand Up @@ -231,7 +231,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
}
}

for _, algo := range derive.CompressionAlgoTypes {
for _, algo := range derive.CompressionAlgos {
for _, tc := range tests {
chainID := big.NewInt(333)
rng := rand.New(rand.NewSource(0x543331))
Expand Down
26 changes: 16 additions & 10 deletions op-node/cmd/batch_decoder/reassemble/reassemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
)

type ChannelWithMetadata struct {
ID derive.ChannelID `json:"id"`
IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"`
Batches []derive.Batch `json:"batches"`
BatchTypes []int `json:"batch_types"`
ID derive.ChannelID `json:"id"`
IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"`
Batches []derive.Batch `json:"batches"`
BatchTypes []int `json:"batch_types"`
ComprAlgos []derive.CompressionAlgo `json:"compr_alogs"`
}

type FrameWithMetadata struct {
Expand Down Expand Up @@ -54,7 +55,6 @@ func LoadFrames(directory string, inbox common.Address) []FrameWithMetadata {
} else {
return txns[i].BlockNumber < txns[j].BlockNumber
}

})
return transactionsToFrames(txns)
}
Expand Down Expand Up @@ -107,8 +107,12 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
}
}

var batches []derive.Batch
var batchTypes []int
var (
batches []derive.Batch
batchTypes []int
comprAlgos []derive.CompressionAlgo
)

invalidBatches := false
if ch.IsReady() {
br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time), rollupCfg.IsFjord(ch.HighestBlock().Time))
Expand All @@ -118,6 +122,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
fmt.Printf("Error reading batchData for channel %v. Err: %v\n", id.String(), err)
invalidBatches = true
} else {
comprAlgos = append(comprAlgos, batchData.ComprAlgo)
batchType := batchData.GetBatchType()
batchTypes = append(batchTypes, int(batchType))
switch batchType {
Expand Down Expand Up @@ -157,6 +162,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
InvalidBatches: invalidBatches,
Batches: batches,
BatchTypes: batchTypes,
ComprAlgos: comprAlgos,
}
}

Expand Down
18 changes: 17 additions & 1 deletion op-node/rollup/derive/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,30 @@ type Batch interface {
GetBatchType() int
GetTimestamp() uint64
LogContext(log.Logger) log.Logger
AsSingularBatch() (*SingularBatch, bool)
AsSpanBatch() (*SpanBatch, bool)
}

type batchWithMetadata struct {
Batch
comprAlgo CompressionAlgo
}

func (b batchWithMetadata) LogContext(l log.Logger) log.Logger {
lgr := b.Batch.LogContext(l)
if b.comprAlgo == "" {
return lgr
}
return lgr.With("compression_algo", b.comprAlgo)
}

// BatchData is used to represent the typed encoding & decoding.
// and wraps around a single interface InnerBatchData.
// Further fields such as cache can be added in the future, without embedding each type of InnerBatchData.
// Similar design with op-geth's types.Transaction struct.
type BatchData struct {
inner InnerBatchData
inner InnerBatchData
ComprAlgo CompressionAlgo
}

// InnerBatchData is the underlying data of a BatchData.
Expand Down
8 changes: 4 additions & 4 deletions op-node/rollup/derive/batch_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,15 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si
}

var nextBatch *SingularBatch
switch batch.GetBatchType() {
switch typ := batch.GetBatchType(); typ {
case SingularBatchType:
singularBatch, ok := batch.(*SingularBatch)
singularBatch, ok := batch.AsSingularBatch()
if !ok {
return nil, false, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
}
nextBatch = singularBatch
case SpanBatchType:
spanBatch, ok := batch.(*SpanBatch)
spanBatch, ok := batch.AsSpanBatch()
if !ok {
return nil, false, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
}
Expand All @@ -198,7 +198,7 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si
// span-batches are non-empty, so the below pop is safe.
nextBatch = bq.popNextBatch(parent)
default:
return nil, false, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", batch.GetBatchType()))
return nil, false, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", typ))
}

// If the nextBatch is derived from the span batch, len(bq.nextSpan) == 0 means it's the last batch of the span.
Expand Down
10 changes: 5 additions & 5 deletions op-node/rollup/derive/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
)

type BatchWithL1InclusionBlock struct {
Batch
L1InclusionBlock eth.L1BlockRef
Batch Batch
}

type BatchValidity uint8
Expand All @@ -34,23 +34,23 @@ const (
func CheckBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1BlockRef,
l2SafeHead eth.L2BlockRef, batch *BatchWithL1InclusionBlock, l2Fetcher SafeBlockFetcher,
) BatchValidity {
switch batch.Batch.GetBatchType() {
switch typ := batch.GetBatchType(); typ {
case SingularBatchType:
singularBatch, ok := batch.Batch.(*SingularBatch)
singularBatch, ok := batch.AsSingularBatch()
if !ok {
log.Error("failed type assertion to SingularBatch")
return BatchDrop
}
return checkSingularBatch(cfg, log, l1Blocks, l2SafeHead, singularBatch, batch.L1InclusionBlock)
case SpanBatchType:
spanBatch, ok := batch.Batch.(*SpanBatch)
spanBatch, ok := batch.AsSpanBatch()
if !ok {
log.Error("failed type assertion to SpanBatch")
return BatchDrop
}
return checkSpanBatch(ctx, cfg, log, l1Blocks, l2SafeHead, spanBatch, batch.L1InclusionBlock, l2Fetcher)
default:
log.Warn("Unrecognized batch type: %d", batch.Batch.GetBatchType())
log.Warn("Unrecognized batch type: %d", typ)
return BatchDrop
}
}
Expand Down
5 changes: 4 additions & 1 deletion op-node/rollup/derive/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
}

var zr io.Reader
var comprAlgo CompressionAlgo
// For zlib, the last 4 bits must be either 8 or 15 (both are reserved value)
if compressionType[0]&0x0F == ZlibCM8 || compressionType[0]&0x0F == ZlibCM15 {
var err error
Expand All @@ -175,6 +176,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
return nil, err
}
// If the bits equal to 1, then it is a brotli reader
comprAlgo = Zlib
} else if compressionType[0] == ChannelVersionBrotli {
// If before Fjord, we cannot accept brotli compressed batch
if !isFjord {
Expand All @@ -186,6 +188,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
return nil, err
}
zr = brotli.NewReader(bufReader)
comprAlgo = Brotli
} else {
return nil, fmt.Errorf("cannot distinguish the compression algo used given type byte %v", compressionType[0])
}
Expand All @@ -194,7 +197,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
rlpReader := rlp.NewStream(zr, maxRLPBytesPerChannel)
// Read each batch iteratively
return func() (*BatchData, error) {
var batchData BatchData
batchData := BatchData{ComprAlgo: comprAlgo}
if err := rlpReader.Decode(&batchData); err != nil {
return nil, err
}
Expand Down
14 changes: 8 additions & 6 deletions op-node/rollup/derive/channel_in_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,29 +87,31 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) {
cr.NextChannel()
return nil, NotEnoughData
}

batch := batchWithMetadata{comprAlgo: batchData.ComprAlgo}
switch batchData.GetBatchType() {
case SingularBatchType:
singularBatch, err := GetSingularBatch(batchData)
batch.Batch, err = GetSingularBatch(batchData)
if err != nil {
return nil, err
}
singularBatch.LogContext(cr.log).Debug("decoded singular batch from channel", "stage_origin", cr.Origin())
batch.LogContext(cr.log).Debug("decoded singular batch from channel", "stage_origin", cr.Origin())
cr.metrics.RecordDerivedBatches("singular")
return singularBatch, nil
return batch, nil
case SpanBatchType:
if origin := cr.Origin(); !cr.cfg.IsDelta(origin.Time) {
// Check hard fork activation with the L1 inclusion block time instead of the L1 origin block time.
// Therefore, even if the batch passed this rule, it can be dropped in the batch queue.
// This is just for early dropping invalid batches as soon as possible.
return nil, NewTemporaryError(fmt.Errorf("cannot accept span batch in L1 block %s at time %d", origin, origin.Time))
}
spanBatch, err := DeriveSpanBatch(batchData, cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID)
batch.Batch, err = DeriveSpanBatch(batchData, cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID)
if err != nil {
return nil, err
}
spanBatch.LogContext(cr.log).Debug("decoded span batch from channel", "stage_origin", cr.Origin())
batch.LogContext(cr.log).Debug("decoded span batch from channel", "stage_origin", cr.Origin())
cr.metrics.RecordDerivedBatches("span")
return spanBatch, nil
return batch, nil
default:
// error is bubbled up to user, but pipeline can skip the batch and continue after.
return nil, NewTemporaryError(fmt.Errorf("unrecognized batch type: %d", batchData.GetBatchType()))
Expand Down
6 changes: 2 additions & 4 deletions op-node/rollup/derive/channel_out_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
)

var (
rollupCfg rollup.Config
)
var rollupCfg rollup.Config

// basic implementation of the Compressor interface that does no compression
type nonCompressor struct {
Expand Down Expand Up @@ -248,7 +246,7 @@ func TestSpanChannelOut(t *testing.T) {
}
for _, test := range tests {
test := test
for _, algo := range CompressionAlgoTypes {
for _, algo := range CompressionAlgos {
t.Run(test.name+"_"+algo.String(), func(t *testing.T) {
test.f(t, algo)
})
Expand Down
Loading