Skip to content

Commit

Permalink
feat: plumb through contexts (#119)
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert authored Oct 27, 2021
1 parent 209d604 commit f461792
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 155 deletions.
59 changes: 30 additions & 29 deletions datastore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package badger

import (
"context"
"errors"
"fmt"
"runtime"
Expand Down Expand Up @@ -210,7 +211,7 @@ func (d *Datastore) periodicGC() {
// NewTransaction starts a new transaction. The resulting transaction object
// can be mutated without incurring changes to the underlying Datastore until
// the transaction is Committed.
func (d *Datastore) NewTransaction(readOnly bool) (ds.Txn, error) {
func (d *Datastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -226,7 +227,7 @@ func (d *Datastore) newImplicitTransaction(readOnly bool) *txn {
return &txn{d, d.DB.NewTransaction(!readOnly), true}
}

func (d *Datastore) Put(key ds.Key, value []byte) error {
func (d *Datastore) Put(ctx context.Context, key ds.Key, value []byte) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -243,7 +244,7 @@ func (d *Datastore) Put(key ds.Key, value []byte) error {
return txn.commit()
}

func (d *Datastore) Sync(prefix ds.Key) error {
func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -257,7 +258,7 @@ func (d *Datastore) Sync(prefix ds.Key) error {
return d.DB.Sync()
}

func (d *Datastore) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) error {
func (d *Datastore) PutWithTTL(ctx context.Context, key ds.Key, value []byte, ttl time.Duration) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -274,7 +275,7 @@ func (d *Datastore) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) erro
return txn.commit()
}

func (d *Datastore) SetTTL(key ds.Key, ttl time.Duration) error {
func (d *Datastore) SetTTL(ctx context.Context, key ds.Key, ttl time.Duration) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -291,7 +292,7 @@ func (d *Datastore) SetTTL(key ds.Key, ttl time.Duration) error {
return txn.commit()
}

func (d *Datastore) GetExpiration(key ds.Key) (time.Time, error) {
func (d *Datastore) GetExpiration(ctx context.Context, key ds.Key) (time.Time, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -304,7 +305,7 @@ func (d *Datastore) GetExpiration(key ds.Key) (time.Time, error) {
return txn.getExpiration(key)
}

func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
func (d *Datastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -317,7 +318,7 @@ func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
return txn.get(key)
}

func (d *Datastore) Has(key ds.Key) (bool, error) {
func (d *Datastore) Has(ctx context.Context, key ds.Key) (bool, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -330,7 +331,7 @@ func (d *Datastore) Has(key ds.Key) (bool, error) {
return txn.has(key)
}

func (d *Datastore) GetSize(key ds.Key) (size int, err error) {
func (d *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -343,7 +344,7 @@ func (d *Datastore) GetSize(key ds.Key) (size int, err error) {
return txn.getSize(key)
}

func (d *Datastore) Delete(key ds.Key) error {
func (d *Datastore) Delete(ctx context.Context, key ds.Key) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()

Expand All @@ -358,7 +359,7 @@ func (d *Datastore) Delete(key ds.Key) error {
return txn.commit()
}

func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -374,7 +375,7 @@ func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {

// DiskUsage implements the PersistentDatastore interface.
// It returns the sum of lsm and value log files sizes in bytes.
func (d *Datastore) DiskUsage() (uint64, error) {
func (d *Datastore) DiskUsage(ctx context.Context) (uint64, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -399,7 +400,7 @@ func (d *Datastore) Close() error {

// Batch creats a new Batch object. This provides a way to do many writes, when
// there may be too many to fit into a single transaction.
func (d *Datastore) Batch() (ds.Batch, error) {
func (d *Datastore) Batch(ctx context.Context) (ds.Batch, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -417,7 +418,7 @@ func (d *Datastore) Batch() (ds.Batch, error) {
return b, nil
}

func (d *Datastore) CollectGarbage() (err error) {
func (d *Datastore) CollectGarbage(ctx context.Context) (err error) {
// The idea is to keep calling DB.RunValueLogGC() till Badger no longer has any log files
// to GC(which would be indicated by an error, please refer to Badger GC docs).
for err == nil {
Expand All @@ -444,7 +445,7 @@ func (d *Datastore) gcOnce() error {

var _ ds.Batch = (*batch)(nil)

func (b *batch) Put(key ds.Key, value []byte) error {
func (b *batch) Put(ctx context.Context, key ds.Key, value []byte) error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
Expand All @@ -457,7 +458,7 @@ func (b *batch) put(key ds.Key, value []byte) error {
return b.writeBatch.Set(key.Bytes(), value)
}

func (b *batch) Delete(key ds.Key) error {
func (b *batch) Delete(ctx context.Context, key ds.Key) error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
Expand All @@ -471,7 +472,7 @@ func (b *batch) delete(key ds.Key) error {
return b.writeBatch.Delete(key.Bytes())
}

func (b *batch) Commit() error {
func (b *batch) Commit(ctx context.Context) error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
Expand Down Expand Up @@ -511,7 +512,7 @@ func (b *batch) cancel() {
var _ ds.Datastore = (*txn)(nil)
var _ ds.TTLDatastore = (*txn)(nil)

func (t *txn) Put(key ds.Key, value []byte) error {
func (t *txn) Put(ctx context.Context, key ds.Key, value []byte) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -524,7 +525,7 @@ func (t *txn) put(key ds.Key, value []byte) error {
return t.txn.Set(key.Bytes(), value)
}

func (t *txn) Sync(prefix ds.Key) error {
func (t *txn) Sync(ctx context.Context, prefix ds.Key) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -534,7 +535,7 @@ func (t *txn) Sync(prefix ds.Key) error {
return nil
}

func (t *txn) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) error {
func (t *txn) PutWithTTL(ctx context.Context, key ds.Key, value []byte, ttl time.Duration) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -547,7 +548,7 @@ func (t *txn) putWithTTL(key ds.Key, value []byte, ttl time.Duration) error {
return t.txn.SetEntry(badger.NewEntry(key.Bytes(), value).WithTTL(ttl))
}

func (t *txn) GetExpiration(key ds.Key) (time.Time, error) {
func (t *txn) GetExpiration(ctx context.Context, key ds.Key) (time.Time, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -567,7 +568,7 @@ func (t *txn) getExpiration(key ds.Key) (time.Time, error) {
return time.Unix(int64(item.ExpiresAt()), 0), nil
}

func (t *txn) SetTTL(key ds.Key, ttl time.Duration) error {
func (t *txn) SetTTL(ctx context.Context, key ds.Key, ttl time.Duration) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -588,7 +589,7 @@ func (t *txn) setTTL(key ds.Key, ttl time.Duration) error {

}

func (t *txn) Get(key ds.Key) ([]byte, error) {
func (t *txn) Get(ctx context.Context, key ds.Key) ([]byte, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -610,7 +611,7 @@ func (t *txn) get(key ds.Key) ([]byte, error) {
return item.ValueCopy(nil)
}

func (t *txn) Has(key ds.Key) (bool, error) {
func (t *txn) Has(ctx context.Context, key ds.Key) (bool, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -632,7 +633,7 @@ func (t *txn) has(key ds.Key) (bool, error) {
}
}

func (t *txn) GetSize(key ds.Key) (int, error) {
func (t *txn) GetSize(ctx context.Context, key ds.Key) (int, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -654,7 +655,7 @@ func (t *txn) getSize(key ds.Key) (int, error) {
}
}

func (t *txn) Delete(key ds.Key) error {
func (t *txn) Delete(ctx context.Context, key ds.Key) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -668,7 +669,7 @@ func (t *txn) delete(key ds.Key) error {
return t.txn.Delete(key.Bytes())
}

func (t *txn) Query(q dsq.Query) (dsq.Results, error) {
func (t *txn) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand Down Expand Up @@ -857,7 +858,7 @@ func (t *txn) query(q dsq.Query) (dsq.Results, error) {
return qrb.Results(), nil
}

func (t *txn) Commit() error {
func (t *txn) Commit(ctx context.Context) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand Down Expand Up @@ -885,7 +886,7 @@ func (t *txn) close() error {
return t.txn.Commit()
}

func (t *txn) Discard() {
func (t *txn) Discard(ctx context.Context) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand Down
Loading

0 comments on commit f461792

Please sign in to comment.