diff --git a/op-batcher/batcher/channel_builder_test.go b/op-batcher/batcher/channel_builder_test.go index cb4c3ee9b7bd..8d1cbd8219ee 100644 --- a/op-batcher/batcher/channel_builder_test.go +++ b/op-batcher/batcher/channel_builder_test.go @@ -430,7 +430,7 @@ func TestChannelBuilder_OutputFrames(t *testing.T) { } func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) { - for _, algo := range derive.CompressionAlgoTypes { + for _, algo := range derive.CompressionAlgos { t.Run("ChannelBuilder_OutputFrames_SpanBatch_"+algo.String(), func(t *testing.T) { if algo.IsBrotli() { ChannelBuilder_OutputFrames_SpanBatch(t, algo) // to fill faster for brotli diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 491837182acb..b6f26d25fe86 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -204,7 +204,6 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { } pc, err := newChannel(s.log, s.metr, s.cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number) - if err != nil { return fmt.Errorf("creating new channel: %w", err) } @@ -218,6 +217,8 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { "l1OriginLastClosedChannel", s.l1OriginLastClosedChannel, "blocks_pending", len(s.blocks), "batch_type", s.cfg.BatchType, + "compression_algo", s.cfg.CompressorConfig.CompressionAlgo, + "target_num_frames", s.cfg.TargetNumFrames, "max_frame_size", s.cfg.MaxFrameSize, ) s.metr.RecordChannelOpened(pc.ID(), len(s.blocks)) diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index cc37961a056a..5180d8434b2f 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -128,7 +128,7 @@ func (c *CLIConfig) Check() error { if c.Compressor == compressor.RatioKind && (c.ApproxComprRatio <= 0 || c.ApproxComprRatio > 1) { return fmt.Errorf("invalid ApproxComprRatio %v for ratio compressor", c.ApproxComprRatio) } - if !derive.ValidCompressionAlgoType(c.CompressionAlgo) { + if !derive.ValidCompressionAlgo(c.CompressionAlgo) { return fmt.Errorf("invalid compression algo %v", c.CompressionAlgo) } if c.BatchType > 1 { diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index 4203f442c78e..1969b2acf5fb 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -242,9 +242,10 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { "max_frame_size", cc.MaxFrameSize, "target_num_frames", cc.TargetNumFrames, "compressor", cc.CompressorConfig.Kind, + "compression_algo", cc.CompressorConfig.CompressionAlgo, + "batch_type", cc.BatchType, "max_channel_duration", cc.MaxChannelDuration, "channel_timeout", cc.ChannelTimeout, - "batch_type", cc.BatchType, "sub_safety_margin", cc.SubSafetyMargin) bs.ChannelConfig = cc return nil diff --git a/op-batcher/flags/flags.go b/op-batcher/flags/flags.go index c91234b96a29..5aa833b75bf7 100644 --- a/op-batcher/flags/flags.go +++ b/op-batcher/flags/flags.go @@ -102,7 +102,7 @@ var ( } CompressionAlgoFlag = &cli.GenericFlag{ Name: "compression-algo", - Usage: "The compression algorithm to use. Valid options: " + openum.EnumString(derive.CompressionAlgoTypes), + Usage: "The compression algorithm to use. Valid options: " + openum.EnumString(derive.CompressionAlgos), EnvVars: prefixEnvVars("COMPRESSION_ALGO"), Value: func() *derive.CompressionAlgo { out := derive.Zlib diff --git a/op-node/benchmarks/batchbuilding_test.go b/op-node/benchmarks/batchbuilding_test.go index 7b87d72e09e3..fc760deba271 100644 --- a/op-node/benchmarks/batchbuilding_test.go +++ b/op-node/benchmarks/batchbuilding_test.go @@ -48,7 +48,7 @@ var ( derive.SpanBatchType, // uncomment to include singular batches in the benchmark // singular batches are not included by default because they are not the target of the benchmark - //derive.SingularBatchType, + // derive.SingularBatchType, } ) @@ -129,7 +129,7 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) { // to leverage optimizations in the Batch Linked List batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix()) } - for _, algo := range derive.CompressionAlgoTypes { + for _, algo := range derive.CompressionAlgos { b.Run(tc.String()+"_"+algo.String(), func(b *testing.B) { // reset the compressor used in the test case for bn := 0; bn < b.N; bn++ { @@ -168,7 +168,7 @@ func BenchmarkIncremental(b *testing.B) { {derive.SpanBatchType, 5, 1, "RealBlindCompressor"}, //{derive.SingularBatchType, 100, 1, "RealShadowCompressor"}, } - for _, algo := range derive.CompressionAlgoTypes { + for _, algo := range derive.CompressionAlgos { for _, tc := range tcs { cout, err := channelOutByType(tc.BatchType, tc.compKey, algo) if err != nil { @@ -231,7 +231,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) { } } - for _, algo := range derive.CompressionAlgoTypes { + for _, algo := range derive.CompressionAlgos { for _, tc := range tests { chainID := big.NewInt(333) rng := rand.New(rand.NewSource(0x543331)) diff --git a/op-node/cmd/batch_decoder/reassemble/reassemble.go b/op-node/cmd/batch_decoder/reassemble/reassemble.go index 23b4323b3e0d..dc95262659fe 100644 --- a/op-node/cmd/batch_decoder/reassemble/reassemble.go +++ b/op-node/cmd/batch_decoder/reassemble/reassemble.go @@ -18,13 +18,14 @@ import ( ) type ChannelWithMetadata struct { - ID derive.ChannelID `json:"id"` - IsReady bool `json:"is_ready"` - InvalidFrames bool `json:"invalid_frames"` - InvalidBatches bool `json:"invalid_batches"` - Frames []FrameWithMetadata `json:"frames"` - Batches []derive.Batch `json:"batches"` - BatchTypes []int `json:"batch_types"` + ID derive.ChannelID `json:"id"` + IsReady bool `json:"is_ready"` + InvalidFrames bool `json:"invalid_frames"` + InvalidBatches bool `json:"invalid_batches"` + Frames []FrameWithMetadata `json:"frames"` + Batches []derive.Batch `json:"batches"` + BatchTypes []int `json:"batch_types"` + ComprAlgos []derive.CompressionAlgo `json:"compr_alogs"` } type FrameWithMetadata struct { @@ -54,7 +55,6 @@ func LoadFrames(directory string, inbox common.Address) []FrameWithMetadata { } else { return txns[i].BlockNumber < txns[j].BlockNumber } - }) return transactionsToFrames(txns) } @@ -107,8 +107,12 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr } } - var batches []derive.Batch - var batchTypes []int + var ( + batches []derive.Batch + batchTypes []int + comprAlgos []derive.CompressionAlgo + ) + invalidBatches := false if ch.IsReady() { br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time), rollupCfg.IsFjord(ch.HighestBlock().Time)) @@ -118,6 +122,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr fmt.Printf("Error reading batchData for channel %v. Err: %v\n", id.String(), err) invalidBatches = true } else { + comprAlgos = append(comprAlgos, batchData.ComprAlgo) batchType := batchData.GetBatchType() batchTypes = append(batchTypes, int(batchType)) switch batchType { @@ -157,6 +162,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr InvalidBatches: invalidBatches, Batches: batches, BatchTypes: batchTypes, + ComprAlgos: comprAlgos, } } diff --git a/op-node/rollup/derive/batch.go b/op-node/rollup/derive/batch.go index c58b7e340c44..9e6283cc0ca7 100644 --- a/op-node/rollup/derive/batch.go +++ b/op-node/rollup/derive/batch.go @@ -37,6 +37,21 @@ type Batch interface { GetBatchType() int GetTimestamp() uint64 LogContext(log.Logger) log.Logger + AsSingularBatch() (*SingularBatch, bool) + AsSpanBatch() (*SpanBatch, bool) +} + +type batchWithMetadata struct { + Batch + comprAlgo CompressionAlgo +} + +func (b batchWithMetadata) LogContext(l log.Logger) log.Logger { + lgr := b.Batch.LogContext(l) + if b.comprAlgo == "" { + return lgr + } + return lgr.With("compression_algo", b.comprAlgo) } // BatchData is used to represent the typed encoding & decoding. @@ -44,7 +59,8 @@ type Batch interface { // Further fields such as cache can be added in the future, without embedding each type of InnerBatchData. // Similar design with op-geth's types.Transaction struct. type BatchData struct { - inner InnerBatchData + inner InnerBatchData + ComprAlgo CompressionAlgo } // InnerBatchData is the underlying data of a BatchData. diff --git a/op-node/rollup/derive/batch_queue.go b/op-node/rollup/derive/batch_queue.go index 4ac92bf04b8b..3dbfe20d305d 100644 --- a/op-node/rollup/derive/batch_queue.go +++ b/op-node/rollup/derive/batch_queue.go @@ -177,15 +177,15 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si } var nextBatch *SingularBatch - switch batch.GetBatchType() { + switch typ := batch.GetBatchType(); typ { case SingularBatchType: - singularBatch, ok := batch.(*SingularBatch) + singularBatch, ok := batch.AsSingularBatch() if !ok { return nil, false, NewCriticalError(errors.New("failed type assertion to SingularBatch")) } nextBatch = singularBatch case SpanBatchType: - spanBatch, ok := batch.(*SpanBatch) + spanBatch, ok := batch.AsSpanBatch() if !ok { return nil, false, NewCriticalError(errors.New("failed type assertion to SpanBatch")) } @@ -198,7 +198,7 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si // span-batches are non-empty, so the below pop is safe. nextBatch = bq.popNextBatch(parent) default: - return nil, false, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", batch.GetBatchType())) + return nil, false, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", typ)) } // If the nextBatch is derived from the span batch, len(bq.nextSpan) == 0 means it's the last batch of the span. diff --git a/op-node/rollup/derive/batches.go b/op-node/rollup/derive/batches.go index 24aa8a2daa15..bde228074552 100644 --- a/op-node/rollup/derive/batches.go +++ b/op-node/rollup/derive/batches.go @@ -11,8 +11,8 @@ import ( ) type BatchWithL1InclusionBlock struct { + Batch L1InclusionBlock eth.L1BlockRef - Batch Batch } type BatchValidity uint8 @@ -34,23 +34,23 @@ const ( func CheckBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1BlockRef, l2SafeHead eth.L2BlockRef, batch *BatchWithL1InclusionBlock, l2Fetcher SafeBlockFetcher, ) BatchValidity { - switch batch.Batch.GetBatchType() { + switch typ := batch.GetBatchType(); typ { case SingularBatchType: - singularBatch, ok := batch.Batch.(*SingularBatch) + singularBatch, ok := batch.AsSingularBatch() if !ok { log.Error("failed type assertion to SingularBatch") return BatchDrop } return checkSingularBatch(cfg, log, l1Blocks, l2SafeHead, singularBatch, batch.L1InclusionBlock) case SpanBatchType: - spanBatch, ok := batch.Batch.(*SpanBatch) + spanBatch, ok := batch.AsSpanBatch() if !ok { log.Error("failed type assertion to SpanBatch") return BatchDrop } return checkSpanBatch(ctx, cfg, log, l1Blocks, l2SafeHead, spanBatch, batch.L1InclusionBlock, l2Fetcher) default: - log.Warn("Unrecognized batch type: %d", batch.Batch.GetBatchType()) + log.Warn("Unrecognized batch type: %d", typ) return BatchDrop } } diff --git a/op-node/rollup/derive/channel.go b/op-node/rollup/derive/channel.go index 58238133081e..48a9f585c4aa 100644 --- a/op-node/rollup/derive/channel.go +++ b/op-node/rollup/derive/channel.go @@ -167,6 +167,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func( } var zr io.Reader + var comprAlgo CompressionAlgo // For zlib, the last 4 bits must be either 8 or 15 (both are reserved value) if compressionType[0]&0x0F == ZlibCM8 || compressionType[0]&0x0F == ZlibCM15 { var err error @@ -175,6 +176,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func( return nil, err } // If the bits equal to 1, then it is a brotli reader + comprAlgo = Zlib } else if compressionType[0] == ChannelVersionBrotli { // If before Fjord, we cannot accept brotli compressed batch if !isFjord { @@ -186,6 +188,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func( return nil, err } zr = brotli.NewReader(bufReader) + comprAlgo = Brotli } else { return nil, fmt.Errorf("cannot distinguish the compression algo used given type byte %v", compressionType[0]) } @@ -194,7 +197,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func( rlpReader := rlp.NewStream(zr, maxRLPBytesPerChannel) // Read each batch iteratively return func() (*BatchData, error) { - var batchData BatchData + batchData := BatchData{ComprAlgo: comprAlgo} if err := rlpReader.Decode(&batchData); err != nil { return nil, err } diff --git a/op-node/rollup/derive/channel_in_reader.go b/op-node/rollup/derive/channel_in_reader.go index cd739e95fb34..f7dde867bc9d 100644 --- a/op-node/rollup/derive/channel_in_reader.go +++ b/op-node/rollup/derive/channel_in_reader.go @@ -87,15 +87,17 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) { cr.NextChannel() return nil, NotEnoughData } + + batch := batchWithMetadata{comprAlgo: batchData.ComprAlgo} switch batchData.GetBatchType() { case SingularBatchType: - singularBatch, err := GetSingularBatch(batchData) + batch.Batch, err = GetSingularBatch(batchData) if err != nil { return nil, err } - singularBatch.LogContext(cr.log).Debug("decoded singular batch from channel", "stage_origin", cr.Origin()) + batch.LogContext(cr.log).Debug("decoded singular batch from channel", "stage_origin", cr.Origin()) cr.metrics.RecordDerivedBatches("singular") - return singularBatch, nil + return batch, nil case SpanBatchType: if origin := cr.Origin(); !cr.cfg.IsDelta(origin.Time) { // Check hard fork activation with the L1 inclusion block time instead of the L1 origin block time. @@ -103,13 +105,13 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) { // This is just for early dropping invalid batches as soon as possible. return nil, NewTemporaryError(fmt.Errorf("cannot accept span batch in L1 block %s at time %d", origin, origin.Time)) } - spanBatch, err := DeriveSpanBatch(batchData, cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID) + batch.Batch, err = DeriveSpanBatch(batchData, cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID) if err != nil { return nil, err } - spanBatch.LogContext(cr.log).Debug("decoded span batch from channel", "stage_origin", cr.Origin()) + batch.LogContext(cr.log).Debug("decoded span batch from channel", "stage_origin", cr.Origin()) cr.metrics.RecordDerivedBatches("span") - return spanBatch, nil + return batch, nil default: // error is bubbled up to user, but pipeline can skip the batch and continue after. return nil, NewTemporaryError(fmt.Errorf("unrecognized batch type: %d", batchData.GetBatchType())) diff --git a/op-node/rollup/derive/channel_out_test.go b/op-node/rollup/derive/channel_out_test.go index 00e0a8b14487..9c5d038c9470 100644 --- a/op-node/rollup/derive/channel_out_test.go +++ b/op-node/rollup/derive/channel_out_test.go @@ -15,9 +15,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" ) -var ( - rollupCfg rollup.Config -) +var rollupCfg rollup.Config // basic implementation of the Compressor interface that does no compression type nonCompressor struct { @@ -248,7 +246,7 @@ func TestSpanChannelOut(t *testing.T) { } for _, test := range tests { test := test - for _, algo := range CompressionAlgoTypes { + for _, algo := range CompressionAlgos { t.Run(test.name+"_"+algo.String(), func(t *testing.T) { test.f(t, algo) }) diff --git a/op-node/rollup/derive/channel_test.go b/op-node/rollup/derive/channel_test.go index d52fa84e6ca2..e853d622aa04 100644 --- a/op-node/rollup/derive/channel_test.go +++ b/op-node/rollup/derive/channel_test.go @@ -55,7 +55,8 @@ func TestFrameValidity(t *testing.T) { name: "double close", frames: []Frame{ {ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")}, - {ID: id, FrameNumber: 1, IsLast: true}}, + {ID: id, FrameNumber: 1, IsLast: true}, + }, shouldErr: []bool{false, true}, sizes: []uint64{204, 204}, }, @@ -63,7 +64,8 @@ func TestFrameValidity(t *testing.T) { name: "duplicate frame", frames: []Frame{ {ID: id, FrameNumber: 2, Data: []byte("four")}, - {ID: id, FrameNumber: 2, Data: []byte("seven__")}}, + {ID: id, FrameNumber: 2, Data: []byte("seven__")}, + }, shouldErr: []bool{false, true}, sizes: []uint64{204, 204}, }, @@ -71,7 +73,8 @@ func TestFrameValidity(t *testing.T) { name: "duplicate closing frames", frames: []Frame{ {ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")}, - {ID: id, FrameNumber: 2, IsLast: true, Data: []byte("seven__")}}, + {ID: id, FrameNumber: 2, IsLast: true, Data: []byte("seven__")}, + }, shouldErr: []bool{false, true}, sizes: []uint64{204, 204}, }, @@ -79,7 +82,8 @@ func TestFrameValidity(t *testing.T) { name: "frame past closing", frames: []Frame{ {ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")}, - {ID: id, FrameNumber: 10, Data: []byte("seven__")}}, + {ID: id, FrameNumber: 10, Data: []byte("seven__")}, + }, shouldErr: []bool{false, true}, sizes: []uint64{204, 204}, }, @@ -87,7 +91,8 @@ func TestFrameValidity(t *testing.T) { name: "prune after close frame", frames: []Frame{ {ID: id, FrameNumber: 10, IsLast: false, Data: []byte("seven__")}, - {ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")}}, + {ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")}, + }, shouldErr: []bool{false, false}, sizes: []uint64{207, 204}, }, @@ -95,7 +100,8 @@ func TestFrameValidity(t *testing.T) { name: "multiple valid frames", frames: []Frame{ {ID: id, FrameNumber: 10, Data: []byte("seven__")}, - {ID: id, FrameNumber: 2, Data: []byte("four")}}, + {ID: id, FrameNumber: 2, Data: []byte("four")}, + }, shouldErr: []bool{false, false}, sizes: []uint64{207, 411}, }, @@ -107,103 +113,107 @@ func TestFrameValidity(t *testing.T) { } func TestBatchReader(t *testing.T) { - // Get batch data rng := rand.New(rand.NewSource(0x543331)) singularBatch := RandomSingularBatch(rng, 20, big.NewInt(333)) batchDataInput := NewBatchData(singularBatch) - encodedBatch := &bytes.Buffer{} + encodedBatch := new(bytes.Buffer) err := batchDataInput.EncodeRLP(encodedBatch) require.NoError(t, err) - var testCases = []struct { + const Zstd CompressionAlgo = "zstd" // invalid algo + compressor := func(ca CompressionAlgo) func(buf *bytes.Buffer, t *testing.T) { + switch { + case ca == Zlib: + return func(buf *bytes.Buffer, t *testing.T) { + writer := zlib.NewWriter(buf) + _, err := writer.Write(encodedBatch.Bytes()) + require.NoError(t, err) + require.NoError(t, writer.Close()) + } + case ca.IsBrotli(): + return func(buf *bytes.Buffer, t *testing.T) { + buf.WriteByte(ChannelVersionBrotli) + lvl := GetBrotliLevel(ca) + writer := brotli.NewWriterLevel(buf, lvl) + _, err := writer.Write(encodedBatch.Bytes()) + require.NoError(t, err) + require.NoError(t, writer.Close()) + } + case ca == Zstd: // invalid algo + return func(buf *bytes.Buffer, t *testing.T) { + buf.WriteByte(0x02) // invalid channel version byte + writer := zstd.NewWriter(buf) + _, err := writer.Write(encodedBatch.Bytes()) + require.NoError(t, err) + require.NoError(t, writer.Close()) + } + default: + panic("unexpected test algo") + } + } + + testCases := []struct { name string - algo func(buf *bytes.Buffer, t *testing.T) + algo CompressionAlgo isFjord bool expectErr bool }{ { - name: "zlib-post-fjord", - algo: func(buf *bytes.Buffer, t *testing.T) { - writer := zlib.NewWriter(buf) - _, err := writer.Write(encodedBatch.Bytes()) - require.NoError(t, err) - writer.Close() - }, + name: "zlib-post-fjord", + algo: Zlib, isFjord: true, }, { - name: "zlib-pre-fjord", - algo: func(buf *bytes.Buffer, t *testing.T) { - writer := zlib.NewWriter(buf) - _, err := writer.Write(encodedBatch.Bytes()) - require.NoError(t, err) - writer.Close() - }, + name: "zlib-pre-fjord", + algo: Zlib, isFjord: false, }, { - name: "brotli9-post-fjord", - algo: func(buf *bytes.Buffer, t *testing.T) { - buf.WriteByte(ChannelVersionBrotli) - writer := brotli.NewWriterLevel(buf, 9) - _, err := writer.Write(encodedBatch.Bytes()) - require.NoError(t, err) - writer.Close() - }, + name: "brotli-post-fjord", + algo: Brotli, isFjord: true, }, { - name: "brotli9-pre-fjord", - algo: func(buf *bytes.Buffer, t *testing.T) { - buf.WriteByte(ChannelVersionBrotli) - writer := brotli.NewWriterLevel(buf, 9) - _, err := writer.Write(encodedBatch.Bytes()) - require.NoError(t, err) - writer.Close() - }, + name: "brotli-pre-fjord", + algo: Brotli, isFjord: false, expectErr: true, // expect an error because brotli is not supported before Fjord }, { - name: "brotli10-post-fjord", - algo: func(buf *bytes.Buffer, t *testing.T) { - buf.WriteByte(ChannelVersionBrotli) - writer := brotli.NewWriterLevel(buf, 10) - _, err := writer.Write(encodedBatch.Bytes()) - require.NoError(t, err) - writer.Close() - }, + name: "brotli9-post-fjord", + algo: Brotli9, isFjord: true, }, { - name: "brotli11-post-fjord", - algo: func(buf *bytes.Buffer, t *testing.T) { - buf.WriteByte(ChannelVersionBrotli) - writer := brotli.NewWriterLevel(buf, 11) - _, err := writer.Write(encodedBatch.Bytes()) - require.NoError(t, err) - writer.Close() - }, + name: "brotli9-pre-fjord", + algo: Brotli9, + isFjord: false, + expectErr: true, // expect an error because brotli is not supported before Fjord + }, + { + name: "brotli10-post-fjord", + algo: Brotli10, isFjord: true, }, { - name: "zstd-post-fjord", - algo: func(buf *bytes.Buffer, t *testing.T) { - writer := zstd.NewWriter(buf) - _, err := writer.Write(encodedBatch.Bytes()) - require.NoError(t, err) - writer.Close() - }, + name: "brotli11-post-fjord", + algo: Brotli11, + isFjord: true, + }, + { + name: "zstd-post-fjord", + algo: Zstd, expectErr: true, isFjord: true, - }} + }, + } for _, tc := range testCases { compressed := new(bytes.Buffer) tc := tc t.Run(tc.name, func(t *testing.T) { - tc.algo(compressed, t) + compressor(tc.algo)(compressed, t) reader, err := BatchReader(bytes.NewReader(compressed.Bytes()), 120000, tc.isFjord) if tc.expectErr { require.Error(t, err) @@ -215,6 +225,12 @@ func TestBatchReader(t *testing.T) { batchData, err := reader() require.NoError(t, err) require.NotNil(t, batchData) + if tc.algo.IsBrotli() { + // special case because reader doesn't decode level + batchDataInput.ComprAlgo = Brotli + } else { + batchDataInput.ComprAlgo = tc.algo + } require.Equal(t, batchDataInput, batchData) }) } diff --git a/op-node/rollup/derive/singular_batch.go b/op-node/rollup/derive/singular_batch.go index fb7baeecb9b0..fdb867efbe72 100644 --- a/op-node/rollup/derive/singular_batch.go +++ b/op-node/rollup/derive/singular_batch.go @@ -27,6 +27,9 @@ type SingularBatch struct { Transactions []hexutil.Bytes } +func (b *SingularBatch) AsSingularBatch() (*SingularBatch, bool) { return b, true } +func (b *SingularBatch) AsSpanBatch() (*SpanBatch, bool) { return nil, false } + // GetBatchType returns its batch type (batch_version) func (b *SingularBatch) GetBatchType() int { return SingularBatchType diff --git a/op-node/rollup/derive/span_batch.go b/op-node/rollup/derive/span_batch.go index 69d2bfa299fb..aa95b3838a7b 100644 --- a/op-node/rollup/derive/span_batch.go +++ b/op-node/rollup/derive/span_batch.go @@ -422,6 +422,9 @@ type SpanBatch struct { sbtxs *spanBatchTxs } +func (b *SpanBatch) AsSingularBatch() (*SingularBatch, bool) { return nil, false } +func (b *SpanBatch) AsSpanBatch() (*SpanBatch, bool) { return b, true } + // spanBatchMarshaling is a helper type used for JSON marshaling. type spanBatchMarshaling struct { ParentCheck []hexutil.Bytes `json:"parent_check"` diff --git a/op-node/rollup/derive/types.go b/op-node/rollup/derive/types.go index a17c1c9a9a6a..fb28d80a012f 100644 --- a/op-node/rollup/derive/types.go +++ b/op-node/rollup/derive/types.go @@ -10,26 +10,28 @@ type CompressionAlgo string const ( // compression algo types Zlib CompressionAlgo = "zlib" + Brotli CompressionAlgo = "brotli" // default level Brotli9 CompressionAlgo = "brotli-9" Brotli10 CompressionAlgo = "brotli-10" Brotli11 CompressionAlgo = "brotli-11" ) -var CompressionAlgoTypes = []CompressionAlgo{ +var CompressionAlgos = []CompressionAlgo{ Zlib, + Brotli, Brotli9, Brotli10, Brotli11, } -var brotliRegexp = regexp.MustCompile(`^brotli-(9|10|11)$`) +var brotliRegexp = regexp.MustCompile(`^brotli(|-(9|10|11))$`) func (algo CompressionAlgo) String() string { return string(algo) } func (algo *CompressionAlgo) Set(value string) error { - if !ValidCompressionAlgoType(CompressionAlgo(value)) { + if !ValidCompressionAlgo(CompressionAlgo(value)) { return fmt.Errorf("unknown compression algo type: %q", value) } *algo = CompressionAlgo(value) @@ -49,7 +51,7 @@ func GetBrotliLevel(algo CompressionAlgo) int { switch algo { case Brotli9: return 9 - case Brotli10: + case Brotli10, Brotli: // make level 10 the default return 10 case Brotli11: return 11 @@ -58,8 +60,8 @@ func GetBrotliLevel(algo CompressionAlgo) int { } } -func ValidCompressionAlgoType(value CompressionAlgo) bool { - for _, k := range CompressionAlgoTypes { +func ValidCompressionAlgo(value CompressionAlgo) bool { + for _, k := range CompressionAlgos { if k == value { return true } diff --git a/op-node/rollup/derive/types_test.go b/op-node/rollup/derive/types_test.go index 5b9c1d94ed50..3b9be6430482 100644 --- a/op-node/rollup/derive/types_test.go +++ b/op-node/rollup/derive/types_test.go @@ -10,49 +10,64 @@ func TestCompressionAlgo(t *testing.T) { testCases := []struct { name string algo CompressionAlgo - isBrotli bool isValidCompressionAlgoType bool + isBrotli bool + brotliLevel int }{ { name: "zlib", algo: Zlib, + isValidCompressionAlgoType: true, isBrotli: false, + }, + { + name: "brotli", + algo: Brotli, isValidCompressionAlgoType: true, + isBrotli: true, + brotliLevel: 10, }, { name: "brotli-9", algo: Brotli9, - isBrotli: true, isValidCompressionAlgoType: true, + isBrotli: true, + brotliLevel: 9, }, { name: "brotli-10", algo: Brotli10, - isBrotli: true, isValidCompressionAlgoType: true, + isBrotli: true, + brotliLevel: 10, }, { name: "brotli-11", algo: Brotli11, - isBrotli: true, isValidCompressionAlgoType: true, + isBrotli: true, + brotliLevel: 11, }, { name: "invalid", algo: CompressionAlgo("invalid"), - isBrotli: false, isValidCompressionAlgoType: false, - }} + isBrotli: false, + }, + } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { require.Equal(t, tc.isBrotli, tc.algo.IsBrotli()) if tc.isBrotli { - require.NotPanics(t, func() { GetBrotliLevel((tc.algo)) }) + require.NotPanics(t, func() { + blvl := GetBrotliLevel((tc.algo)) + require.Equal(t, tc.brotliLevel, blvl) + }) } else { require.Panics(t, func() { GetBrotliLevel(tc.algo) }) } - require.Equal(t, tc.isValidCompressionAlgoType, ValidCompressionAlgoType(tc.algo)) + require.Equal(t, tc.isValidCompressionAlgoType, ValidCompressionAlgo(tc.algo)) }) } }