Skip to content

Commit 4f3d22f

Browse files
janosnonsense
authored andcommitted
swarm/storage/localstore: new localstore package (ethereum#19015)
1 parent 41597c2 commit 4f3d22f

17 files changed

+4244
-0
lines changed

swarm/storage/localstore/doc.go

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2019 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
/*
18+
Package localstore provides disk storage layer for Swarm Chunk persistence.
19+
It uses swarm/shed abstractions on top of github.com/syndtr/goleveldb LevelDB
20+
implementation.
21+
22+
The main type is DB which manages the storage by providing methods to
23+
access and add Chunks and to manage their status.
24+
25+
Modes are abstractions that do specific changes to Chunks. There are three
26+
mode types:
27+
28+
- ModeGet, for Chunk access
29+
- ModePut, for adding Chunks to the database
30+
- ModeSet, for changing Chunk statuses
31+
32+
Every mode type has a corresponding type (Getter, Putter and Setter)
33+
that provides adequate method to perform the opperation and that type
34+
should be injected into localstore consumers instead the whole DB.
35+
This provides more clear insight which operations consumer is performing
36+
on the database.
37+
38+
Getters, Putters and Setters accept different get, put and set modes
39+
to perform different actions. For example, ModeGet has two different
40+
variables ModeGetRequest and ModeGetSync and two different Getters
41+
can be constructed with them that are used when the chunk is requested
42+
or when the chunk is synced as this two events are differently changing
43+
the database.
44+
45+
Subscription methods are implemented for a specific purpose of
46+
continuous iterations over Chunks that should be provided to
47+
Push and Pull syncing.
48+
49+
DB implements an internal garbage collector that removes only synced
50+
Chunks from the database based on their most recent access time.
51+
52+
Internally, DB stores Chunk data and any required information, such as
53+
store and access timestamps in different shed indexes that can be
54+
iterated on by garbage collector or subscriptions.
55+
*/
56+
package localstore

swarm/storage/localstore/gc.go

+302
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
// Copyright 2018 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
/*
18+
Counting number of items in garbage collection index
19+
20+
The number of items in garbage collection index is not the same as the number of
21+
chunks in retrieval index (total number of stored chunks). Chunk can be garbage
22+
collected only when it is set to a synced state by ModSetSync, and only then can
23+
be counted into garbage collection size, which determines whether a number of
24+
chunk should be removed from the storage by the garbage collection. This opens a
25+
possibility that the storage size exceeds the limit if files are locally
26+
uploaded and the node is not connected to other nodes or there is a problem with
27+
syncing.
28+
29+
Tracking of garbage collection size (gcSize) is focused on performance. Key
30+
points:
31+
32+
1. counting the number of key/value pairs in LevelDB takes around 0.7s for 1e6
33+
on a very fast ssd (unacceptable long time in reality)
34+
2. locking leveldb batch writes with a global mutex (serial batch writes) is
35+
not acceptable, we should use locking per chunk address
36+
37+
Because of point 1. we cannot count the number of items in garbage collection
38+
index in New constructor as it could last very long for realistic scenarios
39+
where limit is 5e6 and nodes are running on slower hdd disks or cloud providers
40+
with low IOPS.
41+
42+
Point 2. is a performance optimization to allow parallel batch writes with
43+
getters, putters and setters. Every single batch that they create contain only
44+
information related to a single chunk, no relations with other chunks or shared
45+
statistical data (like gcSize). This approach avoids race conditions on writing
46+
batches in parallel, but creates a problem of synchronizing statistical data
47+
values like gcSize. With global mutex lock, any data could be written by any
48+
batch, but would not use utilize the full potential of leveldb parallel writes.
49+
50+
To mitigate this two problems, the implementation of counting and persisting
51+
gcSize is split into two parts. One is the in-memory value (gcSize) that is fast
52+
to read and write with a dedicated mutex (gcSizeMu) if the batch which adds or
53+
removes items from garbage collection index is successful. The second part is
54+
the reliable persistence of this value to leveldb database, as storedGCSize
55+
field. This database field is saved by writeGCSizeWorker and writeGCSize
56+
functions when in-memory gcSize variable is changed, but no too often to avoid
57+
very frequent database writes. This database writes are triggered by
58+
writeGCSizeTrigger when a call is made to function incGCSize. Trigger ensures
59+
that no database writes are done only when gcSize is changed (contrary to a
60+
simpler periodic writes or checks). A backoff of 10s in writeGCSizeWorker
61+
ensures that no frequent batch writes are made. Saving the storedGCSize on
62+
database Close function ensures that in-memory gcSize is persisted when database
63+
is closed.
64+
65+
This persistence must be resilient to failures like panics. For this purpose, a
66+
collection of hashes that are added to the garbage collection index, but still
67+
not persisted to storedGCSize, must be tracked to count them in when DB is
68+
constructed again with New function after the failure (swarm node restarts). On
69+
every batch write that adds a new item to garbage collection index, the same
70+
hash is added to gcUncountedHashesIndex. This ensures that there is a persisted
71+
information which hashes were added to the garbage collection index. But, when
72+
the storedGCSize is saved by writeGCSize function, this values are removed in
73+
the same batch in which storedGCSize is changed to ensure consistency. When the
74+
panic happen, or database Close method is not saved. The database storage
75+
contains all information to reliably and efficiently get the correct number of
76+
items in garbage collection index. This is performed in the New function when
77+
all hashes in gcUncountedHashesIndex are counted, added to the storedGCSize and
78+
saved to the disk before the database is constructed again. Index
79+
gcUncountedHashesIndex is acting as dirty bit for recovery that provides
80+
information what needs to be corrected. With a simple dirty bit, the whole
81+
garbage collection index should me counted on recovery instead only the items in
82+
gcUncountedHashesIndex. Because of the triggering mechanizm of writeGCSizeWorker
83+
and relatively short backoff time, the number of hashes in
84+
gcUncountedHashesIndex should be low and it should take a very short time to
85+
recover from the previous failure. If there was no failure and
86+
gcUncountedHashesIndex is empty, which is the usual case, New function will take
87+
the minimal time to return.
88+
*/
89+
90+
package localstore
91+
92+
import (
93+
"time"
94+
95+
"github.com/ethereum/go-ethereum/log"
96+
"github.com/ethereum/go-ethereum/swarm/shed"
97+
"github.com/syndtr/goleveldb/leveldb"
98+
)
99+
100+
var (
101+
// gcTargetRatio defines the target number of items
102+
// in garbage collection index that will not be removed
103+
// on garbage collection. The target number of items
104+
// is calculated by gcTarget function. This value must be
105+
// in range (0,1]. For example, with 0.9 value,
106+
// garbage collection will leave 90% of defined capacity
107+
// in database after its run. This prevents frequent
108+
// garbage collection runs.
109+
gcTargetRatio = 0.9
110+
// gcBatchSize limits the number of chunks in a single
111+
// leveldb batch on garbage collection.
112+
gcBatchSize int64 = 1000
113+
)
114+
115+
// collectGarbageWorker is a long running function that waits for
116+
// collectGarbageTrigger channel to signal a garbage collection
117+
// run. GC run iterates on gcIndex and removes older items
118+
// form retrieval and other indexes.
119+
func (db *DB) collectGarbageWorker() {
120+
for {
121+
select {
122+
case <-db.collectGarbageTrigger:
123+
// run a single collect garbage run and
124+
// if done is false, gcBatchSize is reached and
125+
// another collect garbage run is needed
126+
collectedCount, done, err := db.collectGarbage()
127+
if err != nil {
128+
log.Error("localstore collect garbage", "err", err)
129+
}
130+
// check if another gc run is needed
131+
if !done {
132+
db.triggerGarbageCollection()
133+
}
134+
135+
if testHookCollectGarbage != nil {
136+
testHookCollectGarbage(collectedCount)
137+
}
138+
case <-db.close:
139+
return
140+
}
141+
}
142+
}
143+
144+
// collectGarbage removes chunks from retrieval and other
145+
// indexes if maximal number of chunks in database is reached.
146+
// This function returns the number of removed chunks. If done
147+
// is false, another call to this function is needed to collect
148+
// the rest of the garbage as the batch size limit is reached.
149+
// This function is called in collectGarbageWorker.
150+
func (db *DB) collectGarbage() (collectedCount int64, done bool, err error) {
151+
batch := new(leveldb.Batch)
152+
target := db.gcTarget()
153+
154+
done = true
155+
err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
156+
// protect parallel updates
157+
unlock, err := db.lockAddr(item.Address)
158+
if err != nil {
159+
return false, err
160+
}
161+
defer unlock()
162+
163+
gcSize := db.getGCSize()
164+
if gcSize-collectedCount <= target {
165+
return true, nil
166+
}
167+
// delete from retrieve, pull, gc
168+
db.retrievalDataIndex.DeleteInBatch(batch, item)
169+
db.retrievalAccessIndex.DeleteInBatch(batch, item)
170+
db.pullIndex.DeleteInBatch(batch, item)
171+
db.gcIndex.DeleteInBatch(batch, item)
172+
collectedCount++
173+
if collectedCount >= gcBatchSize {
174+
// bach size limit reached,
175+
// another gc run is needed
176+
done = false
177+
return true, nil
178+
}
179+
return false, nil
180+
}, nil)
181+
if err != nil {
182+
return 0, false, err
183+
}
184+
185+
err = db.shed.WriteBatch(batch)
186+
if err != nil {
187+
return 0, false, err
188+
}
189+
// batch is written, decrement gcSize
190+
db.incGCSize(-collectedCount)
191+
return collectedCount, done, nil
192+
}
193+
194+
// gcTrigger retruns the absolute value for garbage collection
195+
// target value, calculated from db.capacity and gcTargetRatio.
196+
func (db *DB) gcTarget() (target int64) {
197+
return int64(float64(db.capacity) * gcTargetRatio)
198+
}
199+
200+
// incGCSize increments gcSize by the provided number.
201+
// If count is negative, it will decrement gcSize.
202+
func (db *DB) incGCSize(count int64) {
203+
if count == 0 {
204+
return
205+
}
206+
207+
db.gcSizeMu.Lock()
208+
new := db.gcSize + count
209+
db.gcSize = new
210+
db.gcSizeMu.Unlock()
211+
212+
select {
213+
case db.writeGCSizeTrigger <- struct{}{}:
214+
default:
215+
}
216+
if new >= db.capacity {
217+
db.triggerGarbageCollection()
218+
}
219+
}
220+
221+
// getGCSize returns gcSize value by locking it
222+
// with gcSizeMu mutex.
223+
func (db *DB) getGCSize() (count int64) {
224+
db.gcSizeMu.RLock()
225+
count = db.gcSize
226+
db.gcSizeMu.RUnlock()
227+
return count
228+
}
229+
230+
// triggerGarbageCollection signals collectGarbageWorker
231+
// to call collectGarbage.
232+
func (db *DB) triggerGarbageCollection() {
233+
select {
234+
case db.collectGarbageTrigger <- struct{}{}:
235+
case <-db.close:
236+
default:
237+
}
238+
}
239+
240+
// writeGCSizeWorker writes gcSize on trigger event
241+
// and waits writeGCSizeDelay after each write.
242+
// It implements a linear backoff with delay of
243+
// writeGCSizeDelay duration to avoid very frequent
244+
// database operations.
245+
func (db *DB) writeGCSizeWorker() {
246+
for {
247+
select {
248+
case <-db.writeGCSizeTrigger:
249+
err := db.writeGCSize(db.getGCSize())
250+
if err != nil {
251+
log.Error("localstore write gc size", "err", err)
252+
}
253+
// Wait some time before writing gc size in the next
254+
// iteration. This prevents frequent I/O operations.
255+
select {
256+
case <-time.After(10 * time.Second):
257+
case <-db.close:
258+
return
259+
}
260+
case <-db.close:
261+
return
262+
}
263+
}
264+
}
265+
266+
// writeGCSize stores the number of items in gcIndex.
267+
// It removes all hashes from gcUncountedHashesIndex
268+
// not to include them on the next DB initialization
269+
// (New function) when gcSize is counted.
270+
func (db *DB) writeGCSize(gcSize int64) (err error) {
271+
const maxBatchSize = 1000
272+
273+
batch := new(leveldb.Batch)
274+
db.storedGCSize.PutInBatch(batch, uint64(gcSize))
275+
batchSize := 1
276+
277+
// use only one iterator as it acquires its snapshot
278+
// not to remove hashes from index that are added
279+
// after stored gc size is written
280+
err = db.gcUncountedHashesIndex.Iterate(func(item shed.Item) (stop bool, err error) {
281+
db.gcUncountedHashesIndex.DeleteInBatch(batch, item)
282+
batchSize++
283+
if batchSize >= maxBatchSize {
284+
err = db.shed.WriteBatch(batch)
285+
if err != nil {
286+
return false, err
287+
}
288+
batch.Reset()
289+
batchSize = 0
290+
}
291+
return false, nil
292+
}, nil)
293+
if err != nil {
294+
return err
295+
}
296+
return db.shed.WriteBatch(batch)
297+
}
298+
299+
// testHookCollectGarbage is a hook that can provide
300+
// information when a garbage collection run is done
301+
// and how many items it removed.
302+
var testHookCollectGarbage func(collectedCount int64)

0 commit comments

Comments
 (0)