Skip to content

Commit b91bf08

Browse files
acudnonsense
authored andcommitted
cmd/swarm/swarm-smoke: sliding window test (ethereum#18967)
1 parent d884410 commit b91bf08

File tree

6 files changed

+416
-298
lines changed

6 files changed

+416
-298
lines changed

cmd/swarm/swarm-smoke/feed_upload_and_sync.go

+5-93
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@ package main
22

33
import (
44
"bytes"
5-
"context"
65
"crypto/md5"
6+
crand "crypto/rand"
77
"fmt"
88
"io"
99
"io/ioutil"
10-
"net/http"
11-
"net/http/httptrace"
1210
"os"
1311
"os/exec"
1412
"strings"
@@ -18,13 +16,7 @@ import (
1816
"github.com/ethereum/go-ethereum/common/hexutil"
1917
"github.com/ethereum/go-ethereum/crypto"
2018
"github.com/ethereum/go-ethereum/log"
21-
"github.com/ethereum/go-ethereum/metrics"
22-
"github.com/ethereum/go-ethereum/swarm/api/client"
23-
"github.com/ethereum/go-ethereum/swarm/spancontext"
2419
"github.com/ethereum/go-ethereum/swarm/storage/feed"
25-
"github.com/ethereum/go-ethereum/swarm/testutil"
26-
colorable "github.com/mattn/go-colorable"
27-
opentracing "github.com/opentracing/opentracing-go"
2820
"github.com/pborman/uuid"
2921
cli "gopkg.in/urfave/cli.v1"
3022
)
@@ -33,27 +25,6 @@ const (
3325
feedRandomDataLength = 8
3426
)
3527

36-
func cliFeedUploadAndSync(c *cli.Context) error {
37-
metrics.GetOrRegisterCounter("feed-and-sync", nil).Inc(1)
38-
log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))))
39-
40-
errc := make(chan error)
41-
go func() {
42-
errc <- feedUploadAndSync(c)
43-
}()
44-
45-
select {
46-
case err := <-errc:
47-
if err != nil {
48-
metrics.GetOrRegisterCounter("feed-and-sync.fail", nil).Inc(1)
49-
}
50-
return err
51-
case <-time.After(time.Duration(timeout) * time.Second):
52-
metrics.GetOrRegisterCounter("feed-and-sync.timeout", nil).Inc(1)
53-
return fmt.Errorf("timeout after %v sec", timeout)
54-
}
55-
}
56-
5728
// TODO: retrieve with manifest + extract repeating code
5829
func feedUploadAndSync(c *cli.Context) error {
5930
defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now())
@@ -232,9 +203,10 @@ func feedUploadAndSync(c *cli.Context) error {
232203
seed := int(time.Now().UnixNano() / 1e6)
233204
log.Info("feed uploading to "+endpoints[0]+" and syncing", "seed", seed)
234205

235-
randomBytes := testutil.RandomBytes(seed, filesize*1000)
206+
h = md5.New()
207+
r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h)
236208

237-
hash, err := upload(&randomBytes, endpoints[0])
209+
hash, err := upload(r, filesize*1000, endpoints[0])
238210
if err != nil {
239211
return err
240212
}
@@ -243,10 +215,7 @@ func feedUploadAndSync(c *cli.Context) error {
243215
return err
244216
}
245217
multihashHex := hexutil.Encode(hashBytes)
246-
fileHash, err := digest(bytes.NewReader(randomBytes))
247-
if err != nil {
248-
return err
249-
}
218+
fileHash := h.Sum(nil)
250219

251220
log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fileHash))
252221

@@ -307,60 +276,3 @@ func feedUploadAndSync(c *cli.Context) error {
307276

308277
return nil
309278
}
310-
311-
func fetchFeed(topic string, user string, endpoint string, original []byte, ruid string) error {
312-
ctx, sp := spancontext.StartSpan(context.Background(), "feed-and-sync.fetch")
313-
defer sp.Finish()
314-
315-
log.Trace("sleeping", "ruid", ruid)
316-
time.Sleep(3 * time.Second)
317-
318-
log.Trace("http get request (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user)
319-
320-
var tn time.Time
321-
reqUri := endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user
322-
req, _ := http.NewRequest("GET", reqUri, nil)
323-
324-
opentracing.GlobalTracer().Inject(
325-
sp.Context(),
326-
opentracing.HTTPHeaders,
327-
opentracing.HTTPHeadersCarrier(req.Header))
328-
329-
trace := client.GetClientTrace("feed-and-sync - http get", "feed-and-sync", ruid, &tn)
330-
331-
req = req.WithContext(httptrace.WithClientTrace(ctx, trace))
332-
transport := http.DefaultTransport
333-
334-
//transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
335-
336-
tn = time.Now()
337-
res, err := transport.RoundTrip(req)
338-
if err != nil {
339-
log.Error(err.Error(), "ruid", ruid)
340-
return err
341-
}
342-
343-
log.Trace("http get response (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user, "code", res.StatusCode, "len", res.ContentLength)
344-
345-
if res.StatusCode != 200 {
346-
return fmt.Errorf("expected status code %d, got %v (ruid %v)", 200, res.StatusCode, ruid)
347-
}
348-
349-
defer res.Body.Close()
350-
351-
rdigest, err := digest(res.Body)
352-
if err != nil {
353-
log.Warn(err.Error(), "ruid", ruid)
354-
return err
355-
}
356-
357-
if !bytes.Equal(rdigest, original) {
358-
err := fmt.Errorf("downloaded imported file md5=%x is not the same as the generated one=%x", rdigest, original)
359-
log.Warn(err.Error(), "ruid", ruid)
360-
return err
361-
}
362-
363-
log.Trace("downloaded file matches random file", "ruid", ruid, "len", res.ContentLength)
364-
365-
return nil
366-
}

cmd/swarm/swarm-smoke/main.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -140,19 +140,25 @@ func main() {
140140
Name: "upload_and_sync",
141141
Aliases: []string{"c"},
142142
Usage: "upload and sync",
143-
Action: cliUploadAndSync,
143+
Action: wrapCliCommand("upload-and-sync", true, uploadAndSync),
144144
},
145145
{
146146
Name: "feed_sync",
147147
Aliases: []string{"f"},
148148
Usage: "feed update generate, upload and sync",
149-
Action: cliFeedUploadAndSync,
149+
Action: wrapCliCommand("feed-and-sync", true, feedUploadAndSync),
150150
},
151151
{
152152
Name: "upload_speed",
153153
Aliases: []string{"u"},
154154
Usage: "measure upload speed",
155-
Action: cliUploadSpeed,
155+
Action: wrapCliCommand("upload-speed", true, uploadSpeed),
156+
},
157+
{
158+
Name: "sliding_window",
159+
Aliases: []string{"s"},
160+
Usage: "measure network aggregate capacity",
161+
Action: wrapCliCommand("sliding-window", false, slidingWindow),
156162
},
157163
}
158164

+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// Copyright 2018 The go-ethereum Authors
2+
// This file is part of go-ethereum.
3+
//
4+
// go-ethereum is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// go-ethereum is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU General Public License
15+
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package main
18+
19+
import (
20+
"crypto/md5"
21+
crand "crypto/rand"
22+
"fmt"
23+
"io"
24+
"math/rand"
25+
"time"
26+
27+
"github.com/ethereum/go-ethereum/log"
28+
"github.com/ethereum/go-ethereum/metrics"
29+
"github.com/pborman/uuid"
30+
31+
cli "gopkg.in/urfave/cli.v1"
32+
)
33+
34+
var seed = time.Now().UTC().UnixNano()
35+
36+
func init() {
37+
rand.Seed(seed)
38+
}
39+
40+
type uploadResult struct {
41+
hash string
42+
digest []byte
43+
}
44+
45+
func slidingWindow(c *cli.Context) error {
46+
defer func(now time.Time) {
47+
totalTime := time.Since(now)
48+
49+
log.Info("total time", "time", totalTime)
50+
metrics.GetOrRegisterCounter("sliding-window.total-time", nil).Inc(int64(totalTime))
51+
}(time.Now())
52+
53+
generateEndpoints(scheme, cluster, appName, from, to)
54+
hashes := []uploadResult{} //swarm hashes of the uploads
55+
nodes := to - from
56+
const iterationTimeout = 30 * time.Second
57+
log.Info("sliding window test started", "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout)
58+
uploadedBytes := 0
59+
networkDepth := 0
60+
errored := false
61+
62+
outer:
63+
for {
64+
log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed)
65+
66+
h := md5.New()
67+
r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h)
68+
t1 := time.Now()
69+
70+
hash, err := upload(r, filesize*1000, endpoints[0])
71+
if err != nil {
72+
log.Error(err.Error())
73+
return err
74+
}
75+
76+
metrics.GetOrRegisterResettingTimer("sliding-window.upload-time", nil).UpdateSince(t1)
77+
78+
fhash := h.Sum(nil)
79+
80+
log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "sleeping", syncDelay)
81+
hashes = append(hashes, uploadResult{hash: hash, digest: fhash})
82+
time.Sleep(time.Duration(syncDelay) * time.Second)
83+
uploadedBytes += filesize * 1000
84+
85+
for i, v := range hashes {
86+
timeout := time.After(time.Duration(timeout) * time.Second)
87+
errored = false
88+
89+
inner:
90+
for {
91+
select {
92+
case <-timeout:
93+
errored = true
94+
log.Error("error retrieving hash. timeout", "hash idx", i, "err", err)
95+
metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1)
96+
break inner
97+
default:
98+
randIndex := 1 + rand.Intn(len(endpoints)-1)
99+
ruid := uuid.New()[:8]
100+
start := time.Now()
101+
err := fetch(v.hash, endpoints[randIndex], v.digest, ruid)
102+
if err != nil {
103+
continue inner
104+
}
105+
metrics.GetOrRegisterResettingTimer("sliding-window.single.fetch-time", nil).UpdateSince(start)
106+
break inner
107+
}
108+
}
109+
110+
if errored {
111+
break outer
112+
}
113+
networkDepth = i
114+
metrics.GetOrRegisterGauge("sliding-window.network-depth", nil).Update(int64(networkDepth))
115+
}
116+
}
117+
118+
log.Info("sliding window test finished", "errored?", errored, "networkDepth", networkDepth, "networkDepth(kb)", networkDepth*filesize)
119+
log.Info("stats", "uploadedFiles", len(hashes), "uploadedKb", uploadedBytes/1000, "filesizeKb", filesize)
120+
121+
return nil
122+
}

0 commit comments

Comments
 (0)