Skip to content

Commit 356c49f

Browse files
janosnonsense
authored andcommitted
swarm: Shed Index and Uint64Field additions (#18398)
1 parent 428eabe commit 356c49f

File tree

6 files changed

+671
-130
lines changed

6 files changed

+671
-130
lines changed

swarm/shed/db.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
// more complex operations on storage data organized in fields and indexes.
1919
//
2020
// Only type which holds logical information about swarm storage chunks data
21-
// and metadata is IndexItem. This part is not generalized mostly for
21+
// and metadata is Item. This part is not generalized mostly for
2222
// performance reasons.
2323
package shed
2424

swarm/shed/example_store_test.go

+20-20
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,20 @@ func New(path string) (s *Store, err error) {
7171
}
7272
// Index storing actual chunk address, data and store timestamp.
7373
s.retrievalIndex, err = db.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{
74-
EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
74+
EncodeKey: func(fields shed.Item) (key []byte, err error) {
7575
return fields.Address, nil
7676
},
77-
DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
77+
DecodeKey: func(key []byte) (e shed.Item, err error) {
7878
e.Address = key
7979
return e, nil
8080
},
81-
EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
81+
EncodeValue: func(fields shed.Item) (value []byte, err error) {
8282
b := make([]byte, 8)
8383
binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
8484
value = append(b, fields.Data...)
8585
return value, nil
8686
},
87-
DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
87+
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
8888
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
8989
e.Data = value[8:]
9090
return e, nil
@@ -96,19 +96,19 @@ func New(path string) (s *Store, err error) {
9696
// Index storing access timestamp for a particular address.
9797
// It is needed in order to update gc index keys for iteration order.
9898
s.accessIndex, err = db.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{
99-
EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
99+
EncodeKey: func(fields shed.Item) (key []byte, err error) {
100100
return fields.Address, nil
101101
},
102-
DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
102+
DecodeKey: func(key []byte) (e shed.Item, err error) {
103103
e.Address = key
104104
return e, nil
105105
},
106-
EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
106+
EncodeValue: func(fields shed.Item) (value []byte, err error) {
107107
b := make([]byte, 8)
108108
binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp))
109109
return b, nil
110110
},
111-
DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
111+
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
112112
e.AccessTimestamp = int64(binary.BigEndian.Uint64(value))
113113
return e, nil
114114
},
@@ -118,23 +118,23 @@ func New(path string) (s *Store, err error) {
118118
}
119119
// Index with keys ordered by access timestamp for garbage collection prioritization.
120120
s.gcIndex, err = db.NewIndex("AccessTimestamp|StoredTimestamp|Address->nil", shed.IndexFuncs{
121-
EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
121+
EncodeKey: func(fields shed.Item) (key []byte, err error) {
122122
b := make([]byte, 16, 16+len(fields.Address))
123123
binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
124124
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
125125
key = append(b, fields.Address...)
126126
return key, nil
127127
},
128-
DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
128+
DecodeKey: func(key []byte) (e shed.Item, err error) {
129129
e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
130130
e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16]))
131131
e.Address = key[16:]
132132
return e, nil
133133
},
134-
EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
134+
EncodeValue: func(fields shed.Item) (value []byte, err error) {
135135
return nil, nil
136136
},
137-
DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
137+
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
138138
return e, nil
139139
},
140140
})
@@ -146,7 +146,7 @@ func New(path string) (s *Store, err error) {
146146

147147
// Put stores the chunk and sets it store timestamp.
148148
func (s *Store) Put(_ context.Context, ch storage.Chunk) (err error) {
149-
return s.retrievalIndex.Put(shed.IndexItem{
149+
return s.retrievalIndex.Put(shed.Item{
150150
Address: ch.Address(),
151151
Data: ch.Data(),
152152
StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -161,7 +161,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
161161
batch := new(leveldb.Batch)
162162

163163
// Get the chunk data and storage timestamp.
164-
item, err := s.retrievalIndex.Get(shed.IndexItem{
164+
item, err := s.retrievalIndex.Get(shed.Item{
165165
Address: addr,
166166
})
167167
if err != nil {
@@ -172,13 +172,13 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
172172
}
173173

174174
// Get the chunk access timestamp.
175-
accessItem, err := s.accessIndex.Get(shed.IndexItem{
175+
accessItem, err := s.accessIndex.Get(shed.Item{
176176
Address: addr,
177177
})
178178
switch err {
179179
case nil:
180180
// Remove gc index entry if access timestamp is found.
181-
err = s.gcIndex.DeleteInBatch(batch, shed.IndexItem{
181+
err = s.gcIndex.DeleteInBatch(batch, shed.Item{
182182
Address: item.Address,
183183
StoreTimestamp: accessItem.AccessTimestamp,
184184
AccessTimestamp: item.StoreTimestamp,
@@ -197,7 +197,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
197197
accessTimestamp := time.Now().UTC().UnixNano()
198198

199199
// Put new access timestamp in access index.
200-
err = s.accessIndex.PutInBatch(batch, shed.IndexItem{
200+
err = s.accessIndex.PutInBatch(batch, shed.Item{
201201
Address: addr,
202202
AccessTimestamp: accessTimestamp,
203203
})
@@ -206,7 +206,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
206206
}
207207

208208
// Put new access timestamp in gc index.
209-
err = s.gcIndex.PutInBatch(batch, shed.IndexItem{
209+
err = s.gcIndex.PutInBatch(batch, shed.Item{
210210
Address: item.Address,
211211
AccessTimestamp: accessTimestamp,
212212
StoreTimestamp: item.StoreTimestamp,
@@ -244,7 +244,7 @@ func (s *Store) CollectGarbage() (err error) {
244244
// New batch for a new cg round.
245245
trash := new(leveldb.Batch)
246246
// Iterate through all index items and break when needed.
247-
err = s.gcIndex.IterateAll(func(item shed.IndexItem) (stop bool, err error) {
247+
err = s.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
248248
// Remove the chunk.
249249
err = s.retrievalIndex.DeleteInBatch(trash, item)
250250
if err != nil {
@@ -265,7 +265,7 @@ func (s *Store) CollectGarbage() (err error) {
265265
return true, nil
266266
}
267267
return false, nil
268-
})
268+
}, nil)
269269
if err != nil {
270270
return err
271271
}

swarm/shed/field_uint64.go

+38
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,44 @@ func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error) {
9999
return val, nil
100100
}
101101

102+
// Dec decrements a uint64 value in the database.
103+
// This operation is not goroutine save.
104+
// The field is protected from overflow to a negative value.
105+
func (f Uint64Field) Dec() (val uint64, err error) {
106+
val, err = f.Get()
107+
if err != nil {
108+
if err == leveldb.ErrNotFound {
109+
val = 0
110+
} else {
111+
return 0, err
112+
}
113+
}
114+
if val != 0 {
115+
val--
116+
}
117+
return val, f.Put(val)
118+
}
119+
120+
// DecInBatch decrements a uint64 value in the batch
121+
// by retreiving a value from the database, not the same batch.
122+
// This operation is not goroutine save.
123+
// The field is protected from overflow to a negative value.
124+
func (f Uint64Field) DecInBatch(batch *leveldb.Batch) (val uint64, err error) {
125+
val, err = f.Get()
126+
if err != nil {
127+
if err == leveldb.ErrNotFound {
128+
val = 0
129+
} else {
130+
return 0, err
131+
}
132+
}
133+
if val != 0 {
134+
val--
135+
}
136+
f.PutInBatch(batch, val)
137+
return val, nil
138+
}
139+
102140
// encode transforms uint64 to 8 byte long
103141
// slice in big endian encoding.
104142
func encodeUint64(val uint64) (b []byte) {

swarm/shed/field_uint64_test.go

+106
Original file line numberDiff line numberDiff line change
@@ -192,3 +192,109 @@ func TestUint64Field_IncInBatch(t *testing.T) {
192192
t.Errorf("got uint64 %v, want %v", got, want)
193193
}
194194
}
195+
196+
// TestUint64Field_Dec validates Dec operation
197+
// of the Uint64Field.
198+
func TestUint64Field_Dec(t *testing.T) {
199+
db, cleanupFunc := newTestDB(t)
200+
defer cleanupFunc()
201+
202+
counter, err := db.NewUint64Field("counter")
203+
if err != nil {
204+
t.Fatal(err)
205+
}
206+
207+
// test overflow protection
208+
var want uint64
209+
got, err := counter.Dec()
210+
if err != nil {
211+
t.Fatal(err)
212+
}
213+
if got != want {
214+
t.Errorf("got uint64 %v, want %v", got, want)
215+
}
216+
217+
want = 32
218+
err = counter.Put(want)
219+
if err != nil {
220+
t.Fatal(err)
221+
}
222+
223+
want = 31
224+
got, err = counter.Dec()
225+
if err != nil {
226+
t.Fatal(err)
227+
}
228+
if got != want {
229+
t.Errorf("got uint64 %v, want %v", got, want)
230+
}
231+
}
232+
233+
// TestUint64Field_DecInBatch validates DecInBatch operation
234+
// of the Uint64Field.
235+
func TestUint64Field_DecInBatch(t *testing.T) {
236+
db, cleanupFunc := newTestDB(t)
237+
defer cleanupFunc()
238+
239+
counter, err := db.NewUint64Field("counter")
240+
if err != nil {
241+
t.Fatal(err)
242+
}
243+
244+
batch := new(leveldb.Batch)
245+
var want uint64
246+
got, err := counter.DecInBatch(batch)
247+
if err != nil {
248+
t.Fatal(err)
249+
}
250+
if got != want {
251+
t.Errorf("got uint64 %v, want %v", got, want)
252+
}
253+
err = db.WriteBatch(batch)
254+
if err != nil {
255+
t.Fatal(err)
256+
}
257+
got, err = counter.Get()
258+
if err != nil {
259+
t.Fatal(err)
260+
}
261+
if got != want {
262+
t.Errorf("got uint64 %v, want %v", got, want)
263+
}
264+
265+
batch2 := new(leveldb.Batch)
266+
want = 42
267+
counter.PutInBatch(batch2, want)
268+
err = db.WriteBatch(batch2)
269+
if err != nil {
270+
t.Fatal(err)
271+
}
272+
got, err = counter.Get()
273+
if err != nil {
274+
t.Fatal(err)
275+
}
276+
if got != want {
277+
t.Errorf("got uint64 %v, want %v", got, want)
278+
}
279+
280+
batch3 := new(leveldb.Batch)
281+
want = 41
282+
got, err = counter.DecInBatch(batch3)
283+
if err != nil {
284+
t.Fatal(err)
285+
}
286+
if got != want {
287+
t.Errorf("got uint64 %v, want %v", got, want)
288+
}
289+
err = db.WriteBatch(batch3)
290+
if err != nil {
291+
t.Fatal(err)
292+
}
293+
got, err = counter.Get()
294+
if err != nil {
295+
t.Fatal(err)
296+
}
297+
if got != want {
298+
t.Errorf("got uint64 %v, want %v", got, want)
299+
}
300+
}

0 commit comments

Comments
 (0)