Skip to content

Commit

Permalink
Add start-signal flag
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewkroh committed Jan 20, 2021
1 parent b87d737 commit 81bb706
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 35 deletions.
30 changes: 22 additions & 8 deletions command/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/spf13/cobra"
"go.uber.org/zap"

"github.com/andrewkroh/stream/pkg/cmdutil"
"github.com/andrewkroh/stream/pkg/output"
)

Expand All @@ -23,7 +24,7 @@ func newLogRunner(options *output.Options, logger *zap.Logger) *cobra.Command {
cmd: &cobra.Command{
Use: "log [log file to stream]",
Short: "Stream log file lines",
Args: cobra.ExactArgs(1),
Args: cmdutil.ValidateArgs(cobra.MinimumNArgs(1), cmdutil.RegularFiles),
},
}

Expand All @@ -36,17 +37,29 @@ func newLogRunner(options *output.Options, logger *zap.Logger) *cobra.Command {
}

func (r *logRunner) Run(files []string) error {
f, err := os.Open(files[0])
out, err := output.Initialize(r.out, r.logger, r.cmd.Context())
if err != nil {
return err
}
defer f.Close()
defer out.Close()

for _, f := range files {
if err := r.sendLog(f, out); err != nil {
return err
}
}

o, err := output.Initialize(r.out, r.logger, r.cmd.Context())
return nil
}

func (r *logRunner) sendLog(path string, out output.Output) error {
logger := r.logger.With("log", path)

f, err := os.Open(path)
if err != nil {
return err
}
defer o.Close()
defer f.Close()

var totalBytes, totalLines int
s := bufio.NewScanner(bufio.NewReader(f))
Expand All @@ -55,18 +68,19 @@ func (r *logRunner) Run(files []string) error {
break
}

r.logger.Debug("Writing packet")
n, err := o.Write(s.Bytes())
logger.Debugw("Sending log line.", "line_number", totalLines+1)
n, err := out.Write(append(s.Bytes(), '\n'))
if err != nil {
return err
}

totalBytes += n
totalLines++
}
if s.Err() != nil {
return s.Err()
}

r.logger.Infow("Log data sent", "sent_bytes", totalBytes, "sent_lines", totalLines)
logger.Infow("Log data sent.", "total_bytes", totalBytes, "total_lines", totalLines)
return nil
}
30 changes: 21 additions & 9 deletions command/pcap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/spf13/cobra"
"go.uber.org/zap"

"github.com/andrewkroh/stream/pkg/cmdutil"
"github.com/andrewkroh/stream/pkg/output"
)

Expand All @@ -22,7 +23,7 @@ func newPCAPRunner(options *output.Options, logger *zap.Logger) *cobra.Command {
cmd: &cobra.Command{
Use: "pcap [pcap data to stream]",
Short: "Stream PCAP payload data",
Args: cobra.ExactArgs(1),
Args: cmdutil.ValidateArgs(cobra.MinimumNArgs(1), cmdutil.RegularFiles),
},
}

Expand All @@ -34,18 +35,30 @@ func newPCAPRunner(options *output.Options, logger *zap.Logger) *cobra.Command {
return r.cmd
}

func (r *pcapRunner) Run(pcapFiles []string) error {
f, err := pcap.OpenOffline(pcapFiles[0])
func (r *pcapRunner) Run(files []string) error {
out, err := output.Initialize(r.out, r.logger, r.cmd.Context())
if err != nil {
return err
}
defer f.Close()
defer out.Close()

for _, f := range files {
if err := r.sendPCAP(f, out); err != nil {
return err
}
}

o, err := output.Initialize(r.out, r.logger, r.cmd.Context())
return nil
}

func (r *pcapRunner) sendPCAP(path string, out output.Output) error {
logger := r.logger.With("pcap", path)

f, err := pcap.OpenOffline(path)
if err != nil {
return err
}
defer o.Close()
defer f.Close()

// Process packets in PCAP and get flow records.
var totalBytes, totalPackets int
Expand All @@ -58,15 +71,14 @@ func (r *pcapRunner) Run(pcapFiles []string) error {
payloadData := packet.TransportLayer().LayerPayload()

// TODO: Rate-limit for UDP.
r.logger.Debug("Writing packet")
n, err := o.Write(payloadData)
n, err := out.Write(payloadData)
if err != nil {
return err
}
totalBytes += n
totalPackets++
}

r.logger.Infow("Sent data", "sent_bytes", totalBytes, "sent_packets", totalPackets)
logger.Infow("Sent PCAP payload data", "total_bytes", totalBytes, "total_packets", totalPackets)
return nil
}
58 changes: 51 additions & 7 deletions command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package command

import (
"context"
"fmt"
"os"
"os/signal"

"github.com/elastic/go-concert/ctxtool/osctx"
"github.com/elastic/go-concert/timed"
"github.com/spf13/cobra"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sys/unix"

"github.com/andrewkroh/stream/pkg/output"

Expand Down Expand Up @@ -38,16 +43,26 @@ func ExecuteContext(ctx context.Context) error {
rootCmd := &cobra.Command{Use: "stream", SilenceUsage: true}

// Global flags.
var outOpts output.Options
rootCmd.PersistentFlags().StringVar(&outOpts.Addr, "addr", "", "destination address")
rootCmd.PersistentFlags().DurationVar(&outOpts.Delay, "delay", 0, "delay streaming")
rootCmd.PersistentFlags().StringVarP(&outOpts.Protocol, "protocol", "p", "tcp", "protocol (tcp/udp/tls)")
rootCmd.PersistentFlags().IntVar(&outOpts.Retries, "retry", 10, "connection retry attempts")
var opts output.Options
rootCmd.PersistentFlags().StringVar(&opts.Addr, "addr", "", "destination address")
rootCmd.PersistentFlags().DurationVar(&opts.Delay, "delay", 0, "delay start after start-signal")
rootCmd.PersistentFlags().StringVarP(&opts.Protocol, "protocol", "p", "tcp", "protocol (tcp/udp/tls)")
rootCmd.PersistentFlags().IntVar(&opts.Retries, "retry", 10, "connection retry attempts for tcp based protocols")
rootCmd.PersistentFlags().StringVarP(&opts.StartSignal, "start-signal", "s", "", "wait for start signal")

// Sub-commands.
rootCmd.AddCommand(newLogRunner(&outOpts, logger))
rootCmd.AddCommand(newPCAPRunner(&outOpts, logger))
rootCmd.AddCommand(newLogRunner(&opts, logger))
rootCmd.AddCommand(newPCAPRunner(&opts, logger))
rootCmd.AddCommand(versionCmd)

// Add common start-up delay logic.
rootCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error {
return multierr.Combine(
waitForStartSignal(&opts, cmd.Context(), logger),
waitForDelay(&opts, cmd.Context(), logger),
)
}

return rootCmd.ExecuteContext(ctx)
}

Expand All @@ -61,3 +76,32 @@ func logger() (*zap.Logger, error) {
}
return log, nil
}

func waitForStartSignal(opts *output.Options, parent context.Context, logger *zap.Logger) error {
if opts.StartSignal == "" {
return nil
}

num := unix.SignalNum(opts.StartSignal)
if num == 0 {
return fmt.Errorf("unknown signal %v", opts.StartSignal)
}

// Wait for the signal or the command context to be done.
logger.Sugar().Infow("Waiting for signal.", "start-signal", opts.StartSignal)
startCtx, _ := osctx.WithSignal(parent, os.Signal(num))
<-startCtx.Done()
return nil
}

func waitForDelay(opts *output.Options, parent context.Context, logger *zap.Logger) error {
if opts.Delay <= 0 {
return nil
}

logger.Sugar().Info("Delaying connection.", "delay", opts.Delay)
if err := timed.Wait(parent, opts.Delay); err != nil {
return fmt.Errorf("delay waiting period was interrupted: %w", err)
}
return nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ require (
github.com/elastic/go-concert v0.0.4
github.com/google/gopacket v1.1.19
github.com/spf13/cobra v1.1.1
go.uber.org/multierr v1.1.0
go.uber.org/zap v1.10.0
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0
)
34 changes: 34 additions & 0 deletions pkg/cmdutil/validate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cmdutil

import (
"fmt"
"os"

"github.com/spf13/cobra"
)

// ValidateArgs combines PositionalArgs validators and runs them serially.
func ValidateArgs(validators ...cobra.PositionalArgs) cobra.PositionalArgs {
return func(cmd *cobra.Command, args []string) error {
for _, f := range validators {
if err := f(cmd, args); err != nil {
return err
}
}
return nil
}
}

// RegularFiles validates that each arg is a regular file.
func RegularFiles(_ *cobra.Command, args []string) error {
for _, f := range args {
info, err := os.Stat(f)
if err != nil {
return fmt.Errorf("arg %q is not a valid file: %w", f, err)
}
if !info.Mode().IsRegular() {
return fmt.Errorf("arg %q is not a regular file", f)
}
}
return nil
}
9 changes: 5 additions & 4 deletions pkg/output/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package output
import "time"

type Options struct {
Addr string
Protocol string
Delay time.Duration
Retries int
Addr string // Destination address (host:port).
Delay time.Duration // Delay start after start signal.
Protocol string // Protocol (udp/tcp/tls).
Retries int // Number of connection retries for tcp based protocols.
StartSignal string // OS signal to wait on before starting.
}
7 changes: 0 additions & 7 deletions pkg/output/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@ func Initialize(opts *Options, logger *zap.SugaredLogger, ctx context.Context) (
return nil, err
}

if opts.Delay > 0 {
logger.Debugw("Delaying connection.", "delay", opts.Delay)
if err = timed.Wait(ctx, opts.Delay); err != nil {
return nil, err
}
}

var dialErr error
for i := 0; i < opts.Retries; i++ {
if ctx.Err() != nil {
Expand Down

0 comments on commit 81bb706

Please sign in to comment.