Skip to content

Commit

Permalink
updated search command
Browse files Browse the repository at this point in the history
  • Loading branch information
danielamkaer committed Mar 18, 2020
1 parent cbffd34 commit 7150a10
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 44 deletions.
14 changes: 7 additions & 7 deletions api/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ type QueryJobs struct {
func (c *Client) QueryJobs() *QueryJobs { return &QueryJobs{client: c} }

type Query struct {
QueryString string `json:"queryString"`
Start string `json:"start,omitempty"`
End string `json:"end,omitempty"`
Live bool `json:"isLive,omitempty"`
TimezoneOffset *int `json:"timeZoneOffsetMinutes,omitempty"`
Arguments map[string]string `json:"arguments,omitempty"`
QueryString string `json:"queryString"`
Start string `json:"start,omitempty"`
End string `json:"end,omitempty"`
Live bool `json:"isLive,omitempty"`
TimezoneOffset *int `json:"timeZoneOffsetMinutes,omitempty"`
Arguments map[string]string `json:"arguments,omitempty"`
ShowQueryEventDistribution bool `json:"showQueryEventDistribution,omitempty"`
}

type QueryResultMetadata struct {
Expand Down Expand Up @@ -96,7 +97,6 @@ func (q *QueryJobs) PollContext(ctx context.Context, repository string, id strin
var result QueryResult

err = json.NewDecoder(resp.Body).Decode(&result)
//err = json.NewDecoder(io.TeeReader(resp.Body, os.Stderr)).Decode(&result)

return result, err
}
Expand Down
4 changes: 4 additions & 0 deletions api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ type StatusResponse struct {
Version string
}

func (s StatusResponse) IsDown() bool {
return s.Status != "OK" && s.Status != "WARN"
}

func (c *Client) Status() (*StatusResponse, error) {
resp, err := c.HTTPRequest(http.MethodGet, "api/v1/status", nil)

Expand Down
2 changes: 1 addition & 1 deletion cmd/profiles_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func collectProfileInfo(cmd *cobra.Command) (*login, error) {
continue
}

if status.Status != "OK" {
if status.IsDown() {
cmd.Println(prompt.Colorize("[[red]Failed[reset]]"))
cmd.Println(fmt.Errorf("The server reported that is is malfunctioning, status: %s", status.Status))
os.Exit(1)
Expand Down
100 changes: 66 additions & 34 deletions cmd/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"fmt"
"github.com/humio/cli/api"
"github.com/humio/cli/prompt"
"github.com/olekukonko/tablewriter"
"github.com/schollz/progressbar/v2"
"github.com/spf13/cobra"
"io"
"math"
"os"
"os/signal"
"regexp"
Expand Down Expand Up @@ -39,7 +40,7 @@ func newSearchCmd() *cobra.Command {

// run in lambda func to be able to defer and delete the query job
err := func() error {
var progress queryResultProgressBar
var progress *queryResultProgressBar
if !noProgress {
progress = newQueryResultProgressBar()
}
Expand All @@ -49,6 +50,7 @@ func newSearchCmd() *cobra.Command {
Start: start,
End: end,
Live: live,
ShowQueryEventDistribution: true,
})

if err != nil {
Expand Down Expand Up @@ -82,16 +84,20 @@ func newSearchCmd() *cobra.Command {
printer = newEventListPrinter(cmd.OutOrStdout(), fmtStr)
}


for !result.Done {
progress.Update(result)
if progress != nil {
progress.Update(result)
}
result, err = poller.WaitAndPollContext(ctx)
if err != nil {
return err
}
}

progress.Finish()
if progress != nil {
progress.Update(result)
progress.Finish()
}

printer.print(result)

Expand All @@ -117,11 +123,11 @@ func newSearchCmd() *cobra.Command {
},
}

cmd.Flags().StringVarP(&start, "start", "s", "10m", "Query start time [default 10m]")
cmd.Flags().StringVarP(&start, "start", "s", "10m", "Query start time")
cmd.Flags().StringVarP(&end, "end", "e", "", "Query end time")
cmd.Flags().BoolVarP(&live, "live", "l", false, "Run a live search and keep outputting until interrupted.")
cmd.Flags().StringVarP(&fmtStr, "fmt", "f", "{@timestamp} {@rawstring}", "Format string if the result is an event list\n"+
"Insert fields by wrapping field names in brackets, e.g. {@timestamp} [default: '{@timestamp} {@rawstring}']\n"+
"Insert fields by wrapping field names in brackets, e.g. {@timestamp}\n"+
"Limited format modifiers are supported such as {@timestamp:40} which will right align and left pad @timestamp to 40 characters.\n"+
"{@timestamp:-40} left aligns and right pads to 40 characters.")
cmd.Flags().BoolVar(&noProgress, "no-progress", false, "Do not should progress information.")
Expand All @@ -144,37 +150,59 @@ func contextCancelledOnInterrupt(ctx context.Context) context.Context {
}

type queryResultProgressBar struct {
bar *progressbar.ProgressBar
bar *prompt.ProgressBar
epsValue float64
bpsValue float64
hits uint64
}

func newQueryResultProgressBar() queryResultProgressBar {
b := queryResultProgressBar{
bar: progressbar.NewOptions(
0,
progressbar.OptionSetPredictTime(false),
progressbar.OptionSetDescription("Searching..."),
progressbar.OptionClearOnFinish(),
),
}
func newQueryResultProgressBar() *queryResultProgressBar {
b := &queryResultProgressBar{}
b.bar = prompt.NewProgressBar(
prompt.ProgressOptionDescription("Searching..."),
prompt.ProgressOptionAppendAdditionalInfo(b.additionalInfoBps),
prompt.ProgressOptionAppendAdditionalInfo(b.additionalInfoEps),
prompt.ProgressOptionAppendAdditionalInfo(b.additionalInfoHits),
)
b.epsValue = math.NaN()
b.bpsValue = math.NaN()
b.bar.Start()
return b
}

func (b queryResultProgressBar) Update(result api.QueryResult) {
if b.bar == nil {
return
func (b *queryResultProgressBar) Update(result api.QueryResult) {
if result.Metadata.TimeMillis > 0 {
b.epsValue = float64(result.Metadata.ProcessedEvents) / float64(result.Metadata.TimeMillis) * 1000
b.bpsValue = float64(result.Metadata.ProcessedBytes) / float64(result.Metadata.TimeMillis) * 1000
}

if result.Metadata.TotalWork > 0 {
b.bar.ChangeMax64(int64(result.Metadata.TotalWork))
b.bar.Set64(int64(result.Metadata.WorkDone))
b.hits = result.Metadata.EventCount

b.bar.Set(result.Metadata.WorkDone, result.Metadata.TotalWork)
}

func (b *queryResultProgressBar) additionalInfoEps() string {
if !math.IsNaN(b.epsValue) {
v, suffix := prompt.AddSISuffix(b.epsValue, false)
return fmt.Sprintf("%.1f %sEPS", v, suffix)
}
return ""
}

func (b queryResultProgressBar) Finish() {
if b.bar == nil {
return
func (b *queryResultProgressBar) additionalInfoBps() string {
if !math.IsNaN(b.bpsValue) {
v, suffix := prompt.AddSISuffix(b.bpsValue, true)
return fmt.Sprintf("%.1f %sB/s", v, suffix)
}
return ""
}

func (b *queryResultProgressBar) additionalInfoHits() string {
v, suffix := prompt.AddSISuffix(float64(b.hits), false)
return fmt.Sprintf("%.1f %s events", v, suffix)
}

func (b *queryResultProgressBar) Finish() {
b.bar.Finish()
}

Expand Down Expand Up @@ -319,17 +347,21 @@ func newAggregatePrinter(w io.Writer) *aggregatePrinter {
}

func (p *aggregatePrinter) print(result api.QueryResult) {
f := p.columns
m := map[string]bool{}
for _, e := range result.Events {
for k := range e {
if !m[k] {
f = append(f, k)
m[k] = true
if len(result.Metadata.FieldOrder) > 0 {
p.columns = result.Metadata.FieldOrder
} else {
f := p.columns
m := map[string]bool{}
for _, e := range result.Events {
for k := range e {
if !m[k] {
f = append(f, k)
m[k] = true
}
}
}
p.columns = f
}
p.columns = f

if len(p.columns) == 0 {
return
Expand Down
8 changes: 6 additions & 2 deletions cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ func newStatusCmd() *cobra.Command {
}

func formatStatusText(statusText string) string {
if statusText == "OK" {
switch statusText {
case "OK":
return prompt.Colorize("[green]OK[reset]")
case "WARN":
return prompt.Colorize("[yellow]WARN[reset]")
default:
return prompt.Colorize(fmt.Sprintf("[red]%s[reset]",statusText))
}
return prompt.Colorize(fmt.Sprintf("[red]%s[reset]", statusText))
}
156 changes: 156 additions & 0 deletions prompt/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package prompt

import (
"fmt"
"io"
"os"
"time"
)

type ProgressBar struct {
w io.Writer
description string
cur uint64
max uint64
barSegments int
tickInterval time.Duration
close chan struct{}
update chan struct{}
running chan struct{}
additionalInfo []func() string
}

type ProgressOption func(*ProgressBar)

func ProgressOptionDescription(description string) ProgressOption {
return func(bar *ProgressBar) {
bar.description = description
}
}

func ProgressOptionBarSegments(segments int) ProgressOption {
return func(bar *ProgressBar) {
bar.barSegments = segments
}
}

func ProgressOptionTickInterval(interval time.Duration) ProgressOption {
return func(bar *ProgressBar) {
bar.tickInterval = interval
}
}

func ProgressOptionAppendAdditionalInfo(f func() string) ProgressOption {
return func(bar *ProgressBar) {
bar.additionalInfo = append(bar.additionalInfo, f)
}
}

func NewProgressBar(opts ...ProgressOption) *ProgressBar {
bar := &ProgressBar{
max: 100,
barSegments: 30,
close: make(chan struct{}),
update: make(chan struct{}),
running: make(chan struct{}),
}

for _, o := range opts {
o(bar)
}

return bar
}

func (p *ProgressBar) percentage() float64 {
if p.max == 0 {
return 0
}

return float64(p.cur) / float64(p.max)
}

func (p *ProgressBar) bar() string {
segments := int(float64(p.barSegments) * p.percentage())

if p.percentage() > 0 && segments == 0 {
segments = 1
}

bar := make([]byte, p.barSegments+2)
bar[0] = '['
bar[p.barSegments+1] = ']'
b := bar[1 : len(bar)-1]
for i := range b {
switch {
case i == segments-1:
b[i] = '>'
case i < segments:
b[i] = '='
default:
b[i] = ' '
}
}

return string(bar)
}

func (ProgressBar) clearLine() {
fmt.Fprint(os.Stderr, "\r")
}

func (p *ProgressBar) print() {
d := p.description
if len(d) > 0 {
d = " " + d
}
fmt.Fprintf(os.Stderr, "%s %.1f %% %s", d, p.percentage()*100, p.bar())
for _, f := range p.additionalInfo {
fmt.Fprintf(os.Stderr, " %s", f())
}
}

func (p *ProgressBar) run() {
defer close(p.running)
for {
p.clearLine()
p.print()
var tick <-chan time.Time
if p.tickInterval > 0 {
tick = time.After(p.tickInterval)
}
select {
case <-p.close:
return
case <-tick:
case <-p.update:
}
}
}

func (p *ProgressBar) Update(cur uint64) {
p.Set(cur, p.max)
}

func (p *ProgressBar) Set(cur, max uint64) {
p.cur, p.max = cur, max
select {
case p.update <- struct{}{}:
default:
}
}

func (p *ProgressBar) Finish() {
p.Set(p.max, p.max)
p.Stop()
<-p.running
fmt.Fprintln(os.Stderr)
}

func (p *ProgressBar) Start() {
go p.run()
}

func (p *ProgressBar) Stop() {
close(p.close)
}
Loading

0 comments on commit 7150a10

Please sign in to comment.