diff --git a/contentcoder.go b/contentcoder.go index c145b5a1..388e5d00 100644 --- a/contentcoder.go +++ b/contentcoder.go @@ -19,6 +19,7 @@ import ( "encoding/binary" "io" "reflect" + "sync/atomic" "github.com/golang/snappy" ) @@ -48,6 +49,9 @@ type chunkedContentCoder struct { chunkMeta []MetaData compressed []byte // temp buf for snappy compression + + // atomic access to this variable + bytesWritten uint64 } // MetaData represents the data information inside a @@ -105,6 +109,14 @@ func (c *chunkedContentCoder) Close() error { return c.flushContents() } +func (c *chunkedContentCoder) incrementBytesWritten(val uint64) { + atomic.AddUint64(&c.bytesWritten, val) +} + +func (c *chunkedContentCoder) getBytesWritten() uint64 { + return atomic.LoadUint64(&c.bytesWritten) +} + func (c *chunkedContentCoder) flushContents() error { // flush the contents, with meta information at first buf := make([]byte, binary.MaxVarintLen64) @@ -127,6 +139,7 @@ func (c *chunkedContentCoder) flushContents() error { c.final = append(c.final, c.chunkMetaBuf.Bytes()...) // write the compressed data to the final data c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes()) + c.incrementBytesWritten(uint64(len(c.compressed))) c.final = append(c.final, c.compressed...) c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData)) diff --git a/docvalues.go b/docvalues.go index b7784121..bd3574da 100644 --- a/docvalues.go +++ b/docvalues.go @@ -142,7 +142,7 @@ func (di *docValueReader) BytesRead() uint64 { return atomic.LoadUint64(&di.bytesRead) } -func (di *docValueReader) SetBytesRead(val uint64) { +func (di *docValueReader) ResetBytesRead(val uint64) { atomic.StoreUint64(&di.bytesRead, val) } @@ -150,6 +150,10 @@ func (di *docValueReader) incrementBytesRead(val uint64) { atomic.AddUint64(&di.bytesRead, val) } +func (di *docValueReader) BytesWritten() uint64 { + return 0 +} + func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error { // advance to the chunk where the docValues // reside for the given docNum diff --git a/intcoder.go b/intcoder.go index c3c488fb..5b716b3f 100644 --- a/intcoder.go +++ b/intcoder.go @@ -18,6 +18,7 @@ import ( "bytes" "encoding/binary" "io" + "sync/atomic" ) // We can safely use 0 to represent termNotEncoded since 0 @@ -34,6 +35,9 @@ type chunkedIntCoder struct { currChunk uint64 buf []byte + + // atomic access to this variable + bytesWritten uint64 } // newChunkedIntCoder returns a new chunk int coder which packs data into @@ -73,6 +77,14 @@ func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) { } } +func (c *chunkedIntCoder) incrementBytesWritten(val uint64) { + atomic.AddUint64(&c.bytesWritten, val) +} + +func (c *chunkedIntCoder) getBytesWritten() uint64 { + return atomic.LoadUint64(&c.bytesWritten) +} + // Add encodes the provided integers into the correct chunk for the provided // doc num. You MUST call Add() with increasing docNums. func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error { @@ -94,6 +106,7 @@ func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error { if err != nil { return err } + c.incrementBytesWritten(uint64(wb)) } return nil diff --git a/new.go b/new.go index 362715d4..edbf45d1 100644 --- a/new.go +++ b/new.go @@ -20,6 +20,7 @@ import ( "math" "sort" "sync" + "sync/atomic" "github.com/RoaringBitmap/roaring" index "github.com/blevesearch/bleve_index_api" @@ -80,6 +81,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document, if err == nil && s.reset() == nil { s.lastNumDocs = len(results) s.lastOutSize = len(br.Bytes()) + sb.setBytesWritten(s.getBytesWritten()) interimPool.Put(s) } @@ -141,6 +143,9 @@ type interim struct { lastNumDocs int lastOutSize int + + // atomic access to this variable + bytesWritten uint64 } func (s *interim) reset() (err error) { @@ -484,6 +489,14 @@ func (s *interim) processDocument(docNum uint64, } } +func (s *interim) getBytesWritten() uint64 { + return atomic.LoadUint64(&s.bytesWritten) +} + +func (s *interim) incrementBytesWritten(val uint64) { + atomic.AddUint64(&s.bytesWritten, val) +} + func (s *interim) writeStoredFields() ( storedIndexOffset uint64, err error) { varBuf := make([]byte, binary.MaxVarintLen64) @@ -559,7 +572,7 @@ func (s *interim) writeStoredFields() ( metaBytes := s.metaBuf.Bytes() compressed = snappy.Encode(compressed[:cap(compressed)], data) - + s.incrementBytesWritten(uint64(len(compressed))) docStoredOffsets[docNum] = uint64(s.w.Count()) _, err := writeUvarints(s.w, @@ -597,6 +610,10 @@ func (s *interim) writeStoredFields() ( return storedIndexOffset, nil } +func (s *interim) setBytesWritten(val uint64) { + atomic.StoreUint64(&s.bytesWritten, val) +} + func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err error) { dictOffsets = make([]uint64, len(s.FieldsInv)) @@ -682,7 +699,7 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err if err != nil { return 0, nil, err } - + prevBytesWritten := locEncoder.getBytesWritten() for _, loc := range locs[locOffset : locOffset+freqNorm.numLocs] { err = locEncoder.Add(docNum, uint64(loc.fieldID), loc.pos, loc.start, loc.end, @@ -696,7 +713,9 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err return 0, nil, err } } - + if locEncoder.getBytesWritten()-prevBytesWritten > 0 { + s.incrementBytesWritten(locEncoder.getBytesWritten() - prevBytesWritten) + } locOffset += freqNorm.numLocs } @@ -750,6 +769,8 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err return 0, nil, err } + s.incrementBytesWritten(uint64(len(vellumData))) + // reset vellum for reuse s.builderBuf.Reset() @@ -764,6 +785,7 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err if err != nil { return 0, nil, err } + fdvEncoder := newChunkedContentCoder(chunkSize, uint64(len(s.results)-1), s.w, false) if s.IncludeDocValues[fieldID] { for docNum, docTerms := range docTermMap { @@ -779,6 +801,8 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err return 0, nil, err } + s.setBytesWritten(s.getBytesWritten()) + fdvOffsetsStart[fieldID] = uint64(s.w.Count()) _, err = fdvEncoder.Write() diff --git a/posting.go b/posting.go index 9d17ef4d..dcae817f 100644 --- a/posting.go +++ b/posting.go @@ -254,7 +254,7 @@ func (p *PostingsList) Count() uint64 { // The purpose of this implementation is to get // the bytes read from the postings lists stored // on disk, while querying -func (p *PostingsList) SetBytesRead(val uint64) { +func (p *PostingsList) ResetBytesRead(val uint64) { atomic.StoreUint64(&p.bytesRead, val) } @@ -266,6 +266,10 @@ func (p *PostingsList) incrementBytesRead(val uint64) { atomic.AddUint64(&p.bytesRead, val) } +func (p *PostingsList) BytesWritten() uint64 { + return 0 +} + func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { rv.postingsOffset = postingsOffset @@ -363,7 +367,7 @@ func (i *PostingsIterator) Size() int { // the bytes read from the disk which includes // the freqNorm and location specific information // of a hit -func (i *PostingsIterator) SetBytesRead(val uint64) { +func (i *PostingsIterator) ResetBytesRead(val uint64) { atomic.StoreUint64(&i.bytesRead, val) } @@ -375,6 +379,10 @@ func (i *PostingsIterator) incrementBytesRead(val uint64) { atomic.AddUint64(&i.bytesRead, val) } +func (i *PostingsIterator) BytesWritten() uint64 { + return 0 +} + func (i *PostingsIterator) loadChunk(chunk int) error { if i.includeFreqNorm { err := i.freqNormReader.loadChunk(chunk) @@ -386,7 +394,7 @@ func (i *PostingsIterator) loadChunk(chunk int) error { // the postingsIterator is tracking only the chunk loaded // and the cumulation is tracked correctly in the downstream // intDecoder - i.SetBytesRead(i.freqNormReader.getBytesRead()) + i.ResetBytesRead(i.freqNormReader.getBytesRead()) } if i.includeLocs { @@ -394,7 +402,7 @@ func (i *PostingsIterator) loadChunk(chunk int) error { if err != nil { return err } - i.SetBytesRead(i.locReader.getBytesRead()) + i.ResetBytesRead(i.locReader.getBytesRead()) } i.currChunk = uint32(chunk) diff --git a/segment.go b/segment.go index e5903e96..f62918c8 100644 --- a/segment.go +++ b/segment.go @@ -104,7 +104,8 @@ type SegmentBase struct { size uint64 // atomic access to this variable - bytesRead uint64 + bytesRead uint64 + bytesWritten uint64 m sync.Mutex fieldFSTs map[uint16]*vellum.FST @@ -226,7 +227,7 @@ func (s *Segment) loadConfig() error { // interface, as the intention is to retrieve the bytes // read from the on-disk segment as part of the current // query. -func (s *Segment) SetBytesRead(val uint64) { +func (s *Segment) ResetBytesRead(val uint64) { atomic.StoreUint64(&s.SegmentBase.bytesRead, val) } @@ -235,10 +236,28 @@ func (s *Segment) BytesRead() uint64 { atomic.LoadUint64(&s.SegmentBase.bytesRead) } +func (s *Segment) BytesWritten() uint64 { + return 0 +} + func (s *Segment) incrementBytesRead(val uint64) { atomic.AddUint64(&s.bytesRead, val) } +func (s *SegmentBase) BytesWritten() uint64 { + return atomic.LoadUint64(&s.bytesWritten) +} + +func (s *SegmentBase) setBytesWritten(val uint64) { + atomic.AddUint64(&s.bytesWritten, val) +} + +func (s *SegmentBase) BytesRead() uint64 { + return 0 +} + +func (s *SegmentBase) ResetBytesRead(val uint64) {} + func (s *SegmentBase) incrementBytesRead(val uint64) { atomic.AddUint64(&s.bytesRead, val) }