From 0e53c594f9e9147d5a6d590e04f66f62e7ec828e Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Mon, 3 Dec 2018 15:11:31 +0100 Subject: [PATCH 01/44] cmd/swarm/swarm-smoke: timeout flag --- cmd/swarm/swarm-smoke/main.go | 7 +++++++ cmd/swarm/swarm-smoke/upload_and_sync.go | 19 +++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 845998dc12..06ec20a079 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -35,6 +35,7 @@ var ( from int to int verbosity int + timeout int ) func main() { @@ -91,6 +92,12 @@ func main() { Usage: "verbosity", Destination: &verbosity, }, + cli.IntFlag{ + Name: "timeout", + Value: 120, + Usage: "timeout in seconds after which kill the process", + Destination: &timeout, + }, } app.Commands = []cli.Command{ diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 3843457dc4..21be544fe3 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -40,11 +40,11 @@ import ( func generateEndpoints(scheme string, cluster string, app string, from int, to int) { if cluster == "prod" { - for port := from; port <= to; port++ { + for port := from; port < to; port++ { endpoints = append(endpoints, fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, port)) } } else { - for port := from; port <= to; port++ { + for port := from; port < to; port++ { endpoints = append(endpoints, fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, port, cluster)) } } @@ -58,6 +58,21 @@ func cliUploadAndSync(c *cli.Context) error { log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) + errc := make(chan error) + go func() { + errc <- uploadAndSync(c) + }() + + select { + case err := <-errc: + return err + case <-time.After(time.Duration(timeout) * time.Second): + return fmt.Errorf("timeout after %v sec", timeout) + } + +} + +func uploadAndSync(c *cli.Context) error { defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "kb", filesize) }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) From 53905cb7095bdc1b72f9ca650adc6b2740b2342e Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Tue, 4 Dec 2018 12:27:00 +0530 Subject: [PATCH 02/44] added influxdb flags --- cmd/swarm/swarm-smoke/main.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 06ec20a079..c52e9edc95 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -20,6 +20,8 @@ import ( "os" "sort" + swarmmetrics "github.com/ethereum/go-ethereum/swarm/metrics" + "github.com/ethereum/go-ethereum/log" cli "gopkg.in/urfave/cli.v1" @@ -100,6 +102,9 @@ func main() { }, } + app.Flags = append(app.Flags, swarmmetrics.Flags[0]) + app.Flags = append(app.Flags, swarmmetrics.Flags[1:]...) + app.Commands = []cli.Command{ { Name: "upload_and_sync", From ad3aef73027a1db423ddc18b56ca5cff20a5fb54 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Tue, 4 Dec 2018 13:28:32 +0530 Subject: [PATCH 03/44] correct metrics flags and add metrics init on smoke run --- cmd/swarm/swarm-smoke/main.go | 39 +++++++++++++++++++++++++++++++++-- swarm/metrics/flags.go | 32 +++++++++++++++------------- 2 files changed, 55 insertions(+), 16 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index c52e9edc95..48b762c076 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -18,8 +18,13 @@ package main import ( "os" + "runtime" "sort" + "time" + "github.com/ethereum/go-ethereum/cmd/utils" + gethmetrics "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/metrics/influxdb" swarmmetrics "github.com/ethereum/go-ethereum/swarm/metrics" "github.com/ethereum/go-ethereum/log" @@ -102,8 +107,14 @@ func main() { }, } - app.Flags = append(app.Flags, swarmmetrics.Flags[0]) - app.Flags = append(app.Flags, swarmmetrics.Flags[1:]...) + app.Flags = append(app.Flags, []cli.Flag{ + utils.MetricsEnabledFlag, + swarmmetrics.MetricsInfluxDBEndpointFlag, + swarmmetrics.MetricsInfluxDBDatabaseFlag, + swarmmetrics.MetricsInfluxDBUsernameFlag, + swarmmetrics.MetricsInfluxDBPasswordFlag, + swarmmetrics.MetricsInfluxDBHostTagFlag, + }...) app.Commands = []cli.Command{ { @@ -122,6 +133,11 @@ func main() { sort.Sort(cli.FlagsByName(app.Flags)) sort.Sort(cli.CommandsByName(app.Commands)) + app.Before = func(ctx *cli.Context) error { + runtime.GOMAXPROCS(runtime.NumCPU()) + setupMetrics(ctx) + return nil + } err := app.Run(os.Args) if err != nil { @@ -129,3 +145,22 @@ func main() { os.Exit(1) } } + +func setupMetrics(ctx *cli.Context) { + if gethmetrics.Enabled { + var ( + endpoint = ctx.GlobalString(swarmmetrics.MetricsInfluxDBEndpointFlag.Name) + database = ctx.GlobalString(swarmmetrics.MetricsInfluxDBDatabaseFlag.Name) + username = ctx.GlobalString(swarmmetrics.MetricsInfluxDBUsernameFlag.Name) + password = ctx.GlobalString(swarmmetrics.MetricsInfluxDBPasswordFlag.Name) + hosttag = ctx.GlobalString(swarmmetrics.MetricsInfluxDBHostTagFlag.Name) + ) + + // Start system runtime metrics collection + go gethmetrics.CollectProcessMetrics(2 * time.Second) + + go influxdb.InfluxDBWithTags(gethmetrics.DefaultRegistry, 10*time.Second, endpoint, database, username, password, "swarm.", map[string]string{ + "host": hosttag, + }) + } +} diff --git a/swarm/metrics/flags.go b/swarm/metrics/flags.go index 79490fd360..7c12120a60 100644 --- a/swarm/metrics/flags.go +++ b/swarm/metrics/flags.go @@ -27,26 +27,26 @@ import ( ) var ( - metricsEnableInfluxDBExportFlag = cli.BoolFlag{ + MetricsEnableInfluxDBExportFlag = cli.BoolFlag{ Name: "metrics.influxdb.export", Usage: "Enable metrics export/push to an external InfluxDB database", } - metricsInfluxDBEndpointFlag = cli.StringFlag{ + MetricsInfluxDBEndpointFlag = cli.StringFlag{ Name: "metrics.influxdb.endpoint", Usage: "Metrics InfluxDB endpoint", Value: "http://127.0.0.1:8086", } - metricsInfluxDBDatabaseFlag = cli.StringFlag{ + MetricsInfluxDBDatabaseFlag = cli.StringFlag{ Name: "metrics.influxdb.database", Usage: "Metrics InfluxDB database", Value: "metrics", } - metricsInfluxDBUsernameFlag = cli.StringFlag{ + MetricsInfluxDBUsernameFlag = cli.StringFlag{ Name: "metrics.influxdb.username", Usage: "Metrics InfluxDB username", Value: "", } - metricsInfluxDBPasswordFlag = cli.StringFlag{ + MetricsInfluxDBPasswordFlag = cli.StringFlag{ Name: "metrics.influxdb.password", Usage: "Metrics InfluxDB password", Value: "", @@ -55,7 +55,7 @@ var ( // It is used so that we can group all nodes and average a measurement across all of them, but also so // that we can select a specific node and inspect its measurements. // https://docs.influxdata.com/influxdb/v1.4/concepts/key_concepts/#tag-key - metricsInfluxDBHostTagFlag = cli.StringFlag{ + MetricsInfluxDBHostTagFlag = cli.StringFlag{ Name: "metrics.influxdb.host.tag", Usage: "Metrics InfluxDB `host` tag attached to all measurements", Value: "localhost", @@ -65,20 +65,24 @@ var ( // Flags holds all command-line flags required for metrics collection. var Flags = []cli.Flag{ utils.MetricsEnabledFlag, - metricsEnableInfluxDBExportFlag, - metricsInfluxDBEndpointFlag, metricsInfluxDBDatabaseFlag, metricsInfluxDBUsernameFlag, metricsInfluxDBPasswordFlag, metricsInfluxDBHostTagFlag, + MetricsEnableInfluxDBExportFlag, + MetricsInfluxDBEndpointFlag, + MetricsInfluxDBDatabaseFlag, + MetricsInfluxDBUsernameFlag, + MetricsInfluxDBPasswordFlag, + MetricsInfluxDBHostTagFlag, } func Setup(ctx *cli.Context) { if gethmetrics.Enabled { log.Info("Enabling swarm metrics collection") var ( - enableExport = ctx.GlobalBool(metricsEnableInfluxDBExportFlag.Name) - endpoint = ctx.GlobalString(metricsInfluxDBEndpointFlag.Name) - database = ctx.GlobalString(metricsInfluxDBDatabaseFlag.Name) - username = ctx.GlobalString(metricsInfluxDBUsernameFlag.Name) - password = ctx.GlobalString(metricsInfluxDBPasswordFlag.Name) - hosttag = ctx.GlobalString(metricsInfluxDBHostTagFlag.Name) + enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name) + endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name) + database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name) + username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name) + password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name) + hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name) ) // Start system runtime metrics collection From ea987e48c118745748df54a56ea4bbb499ac4a6e Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Tue, 4 Dec 2018 14:57:03 +0530 Subject: [PATCH 04/44] add metrics to upload and sync test --- cmd/swarm/swarm-smoke/main.go | 6 ++++++ cmd/swarm/swarm-smoke/upload_and_sync.go | 9 ++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 48b762c076..e14d3a20f9 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -44,6 +44,12 @@ var ( verbosity int timeout int ) +var ( + smokeUploadAndSyncCount = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.count", nil) + smokeUploadAndSyncFailCount = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.fail.count", nil) + smokeUploadAndSyncRunTime = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.time", nil) + smokeUploadAndSyncTimeout = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.timeout", nil) +) func main() { diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 21be544fe3..4b7de7cbe2 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -55,6 +55,7 @@ func generateEndpoints(scheme string, cluster string, app string, from int, to i } func cliUploadAndSync(c *cli.Context) error { + smokeUploadAndSyncCount.Inc(1) log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) @@ -65,15 +66,21 @@ func cliUploadAndSync(c *cli.Context) error { select { case err := <-errc: + if err != nil { + smokeUploadAndSyncFailCount.Inc(1) + } return err case <-time.After(time.Duration(timeout) * time.Second): + smokeUploadAndSyncTimeout.Inc(1) return fmt.Errorf("timeout after %v sec", timeout) } } func uploadAndSync(c *cli.Context) error { - defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "kb", filesize) }(time.Now()) + defer func(now time.Time) { + log.Info("total time", "time", time.Since(now), "kb", filesize) + }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) From 84a5fcf8ac4c181586ff05435192122ef27ef6c4 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Tue, 4 Dec 2018 17:28:44 +0530 Subject: [PATCH 05/44] add feed smoke metrics --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 22 +++++++++++++++++-- cmd/swarm/swarm-smoke/main.go | 4 ++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 7ec1528263..dec354c06b 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -26,11 +26,29 @@ const ( feedRandomDataLength = 8 ) -// TODO: retrieve with manifest + extract repeating code func cliFeedUploadAndSync(c *cli.Context) error { - + feedUploadAndSyncCount.Inc(1) log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))) + errc := make(chan error) + go func() { + errc <- feedUploadAndSync(c) + }() + + select { + case err := <-errc: + if err != nil { + feedUploadAndSyncFailCount.Inc(1) + } + return err + case <-time.After(time.Duration(timeout) * time.Second): + feedUploadAndSyncTimeout.Inc(1) + return fmt.Errorf("timeout after %v sec", timeout) + } +} + +// TODO: retrieve with manifest + extract repeating code +func feedUploadAndSync(c *cli.Context) error { defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index e14d3a20f9..df307ef40d 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -45,6 +45,10 @@ var ( timeout int ) var ( + feedUploadAndSyncCount = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.count", nil) + feedUploadAndSyncFailCount = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.fail.count", nil) + feedUploadAndSyncRunTime = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.time", nil) + feedUploadAndSyncTimeout = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.timeout", nil) smokeUploadAndSyncCount = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.count", nil) smokeUploadAndSyncFailCount = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.fail.count", nil) smokeUploadAndSyncRunTime = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.time", nil) From 705876070ad8ef30aa8832da555f259a5fc93085 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Tue, 4 Dec 2018 13:58:36 +0100 Subject: [PATCH 06/44] cmd/swarm/swarm-smoke: fix metrics collection --- cmd/swarm/swarm-smoke/main.go | 32 ++++++++++++------------ cmd/swarm/swarm-smoke/upload_and_sync.go | 16 ++++++++---- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index df307ef40d..bf4a6e6e61 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -17,8 +17,8 @@ package main import ( + "fmt" "os" - "runtime" "sort" "time" @@ -32,6 +32,10 @@ import ( cli "gopkg.in/urfave/cli.v1" ) +const ( + collectionInterval = 5 * time.Second +) + var ( endpoints []string includeLocalhost bool @@ -45,14 +49,9 @@ var ( timeout int ) var ( - feedUploadAndSyncCount = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.count", nil) - feedUploadAndSyncFailCount = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.fail.count", nil) - feedUploadAndSyncRunTime = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.time", nil) - feedUploadAndSyncTimeout = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.timeout", nil) - smokeUploadAndSyncCount = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.count", nil) - smokeUploadAndSyncFailCount = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.fail.count", nil) - smokeUploadAndSyncRunTime = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.time", nil) - smokeUploadAndSyncTimeout = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.timeout", nil) + feedUploadAndSyncCount = gethmetrics.NewRegisteredCounter("feed-and-sync", nil) + feedUploadAndSyncFailCount = gethmetrics.NewRegisteredCounter("feed-and-sync.fail", nil) + feedUploadAndSyncTimeout = gethmetrics.NewRegisteredCounter("feed-and-sync.timeout", nil) ) func main() { @@ -141,11 +140,14 @@ func main() { }, } + // wait for metrics reporter to push latest measurements + defer func() { + time.Sleep(collectionInterval + 1*time.Second) + }() + sort.Sort(cli.FlagsByName(app.Flags)) sort.Sort(cli.CommandsByName(app.Commands)) app.Before = func(ctx *cli.Context) error { - runtime.GOMAXPROCS(runtime.NumCPU()) - setupMetrics(ctx) return nil } @@ -166,11 +168,9 @@ func setupMetrics(ctx *cli.Context) { hosttag = ctx.GlobalString(swarmmetrics.MetricsInfluxDBHostTagFlag.Name) ) - // Start system runtime metrics collection - go gethmetrics.CollectProcessMetrics(2 * time.Second) - - go influxdb.InfluxDBWithTags(gethmetrics.DefaultRegistry, 10*time.Second, endpoint, database, username, password, "swarm.", map[string]string{ - "host": hosttag, + go influxdb.InfluxDBWithTags(gethmetrics.DefaultRegistry, collectionInterval, endpoint, database, username, password, "swarm-smoke.", map[string]string{ + "host": hosttag, + "filesize": fmt.Sprintf("%v", filesize), }) } } diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 4b7de7cbe2..9910a82c1e 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/pborman/uuid" + metrics "github.com/rcrowley/go-metrics" cli "gopkg.in/urfave/cli.v1" ) @@ -55,10 +56,13 @@ func generateEndpoints(scheme string, cluster string, app string, from int, to i } func cliUploadAndSync(c *cli.Context) error { - smokeUploadAndSyncCount.Inc(1) log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) + setupMetrics(c) + + metrics.GetOrRegisterCounter("upload-and-sync", nil).Inc(1) + errc := make(chan error) go func() { errc <- uploadAndSync(c) @@ -67,19 +71,21 @@ func cliUploadAndSync(c *cli.Context) error { select { case err := <-errc: if err != nil { - smokeUploadAndSyncFailCount.Inc(1) + metrics.GetOrRegisterCounter("upload-and-sync.fail", nil).Inc(1) } return err case <-time.After(time.Duration(timeout) * time.Second): - smokeUploadAndSyncTimeout.Inc(1) + metrics.GetOrRegisterCounter("upload-and-sync.timeout", nil).Inc(1) return fmt.Errorf("timeout after %v sec", timeout) } - } func uploadAndSync(c *cli.Context) error { defer func(now time.Time) { - log.Info("total time", "time", time.Since(now), "kb", filesize) + totalTime := time.Since(now) + + log.Info("total time", "time", totalTime, "kb", filesize) + metrics.GetOrRegisterCounter("upload-and-sync.time", nil).Inc(int64(totalTime)) }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) From 0313009ec950a237f141f89af439afd69bfa0ca2 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Tue, 4 Dec 2018 15:03:21 +0100 Subject: [PATCH 07/44] cmd/swarm: use correct import --- cmd/swarm/swarm-smoke/upload_and_sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 9910a82c1e..30ed7371ee 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -33,8 +33,8 @@ import ( "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/pborman/uuid" - metrics "github.com/rcrowley/go-metrics" cli "gopkg.in/urfave/cli.v1" ) From c05de65111bfaf56ce91880b9f0503bde801a697 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Tue, 4 Dec 2018 15:53:25 +0100 Subject: [PATCH 08/44] add git commit --- cmd/swarm/swarm-smoke/main.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index bf4a6e6e61..b9252be4f6 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -32,6 +32,10 @@ import ( cli "gopkg.in/urfave/cli.v1" ) +var ( + gitCommit string // Git SHA1 commit hash of the release (set via linker flags) +) + const ( collectionInterval = 5 * time.Second ) @@ -170,6 +174,7 @@ func setupMetrics(ctx *cli.Context) { go influxdb.InfluxDBWithTags(gethmetrics.DefaultRegistry, collectionInterval, endpoint, database, username, password, "swarm-smoke.", map[string]string{ "host": hosttag, + "version": gitCommit, "filesize": fmt.Sprintf("%v", filesize), }) } From acb3149c06c43f0f1920ef133d1874938051868c Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Tue, 4 Dec 2018 17:01:54 +0100 Subject: [PATCH 09/44] cmd/swarm: fix wait for reporter --- cmd/swarm/swarm-smoke/main.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index b9252be4f6..21f41af65c 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -144,11 +144,6 @@ func main() { }, } - // wait for metrics reporter to push latest measurements - defer func() { - time.Sleep(collectionInterval + 1*time.Second) - }() - sort.Sort(cli.FlagsByName(app.Flags)) sort.Sort(cli.CommandsByName(app.Commands)) app.Before = func(ctx *cli.Context) error { @@ -158,8 +153,15 @@ func main() { err := app.Run(os.Args) if err != nil { log.Error(err.Error()) + + // wait for metrics reporter to push latest measurements + time.Sleep(collectionInterval + 1*time.Second) + os.Exit(1) } + + // wait for metrics reporter to push latest measurements + time.Sleep(collectionInterval + 1*time.Second) } func setupMetrics(ctx *cli.Context) { From d6ecdfeeba01817ebfde5d9a158fe9f2703ad7cc Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Tue, 4 Dec 2018 17:34:14 +0100 Subject: [PATCH 10/44] measure upload time --- cmd/swarm/swarm-smoke/upload_and_sync.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 30ed7371ee..f41d80d947 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -85,7 +85,7 @@ func uploadAndSync(c *cli.Context) error { totalTime := time.Since(now) log.Info("total time", "time", totalTime, "kb", filesize) - metrics.GetOrRegisterCounter("upload-and-sync.time", nil).Inc(int64(totalTime)) + metrics.GetOrRegisterCounter("upload-and-sync.total-time", nil).Inc(int64(totalTime)) }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) @@ -95,11 +95,13 @@ func uploadAndSync(c *cli.Context) error { f, cleanup := generateRandomFile(filesize * 1000) defer cleanup() + t1 := time.Now() hash, err := upload(f, endpoints[0]) if err != nil { log.Error(err.Error()) return err } + metrics.GetOrRegisterCounter("upload-and-sync.upload-time", nil).Inc(int64(time.Since(t1))) fhash, err := digest(f) if err != nil { From 99a1c3095f157b0737d750d27301ca7d0ddf5af4 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Tue, 4 Dec 2018 18:21:06 +0100 Subject: [PATCH 11/44] emit metrics only once --- cmd/swarm/swarm-smoke/main.go | 19 +++++----------- cmd/swarm/swarm-smoke/upload_and_sync.go | 2 -- metrics/influxdb/influxdb.go | 28 ++++++++++++++++++++++++ 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 21f41af65c..f1e50372ec 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -20,7 +20,6 @@ import ( "fmt" "os" "sort" - "time" "github.com/ethereum/go-ethereum/cmd/utils" gethmetrics "github.com/ethereum/go-ethereum/metrics" @@ -36,10 +35,6 @@ var ( gitCommit string // Git SHA1 commit hash of the release (set via linker flags) ) -const ( - collectionInterval = 5 * time.Second -) - var ( endpoints []string includeLocalhost bool @@ -149,22 +144,20 @@ func main() { app.Before = func(ctx *cli.Context) error { return nil } + app.After = func(ctx *cli.Context) error { + emitMetrics(ctx) + return nil + } err := app.Run(os.Args) if err != nil { log.Error(err.Error()) - // wait for metrics reporter to push latest measurements - time.Sleep(collectionInterval + 1*time.Second) - os.Exit(1) } - - // wait for metrics reporter to push latest measurements - time.Sleep(collectionInterval + 1*time.Second) } -func setupMetrics(ctx *cli.Context) { +func emitMetrics(ctx *cli.Context) { if gethmetrics.Enabled { var ( endpoint = ctx.GlobalString(swarmmetrics.MetricsInfluxDBEndpointFlag.Name) @@ -174,7 +167,7 @@ func setupMetrics(ctx *cli.Context) { hosttag = ctx.GlobalString(swarmmetrics.MetricsInfluxDBHostTagFlag.Name) ) - go influxdb.InfluxDBWithTags(gethmetrics.DefaultRegistry, collectionInterval, endpoint, database, username, password, "swarm-smoke.", map[string]string{ + influxdb.InfluxDBWithTagsOnce(gethmetrics.DefaultRegistry, endpoint, database, username, password, "swarm-smoke.", map[string]string{ "host": hosttag, "version": gitCommit, "filesize": fmt.Sprintf("%v", filesize), diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index f41d80d947..e5699dd50a 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -59,8 +59,6 @@ func cliUploadAndSync(c *cli.Context) error { log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) - setupMetrics(c) - metrics.GetOrRegisterCounter("upload-and-sync", nil).Inc(1) errc := make(chan error) diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go index 31a5c21b5f..5f99bb0146 100644 --- a/metrics/influxdb/influxdb.go +++ b/metrics/influxdb/influxdb.go @@ -58,6 +58,34 @@ func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, userna rep.run() } +// InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags +func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) { + u, err := uurl.Parse(url) + if err != nil { + log.Warn("Unable to parse InfluxDB", "url", url, "err", err) + return + } + + rep := &reporter{ + reg: r, + url: *u, + database: database, + username: username, + password: password, + namespace: namespace, + tags: tags, + cache: make(map[string]int64), + } + if err := rep.makeClient(); err != nil { + log.Warn("Unable to make InfluxDB client", "err", err) + return + } + + if err := rep.send(); err != nil { + log.Warn("Unable to send to InfluxDB", "err", err) + } +} + func (r *reporter) makeClient() (err error) { r.client, err = client.NewClient(client.Config{ URL: r.url, From cb72c5764a97c63e6e932ab35c02e574b20a8f5d Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Wed, 5 Dec 2018 12:32:16 +0530 Subject: [PATCH 12/44] address PR comments --- cmd/swarm/swarm-smoke/main.go | 11 +++-------- metrics/influxdb/influxdb.go | 13 +++++++------ 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index f1e50372ec..ea419e9fac 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -141,12 +141,8 @@ func main() { sort.Sort(cli.FlagsByName(app.Flags)) sort.Sort(cli.CommandsByName(app.Commands)) - app.Before = func(ctx *cli.Context) error { - return nil - } app.After = func(ctx *cli.Context) error { - emitMetrics(ctx) - return nil + return emitMetrics(ctx) } err := app.Run(os.Args) @@ -157,7 +153,7 @@ func main() { } } -func emitMetrics(ctx *cli.Context) { +func emitMetrics(ctx *cli.Context) error { if gethmetrics.Enabled { var ( endpoint = ctx.GlobalString(swarmmetrics.MetricsInfluxDBEndpointFlag.Name) @@ -166,8 +162,7 @@ func emitMetrics(ctx *cli.Context) { password = ctx.GlobalString(swarmmetrics.MetricsInfluxDBPasswordFlag.Name) hosttag = ctx.GlobalString(swarmmetrics.MetricsInfluxDBHostTagFlag.Name) ) - - influxdb.InfluxDBWithTagsOnce(gethmetrics.DefaultRegistry, endpoint, database, username, password, "swarm-smoke.", map[string]string{ + return influxdb.InfluxDBWithTagsOnce(gethmetrics.DefaultRegistry, endpoint, database, username, password, "swarm-smoke.", map[string]string{ "host": hosttag, "version": gitCommit, "filesize": fmt.Sprintf("%v", filesize), diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go index 5f99bb0146..1c2b04bf82 100644 --- a/metrics/influxdb/influxdb.go +++ b/metrics/influxdb/influxdb.go @@ -1,6 +1,7 @@ package influxdb import ( + "errors" "fmt" uurl "net/url" "time" @@ -59,11 +60,10 @@ func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, userna } // InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags -func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) { +func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error { u, err := uurl.Parse(url) if err != nil { - log.Warn("Unable to parse InfluxDB", "url", url, "err", err) - return + return errors.New(fmt.Sprintf("Unable to parse InfluxDB. url: %s, err: %v", url, err)) } rep := &reporter{ @@ -77,13 +77,14 @@ func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, cache: make(map[string]int64), } if err := rep.makeClient(); err != nil { - log.Warn("Unable to make InfluxDB client", "err", err) - return + return errors.New(fmt.Sprintf("Unable to make InfluxDB client. err: %v", err)) } if err := rep.send(); err != nil { - log.Warn("Unable to send to InfluxDB", "err", err) + return errors.New(fmt.Sprintf("Unable to send to InfluxDB. err: %v", err)) } + + return nil } func (r *reporter) makeClient() (err error) { From 1569d4f55f65d501fb21ef196ab771bd77edab15 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Wed, 5 Dec 2018 12:36:50 +0530 Subject: [PATCH 13/44] return nil on no error --- cmd/swarm/swarm-smoke/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index ea419e9fac..f6e489df8a 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -168,4 +168,6 @@ func emitMetrics(ctx *cli.Context) error { "filesize": fmt.Sprintf("%v", filesize), }) } + + return nil } From 0edb2bbfe12bb9892f2637648405c50d5b40b3e6 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Wed, 5 Dec 2018 16:47:20 +0530 Subject: [PATCH 14/44] fix linter errors --- metrics/influxdb/influxdb.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go index 1c2b04bf82..c4ef927234 100644 --- a/metrics/influxdb/influxdb.go +++ b/metrics/influxdb/influxdb.go @@ -1,7 +1,6 @@ package influxdb import ( - "errors" "fmt" uurl "net/url" "time" @@ -63,7 +62,7 @@ func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, userna func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error { u, err := uurl.Parse(url) if err != nil { - return errors.New(fmt.Sprintf("Unable to parse InfluxDB. url: %s, err: %v", url, err)) + return fmt.Errorf("Unable to parse InfluxDB. url: %s, err: %v", url, err) } rep := &reporter{ @@ -77,11 +76,11 @@ func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, cache: make(map[string]int64), } if err := rep.makeClient(); err != nil { - return errors.New(fmt.Sprintf("Unable to make InfluxDB client. err: %v", err)) + return fmt.Errorf("Unable to make InfluxDB client. err: %v", err) } if err := rep.send(); err != nil { - return errors.New(fmt.Sprintf("Unable to send to InfluxDB. err: %v", err)) + return fmt.Errorf("Unable to send to InfluxDB. err: %v", err) } return nil From eac3fdb21873e813024527a490842b44b71cd8c0 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Mon, 3 Dec 2018 15:11:31 +0100 Subject: [PATCH 15/44] cmd/swarm/swarm-smoke: timeout flag --- cmd/swarm/swarm-smoke/main.go | 7 +++++++ cmd/swarm/swarm-smoke/upload_and_sync.go | 19 +++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 845998dc12..06ec20a079 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -35,6 +35,7 @@ var ( from int to int verbosity int + timeout int ) func main() { @@ -91,6 +92,12 @@ func main() { Usage: "verbosity", Destination: &verbosity, }, + cli.IntFlag{ + Name: "timeout", + Value: 120, + Usage: "timeout in seconds after which kill the process", + Destination: &timeout, + }, } app.Commands = []cli.Command{ diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 3843457dc4..21be544fe3 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -40,11 +40,11 @@ import ( func generateEndpoints(scheme string, cluster string, app string, from int, to int) { if cluster == "prod" { - for port := from; port <= to; port++ { + for port := from; port < to; port++ { endpoints = append(endpoints, fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, port)) } } else { - for port := from; port <= to; port++ { + for port := from; port < to; port++ { endpoints = append(endpoints, fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, port, cluster)) } } @@ -58,6 +58,21 @@ func cliUploadAndSync(c *cli.Context) error { log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) + errc := make(chan error) + go func() { + errc <- uploadAndSync(c) + }() + + select { + case err := <-errc: + return err + case <-time.After(time.Duration(timeout) * time.Second): + return fmt.Errorf("timeout after %v sec", timeout) + } + +} + +func uploadAndSync(c *cli.Context) error { defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "kb", filesize) }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) From e176fe5bbd5b3076995e32869a38f5aaf98e4b56 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Tue, 4 Dec 2018 12:27:00 +0530 Subject: [PATCH 16/44] added influxdb flags --- cmd/swarm/swarm-smoke/main.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 06ec20a079..c52e9edc95 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -20,6 +20,8 @@ import ( "os" "sort" + swarmmetrics "github.com/ethereum/go-ethereum/swarm/metrics" + "github.com/ethereum/go-ethereum/log" cli "gopkg.in/urfave/cli.v1" @@ -100,6 +102,9 @@ func main() { }, } + app.Flags = append(app.Flags, swarmmetrics.Flags[0]) + app.Flags = append(app.Flags, swarmmetrics.Flags[1:]...) + app.Commands = []cli.Command{ { Name: "upload_and_sync", From 8be02faefaf3c9e0d9aeb929e62ebd0a4cf15624 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Tue, 4 Dec 2018 13:28:32 +0530 Subject: [PATCH 17/44] correct metrics flags and add metrics init on smoke run --- cmd/swarm/swarm-smoke/main.go | 39 +++++++++++++++++++++++++++++++++-- swarm/metrics/flags.go | 32 +++++++++++++++------------- 2 files changed, 55 insertions(+), 16 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index c52e9edc95..48b762c076 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -18,8 +18,13 @@ package main import ( "os" + "runtime" "sort" + "time" + "github.com/ethereum/go-ethereum/cmd/utils" + gethmetrics "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/metrics/influxdb" swarmmetrics "github.com/ethereum/go-ethereum/swarm/metrics" "github.com/ethereum/go-ethereum/log" @@ -102,8 +107,14 @@ func main() { }, } - app.Flags = append(app.Flags, swarmmetrics.Flags[0]) - app.Flags = append(app.Flags, swarmmetrics.Flags[1:]...) + app.Flags = append(app.Flags, []cli.Flag{ + utils.MetricsEnabledFlag, + swarmmetrics.MetricsInfluxDBEndpointFlag, + swarmmetrics.MetricsInfluxDBDatabaseFlag, + swarmmetrics.MetricsInfluxDBUsernameFlag, + swarmmetrics.MetricsInfluxDBPasswordFlag, + swarmmetrics.MetricsInfluxDBHostTagFlag, + }...) app.Commands = []cli.Command{ { @@ -122,6 +133,11 @@ func main() { sort.Sort(cli.FlagsByName(app.Flags)) sort.Sort(cli.CommandsByName(app.Commands)) + app.Before = func(ctx *cli.Context) error { + runtime.GOMAXPROCS(runtime.NumCPU()) + setupMetrics(ctx) + return nil + } err := app.Run(os.Args) if err != nil { @@ -129,3 +145,22 @@ func main() { os.Exit(1) } } + +func setupMetrics(ctx *cli.Context) { + if gethmetrics.Enabled { + var ( + endpoint = ctx.GlobalString(swarmmetrics.MetricsInfluxDBEndpointFlag.Name) + database = ctx.GlobalString(swarmmetrics.MetricsInfluxDBDatabaseFlag.Name) + username = ctx.GlobalString(swarmmetrics.MetricsInfluxDBUsernameFlag.Name) + password = ctx.GlobalString(swarmmetrics.MetricsInfluxDBPasswordFlag.Name) + hosttag = ctx.GlobalString(swarmmetrics.MetricsInfluxDBHostTagFlag.Name) + ) + + // Start system runtime metrics collection + go gethmetrics.CollectProcessMetrics(2 * time.Second) + + go influxdb.InfluxDBWithTags(gethmetrics.DefaultRegistry, 10*time.Second, endpoint, database, username, password, "swarm.", map[string]string{ + "host": hosttag, + }) + } +} diff --git a/swarm/metrics/flags.go b/swarm/metrics/flags.go index 79490fd360..7c12120a60 100644 --- a/swarm/metrics/flags.go +++ b/swarm/metrics/flags.go @@ -27,26 +27,26 @@ import ( ) var ( - metricsEnableInfluxDBExportFlag = cli.BoolFlag{ + MetricsEnableInfluxDBExportFlag = cli.BoolFlag{ Name: "metrics.influxdb.export", Usage: "Enable metrics export/push to an external InfluxDB database", } - metricsInfluxDBEndpointFlag = cli.StringFlag{ + MetricsInfluxDBEndpointFlag = cli.StringFlag{ Name: "metrics.influxdb.endpoint", Usage: "Metrics InfluxDB endpoint", Value: "http://127.0.0.1:8086", } - metricsInfluxDBDatabaseFlag = cli.StringFlag{ + MetricsInfluxDBDatabaseFlag = cli.StringFlag{ Name: "metrics.influxdb.database", Usage: "Metrics InfluxDB database", Value: "metrics", } - metricsInfluxDBUsernameFlag = cli.StringFlag{ + MetricsInfluxDBUsernameFlag = cli.StringFlag{ Name: "metrics.influxdb.username", Usage: "Metrics InfluxDB username", Value: "", } - metricsInfluxDBPasswordFlag = cli.StringFlag{ + MetricsInfluxDBPasswordFlag = cli.StringFlag{ Name: "metrics.influxdb.password", Usage: "Metrics InfluxDB password", Value: "", @@ -55,7 +55,7 @@ var ( // It is used so that we can group all nodes and average a measurement across all of them, but also so // that we can select a specific node and inspect its measurements. // https://docs.influxdata.com/influxdb/v1.4/concepts/key_concepts/#tag-key - metricsInfluxDBHostTagFlag = cli.StringFlag{ + MetricsInfluxDBHostTagFlag = cli.StringFlag{ Name: "metrics.influxdb.host.tag", Usage: "Metrics InfluxDB `host` tag attached to all measurements", Value: "localhost", @@ -65,20 +65,24 @@ var ( // Flags holds all command-line flags required for metrics collection. var Flags = []cli.Flag{ utils.MetricsEnabledFlag, - metricsEnableInfluxDBExportFlag, - metricsInfluxDBEndpointFlag, metricsInfluxDBDatabaseFlag, metricsInfluxDBUsernameFlag, metricsInfluxDBPasswordFlag, metricsInfluxDBHostTagFlag, + MetricsEnableInfluxDBExportFlag, + MetricsInfluxDBEndpointFlag, + MetricsInfluxDBDatabaseFlag, + MetricsInfluxDBUsernameFlag, + MetricsInfluxDBPasswordFlag, + MetricsInfluxDBHostTagFlag, } func Setup(ctx *cli.Context) { if gethmetrics.Enabled { log.Info("Enabling swarm metrics collection") var ( - enableExport = ctx.GlobalBool(metricsEnableInfluxDBExportFlag.Name) - endpoint = ctx.GlobalString(metricsInfluxDBEndpointFlag.Name) - database = ctx.GlobalString(metricsInfluxDBDatabaseFlag.Name) - username = ctx.GlobalString(metricsInfluxDBUsernameFlag.Name) - password = ctx.GlobalString(metricsInfluxDBPasswordFlag.Name) - hosttag = ctx.GlobalString(metricsInfluxDBHostTagFlag.Name) + enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name) + endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name) + database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name) + username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name) + password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name) + hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name) ) // Start system runtime metrics collection From 238195adbaddbdb116c945421929dfcb3606dfee Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Tue, 4 Dec 2018 14:57:03 +0530 Subject: [PATCH 18/44] add metrics to upload and sync test --- cmd/swarm/swarm-smoke/main.go | 6 ++++++ cmd/swarm/swarm-smoke/upload_and_sync.go | 9 ++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 48b762c076..e14d3a20f9 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -44,6 +44,12 @@ var ( verbosity int timeout int ) +var ( + smokeUploadAndSyncCount = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.count", nil) + smokeUploadAndSyncFailCount = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.fail.count", nil) + smokeUploadAndSyncRunTime = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.time", nil) + smokeUploadAndSyncTimeout = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.timeout", nil) +) func main() { diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 21be544fe3..4b7de7cbe2 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -55,6 +55,7 @@ func generateEndpoints(scheme string, cluster string, app string, from int, to i } func cliUploadAndSync(c *cli.Context) error { + smokeUploadAndSyncCount.Inc(1) log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) @@ -65,15 +66,21 @@ func cliUploadAndSync(c *cli.Context) error { select { case err := <-errc: + if err != nil { + smokeUploadAndSyncFailCount.Inc(1) + } return err case <-time.After(time.Duration(timeout) * time.Second): + smokeUploadAndSyncTimeout.Inc(1) return fmt.Errorf("timeout after %v sec", timeout) } } func uploadAndSync(c *cli.Context) error { - defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "kb", filesize) }(time.Now()) + defer func(now time.Time) { + log.Info("total time", "time", time.Since(now), "kb", filesize) + }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) From 5f932263de4a3ce60bf6fa7b7d2d0f9adf4bbf56 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Tue, 4 Dec 2018 17:28:44 +0530 Subject: [PATCH 19/44] add feed smoke metrics --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 22 +++++++++++++++++-- cmd/swarm/swarm-smoke/main.go | 4 ++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 7ec1528263..dec354c06b 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -26,11 +26,29 @@ const ( feedRandomDataLength = 8 ) -// TODO: retrieve with manifest + extract repeating code func cliFeedUploadAndSync(c *cli.Context) error { - + feedUploadAndSyncCount.Inc(1) log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))) + errc := make(chan error) + go func() { + errc <- feedUploadAndSync(c) + }() + + select { + case err := <-errc: + if err != nil { + feedUploadAndSyncFailCount.Inc(1) + } + return err + case <-time.After(time.Duration(timeout) * time.Second): + feedUploadAndSyncTimeout.Inc(1) + return fmt.Errorf("timeout after %v sec", timeout) + } +} + +// TODO: retrieve with manifest + extract repeating code +func feedUploadAndSync(c *cli.Context) error { defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index e14d3a20f9..df307ef40d 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -45,6 +45,10 @@ var ( timeout int ) var ( + feedUploadAndSyncCount = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.count", nil) + feedUploadAndSyncFailCount = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.fail.count", nil) + feedUploadAndSyncRunTime = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.time", nil) + feedUploadAndSyncTimeout = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.timeout", nil) smokeUploadAndSyncCount = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.count", nil) smokeUploadAndSyncFailCount = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.fail.count", nil) smokeUploadAndSyncRunTime = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.time", nil) From f9c231cf7251c17157c5e006bbdb7cbd8689f8dc Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Tue, 4 Dec 2018 13:58:36 +0100 Subject: [PATCH 20/44] cmd/swarm/swarm-smoke: fix metrics collection --- cmd/swarm/swarm-smoke/main.go | 32 ++++++++++++------------ cmd/swarm/swarm-smoke/upload_and_sync.go | 16 ++++++++---- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index df307ef40d..bf4a6e6e61 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -17,8 +17,8 @@ package main import ( + "fmt" "os" - "runtime" "sort" "time" @@ -32,6 +32,10 @@ import ( cli "gopkg.in/urfave/cli.v1" ) +const ( + collectionInterval = 5 * time.Second +) + var ( endpoints []string includeLocalhost bool @@ -45,14 +49,9 @@ var ( timeout int ) var ( - feedUploadAndSyncCount = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.count", nil) - feedUploadAndSyncFailCount = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.fail.count", nil) - feedUploadAndSyncRunTime = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.time", nil) - feedUploadAndSyncTimeout = gethmetrics.NewRegisteredCounter("swarm-smoke.feed-and-sync.timeout", nil) - smokeUploadAndSyncCount = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.count", nil) - smokeUploadAndSyncFailCount = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.fail.count", nil) - smokeUploadAndSyncRunTime = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.time", nil) - smokeUploadAndSyncTimeout = gethmetrics.NewRegisteredCounter("swarm-smoke.upload-and-sync.timeout", nil) + feedUploadAndSyncCount = gethmetrics.NewRegisteredCounter("feed-and-sync", nil) + feedUploadAndSyncFailCount = gethmetrics.NewRegisteredCounter("feed-and-sync.fail", nil) + feedUploadAndSyncTimeout = gethmetrics.NewRegisteredCounter("feed-and-sync.timeout", nil) ) func main() { @@ -141,11 +140,14 @@ func main() { }, } + // wait for metrics reporter to push latest measurements + defer func() { + time.Sleep(collectionInterval + 1*time.Second) + }() + sort.Sort(cli.FlagsByName(app.Flags)) sort.Sort(cli.CommandsByName(app.Commands)) app.Before = func(ctx *cli.Context) error { - runtime.GOMAXPROCS(runtime.NumCPU()) - setupMetrics(ctx) return nil } @@ -166,11 +168,9 @@ func setupMetrics(ctx *cli.Context) { hosttag = ctx.GlobalString(swarmmetrics.MetricsInfluxDBHostTagFlag.Name) ) - // Start system runtime metrics collection - go gethmetrics.CollectProcessMetrics(2 * time.Second) - - go influxdb.InfluxDBWithTags(gethmetrics.DefaultRegistry, 10*time.Second, endpoint, database, username, password, "swarm.", map[string]string{ - "host": hosttag, + go influxdb.InfluxDBWithTags(gethmetrics.DefaultRegistry, collectionInterval, endpoint, database, username, password, "swarm-smoke.", map[string]string{ + "host": hosttag, + "filesize": fmt.Sprintf("%v", filesize), }) } } diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 4b7de7cbe2..9910a82c1e 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/pborman/uuid" + metrics "github.com/rcrowley/go-metrics" cli "gopkg.in/urfave/cli.v1" ) @@ -55,10 +56,13 @@ func generateEndpoints(scheme string, cluster string, app string, from int, to i } func cliUploadAndSync(c *cli.Context) error { - smokeUploadAndSyncCount.Inc(1) log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) + setupMetrics(c) + + metrics.GetOrRegisterCounter("upload-and-sync", nil).Inc(1) + errc := make(chan error) go func() { errc <- uploadAndSync(c) @@ -67,19 +71,21 @@ func cliUploadAndSync(c *cli.Context) error { select { case err := <-errc: if err != nil { - smokeUploadAndSyncFailCount.Inc(1) + metrics.GetOrRegisterCounter("upload-and-sync.fail", nil).Inc(1) } return err case <-time.After(time.Duration(timeout) * time.Second): - smokeUploadAndSyncTimeout.Inc(1) + metrics.GetOrRegisterCounter("upload-and-sync.timeout", nil).Inc(1) return fmt.Errorf("timeout after %v sec", timeout) } - } func uploadAndSync(c *cli.Context) error { defer func(now time.Time) { - log.Info("total time", "time", time.Since(now), "kb", filesize) + totalTime := time.Since(now) + + log.Info("total time", "time", totalTime, "kb", filesize) + metrics.GetOrRegisterCounter("upload-and-sync.time", nil).Inc(int64(totalTime)) }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) From fe3da49a2b85e7ae4682486340df5e8eb9aa1e2b Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Tue, 4 Dec 2018 15:03:21 +0100 Subject: [PATCH 21/44] cmd/swarm: use correct import --- cmd/swarm/swarm-smoke/upload_and_sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 9910a82c1e..30ed7371ee 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -33,8 +33,8 @@ import ( "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/pborman/uuid" - metrics "github.com/rcrowley/go-metrics" cli "gopkg.in/urfave/cli.v1" ) From f7b0db6563338cd87fef2339d02d21a4245bae98 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Tue, 4 Dec 2018 15:53:25 +0100 Subject: [PATCH 22/44] add git commit --- cmd/swarm/swarm-smoke/main.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index bf4a6e6e61..b9252be4f6 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -32,6 +32,10 @@ import ( cli "gopkg.in/urfave/cli.v1" ) +var ( + gitCommit string // Git SHA1 commit hash of the release (set via linker flags) +) + const ( collectionInterval = 5 * time.Second ) @@ -170,6 +174,7 @@ func setupMetrics(ctx *cli.Context) { go influxdb.InfluxDBWithTags(gethmetrics.DefaultRegistry, collectionInterval, endpoint, database, username, password, "swarm-smoke.", map[string]string{ "host": hosttag, + "version": gitCommit, "filesize": fmt.Sprintf("%v", filesize), }) } From f6cd8232fbc697a921698d1156dcafc376ffff7d Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Tue, 4 Dec 2018 17:01:54 +0100 Subject: [PATCH 23/44] cmd/swarm: fix wait for reporter --- cmd/swarm/swarm-smoke/main.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index b9252be4f6..21f41af65c 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -144,11 +144,6 @@ func main() { }, } - // wait for metrics reporter to push latest measurements - defer func() { - time.Sleep(collectionInterval + 1*time.Second) - }() - sort.Sort(cli.FlagsByName(app.Flags)) sort.Sort(cli.CommandsByName(app.Commands)) app.Before = func(ctx *cli.Context) error { @@ -158,8 +153,15 @@ func main() { err := app.Run(os.Args) if err != nil { log.Error(err.Error()) + + // wait for metrics reporter to push latest measurements + time.Sleep(collectionInterval + 1*time.Second) + os.Exit(1) } + + // wait for metrics reporter to push latest measurements + time.Sleep(collectionInterval + 1*time.Second) } func setupMetrics(ctx *cli.Context) { From 77cc885b8b61a9fc821bad724d595ac6bbc44550 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Tue, 4 Dec 2018 17:34:14 +0100 Subject: [PATCH 24/44] measure upload time --- cmd/swarm/swarm-smoke/upload_and_sync.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 30ed7371ee..f41d80d947 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -85,7 +85,7 @@ func uploadAndSync(c *cli.Context) error { totalTime := time.Since(now) log.Info("total time", "time", totalTime, "kb", filesize) - metrics.GetOrRegisterCounter("upload-and-sync.time", nil).Inc(int64(totalTime)) + metrics.GetOrRegisterCounter("upload-and-sync.total-time", nil).Inc(int64(totalTime)) }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) @@ -95,11 +95,13 @@ func uploadAndSync(c *cli.Context) error { f, cleanup := generateRandomFile(filesize * 1000) defer cleanup() + t1 := time.Now() hash, err := upload(f, endpoints[0]) if err != nil { log.Error(err.Error()) return err } + metrics.GetOrRegisterCounter("upload-and-sync.upload-time", nil).Inc(int64(time.Since(t1))) fhash, err := digest(f) if err != nil { From 891f92b965f0f49e72e22e6f7d1ad3ae5ea4a560 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Tue, 4 Dec 2018 18:21:06 +0100 Subject: [PATCH 25/44] emit metrics only once --- cmd/swarm/swarm-smoke/main.go | 19 +++++----------- cmd/swarm/swarm-smoke/upload_and_sync.go | 2 -- metrics/influxdb/influxdb.go | 28 ++++++++++++++++++++++++ 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 21f41af65c..f1e50372ec 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -20,7 +20,6 @@ import ( "fmt" "os" "sort" - "time" "github.com/ethereum/go-ethereum/cmd/utils" gethmetrics "github.com/ethereum/go-ethereum/metrics" @@ -36,10 +35,6 @@ var ( gitCommit string // Git SHA1 commit hash of the release (set via linker flags) ) -const ( - collectionInterval = 5 * time.Second -) - var ( endpoints []string includeLocalhost bool @@ -149,22 +144,20 @@ func main() { app.Before = func(ctx *cli.Context) error { return nil } + app.After = func(ctx *cli.Context) error { + emitMetrics(ctx) + return nil + } err := app.Run(os.Args) if err != nil { log.Error(err.Error()) - // wait for metrics reporter to push latest measurements - time.Sleep(collectionInterval + 1*time.Second) - os.Exit(1) } - - // wait for metrics reporter to push latest measurements - time.Sleep(collectionInterval + 1*time.Second) } -func setupMetrics(ctx *cli.Context) { +func emitMetrics(ctx *cli.Context) { if gethmetrics.Enabled { var ( endpoint = ctx.GlobalString(swarmmetrics.MetricsInfluxDBEndpointFlag.Name) @@ -174,7 +167,7 @@ func setupMetrics(ctx *cli.Context) { hosttag = ctx.GlobalString(swarmmetrics.MetricsInfluxDBHostTagFlag.Name) ) - go influxdb.InfluxDBWithTags(gethmetrics.DefaultRegistry, collectionInterval, endpoint, database, username, password, "swarm-smoke.", map[string]string{ + influxdb.InfluxDBWithTagsOnce(gethmetrics.DefaultRegistry, endpoint, database, username, password, "swarm-smoke.", map[string]string{ "host": hosttag, "version": gitCommit, "filesize": fmt.Sprintf("%v", filesize), diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index f41d80d947..e5699dd50a 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -59,8 +59,6 @@ func cliUploadAndSync(c *cli.Context) error { log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) - setupMetrics(c) - metrics.GetOrRegisterCounter("upload-and-sync", nil).Inc(1) errc := make(chan error) diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go index 31a5c21b5f..5f99bb0146 100644 --- a/metrics/influxdb/influxdb.go +++ b/metrics/influxdb/influxdb.go @@ -58,6 +58,34 @@ func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, userna rep.run() } +// InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags +func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) { + u, err := uurl.Parse(url) + if err != nil { + log.Warn("Unable to parse InfluxDB", "url", url, "err", err) + return + } + + rep := &reporter{ + reg: r, + url: *u, + database: database, + username: username, + password: password, + namespace: namespace, + tags: tags, + cache: make(map[string]int64), + } + if err := rep.makeClient(); err != nil { + log.Warn("Unable to make InfluxDB client", "err", err) + return + } + + if err := rep.send(); err != nil { + log.Warn("Unable to send to InfluxDB", "err", err) + } +} + func (r *reporter) makeClient() (err error) { r.client, err = client.NewClient(client.Config{ URL: r.url, From 24b6b95972f8bae16608811033da8580a584cfc8 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Wed, 5 Dec 2018 12:32:16 +0530 Subject: [PATCH 26/44] address PR comments --- cmd/swarm/swarm-smoke/main.go | 11 +++-------- metrics/influxdb/influxdb.go | 13 +++++++------ 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index f1e50372ec..ea419e9fac 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -141,12 +141,8 @@ func main() { sort.Sort(cli.FlagsByName(app.Flags)) sort.Sort(cli.CommandsByName(app.Commands)) - app.Before = func(ctx *cli.Context) error { - return nil - } app.After = func(ctx *cli.Context) error { - emitMetrics(ctx) - return nil + return emitMetrics(ctx) } err := app.Run(os.Args) @@ -157,7 +153,7 @@ func main() { } } -func emitMetrics(ctx *cli.Context) { +func emitMetrics(ctx *cli.Context) error { if gethmetrics.Enabled { var ( endpoint = ctx.GlobalString(swarmmetrics.MetricsInfluxDBEndpointFlag.Name) @@ -166,8 +162,7 @@ func emitMetrics(ctx *cli.Context) { password = ctx.GlobalString(swarmmetrics.MetricsInfluxDBPasswordFlag.Name) hosttag = ctx.GlobalString(swarmmetrics.MetricsInfluxDBHostTagFlag.Name) ) - - influxdb.InfluxDBWithTagsOnce(gethmetrics.DefaultRegistry, endpoint, database, username, password, "swarm-smoke.", map[string]string{ + return influxdb.InfluxDBWithTagsOnce(gethmetrics.DefaultRegistry, endpoint, database, username, password, "swarm-smoke.", map[string]string{ "host": hosttag, "version": gitCommit, "filesize": fmt.Sprintf("%v", filesize), diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go index 5f99bb0146..1c2b04bf82 100644 --- a/metrics/influxdb/influxdb.go +++ b/metrics/influxdb/influxdb.go @@ -1,6 +1,7 @@ package influxdb import ( + "errors" "fmt" uurl "net/url" "time" @@ -59,11 +60,10 @@ func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, userna } // InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags -func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) { +func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error { u, err := uurl.Parse(url) if err != nil { - log.Warn("Unable to parse InfluxDB", "url", url, "err", err) - return + return errors.New(fmt.Sprintf("Unable to parse InfluxDB. url: %s, err: %v", url, err)) } rep := &reporter{ @@ -77,13 +77,14 @@ func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, cache: make(map[string]int64), } if err := rep.makeClient(); err != nil { - log.Warn("Unable to make InfluxDB client", "err", err) - return + return errors.New(fmt.Sprintf("Unable to make InfluxDB client. err: %v", err)) } if err := rep.send(); err != nil { - log.Warn("Unable to send to InfluxDB", "err", err) + return errors.New(fmt.Sprintf("Unable to send to InfluxDB. err: %v", err)) } + + return nil } func (r *reporter) makeClient() (err error) { From a2dfa31334a180e61e6fbafa0851d2778b82b86c Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Wed, 5 Dec 2018 12:36:50 +0530 Subject: [PATCH 27/44] return nil on no error --- cmd/swarm/swarm-smoke/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index ea419e9fac..f6e489df8a 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -168,4 +168,6 @@ func emitMetrics(ctx *cli.Context) error { "filesize": fmt.Sprintf("%v", filesize), }) } + + return nil } From de8a8afa229be6b0a0d5228222f6e02e48920941 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Wed, 5 Dec 2018 16:47:20 +0530 Subject: [PATCH 28/44] fix linter errors --- metrics/influxdb/influxdb.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go index 1c2b04bf82..c4ef927234 100644 --- a/metrics/influxdb/influxdb.go +++ b/metrics/influxdb/influxdb.go @@ -1,7 +1,6 @@ package influxdb import ( - "errors" "fmt" uurl "net/url" "time" @@ -63,7 +62,7 @@ func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, userna func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error { u, err := uurl.Parse(url) if err != nil { - return errors.New(fmt.Sprintf("Unable to parse InfluxDB. url: %s, err: %v", url, err)) + return fmt.Errorf("Unable to parse InfluxDB. url: %s, err: %v", url, err) } rep := &reporter{ @@ -77,11 +76,11 @@ func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, cache: make(map[string]int64), } if err := rep.makeClient(); err != nil { - return errors.New(fmt.Sprintf("Unable to make InfluxDB client. err: %v", err)) + return fmt.Errorf("Unable to make InfluxDB client. err: %v", err) } if err := rep.send(); err != nil { - return errors.New(fmt.Sprintf("Unable to send to InfluxDB. err: %v", err)) + return fmt.Errorf("Unable to send to InfluxDB. err: %v", err) } return nil From 972e699479c3a1876c95dd8e9d31428c058ddd30 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Wed, 5 Dec 2018 12:34:49 +0100 Subject: [PATCH 29/44] cmd/swarm-smoke: inline metrics --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 7 ++++--- cmd/swarm/swarm-smoke/main.go | 5 ----- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index dec354c06b..9d01c29a6f 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/storage/feed" colorable "github.com/mattn/go-colorable" "github.com/pborman/uuid" @@ -27,7 +28,7 @@ const ( ) func cliFeedUploadAndSync(c *cli.Context) error { - feedUploadAndSyncCount.Inc(1) + metrics.GetOrRegisterCounter("feed-and-sync", nil).Inc(1) log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))) errc := make(chan error) @@ -38,11 +39,11 @@ func cliFeedUploadAndSync(c *cli.Context) error { select { case err := <-errc: if err != nil { - feedUploadAndSyncFailCount.Inc(1) + metrics.GetOrRegisterCounter("feed-and-sync.fail", nil).Inc(1) } return err case <-time.After(time.Duration(timeout) * time.Second): - feedUploadAndSyncTimeout.Inc(1) + metrics.GetOrRegisterCounter("feed-and-sync.timeout", nil).Inc(1) return fmt.Errorf("timeout after %v sec", timeout) } } diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index f6e489df8a..bf0d4762ed 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -47,11 +47,6 @@ var ( verbosity int timeout int ) -var ( - feedUploadAndSyncCount = gethmetrics.NewRegisteredCounter("feed-and-sync", nil) - feedUploadAndSyncFailCount = gethmetrics.NewRegisteredCounter("feed-and-sync.fail", nil) - feedUploadAndSyncTimeout = gethmetrics.NewRegisteredCounter("feed-and-sync.timeout", nil) -) func main() { From 72a99531e492692f2ba7df1b579c91db3b3215a1 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Wed, 5 Dec 2018 14:47:51 +0100 Subject: [PATCH 30/44] swarm/storage: measure lazy chunk reader chunk get time --- swarm/storage/chunker.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 40292e88f9..cbe65372a1 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "sync" + "time" "github.com/ethereum/go-ethereum/metrics" ch "github.com/ethereum/go-ethereum/swarm/chunk" @@ -410,10 +411,14 @@ func (r *LazyChunkReader) Size(ctx context.Context, quitC chan bool) (n int64, e log.Debug("lazychunkreader.size", "addr", r.addr) if r.chunkData == nil { + + startTime := time.Now() chunkData, err := r.getter.Get(cctx, Reference(r.addr)) if err != nil { + metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime) return 0, err } + metrics.GetOrRegisterResettingTimer("lcr.getter.get", nil).UpdateSince(startTime) r.chunkData = chunkData s := r.chunkData.Size() log.Debug("lazychunkreader.size", "key", r.addr, "size", s) @@ -542,8 +547,10 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS wg.Add(1) go func(j int64) { childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] + startTime := time.Now() chunkData, err := r.getter.Get(r.ctx, Reference(childAddress)) if err != nil { + metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime) log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err) select { case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)): @@ -551,6 +558,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS } return } + metrics.GetOrRegisterResettingTimer("lcr.getter.get", nil).UpdateSince(startTime) if l := len(chunkData); l < 9 { select { case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childAddress), l): From 159d9af395bfb2c9f4382019bac3938d0cd040ae Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Wed, 5 Dec 2018 17:00:35 +0100 Subject: [PATCH 31/44] swarm/api: log and measure the same value for request time --- swarm/api/http/middleware.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go index 115a008566..3800212c21 100644 --- a/swarm/api/http/middleware.go +++ b/swarm/api/http/middleware.go @@ -74,13 +74,15 @@ func ParseURI(h http.Handler) http.Handler { func InitLoggingResponseWriter(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - startTime := time.Now() - defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.time", r.Method), nil).UpdateSince(startTime) + tn := time.Now() writer := newLoggingResponseWriter(w) h.ServeHTTP(writer, r) - log.Info("request served", "ruid", GetRUID(r.Context()), "code", writer.statusCode) - metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.%d.time", r.Method, writer.statusCode), nil).UpdateSince(startTime) + + ts := time.Since(tn) + log.Info("request served", "ruid", GetRUID(r.Context()), "code", writer.statusCode, "time", ts*time.Millisecond) + metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.time", r.Method), nil).Update(ts) + metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.%d.time", r.Method, writer.statusCode), nil).Update(ts) }) } From 4f589df12fb5cf397d1fe68103bed1aba6507b1f Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Thu, 6 Dec 2018 12:40:41 +0530 Subject: [PATCH 32/44] added loglines to http get requests --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 33 ++++++++++++++++++- cmd/swarm/swarm-smoke/upload_and_sync.go | 33 ++++++++++++++++--- 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index dec354c06b..bb919e3776 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -3,10 +3,12 @@ package main import ( "bytes" "crypto/md5" + "crypto/tls" "fmt" "io" "io/ioutil" "net/http" + "net/http/httptrace" "os" "os/exec" "strings" @@ -306,10 +308,39 @@ func fetchFeed(topic string, user string, endpoint string, original []byte, ruid time.Sleep(3 * time.Second) log.Trace("http get request (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user) - res, err := http.Get(endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user) + + reqUri := endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user + req, _ := http.NewRequest("GET", reqUri, nil) + trace := &httptrace.ClientTrace{ + GetConn: func(_ string) { log.Trace("http get request (feed) - GetConn") }, + GotConn: func(_GotConnInfo) { log.Trace("http get request (feed) - GotConn") }, + PutIdleConn: func(err error) { log.Trace("http get request (feed) - PutIdleConn", "err", err) }, + GotFirstResponseByte: func() { log.Trace("http get request (feed) - GotFirstResponseByte") }, + Got100Continue: func() { log.Trace("http get request (feed) - Got100Continue") }, + DNSStart: func(_ DNSStartInfo) { log.Trace("http get request (feed) - DNSStart") }, + DNSDone: func(_ DNSDoneInfo) { log.Trace("http get request (feed) - DNSDone") }, + ConnectStart: func(network, addr string) { + log.Trace("http get request (feed) - ConnectStart", "network", network, "addr", addr) + }, + ConnectDone: func(network, addr string, err error) { + log.Trace("http get request (feed) - ConnectDone", "network", network, "addr", addr, "err", err) + + }, + WroteHeaders: func() { log.Trace("http get request (feed) - WroteHeaders(request)") }, + Wait100Continue: func() { log.Trace("http get request (feed) - Wait100Continue") }, + + WroteRequest: func(_ WroteRequestInfo) { log.Trace("http get request (feed) - WroteRequest") }, + } + req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) + transport := http.DefaultTransport + transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + + res, err := transport.RoundTrip(req) if err != nil { + log.Error(err.Error(), "ruid", ruid) return err } + log.Trace("http get response (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user, "code", res.StatusCode, "len", res.ContentLength) if res.StatusCode != 200 { diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index e5699dd50a..af037f1f0d 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -26,6 +26,7 @@ import ( "io" "io/ioutil" "net/http" + "net/http/httptrace" "os" "os/exec" "strings" @@ -139,12 +140,34 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error { time.Sleep(3 * time.Second) log.Trace("http get request", "ruid", ruid, "api", endpoint, "hash", hash) - client := &http.Client{Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - }} - res, err := client.Get(endpoint + "/bzz:/" + hash + "/") + + reqUri := endpoint + "/bzz:/" + hash + "/" + req, _ := http.NewRequest("GET", reqUri, nil) + trace := &httptrace.ClientTrace{ + GetConn: func(_ string) { log.Trace("http get request - GetConn") }, + GotConn: func(_GotConnInfo) { log.Trace("http get request - GotConn") }, + PutIdleConn: func(err error) { log.Trace("http get request - PutIdleConn", "err", err) }, + GotFirstResponseByte: func() { log.Trace("http get request - GotFirstResponseByte") }, + Got100Continue: func() { log.Trace("http get request - Got100Continue") }, + DNSStart: func(_ DNSStartInfo) { log.Trace("http get request - DNSStart") }, + DNSDone: func(_ DNSDoneInfo) { log.Trace("http get request - DNSDone") }, + ConnectStart: func(network, addr string) { + log.Trace("http get request - ConnectStart", "network", network, "addr", addr) + }, + ConnectDone: func(network, addr string, err error) { + log.Trace("http get request - ConnectDone", "network", network, "addr", addr, "err", err) + }, + WroteHeaders: func() { log.Trace("http get request - WroteHeaders(request)") }, + Wait100Continue: func() { log.Trace("http get request - Wait100Continue") }, + + WroteRequest: func(_ WroteRequestInfo) { log.Trace("http get request - WroteRequest") }, + } + req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) + transport := http.DefaultTransport + transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + res, err := transport.RoundTrip(req) if err != nil { - log.Warn(err.Error(), "ruid", ruid) + log.Error(err.Error(), "ruid", ruid) return err } log.Trace("http get response", "ruid", ruid, "api", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) From 637444cafb71514bf3b4184d97eb02ecc6a8fe8d Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Thu, 6 Dec 2018 15:10:04 +0530 Subject: [PATCH 33/44] fix goimports --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 11 +++++------ cmd/swarm/swarm-smoke/upload_and_sync.go | 11 +++++------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 4ab0b72d36..150a4ada64 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -3,7 +3,6 @@ package main import ( "bytes" "crypto/md5" - "crypto/tls" "fmt" "io" "io/ioutil" @@ -314,12 +313,12 @@ func fetchFeed(topic string, user string, endpoint string, original []byte, ruid req, _ := http.NewRequest("GET", reqUri, nil) trace := &httptrace.ClientTrace{ GetConn: func(_ string) { log.Trace("http get request (feed) - GetConn") }, - GotConn: func(_GotConnInfo) { log.Trace("http get request (feed) - GotConn") }, + GotConn: func(_ httptrace.GotConnInfo) { log.Trace("http get request (feed) - GotConn") }, PutIdleConn: func(err error) { log.Trace("http get request (feed) - PutIdleConn", "err", err) }, GotFirstResponseByte: func() { log.Trace("http get request (feed) - GotFirstResponseByte") }, Got100Continue: func() { log.Trace("http get request (feed) - Got100Continue") }, - DNSStart: func(_ DNSStartInfo) { log.Trace("http get request (feed) - DNSStart") }, - DNSDone: func(_ DNSDoneInfo) { log.Trace("http get request (feed) - DNSDone") }, + DNSStart: func(_ httptrace.DNSStartInfo) { log.Trace("http get request (feed) - DNSStart") }, + DNSDone: func(_ httptrace.DNSDoneInfo) { log.Trace("http get request (feed) - DNSDone") }, ConnectStart: func(network, addr string) { log.Trace("http get request (feed) - ConnectStart", "network", network, "addr", addr) }, @@ -330,11 +329,11 @@ func fetchFeed(topic string, user string, endpoint string, original []byte, ruid WroteHeaders: func() { log.Trace("http get request (feed) - WroteHeaders(request)") }, Wait100Continue: func() { log.Trace("http get request (feed) - Wait100Continue") }, - WroteRequest: func(_ WroteRequestInfo) { log.Trace("http get request (feed) - WroteRequest") }, + WroteRequest: func(_ httptrace.WroteRequestInfo) { log.Trace("http get request (feed) - WroteRequest") }, } req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) transport := http.DefaultTransport - transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} res, err := transport.RoundTrip(req) if err != nil { diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index af037f1f0d..084716df95 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -20,7 +20,6 @@ import ( "bytes" "crypto/md5" crand "crypto/rand" - "crypto/tls" "errors" "fmt" "io" @@ -145,12 +144,12 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error { req, _ := http.NewRequest("GET", reqUri, nil) trace := &httptrace.ClientTrace{ GetConn: func(_ string) { log.Trace("http get request - GetConn") }, - GotConn: func(_GotConnInfo) { log.Trace("http get request - GotConn") }, + GotConn: func(_ httptrace.GotConnInfo) { log.Trace("http get request - GotConn") }, PutIdleConn: func(err error) { log.Trace("http get request - PutIdleConn", "err", err) }, GotFirstResponseByte: func() { log.Trace("http get request - GotFirstResponseByte") }, Got100Continue: func() { log.Trace("http get request - Got100Continue") }, - DNSStart: func(_ DNSStartInfo) { log.Trace("http get request - DNSStart") }, - DNSDone: func(_ DNSDoneInfo) { log.Trace("http get request - DNSDone") }, + DNSStart: func(_ httptrace.DNSStartInfo) { log.Trace("http get request - DNSStart") }, + DNSDone: func(_ httptrace.DNSDoneInfo) { log.Trace("http get request - DNSDone") }, ConnectStart: func(network, addr string) { log.Trace("http get request - ConnectStart", "network", network, "addr", addr) }, @@ -160,11 +159,11 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error { WroteHeaders: func() { log.Trace("http get request - WroteHeaders(request)") }, Wait100Continue: func() { log.Trace("http get request - Wait100Continue") }, - WroteRequest: func(_ WroteRequestInfo) { log.Trace("http get request - WroteRequest") }, + WroteRequest: func(_ httptrace.WroteRequestInfo) { log.Trace("http get request - WroteRequest") }, } req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) transport := http.DefaultTransport - transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} res, err := transport.RoundTrip(req) if err != nil { log.Error(err.Error(), "ruid", ruid) From 36d7c2f23870069c614e0bc763daae04a3c26351 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Thu, 6 Dec 2018 15:35:17 +0530 Subject: [PATCH 34/44] added resetting counters --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 54 +++++++++++++++---- cmd/swarm/swarm-smoke/upload_and_sync.go | 54 +++++++++++++++---- 2 files changed, 86 insertions(+), 22 deletions(-) diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 150a4ada64..eddce78888 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -309,27 +309,59 @@ func fetchFeed(topic string, user string, endpoint string, original []byte, ruid log.Trace("http get request (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user) + tn := time.Now() reqUri := endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user req, _ := http.NewRequest("GET", reqUri, nil) trace := &httptrace.ClientTrace{ - GetConn: func(_ string) { log.Trace("http get request (feed) - GetConn") }, - GotConn: func(_ httptrace.GotConnInfo) { log.Trace("http get request (feed) - GotConn") }, - PutIdleConn: func(err error) { log.Trace("http get request (feed) - PutIdleConn", "err", err) }, - GotFirstResponseByte: func() { log.Trace("http get request (feed) - GotFirstResponseByte") }, - Got100Continue: func() { log.Trace("http get request (feed) - Got100Continue") }, - DNSStart: func(_ httptrace.DNSStartInfo) { log.Trace("http get request (feed) - DNSStart") }, - DNSDone: func(_ httptrace.DNSDoneInfo) { log.Trace("http get request (feed) - DNSDone") }, + GetConn: func(_ string) { + log.Trace("http get request (feed) - GetConn") + metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.getconn", nil).Update(time.Since(tn)) + }, + GotConn: func(_ httptrace.GotConnInfo) { + log.Trace("http get request (feed) - GotConn") + metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.gotconn", nil).Update(time.Since(tn)) + }, + PutIdleConn: func(err error) { + log.Trace("http get request (feed) - PutIdleConn", "err", err) + metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.putidle", nil).Update(time.Since(tn)) + }, + GotFirstResponseByte: func() { + log.Trace("http get request (feed) - GotFirstResponseByte") + metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.firstbyte", nil).Update(time.Since(tn)) + }, + Got100Continue: func() { + log.Trace("http get request (feed) - Got100Continue") + metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.got100continue", nil).Update(time.Since(tn)) + }, + DNSStart: func(_ httptrace.DNSStartInfo) { + log.Trace("http get request (feed) - DNSStart") + metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.dnsstart", nil).Update(time.Since(tn)) + }, + DNSDone: func(_ httptrace.DNSDoneInfo) { + log.Trace("http get request (feed) - DNSDone") + metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.dnsdone", nil).Update(time.Since(tn)) + }, ConnectStart: func(network, addr string) { log.Trace("http get request (feed) - ConnectStart", "network", network, "addr", addr) + metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.connectstart", nil).Update(time.Since(tn)) }, ConnectDone: func(network, addr string, err error) { log.Trace("http get request (feed) - ConnectDone", "network", network, "addr", addr, "err", err) - + metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.connectdone", nil).Update(time.Since(tn)) + }, + WroteHeaders: func() { + log.Trace("http get request (feed) - WroteHeaders(request)") + metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.wroteheaders", nil).Update(time.Since(tn)) + }, + Wait100Continue: func() { + log.Trace("http get request (feed) - Wait100Continue") + metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.wait100continue", nil).Update(time.Since(tn)) }, - WroteHeaders: func() { log.Trace("http get request (feed) - WroteHeaders(request)") }, - Wait100Continue: func() { log.Trace("http get request (feed) - Wait100Continue") }, - WroteRequest: func(_ httptrace.WroteRequestInfo) { log.Trace("http get request (feed) - WroteRequest") }, + WroteRequest: func(_ httptrace.WroteRequestInfo) { + log.Trace("http get request (feed) - WroteRequest") + metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.wroterequest", nil).Update(time.Since(tn)) + }, } req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) transport := http.DefaultTransport diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 084716df95..792f5cfb31 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -140,26 +140,58 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error { log.Trace("http get request", "ruid", ruid, "api", endpoint, "hash", hash) + tn := time.Now() reqUri := endpoint + "/bzz:/" + hash + "/" req, _ := http.NewRequest("GET", reqUri, nil) trace := &httptrace.ClientTrace{ - GetConn: func(_ string) { log.Trace("http get request - GetConn") }, - GotConn: func(_ httptrace.GotConnInfo) { log.Trace("http get request - GotConn") }, - PutIdleConn: func(err error) { log.Trace("http get request - PutIdleConn", "err", err) }, - GotFirstResponseByte: func() { log.Trace("http get request - GotFirstResponseByte") }, - Got100Continue: func() { log.Trace("http get request - Got100Continue") }, - DNSStart: func(_ httptrace.DNSStartInfo) { log.Trace("http get request - DNSStart") }, - DNSDone: func(_ httptrace.DNSDoneInfo) { log.Trace("http get request - DNSDone") }, + GetConn: func(_ string) { + log.Trace("http get request - GetConn") + metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.getconn", nil).Update(time.Since(tn)) + }, + GotConn: func(_ httptrace.GotConnInfo) { + log.Trace("http get request - GotConn") + metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.gotconn", nil).Update(time.Since(tn)) + }, + PutIdleConn: func(err error) { + log.Trace("http get request - PutIdleConn", "err", err) + metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.putidle", nil).Update(time.Since(tn)) + }, + GotFirstResponseByte: func() { + log.Trace("http get request - GotFirstResponseByte") + metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.firstbyte", nil).Update(time.Since(tn)) + }, + Got100Continue: func() { + log.Trace("http get request - Got100Continue") + metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.got100continue", nil).Update(time.Since(tn)) + }, + DNSStart: func(_ httptrace.DNSStartInfo) { + log.Trace("http get request - DNSStart") + metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.dnsstart", nil).Update(time.Since(tn)) + }, + DNSDone: func(_ httptrace.DNSDoneInfo) { + log.Trace("http get request - DNSDone") + metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.dnsdone", nil).Update(time.Since(tn)) + }, ConnectStart: func(network, addr string) { log.Trace("http get request - ConnectStart", "network", network, "addr", addr) + metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.connectstart", nil).Update(time.Since(tn)) }, ConnectDone: func(network, addr string, err error) { log.Trace("http get request - ConnectDone", "network", network, "addr", addr, "err", err) + metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.connectdone", nil).Update(time.Since(tn)) + }, + WroteHeaders: func() { + log.Trace("http get request - WroteHeaders(request)") + metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.wroteheaders", nil).Update(time.Since(tn)) + }, + Wait100Continue: func() { + log.Trace("http get request - Wait100Continue") + metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.wait100continue", nil).Update(time.Since(tn)) + }, + WroteRequest: func(_ httptrace.WroteRequestInfo) { + log.Trace("http get request - WroteRequest") + metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.wroterequest", nil).Update(time.Since(tn)) }, - WroteHeaders: func() { log.Trace("http get request - WroteHeaders(request)") }, - Wait100Continue: func() { log.Trace("http get request - Wait100Continue") }, - - WroteRequest: func(_ httptrace.WroteRequestInfo) { log.Trace("http get request - WroteRequest") }, } req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) transport := http.DefaultTransport From ac8b6d5f68213058e6892c6c627c94aa99189889 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Thu, 6 Dec 2018 17:36:23 +0530 Subject: [PATCH 35/44] added opentracing span injection and extraction over the wire for http requests from swarm-smoke --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 15 +++++++++++++++ cmd/swarm/swarm-smoke/upload_and_sync.go | 17 ++++++++++++++++- swarm/api/http/middleware.go | 9 +++++++-- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index eddce78888..9823dc0542 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "context" "crypto/md5" "fmt" "io" @@ -18,8 +19,10 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage/feed" colorable "github.com/mattn/go-colorable" + opentracing "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) @@ -304,6 +307,11 @@ func feedUploadAndSync(c *cli.Context) error { } func fetchFeed(topic string, user string, endpoint string, original []byte, ruid string) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctx, sp := spancontext.StartSpan(ctx, "feed-and-sync.fetch") + defer sp.Finish() + log.Trace("sleeping", "ruid", ruid) time.Sleep(3 * time.Second) @@ -312,6 +320,12 @@ func fetchFeed(topic string, user string, endpoint string, original []byte, ruid tn := time.Now() reqUri := endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user req, _ := http.NewRequest("GET", reqUri, nil) + + opentracing.GlobalTracer().Inject( + sp.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)) + trace := &httptrace.ClientTrace{ GetConn: func(_ string) { log.Trace("http get request (feed) - GetConn") @@ -365,6 +379,7 @@ func fetchFeed(topic string, user string, endpoint string, original []byte, ruid } req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) transport := http.DefaultTransport + //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} res, err := transport.RoundTrip(req) diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 792f5cfb31..15dfee3cf6 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -18,6 +18,7 @@ package main import ( "bytes" + "context" "crypto/md5" crand "crypto/rand" "errors" @@ -34,6 +35,8 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/spancontext" + opentracing "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" @@ -135,14 +138,24 @@ func uploadAndSync(c *cli.Context) error { // fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file func fetch(hash string, endpoint string, original []byte, ruid string) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctx, sp := spancontext.StartSpan(ctx, "upload-and-sync.fetch") + defer sp.Finish() + log.Trace("sleeping", "ruid", ruid) time.Sleep(3 * time.Second) - log.Trace("http get request", "ruid", ruid, "api", endpoint, "hash", hash) tn := time.Now() reqUri := endpoint + "/bzz:/" + hash + "/" req, _ := http.NewRequest("GET", reqUri, nil) + + opentracing.GlobalTracer().Inject( + sp.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)) + trace := &httptrace.ClientTrace{ GetConn: func(_ string) { log.Trace("http get request - GetConn") @@ -195,7 +208,9 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error { } req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) transport := http.DefaultTransport + //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + res, err := transport.RoundTrip(req) if err != nil { log.Error(err.Error(), "ruid", ruid) diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go index 3800212c21..42dd34cf5b 100644 --- a/swarm/api/http/middleware.go +++ b/swarm/api/http/middleware.go @@ -11,7 +11,8 @@ import ( "github.com/ethereum/go-ethereum/swarm/api" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/sctx" - "github.com/ethereum/go-ethereum/swarm/spancontext" + opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" "github.com/pborman/uuid" ) @@ -94,7 +95,11 @@ func InstrumentOpenTracing(h http.Handler) http.Handler { return } spanName := fmt.Sprintf("http.%s.%s", r.Method, uri.Scheme) - ctx, sp := spancontext.StartSpan(r.Context(), spanName) + wireContext, _ := opentracing.GlobalTracer().Extract( + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(r.Header)) + sp, ctx := opentracing.StartSpanFromContext(r.Context(), spanName, ext.RPCServerOption(wireContext)) + defer sp.Finish() h.ServeHTTP(w, r.WithContext(ctx)) }) From c38d2508de16f1814fff491df2bbbe858d1e4854 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Thu, 6 Dec 2018 20:37:59 +0530 Subject: [PATCH 36/44] cleanup and address PR comments --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 62 ++---------------- cmd/swarm/swarm-smoke/main.go | 65 +++++++++++++++++++ cmd/swarm/swarm-smoke/upload_and_sync.go | 61 ++--------------- 3 files changed, 77 insertions(+), 111 deletions(-) diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 9823dc0542..2bfb3016e5 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -307,9 +307,7 @@ func feedUploadAndSync(c *cli.Context) error { } func fetchFeed(topic string, user string, endpoint string, original []byte, ruid string) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ctx, sp := spancontext.StartSpan(ctx, "feed-and-sync.fetch") + ctx, sp := spancontext.StartSpan(context.Background(), "feed-and-sync.fetch") defer sp.Finish() log.Trace("sleeping", "ruid", ruid) @@ -317,7 +315,7 @@ func fetchFeed(topic string, user string, endpoint string, original []byte, ruid log.Trace("http get request (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user) - tn := time.Now() + var tn time.Time reqUri := endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user req, _ := http.NewRequest("GET", reqUri, nil) @@ -326,62 +324,14 @@ func fetchFeed(topic string, user string, endpoint string, original []byte, ruid opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header)) - trace := &httptrace.ClientTrace{ - GetConn: func(_ string) { - log.Trace("http get request (feed) - GetConn") - metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.getconn", nil).Update(time.Since(tn)) - }, - GotConn: func(_ httptrace.GotConnInfo) { - log.Trace("http get request (feed) - GotConn") - metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.gotconn", nil).Update(time.Since(tn)) - }, - PutIdleConn: func(err error) { - log.Trace("http get request (feed) - PutIdleConn", "err", err) - metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.putidle", nil).Update(time.Since(tn)) - }, - GotFirstResponseByte: func() { - log.Trace("http get request (feed) - GotFirstResponseByte") - metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.firstbyte", nil).Update(time.Since(tn)) - }, - Got100Continue: func() { - log.Trace("http get request (feed) - Got100Continue") - metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.got100continue", nil).Update(time.Since(tn)) - }, - DNSStart: func(_ httptrace.DNSStartInfo) { - log.Trace("http get request (feed) - DNSStart") - metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.dnsstart", nil).Update(time.Since(tn)) - }, - DNSDone: func(_ httptrace.DNSDoneInfo) { - log.Trace("http get request (feed) - DNSDone") - metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.dnsdone", nil).Update(time.Since(tn)) - }, - ConnectStart: func(network, addr string) { - log.Trace("http get request (feed) - ConnectStart", "network", network, "addr", addr) - metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.connectstart", nil).Update(time.Since(tn)) - }, - ConnectDone: func(network, addr string, err error) { - log.Trace("http get request (feed) - ConnectDone", "network", network, "addr", addr, "err", err) - metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.connectdone", nil).Update(time.Since(tn)) - }, - WroteHeaders: func() { - log.Trace("http get request (feed) - WroteHeaders(request)") - metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.wroteheaders", nil).Update(time.Since(tn)) - }, - Wait100Continue: func() { - log.Trace("http get request (feed) - Wait100Continue") - metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.wait100continue", nil).Update(time.Since(tn)) - }, - - WroteRequest: func(_ httptrace.WroteRequestInfo) { - log.Trace("http get request (feed) - WroteRequest") - metrics.GetOrRegisterResettingTimer("feed-and-sync.fetch.clienttrace.wroterequest", nil).Update(time.Since(tn)) - }, - } - req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) + trace := getClientTrace("feed-and-sync", ruid, &tn) + + req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) transport := http.DefaultTransport //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + tn = time.Now() res, err := transport.RoundTrip(req) if err != nil { log.Error(err.Error(), "ruid", ruid) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index bf0d4762ed..bcd9212c9b 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -18,13 +18,16 @@ package main import ( "fmt" + "net/http/httptrace" "os" "sort" + "time" "github.com/ethereum/go-ethereum/cmd/utils" gethmetrics "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics/influxdb" swarmmetrics "github.com/ethereum/go-ethereum/swarm/metrics" + "github.com/ethereum/go-ethereum/swarm/tracing" "github.com/ethereum/go-ethereum/log" @@ -119,6 +122,8 @@ func main() { swarmmetrics.MetricsInfluxDBHostTagFlag, }...) + app.Flags = append(app.Flags, tracing.Flags...) + app.Commands = []cli.Command{ { Name: "upload_and_sync", @@ -136,6 +141,12 @@ func main() { sort.Sort(cli.FlagsByName(app.Flags)) sort.Sort(cli.CommandsByName(app.Commands)) + + app.Before = func(ctx *cli.Context) error { + tracing.Setup(ctx) + return nil + } + app.After = func(ctx *cli.Context) error { return emitMetrics(ctx) } @@ -166,3 +177,57 @@ func emitMetrics(ctx *cli.Context) error { return nil } + +func getClientTrace(testName, ruid string, tn *time.Time) *httptrace.ClientTrace { + trace := &httptrace.ClientTrace{ + GetConn: func(_ string) { + log.Trace(testName+" - http get", "event", "GetConn", "ruid", ruid) + gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.getconn", nil).Update(time.Since(*tn)) + }, + GotConn: func(_ httptrace.GotConnInfo) { + log.Trace(testName+" - http get", "event", "GotConn", "ruid", ruid) + gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.gotconn", nil).Update(time.Since(*tn)) + }, + PutIdleConn: func(err error) { + log.Trace(testName+" - http get", "event", "PutIdleConn", "ruid", ruid, "err", err) + gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.putidle", nil).Update(time.Since(*tn)) + }, + GotFirstResponseByte: func() { + log.Trace(testName+" - http get", "event", "GotFirstResponseByte", "ruid", ruid) + gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.firstbyte", nil).Update(time.Since(*tn)) + }, + Got100Continue: func() { + log.Trace(testName+" - http get", "event", "Got100Continue", "ruid", ruid) + gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.got100continue", nil).Update(time.Since(*tn)) + }, + DNSStart: func(_ httptrace.DNSStartInfo) { + log.Trace(testName+" - http get", "event", "DNSStart", "ruid", ruid) + gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.dnsstart", nil).Update(time.Since(*tn)) + }, + DNSDone: func(_ httptrace.DNSDoneInfo) { + log.Trace(testName+" - http get", "event", "DNSDone", "ruid", ruid) + gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.dnsdone", nil).Update(time.Since(*tn)) + }, + ConnectStart: func(network, addr string) { + log.Trace(testName+" - http get", "event", "ConnectStart", "ruid", ruid, "network", network, "addr", addr) + gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.connectstart", nil).Update(time.Since(*tn)) + }, + ConnectDone: func(network, addr string, err error) { + log.Trace(testName+" - http get", "event", "ConnectDone", "ruid", ruid, "network", network, "addr", addr, "err", err) + gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.connectdone", nil).Update(time.Since(*tn)) + }, + WroteHeaders: func() { + log.Trace(testName+" - http get", "event", "WroteHeaders(request)", "ruid", ruid) + gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.wroteheaders", nil).Update(time.Since(*tn)) + }, + Wait100Continue: func() { + log.Trace(testName+" - http get", "event", "Wait100Continue", "ruid", ruid) + gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.wait100continue", nil).Update(time.Since(*tn)) + }, + WroteRequest: func(_ httptrace.WroteRequestInfo) { + log.Trace(testName+" - http get", "event", "WroteRequest", "ruid", ruid) + gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.wroterequest", nil).Update(time.Since(*tn)) + }, + } + return trace +} diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 15dfee3cf6..1524b82a5c 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -138,16 +138,14 @@ func uploadAndSync(c *cli.Context) error { // fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file func fetch(hash string, endpoint string, original []byte, ruid string) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ctx, sp := spancontext.StartSpan(ctx, "upload-and-sync.fetch") + ctx, sp := spancontext.StartSpan(context.Background(), "upload-and-sync.fetch") defer sp.Finish() log.Trace("sleeping", "ruid", ruid) time.Sleep(3 * time.Second) log.Trace("http get request", "ruid", ruid, "api", endpoint, "hash", hash) - tn := time.Now() + var tn time.Time reqUri := endpoint + "/bzz:/" + hash + "/" req, _ := http.NewRequest("GET", reqUri, nil) @@ -156,61 +154,14 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error { opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header)) - trace := &httptrace.ClientTrace{ - GetConn: func(_ string) { - log.Trace("http get request - GetConn") - metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.getconn", nil).Update(time.Since(tn)) - }, - GotConn: func(_ httptrace.GotConnInfo) { - log.Trace("http get request - GotConn") - metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.gotconn", nil).Update(time.Since(tn)) - }, - PutIdleConn: func(err error) { - log.Trace("http get request - PutIdleConn", "err", err) - metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.putidle", nil).Update(time.Since(tn)) - }, - GotFirstResponseByte: func() { - log.Trace("http get request - GotFirstResponseByte") - metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.firstbyte", nil).Update(time.Since(tn)) - }, - Got100Continue: func() { - log.Trace("http get request - Got100Continue") - metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.got100continue", nil).Update(time.Since(tn)) - }, - DNSStart: func(_ httptrace.DNSStartInfo) { - log.Trace("http get request - DNSStart") - metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.dnsstart", nil).Update(time.Since(tn)) - }, - DNSDone: func(_ httptrace.DNSDoneInfo) { - log.Trace("http get request - DNSDone") - metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.dnsdone", nil).Update(time.Since(tn)) - }, - ConnectStart: func(network, addr string) { - log.Trace("http get request - ConnectStart", "network", network, "addr", addr) - metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.connectstart", nil).Update(time.Since(tn)) - }, - ConnectDone: func(network, addr string, err error) { - log.Trace("http get request - ConnectDone", "network", network, "addr", addr, "err", err) - metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.connectdone", nil).Update(time.Since(tn)) - }, - WroteHeaders: func() { - log.Trace("http get request - WroteHeaders(request)") - metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.wroteheaders", nil).Update(time.Since(tn)) - }, - Wait100Continue: func() { - log.Trace("http get request - Wait100Continue") - metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.wait100continue", nil).Update(time.Since(tn)) - }, - WroteRequest: func(_ httptrace.WroteRequestInfo) { - log.Trace("http get request - WroteRequest") - metrics.GetOrRegisterResettingTimer("upload-and-sync.fetch.clienttrace.wroterequest", nil).Update(time.Since(tn)) - }, - } - req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) + trace := getClientTrace("upload-and-sync", ruid, &tn) + + req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) transport := http.DefaultTransport //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + tn = time.Now() res, err := transport.RoundTrip(req) if err != nil { log.Error(err.Error(), "ruid", ruid) From 73fc234634db587c1c8302f2d8e84bc68cc91c2f Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Thu, 6 Dec 2018 17:01:39 +0100 Subject: [PATCH 37/44] add sync-delay and single configurations to swarm smoke tests --- cmd/swarm/swarm-smoke/main.go | 13 +++++++++++++ cmd/swarm/swarm-smoke/upload_and_sync.go | 24 +++++++++++++++++++++--- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index bcd9212c9b..f4ce4400a0 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -45,10 +45,12 @@ var ( appName string scheme string filesize int + syncDelay int from int to int verbosity int timeout int + single bool ) func main() { @@ -99,6 +101,12 @@ func main() { Usage: "file size for generated random file in KB", Destination: &filesize, }, + cli.IntFlag{ + Name: "sync-delay", + Value: 5, + Usage: "duration of delay in seconds to wait for content to be synced", + Destination: &syncDelay, + }, cli.IntFlag{ Name: "verbosity", Value: 1, @@ -111,6 +119,11 @@ func main() { Usage: "timeout in seconds after which kill the process", Destination: &timeout, }, + cli.BoolFlag{ + Name: "single", + Usage: "whether to fetch content from a single node or from all nodes", + Destination: &single, + }, } app.Flags = append(app.Flags, []cli.Flag{ diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 1524b82a5c..8715a815f3 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -25,6 +25,7 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "net/http" "net/http/httptrace" "os" @@ -112,10 +113,11 @@ func uploadAndSync(c *cli.Context) error { log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash)) - time.Sleep(3 * time.Second) + time.Sleep(time.Duration(syncDelay) * time.Second) wg := sync.WaitGroup{} - for _, endpoint := range endpoints { + if single { + randIndex := 1 + rand.Intn(len(endpoints)-1) ruid := uuid.New()[:8] wg.Add(1) go func(endpoint string, ruid string) { @@ -128,7 +130,23 @@ func uploadAndSync(c *cli.Context) error { wg.Done() return } - }(endpoint, ruid) + }(endpoints[randIndex], ruid) + } else { + for _, endpoint := range endpoints { + ruid := uuid.New()[:8] + wg.Add(1) + go func(endpoint string, ruid string) { + for { + err := fetch(hash, endpoint, fhash, ruid) + if err != nil { + continue + } + + wg.Done() + return + } + }(endpoint, ruid) + } } wg.Wait() log.Info("all endpoints synced random file successfully") From 964ce76658bf4981377746203a55c959ecf57745 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Thu, 6 Dec 2018 18:11:59 +0100 Subject: [PATCH 38/44] add missing rand seed --- cmd/swarm/swarm-smoke/upload_and_sync.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 8715a815f3..d0bc2d91d3 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -117,6 +117,7 @@ func uploadAndSync(c *cli.Context) error { wg := sync.WaitGroup{} if single { + rand.Seed(time.Now().UTC().UnixNano()) randIndex := 1 + rand.Intn(len(endpoints)-1) ruid := uuid.New()[:8] wg.Add(1) From 4dbb41b921cc1710738fad786301fdd50fcc5caf Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Fri, 7 Dec 2018 13:42:58 +0530 Subject: [PATCH 39/44] replaced upload from swarm binary by upload from swarm client --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 11 ++-- cmd/swarm/swarm-smoke/upload_and_sync.go | 63 +++++++------------ 2 files changed, 27 insertions(+), 47 deletions(-) diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 2bfb3016e5..10c4be815f 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -21,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage/feed" + "github.com/ethereum/go-ethereum/swarm/testutil" colorable "github.com/mattn/go-colorable" opentracing "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" @@ -227,12 +228,12 @@ func feedUploadAndSync(c *cli.Context) error { log.Info("all endpoints synced random data successfully") // upload test file - log.Info("uploading to " + endpoints[0] + " and syncing") + seed := int(time.Now().UnixNano() / 1e6) + log.Info("feed uploading to "+endpoints[0]+" and syncing", "seed", seed) - f, cleanup := generateRandomFile(filesize * 1000) - defer cleanup() + randomBytes := testutil.RandomBytes(seed, filesize*1000) - hash, err := upload(f, endpoints[0]) + hash, err := upload(&randomBytes, endpoints[0]) if err != nil { return err } @@ -241,7 +242,7 @@ func feedUploadAndSync(c *cli.Context) error { return err } multihashHex := hexutil.Encode(hashBytes) - fileHash, err := digest(f) + fileHash, err := digest(bytes.NewReader(randomBytes)) if err != nil { return err } diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index d0bc2d91d3..7fd0645281 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -29,14 +29,15 @@ import ( "net/http" "net/http/httptrace" "os" - "os/exec" - "strings" "sync" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/api/client" "github.com/ethereum/go-ethereum/swarm/spancontext" + "github.com/ethereum/go-ethereum/swarm/testutil" opentracing "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" @@ -91,21 +92,20 @@ func uploadAndSync(c *cli.Context) error { }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) + seed := int(time.Now().UnixNano() / 1e6) + log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed) - log.Info("uploading to " + endpoints[0] + " and syncing") - - f, cleanup := generateRandomFile(filesize * 1000) - defer cleanup() + randomBytes := testutil.RandomBytes(seed, filesize*1000) t1 := time.Now() - hash, err := upload(f, endpoints[0]) + hash, err := upload(&randomBytes, endpoints[0]) if err != nil { log.Error(err.Error()) return err } metrics.GetOrRegisterCounter("upload-and-sync.upload-time", nil).Inc(int64(time.Since(t1))) - fhash, err := digest(f) + fhash, err := digest(bytes.NewReader(randomBytes)) if err != nil { log.Error(err.Error()) return err @@ -214,16 +214,19 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error { } // upload is uploading a file `f` to `endpoint` via the `swarm up` cmd -func upload(f *os.File, endpoint string) (string, error) { - var out bytes.Buffer - cmd := exec.Command("swarm", "--bzzapi", endpoint, "up", f.Name()) - cmd.Stdout = &out - err := cmd.Run() - if err != nil { - return "", err - } - hash := strings.TrimRight(out.String(), "\r\n") - return hash, nil +func upload(dataBytes *[]byte, endpoint string) (string, error) { + swarm := client.NewClient(endpoint) + f := &client.File{ + ReadCloser: ioutil.NopCloser(bytes.NewReader(*dataBytes)), + ManifestEntry: api.ManifestEntry{ + ContentType: "text/plain", + Mode: 0660, + Size: int64(len(*dataBytes)), + }, + } + + // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. + return swarm.Upload(f, "", false) } func digest(r io.Reader) ([]byte, error) { @@ -246,27 +249,3 @@ func generateRandomData(datasize int) ([]byte, error) { } return b, nil } - -// generateRandomFile is creating a temporary file with the requested byte size -func generateRandomFile(size int) (f *os.File, teardown func()) { - // create a tmp file - tmp, err := ioutil.TempFile("", "swarm-test") - if err != nil { - panic(err) - } - - // callback for tmp file cleanup - teardown = func() { - tmp.Close() - os.Remove(tmp.Name()) - } - - buf := make([]byte, size) - _, err = crand.Read(buf) - if err != nil { - panic(err) - } - ioutil.WriteFile(tmp.Name(), buf, 0755) - - return tmp, teardown -} From b38fd0c3328ef35c91913155cfa75d2ebfa2b87c Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Fri, 7 Dec 2018 15:12:58 +0530 Subject: [PATCH 40/44] moved http client tracer to api/client. added tracing on uploads --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 3 +- cmd/swarm/swarm-smoke/main.go | 56 ------------- cmd/swarm/swarm-smoke/upload_and_sync.go | 2 +- swarm/api/client/client.go | 82 ++++++++++++++++++- 4 files changed, 83 insertions(+), 60 deletions(-) diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 10c4be815f..2c5e3fd235 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -19,6 +19,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/api/client" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/ethereum/go-ethereum/swarm/testutil" @@ -325,7 +326,7 @@ func fetchFeed(topic string, user string, endpoint string, original []byte, ruid opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header)) - trace := getClientTrace("feed-and-sync", ruid, &tn) + trace := client.GetClientTrace("feed-and-sync - http get", "feed-and-sync", ruid, &tn) req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) transport := http.DefaultTransport diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index f4ce4400a0..66cecdc5c7 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -18,10 +18,8 @@ package main import ( "fmt" - "net/http/httptrace" "os" "sort" - "time" "github.com/ethereum/go-ethereum/cmd/utils" gethmetrics "github.com/ethereum/go-ethereum/metrics" @@ -190,57 +188,3 @@ func emitMetrics(ctx *cli.Context) error { return nil } - -func getClientTrace(testName, ruid string, tn *time.Time) *httptrace.ClientTrace { - trace := &httptrace.ClientTrace{ - GetConn: func(_ string) { - log.Trace(testName+" - http get", "event", "GetConn", "ruid", ruid) - gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.getconn", nil).Update(time.Since(*tn)) - }, - GotConn: func(_ httptrace.GotConnInfo) { - log.Trace(testName+" - http get", "event", "GotConn", "ruid", ruid) - gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.gotconn", nil).Update(time.Since(*tn)) - }, - PutIdleConn: func(err error) { - log.Trace(testName+" - http get", "event", "PutIdleConn", "ruid", ruid, "err", err) - gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.putidle", nil).Update(time.Since(*tn)) - }, - GotFirstResponseByte: func() { - log.Trace(testName+" - http get", "event", "GotFirstResponseByte", "ruid", ruid) - gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.firstbyte", nil).Update(time.Since(*tn)) - }, - Got100Continue: func() { - log.Trace(testName+" - http get", "event", "Got100Continue", "ruid", ruid) - gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.got100continue", nil).Update(time.Since(*tn)) - }, - DNSStart: func(_ httptrace.DNSStartInfo) { - log.Trace(testName+" - http get", "event", "DNSStart", "ruid", ruid) - gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.dnsstart", nil).Update(time.Since(*tn)) - }, - DNSDone: func(_ httptrace.DNSDoneInfo) { - log.Trace(testName+" - http get", "event", "DNSDone", "ruid", ruid) - gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.dnsdone", nil).Update(time.Since(*tn)) - }, - ConnectStart: func(network, addr string) { - log.Trace(testName+" - http get", "event", "ConnectStart", "ruid", ruid, "network", network, "addr", addr) - gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.connectstart", nil).Update(time.Since(*tn)) - }, - ConnectDone: func(network, addr string, err error) { - log.Trace(testName+" - http get", "event", "ConnectDone", "ruid", ruid, "network", network, "addr", addr, "err", err) - gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.connectdone", nil).Update(time.Since(*tn)) - }, - WroteHeaders: func() { - log.Trace(testName+" - http get", "event", "WroteHeaders(request)", "ruid", ruid) - gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.wroteheaders", nil).Update(time.Since(*tn)) - }, - Wait100Continue: func() { - log.Trace(testName+" - http get", "event", "Wait100Continue", "ruid", ruid) - gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.wait100continue", nil).Update(time.Since(*tn)) - }, - WroteRequest: func(_ httptrace.WroteRequestInfo) { - log.Trace(testName+" - http get", "event", "WroteRequest", "ruid", ruid) - gethmetrics.GetOrRegisterResettingTimer(testName+".fetch.clienttrace.wroterequest", nil).Update(time.Since(*tn)) - }, - } - return trace -} diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 7fd0645281..db74f569aa 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -173,7 +173,7 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error { opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header)) - trace := getClientTrace("upload-and-sync", ruid, &tn) + trace := client.GetClientTrace("upload-and-sync - http get", "upload-and-sync", ruid, &tn) req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) transport := http.DefaultTransport diff --git a/swarm/api/client/client.go b/swarm/api/client/client.go index d9837ca732..570c580690 100644 --- a/swarm/api/client/client.go +++ b/swarm/api/client/client.go @@ -19,6 +19,7 @@ package client import ( "archive/tar" "bytes" + "context" "encoding/json" "errors" "fmt" @@ -26,6 +27,7 @@ import ( "io/ioutil" "mime/multipart" "net/http" + "net/http/httptrace" "net/textproto" "net/url" "os" @@ -33,9 +35,15 @@ import ( "regexp" "strconv" "strings" + "time" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage/feed" + opentracing "github.com/opentracing/opentracing-go" + "github.com/pborman/uuid" ) var ( @@ -474,6 +482,11 @@ type UploadFn func(file *File) error // TarUpload uses the given Uploader to upload files to swarm as a tar stream, // returning the resulting manifest hash func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt bool) (string, error) { + ctx, sp := spancontext.StartSpan(context.Background(), "swarm.api.client.tarupload") + defer sp.Finish() + + var tn time.Time + reqR, reqW := io.Pipe() defer reqR.Close() addr := hash @@ -489,6 +502,17 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t if err != nil { return "", err } + + opentracing.GlobalTracer().Inject( + sp.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)) + + trace := GetClientTrace("swarm api client - upload tar", "swarm.api.client.uploadtar", uuid.New()[:8], &tn) + + req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) + transport := http.DefaultTransport + req.Header.Set("Content-Type", "application/x-tar") if defaultPath != "" { q := req.URL.Query() @@ -529,8 +553,8 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t } reqW.CloseWithError(err) }() - - res, err := http.DefaultClient.Do(req) + tn = time.Now() + res, err := transport.RoundTrip(req) if err != nil { return "", err } @@ -728,3 +752,57 @@ func (c *Client) GetFeedRequest(query *feed.Query, manifestAddressOrDomain strin } return &metadata, nil } + +func GetClientTrace(traceMsg, metricPrefix, ruid string, tn *time.Time) *httptrace.ClientTrace { + trace := &httptrace.ClientTrace{ + GetConn: func(_ string) { + log.Trace(traceMsg+" - http get", "event", "GetConn", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.getconn", nil).Update(time.Since(*tn)) + }, + GotConn: func(_ httptrace.GotConnInfo) { + log.Trace(traceMsg+" - http get", "event", "GotConn", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.gotconn", nil).Update(time.Since(*tn)) + }, + PutIdleConn: func(err error) { + log.Trace(traceMsg+" - http get", "event", "PutIdleConn", "ruid", ruid, "err", err) + metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.putidle", nil).Update(time.Since(*tn)) + }, + GotFirstResponseByte: func() { + log.Trace(traceMsg+" - http get", "event", "GotFirstResponseByte", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.firstbyte", nil).Update(time.Since(*tn)) + }, + Got100Continue: func() { + log.Trace(traceMsg, "event", "Got100Continue", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.got100continue", nil).Update(time.Since(*tn)) + }, + DNSStart: func(_ httptrace.DNSStartInfo) { + log.Trace(traceMsg, "event", "DNSStart", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.dnsstart", nil).Update(time.Since(*tn)) + }, + DNSDone: func(_ httptrace.DNSDoneInfo) { + log.Trace(traceMsg, "event", "DNSDone", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.dnsdone", nil).Update(time.Since(*tn)) + }, + ConnectStart: func(network, addr string) { + log.Trace(traceMsg, "event", "ConnectStart", "ruid", ruid, "network", network, "addr", addr) + metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.connectstart", nil).Update(time.Since(*tn)) + }, + ConnectDone: func(network, addr string, err error) { + log.Trace(traceMsg, "event", "ConnectDone", "ruid", ruid, "network", network, "addr", addr, "err", err) + metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.connectdone", nil).Update(time.Since(*tn)) + }, + WroteHeaders: func() { + log.Trace(traceMsg, "event", "WroteHeaders(request)", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.wroteheaders", nil).Update(time.Since(*tn)) + }, + Wait100Continue: func() { + log.Trace(traceMsg, "event", "Wait100Continue", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.wait100continue", nil).Update(time.Since(*tn)) + }, + WroteRequest: func(_ httptrace.WroteRequestInfo) { + log.Trace(traceMsg, "event", "WroteRequest", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.wroterequest", nil).Update(time.Since(*tn)) + }, + } + return trace +} From a43801c63640f982a90e9f0cde8d57ae86edc565 Mon Sep 17 00:00:00 2001 From: holisticode Date: Fri, 7 Dec 2018 08:03:06 -0500 Subject: [PATCH 41/44] cmd/swarm/swarm-smoke: metrics for each run (#1049) --- cmd/swarm/swarm-smoke/upload_and_sync.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index db74f569aa..d605f79a3a 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -123,11 +123,14 @@ func uploadAndSync(c *cli.Context) error { wg.Add(1) go func(endpoint string, ruid string) { for { + start := time.Now() err := fetch(hash, endpoint, fhash, ruid) + fetchTime := time.Since(start) if err != nil { continue } + metrics.GetOrRegisterMeter("upload-and-sync.single.fetch-time", nil).Mark(int64(fetchTime)) wg.Done() return } @@ -138,11 +141,14 @@ func uploadAndSync(c *cli.Context) error { wg.Add(1) go func(endpoint string, ruid string) { for { + start := time.Now() err := fetch(hash, endpoint, fhash, ruid) + fetchTime := time.Since(start) if err != nil { continue } + metrics.GetOrRegisterMeter("upload-and-sync.each.fetch-time", nil).Mark(int64(fetchTime)) wg.Done() return } From aef8672b6326c02db40498eef63647f408c5f618 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Mon, 10 Dec 2018 23:14:13 +0530 Subject: [PATCH 42/44] swarm/api/client: address pr comments --- swarm/api/client/client.go | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/swarm/api/client/client.go b/swarm/api/client/client.go index 570c580690..fad5b652c6 100644 --- a/swarm/api/client/client.go +++ b/swarm/api/client/client.go @@ -42,7 +42,6 @@ import ( "github.com/ethereum/go-ethereum/swarm/api" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage/feed" - opentracing "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" ) @@ -503,12 +502,7 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t return "", err } - opentracing.GlobalTracer().Inject( - sp.Context(), - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(req.Header)) - - trace := GetClientTrace("swarm api client - upload tar", "swarm.api.client.uploadtar", uuid.New()[:8], &tn) + trace := GetClientTrace("swarm api client - upload tar", "api.client.uploadtar", uuid.New()[:8], &tn) req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) transport := http.DefaultTransport @@ -757,51 +751,51 @@ func GetClientTrace(traceMsg, metricPrefix, ruid string, tn *time.Time) *httptra trace := &httptrace.ClientTrace{ GetConn: func(_ string) { log.Trace(traceMsg+" - http get", "event", "GetConn", "ruid", ruid) - metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.getconn", nil).Update(time.Since(*tn)) + metrics.GetOrRegisterResettingTimer(metricPrefix+".getconn", nil).Update(time.Since(*tn)) }, GotConn: func(_ httptrace.GotConnInfo) { log.Trace(traceMsg+" - http get", "event", "GotConn", "ruid", ruid) - metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.gotconn", nil).Update(time.Since(*tn)) + metrics.GetOrRegisterResettingTimer(metricPrefix+".gotconn", nil).Update(time.Since(*tn)) }, PutIdleConn: func(err error) { log.Trace(traceMsg+" - http get", "event", "PutIdleConn", "ruid", ruid, "err", err) - metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.putidle", nil).Update(time.Since(*tn)) + metrics.GetOrRegisterResettingTimer(metricPrefix+".putidle", nil).Update(time.Since(*tn)) }, GotFirstResponseByte: func() { log.Trace(traceMsg+" - http get", "event", "GotFirstResponseByte", "ruid", ruid) - metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.firstbyte", nil).Update(time.Since(*tn)) + metrics.GetOrRegisterResettingTimer(metricPrefix+".firstbyte", nil).Update(time.Since(*tn)) }, Got100Continue: func() { log.Trace(traceMsg, "event", "Got100Continue", "ruid", ruid) - metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.got100continue", nil).Update(time.Since(*tn)) + metrics.GetOrRegisterResettingTimer(metricPrefix+".got100continue", nil).Update(time.Since(*tn)) }, DNSStart: func(_ httptrace.DNSStartInfo) { log.Trace(traceMsg, "event", "DNSStart", "ruid", ruid) - metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.dnsstart", nil).Update(time.Since(*tn)) + metrics.GetOrRegisterResettingTimer(metricPrefix+".dnsstart", nil).Update(time.Since(*tn)) }, DNSDone: func(_ httptrace.DNSDoneInfo) { log.Trace(traceMsg, "event", "DNSDone", "ruid", ruid) - metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.dnsdone", nil).Update(time.Since(*tn)) + metrics.GetOrRegisterResettingTimer(metricPrefix+".dnsdone", nil).Update(time.Since(*tn)) }, ConnectStart: func(network, addr string) { log.Trace(traceMsg, "event", "ConnectStart", "ruid", ruid, "network", network, "addr", addr) - metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.connectstart", nil).Update(time.Since(*tn)) + metrics.GetOrRegisterResettingTimer(metricPrefix+".connectstart", nil).Update(time.Since(*tn)) }, ConnectDone: func(network, addr string, err error) { log.Trace(traceMsg, "event", "ConnectDone", "ruid", ruid, "network", network, "addr", addr, "err", err) - metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.connectdone", nil).Update(time.Since(*tn)) + metrics.GetOrRegisterResettingTimer(metricPrefix+".connectdone", nil).Update(time.Since(*tn)) }, WroteHeaders: func() { log.Trace(traceMsg, "event", "WroteHeaders(request)", "ruid", ruid) - metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.wroteheaders", nil).Update(time.Since(*tn)) + metrics.GetOrRegisterResettingTimer(metricPrefix+".wroteheaders", nil).Update(time.Since(*tn)) }, Wait100Continue: func() { log.Trace(traceMsg, "event", "Wait100Continue", "ruid", ruid) - metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.wait100continue", nil).Update(time.Since(*tn)) + metrics.GetOrRegisterResettingTimer(metricPrefix+".wait100continue", nil).Update(time.Since(*tn)) }, WroteRequest: func(_ httptrace.WroteRequestInfo) { log.Trace(traceMsg, "event", "WroteRequest", "ruid", ruid) - metrics.GetOrRegisterResettingTimer(metricPrefix+".fetch.clienttrace.wroterequest", nil).Update(time.Since(*tn)) + metrics.GetOrRegisterResettingTimer(metricPrefix+".wroterequest", nil).Update(time.Since(*tn)) }, } return trace From 07b10926f5a183030cbb57443b5fc47af1b53b61 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Mon, 10 Dec 2018 23:52:44 +0530 Subject: [PATCH 43/44] swarm/api/client: address pr comments --- swarm/api/client/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/api/client/client.go b/swarm/api/client/client.go index fad5b652c6..f793ca8b89 100644 --- a/swarm/api/client/client.go +++ b/swarm/api/client/client.go @@ -481,7 +481,7 @@ type UploadFn func(file *File) error // TarUpload uses the given Uploader to upload files to swarm as a tar stream, // returning the resulting manifest hash func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt bool) (string, error) { - ctx, sp := spancontext.StartSpan(context.Background(), "swarm.api.client.tarupload") + ctx, sp := spancontext.StartSpan(context.Background(), "api.client.tarupload") defer sp.Finish() var tn time.Time From 73364460cd01a6f9b3123f6dcb501f30076c1d57 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Mon, 10 Dec 2018 19:46:20 +0100 Subject: [PATCH 44/44] swarm/api/http: remove opentracing --- swarm/api/http/middleware.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go index 42dd34cf5b..f7f819eab5 100644 --- a/swarm/api/http/middleware.go +++ b/swarm/api/http/middleware.go @@ -11,8 +11,7 @@ import ( "github.com/ethereum/go-ethereum/swarm/api" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/sctx" - opentracing "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/pborman/uuid" ) @@ -95,10 +94,7 @@ func InstrumentOpenTracing(h http.Handler) http.Handler { return } spanName := fmt.Sprintf("http.%s.%s", r.Method, uri.Scheme) - wireContext, _ := opentracing.GlobalTracer().Extract( - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(r.Header)) - sp, ctx := opentracing.StartSpanFromContext(r.Context(), spanName, ext.RPCServerOption(wireContext)) + ctx, sp := spancontext.StartSpan(r.Context(), spanName) defer sp.Finish() h.ServeHTTP(w, r.WithContext(ctx))