@@ -70,6 +70,19 @@ func (i *indexEntry) marshallBinary() []byte {
70
70
return b
71
71
}
72
72
73
+ // bounds returns the start- and end- offsets, and the file number of where to
74
+ // read there data item marked by the two index entries. The two entries are
75
+ // assumed to be sequential.
76
+ func (start * indexEntry ) bounds (end * indexEntry ) (startOffset , endOffset , fileId uint32 ) {
77
+ if start .filenum != end .filenum {
78
+ // If a piece of data 'crosses' a data-file,
79
+ // it's actually in one piece on the second data-file.
80
+ // We return a zero-indexEntry for the second file as start
81
+ return 0 , end .offset , end .filenum
82
+ }
83
+ return start .offset , end .offset , end .filenum
84
+ }
85
+
73
86
// freezerTable represents a single chained data table within the freezer (e.g. blocks).
74
87
// It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry
75
88
// file (uncompressed 64 bit indices into the data file).
@@ -546,84 +559,183 @@ func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool
546
559
return false , nil
547
560
}
548
561
549
- // getBounds returns the indexes for the item
550
- // returns start, end, filenumber and error
551
- func (t * freezerTable ) getBounds (item uint64 ) (uint32 , uint32 , uint32 , error ) {
552
- buffer := make ([]byte , indexEntrySize )
553
- var startIdx , endIdx indexEntry
554
- // Read second index
555
- if _ , err := t .index .ReadAt (buffer , int64 ((item + 1 )* indexEntrySize )); err != nil {
556
- return 0 , 0 , 0 , err
557
- }
558
- endIdx .unmarshalBinary (buffer )
559
- // Read first index (unless it's the very first item)
560
- if item != 0 {
561
- if _ , err := t .index .ReadAt (buffer , int64 (item * indexEntrySize )); err != nil {
562
- return 0 , 0 , 0 , err
563
- }
564
- startIdx .unmarshalBinary (buffer )
565
- } else {
562
+ // getIndices returns the index entries for the given from-item, covering 'count' items.
563
+ // N.B: The actual number of returned indices for N items will always be N+1 (unless an
564
+ // error is returned).
565
+ // OBS: This method assumes that the caller has already verified (and/or trimmed) the range
566
+ // so that the items are within bounds. If this method is used to read out of bounds,
567
+ // it will return error.
568
+ func (t * freezerTable ) getIndices (from , count uint64 ) ([]* indexEntry , error ) {
569
+ // Apply the table-offset
570
+ from = from - uint64 (t .itemOffset )
571
+ // For reading N items, we need N+1 indices.
572
+ buffer := make ([]byte , (count + 1 )* indexEntrySize )
573
+ if _ , err := t .index .ReadAt (buffer , int64 (from * indexEntrySize )); err != nil {
574
+ return nil , err
575
+ }
576
+ var (
577
+ indices []* indexEntry
578
+ offset int
579
+ )
580
+ for i := from ; i <= from + count ; i ++ {
581
+ index := new (indexEntry )
582
+ index .unmarshalBinary (buffer [offset :])
583
+ offset += indexEntrySize
584
+ indices = append (indices , index )
585
+ }
586
+ if from == 0 {
566
587
// Special case if we're reading the first item in the freezer. We assume that
567
588
// the first item always start from zero(regarding the deletion, we
568
589
// only support deletion by files, so that the assumption is held).
569
590
// This means we can use the first item metadata to carry information about
570
591
// the 'global' offset, for the deletion-case
571
- return 0 , endIdx .offset , endIdx .filenum , nil
592
+ indices [0 ].offset = 0
593
+ indices [0 ].filenum = indices [1 ].filenum
572
594
}
573
- if startIdx .filenum != endIdx .filenum {
574
- // If a piece of data 'crosses' a data-file,
575
- // it's actually in one piece on the second data-file.
576
- // We return a zero-indexEntry for the second file as start
577
- return 0 , endIdx .offset , endIdx .filenum , nil
578
- }
579
- return startIdx .offset , endIdx .offset , endIdx .filenum , nil
595
+ return indices , nil
580
596
}
581
597
582
598
// Retrieve looks up the data offset of an item with the given number and retrieves
583
599
// the raw binary blob from the data file.
584
600
func (t * freezerTable ) Retrieve (item uint64 ) ([]byte , error ) {
585
- blob , err := t .retrieve (item )
601
+ items , err := t .RetrieveItems (item , 1 , 0 )
586
602
if err != nil {
587
603
return nil , err
588
604
}
589
- if t .noCompression {
590
- return blob , nil
605
+ return items [0 ], nil
606
+ }
607
+
608
+ // RetrieveItems returns multiple items in sequence, starting from the index 'start'.
609
+ // It will return at most 'max' items, but will abort earlier to respect the
610
+ // 'maxBytes' argument. However, if the 'maxBytes' is smaller than the size of one
611
+ // item, it _will_ return one element and possibly overflow the maxBytes.
612
+ func (t * freezerTable ) RetrieveItems (start , count , maxBytes uint64 ) ([][]byte , error ) {
613
+ // First we read the 'raw' data, which might be compressed.
614
+ diskData , sizes , err := t .retrieveItems (start , count , maxBytes )
615
+ if err != nil {
616
+ return nil , err
591
617
}
592
- return snappy .Decode (nil , blob )
618
+ var (
619
+ output = make ([][]byte , 0 , count )
620
+ offset int // offset for reading
621
+ outputSize int // size of uncompressed data
622
+ )
623
+ // Now slice up the data and decompress.
624
+ for i , diskSize := range sizes {
625
+ item := diskData [offset : offset + diskSize ]
626
+ offset += diskSize
627
+ decompressedSize := diskSize
628
+ if ! t .noCompression {
629
+ decompressedSize , _ = snappy .DecodedLen (item )
630
+ }
631
+ if i > 0 && uint64 (outputSize + decompressedSize ) > maxBytes {
632
+ break
633
+ }
634
+ if ! t .noCompression {
635
+ data , err := snappy .Decode (nil , item )
636
+ if err != nil {
637
+ return nil , err
638
+ }
639
+ output = append (output , data )
640
+ } else {
641
+ output = append (output , item )
642
+ }
643
+ outputSize += decompressedSize
644
+ }
645
+ return output , nil
593
646
}
594
647
595
- // retrieve looks up the data offset of an item with the given number and retrieves
596
- // the raw binary blob from the data file. OBS! This method does not decode
597
- // compressed data.
598
- func (t * freezerTable ) retrieve ( item uint64 ) ([]byte , error ) {
648
+ // retrieveItems reads up to 'count' items from the table. It reads at least
649
+ // one item, but otherwise avoids reading more than maxBytes bytes.
650
+ // It returns the (potentially compressed) data, and the sizes .
651
+ func (t * freezerTable ) retrieveItems ( start , count , maxBytes uint64 ) ([]byte , [] int , error ) {
599
652
t .lock .RLock ()
600
653
defer t .lock .RUnlock ()
601
654
// Ensure the table and the item is accessible
602
655
if t .index == nil || t .head == nil {
603
- return nil , errClosed
656
+ return nil , nil , errClosed
604
657
}
605
- if atomic .LoadUint64 (& t .items ) <= item {
606
- return nil , errOutOfBounds
658
+ itemCount := atomic .LoadUint64 (& t .items ) // max number
659
+ // Ensure the start is written, not deleted from the tail, and that the
660
+ // caller actually wants something
661
+ if itemCount <= start || uint64 (t .itemOffset ) > start || count == 0 {
662
+ return nil , nil , errOutOfBounds
607
663
}
608
- // Ensure the item was not deleted from the tail either
609
- if uint64 (t .itemOffset ) > item {
610
- return nil , errOutOfBounds
664
+ if start + count > itemCount {
665
+ count = itemCount - start
611
666
}
612
- startOffset , endOffset , filenum , err := t .getBounds (item - uint64 (t .itemOffset ))
613
- if err != nil {
614
- return nil , err
667
+ var (
668
+ output = make ([]byte , maxBytes ) // Buffer to read data into
669
+ outputSize int // Used size of that buffer
670
+ )
671
+ // readData is a helper method to read a single data item from disk.
672
+ readData := func (fileId , start uint32 , length int ) error {
673
+ // In case a small limit is used, and the elements are large, may need to
674
+ // realloc the read-buffer when reading the first (and only) item.
675
+ if len (output ) < length {
676
+ output = make ([]byte , length )
677
+ }
678
+ dataFile , exist := t .files [fileId ]
679
+ if ! exist {
680
+ return fmt .Errorf ("missing data file %d" , fileId )
681
+ }
682
+ if _ , err := dataFile .ReadAt (output [outputSize :outputSize + length ], int64 (start )); err != nil {
683
+ return err
684
+ }
685
+ outputSize += length
686
+ return nil
615
687
}
616
- dataFile , exist := t .files [filenum ]
617
- if ! exist {
618
- return nil , fmt .Errorf ("missing data file %d" , filenum )
688
+ // Read all the indexes in one go
689
+ indices , err := t .getIndices (start , count )
690
+ if err != nil {
691
+ return nil , nil , err
619
692
}
620
- // Retrieve the data itself, decompress and return
621
- blob := make ([]byte , endOffset - startOffset )
622
- if _ , err := dataFile .ReadAt (blob , int64 (startOffset )); err != nil {
623
- return nil , err
693
+ var (
694
+ sizes []int // The sizes for each element
695
+ totalSize = 0 // The total size of all data read so far
696
+ readStart = indices [0 ].offset // Where, in the file, to start reading
697
+ unreadSize = 0 // The size of the as-yet-unread data
698
+ )
699
+
700
+ for i , firstIndex := range indices [:len (indices )- 1 ] {
701
+ secondIndex := indices [i + 1 ]
702
+ // Determine the size of the item.
703
+ offset1 , offset2 , _ := firstIndex .bounds (secondIndex )
704
+ size := int (offset2 - offset1 )
705
+ // Crossing a file boundary?
706
+ if secondIndex .filenum != firstIndex .filenum {
707
+ // If we have unread data in the first file, we need to do that read now.
708
+ if unreadSize > 0 {
709
+ if err := readData (firstIndex .filenum , readStart , unreadSize ); err != nil {
710
+ return nil , nil , err
711
+ }
712
+ unreadSize = 0
713
+ }
714
+ readStart = 0
715
+ }
716
+ if i > 0 && uint64 (totalSize + size ) > maxBytes {
717
+ // About to break out due to byte limit being exceeded. We don't
718
+ // read this last item, but we need to do the deferred reads now.
719
+ if unreadSize > 0 {
720
+ if err := readData (secondIndex .filenum , readStart , unreadSize ); err != nil {
721
+ return nil , nil , err
722
+ }
723
+ }
724
+ break
725
+ }
726
+ // Defer the read for later
727
+ unreadSize += size
728
+ totalSize += size
729
+ sizes = append (sizes , size )
730
+ if i == len (indices )- 2 || uint64 (totalSize ) > maxBytes {
731
+ // Last item, need to do the read now
732
+ if err := readData (secondIndex .filenum , readStart , unreadSize ); err != nil {
733
+ return nil , nil , err
734
+ }
735
+ break
736
+ }
624
737
}
625
- t .readMeter .Mark (int64 (len (blob ) + 2 * indexEntrySize ))
626
- return blob , nil
738
+ return output [:outputSize ], sizes , nil
627
739
}
628
740
629
741
// has returns an indicator whether the specified number data
0 commit comments