@@ -18,6 +18,7 @@ package main
18
18
19
19
import (
20
20
"context"
21
+ "errors"
21
22
"fmt"
22
23
"io"
23
24
"net/http"
@@ -191,85 +192,126 @@ func getRemoteDataStream(url string, hint string) (result io.ReadCloser, ctxCanc
191
192
return
192
193
}
193
194
194
- func downloadCatchpoint (addr string , round int ) (tarName string , err error ) {
195
+ func doDownloadCatchpoint (url string , wdReader util.WatchdogStreamReader , out io.Writer ) error {
196
+ writeChunkSize := 64 * 1024
197
+
198
+ var totalBytes int
199
+ tempBytes := make ([]byte , writeChunkSize )
200
+ lastProgressUpdate := time .Now ()
201
+ progress := - 25
202
+ printDownloadProgressLine (progress , 50 , url , 0 )
203
+
204
+ for {
205
+ n , err := wdReader .Read (tempBytes )
206
+ if err == io .EOF {
207
+ return nil
208
+ }
209
+ if err != nil {
210
+ return err
211
+ }
212
+ totalBytes += n
213
+ _ , err = out .Write (tempBytes [:n ])
214
+ if err != nil {
215
+ return err
216
+ }
217
+
218
+ err = wdReader .Reset ()
219
+ if err == io .EOF {
220
+ return nil
221
+ }
222
+ if err != nil {
223
+ return err
224
+ }
225
+ if time .Since (lastProgressUpdate ) > 50 * time .Millisecond {
226
+ lastProgressUpdate = time .Now ()
227
+ printDownloadProgressLine (progress , 50 , url , int64 (totalBytes ))
228
+ progress ++
229
+ }
230
+ }
231
+ }
232
+
233
+ // Downloads a catchpoint tar file and returns the path to the tar file.
234
+ func downloadCatchpoint (addr string , round int ) (string , error ) {
195
235
genesisID := strings .Split (networkName , "." )[0 ] + "-v1.0"
196
236
urlTemplate := "http://" + addr + "/v1/" + genesisID + "/%s/" + strconv .FormatUint (uint64 (round ), 36 )
197
237
catchpointURL := fmt .Sprintf (urlTemplate , "ledger" )
198
238
199
239
catchpointStream , catchpointCtxCancel , err := getRemoteDataStream (catchpointURL , "catchpoint" )
200
240
defer catchpointCtxCancel ()
201
241
if err != nil {
202
- return
242
+ return "" , err
203
243
}
204
244
defer catchpointStream .Close ()
205
245
206
246
dirName := "./" + strings .Split (networkName , "." )[0 ] + "/" + strings .Split (addr , "." )[0 ]
207
247
os .RemoveAll (dirName )
208
248
err = os .MkdirAll (dirName , 0777 )
209
249
if err != nil && ! os .IsExist (err ) {
210
- return
250
+ return "" , err
211
251
}
212
- tarName = dirName + "/" + strconv .FormatUint (uint64 (round ), 10 ) + ".tar"
213
- file , err2 := os .Create (tarName ) // will create a file with 0666 permission.
214
- if err2 != nil {
215
- return tarName , err2
252
+ tarName : = dirName + "/" + strconv .FormatUint (uint64 (round ), 10 ) + ".tar"
253
+ file , err := os .Create (tarName ) // will create a file with 0666 permission.
254
+ if err != nil {
255
+ return "" , err
216
256
}
217
- defer func () {
218
- err = file .Close ()
219
- }()
220
- writeChunkSize := 64 * 1024
257
+ defer file .Close ()
221
258
222
- wdReader := util .MakeWatchdogStreamReader (catchpointStream , 4096 , 4096 , 2 * time .Second )
223
- var totalBytes int
224
- tempBytes := make ([]byte , writeChunkSize )
225
- lastProgressUpdate := time .Now ()
226
- progress := - 25
227
- printDownloadProgressLine (progress , 50 , catchpointURL , 0 )
228
- defer printDownloadProgressLine (0 , 0 , catchpointURL , 0 )
229
- var n int
230
- for {
231
- n , err = wdReader .Read (tempBytes )
232
- if err != nil && err != io .EOF {
233
- return
234
- }
235
- totalBytes += n
236
- writtenBytes , err2 := file .Write (tempBytes [:n ])
237
- if err2 != nil || n != writtenBytes {
238
- return tarName , err2
239
- }
259
+ wdReader := util .MakeWatchdogStreamReader (catchpointStream , 4096 , 4096 , 5 * time .Second )
260
+ defer wdReader .Close ()
240
261
241
- err = wdReader .Reset ()
242
- if err != nil {
243
- if err == io .EOF {
244
- return tarName , nil
245
- }
246
- return
262
+ err = doDownloadCatchpoint (catchpointURL , wdReader , file )
263
+ if err != nil {
264
+ return "" , err
265
+ }
266
+
267
+ printDownloadProgressLine (0 , 0 , catchpointURL , 0 )
268
+
269
+ err = file .Close ()
270
+ if err != nil {
271
+ return "" , err
272
+ }
273
+
274
+ err = catchpointStream .Close ()
275
+ if err != nil {
276
+ return "" , err
277
+ }
278
+
279
+ return tarName , nil
280
+ }
281
+
282
+ func deleteLedgerFiles (deleteTracker bool ) error {
283
+ paths := []string {
284
+ "./ledger.block.sqlite" ,
285
+ "./ledger.block.sqlite-shm" ,
286
+ "./ledger.block.sqlite-wal" ,
287
+ }
288
+ if deleteTracker {
289
+ trackerPaths := []string {
290
+ "./ledger.tracker.sqlite" ,
291
+ "./ledger.tracker.sqlite-shm" ,
292
+ "./ledger.tracker.sqlite-wal" ,
247
293
}
248
- if time .Since (lastProgressUpdate ) > 50 * time .Millisecond {
249
- lastProgressUpdate = time .Now ()
250
- printDownloadProgressLine (progress , 50 , catchpointURL , int64 (totalBytes ))
251
- progress ++
294
+ paths = append (paths , trackerPaths ... )
295
+ }
296
+
297
+ for _ , path := range paths {
298
+ err := os .Remove (path )
299
+ if (err != nil ) && ! errors .Is (err , os .ErrNotExist ) {
300
+ return err
252
301
}
253
302
}
303
+
304
+ return nil
254
305
}
255
306
256
307
func loadAndDump (addr string , tarFile string , genesisInitState ledgercore.InitState ) error {
257
- deleteLedgerFiles := func (deleteTracker bool ) {
258
- os .Remove ("./ledger.block.sqlite" )
259
- os .Remove ("./ledger.block.sqlite-shm" )
260
- os .Remove ("./ledger.block.sqlite-wal" )
261
- if deleteTracker {
262
- os .Remove ("./ledger.tracker.sqlite" )
263
- os .Remove ("./ledger.tracker.sqlite-shm" )
264
- os .Remove ("./ledger.tracker.sqlite-wal" )
265
- }
266
- }
267
308
// delete current ledger files.
268
309
deleteLedgerFiles (true )
269
310
cfg := config .GetDefaultLocal ()
270
311
l , err := ledger .OpenLedger (logging .Base (), "./ledger" , false , genesisInitState , cfg )
271
312
if err != nil {
272
313
reportErrorf ("Unable to open ledger : %v" , err )
314
+ return err
273
315
}
274
316
275
317
defer deleteLedgerFiles (! loadOnly )
@@ -279,6 +321,7 @@ func loadAndDump(addr string, tarFile string, genesisInitState ledgercore.InitSt
279
321
err = catchupAccessor .ResetStagingBalances (context .Background (), true )
280
322
if err != nil {
281
323
reportErrorf ("Unable to initialize catchup database : %v" , err )
324
+ return err
282
325
}
283
326
284
327
stats , err := os .Stat (tarFile )
@@ -297,6 +340,7 @@ func loadAndDump(addr string, tarFile string, genesisInitState ledgercore.InitSt
297
340
fileHeader , err = loadCatchpointIntoDatabase (context .Background (), catchupAccessor , reader , tarSize )
298
341
if err != nil {
299
342
reportErrorf ("Unable to load catchpoint file into in-memory database : %v" , err )
343
+ return err
300
344
}
301
345
302
346
if ! loadOnly {
0 commit comments