From a105c5ed3e49c5ee75d61f8e981902d505c83c4b Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Fri, 21 Aug 2020 14:00:39 +0200 Subject: [PATCH 01/13] Fix duplicate import --- filebeat/input/v2/input-cursor/manager.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/filebeat/input/v2/input-cursor/manager.go b/filebeat/input/v2/input-cursor/manager.go index 2a4310dc778e..766d6f17fa00 100644 --- a/filebeat/input/v2/input-cursor/manager.go +++ b/filebeat/input/v2/input-cursor/manager.go @@ -26,7 +26,6 @@ import ( "github.com/elastic/go-concert/unison" - input "github.com/elastic/beats/v7/filebeat/input/v2" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" @@ -145,7 +144,7 @@ func (cim *InputManager) shutdown() { // Create builds a new v2.Input using the provided Configure function. // The Input will run a go-routine per source that has been configured. -func (cim *InputManager) Create(config *common.Config) (input.Input, error) { +func (cim *InputManager) Create(config *common.Config) (v2.Input, error) { if err := cim.init(); err != nil { return nil, err } @@ -180,7 +179,7 @@ func (cim *InputManager) Create(config *common.Config) (input.Input, error) { // Lock locks a key for exclusive access and returns an resource that can be used to modify // the cursor state and unlock the key. -func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error) { +func (cim *InputManager) lock(ctx v2.Context, key string) (*resource, error) { resource := cim.store.Get(key) err := lockResource(ctx.Logger, resource, ctx.Cancelation) if err != nil { @@ -190,7 +189,7 @@ func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error) return resource, nil } -func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error { +func lockResource(log *logp.Logger, resource *resource, canceler v2.Canceler) error { if !resource.lock.TryLock() { log.Infof("Resource '%v' currently in use, waiting...", resource.key) err := resource.lock.LockContext(canceler) From f18d64c17f6f179bf8a0ed2721ccbd939ada9770 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 24 Aug 2020 09:57:12 +0200 Subject: [PATCH 02/13] Move config to its own package --- .../input/httpjson/{ => config}/config.go | 10 +++++----- .../httpjson/{ => config}/config_oauth.go | 5 +++-- .../{ => config}/config_oauth_test.go | 2 +- .../httpjson/{ => config}/config_test.go | 20 +++++++++---------- x-pack/filebeat/input/httpjson/date_cursor.go | 10 ++++++---- x-pack/filebeat/input/httpjson/input.go | 4 ++-- x-pack/filebeat/input/httpjson/pagination.go | 5 +++-- .../filebeat/input/httpjson/rate_limiter.go | 3 ++- x-pack/filebeat/input/httpjson/requester.go | 3 ++- 9 files changed, 34 insertions(+), 28 deletions(-) rename x-pack/filebeat/input/httpjson/{ => config}/config.go (97%) rename x-pack/filebeat/input/httpjson/{ => config}/config_oauth.go (97%) rename x-pack/filebeat/input/httpjson/{ => config}/config_oauth_test.go (99%) rename x-pack/filebeat/input/httpjson/{ => config}/config_test.go (98%) diff --git a/x-pack/filebeat/input/httpjson/config.go b/x-pack/filebeat/input/httpjson/config/config.go similarity index 97% rename from x-pack/filebeat/input/httpjson/config.go rename to x-pack/filebeat/input/httpjson/config/config.go index 95ca205be0d3..b6eddbd00e61 100644 --- a/x-pack/filebeat/input/httpjson/config.go +++ b/x-pack/filebeat/input/httpjson/config/config.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package httpjson +package config import ( "errors" @@ -18,7 +18,7 @@ import ( ) // Config contains information about httpjson configuration -type config struct { +type Config struct { OAuth2 *OAuth2 `config:"oauth2"` APIKey string `config:"api_key"` AuthenticationScheme string `config:"authentication_scheme"` @@ -132,7 +132,7 @@ func (dc *DateCursor) Validate() error { return nil } -func (c *config) Validate() error { +func (c *Config) Validate() error { switch strings.ToUpper(c.HTTPMethod) { case "GET", "POST": break @@ -162,8 +162,8 @@ func (c *config) Validate() error { return nil } -func defaultConfig() config { - var c config +func Default() Config { + var c Config c.HTTPMethod = "GET" c.HTTPClientTimeout = 60 * time.Second c.RetryWaitMin = 1 * time.Second diff --git a/x-pack/filebeat/input/httpjson/config_oauth.go b/x-pack/filebeat/input/httpjson/config/config_oauth.go similarity index 97% rename from x-pack/filebeat/input/httpjson/config_oauth.go rename to x-pack/filebeat/input/httpjson/config/config_oauth.go index 0ff55dcbc334..a02af8b1542d 100644 --- a/x-pack/filebeat/input/httpjson/config_oauth.go +++ b/x-pack/filebeat/input/httpjson/config/config_oauth.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package httpjson +package config import ( "context" @@ -67,7 +67,8 @@ func (o *OAuth2) IsEnabled() bool { // Client wraps the given http.Client and returns a new one that will use the oauth authentication. func (o *OAuth2) Client(ctx context.Context, client *http.Client) (*http.Client, error) { - ctx = context.WithValue(ctx, oauth2.HTTPClient, client) + // only required to let oauth2 library to find our custom client in the context + ctx = context.WithValue(context.Background(), oauth2.HTTPClient, client) switch o.GetProvider() { case OAuth2ProviderAzure, OAuth2ProviderDefault: diff --git a/x-pack/filebeat/input/httpjson/config_oauth_test.go b/x-pack/filebeat/input/httpjson/config/config_oauth_test.go similarity index 99% rename from x-pack/filebeat/input/httpjson/config_oauth_test.go rename to x-pack/filebeat/input/httpjson/config/config_oauth_test.go index 3fa0eed42845..cc0e43bcd58c 100644 --- a/x-pack/filebeat/input/httpjson/config_oauth_test.go +++ b/x-pack/filebeat/input/httpjson/config/config_oauth_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package httpjson +package config import ( "reflect" diff --git a/x-pack/filebeat/input/httpjson/config_test.go b/x-pack/filebeat/input/httpjson/config/config_test.go similarity index 98% rename from x-pack/filebeat/input/httpjson/config_test.go rename to x-pack/filebeat/input/httpjson/config/config_test.go index 0de073112390..1629f3c0a0d4 100644 --- a/x-pack/filebeat/input/httpjson/config_test.go +++ b/x-pack/filebeat/input/httpjson/config/config_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package httpjson +package config import ( "context" @@ -25,7 +25,7 @@ func TestConfigValidationCase1(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := Default() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. no_http_body and http_request_body cannot coexist.") } @@ -39,7 +39,7 @@ func TestConfigValidationCase2(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := Default() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. no_http_body and pagination.extra_body_content cannot coexist.") } @@ -53,7 +53,7 @@ func TestConfigValidationCase3(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := Default() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. no_http_body and pagination.req_field cannot coexist.") } @@ -66,7 +66,7 @@ func TestConfigValidationCase4(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := Default() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. pagination.header and pagination.req_field cannot coexist.") } @@ -79,7 +79,7 @@ func TestConfigValidationCase5(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := Default() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. pagination.header and pagination.id_field cannot coexist.") } @@ -92,7 +92,7 @@ func TestConfigValidationCase6(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := Default() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. pagination.header and extra_body_content cannot coexist.") } @@ -105,7 +105,7 @@ func TestConfigValidationCase7(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := Default() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. http_method DELETE is not allowed.") } @@ -116,7 +116,7 @@ func TestConfigMustFailWithInvalidURL(t *testing.T) { "url": "::invalid::", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := Default() err := cfg.Unpack(&conf) assert.EqualError(t, err, `parse "::invalid::": missing protocol scheme accessing 'url'`) } @@ -414,7 +414,7 @@ func TestConfigOauth2Validation(t *testing.T) { } cfg := common.MustNewConfigFrom(c.input) - conf := defaultConfig() + conf := Default() err := cfg.Unpack(&conf) switch { diff --git a/x-pack/filebeat/input/httpjson/date_cursor.go b/x-pack/filebeat/input/httpjson/date_cursor.go index 2a9db44bd2a9..ed762985e686 100644 --- a/x-pack/filebeat/input/httpjson/date_cursor.go +++ b/x-pack/filebeat/input/httpjson/date_cursor.go @@ -7,10 +7,12 @@ package httpjson import ( "bytes" "net/url" + "text/template" "time" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" ) type dateCursor struct { @@ -23,10 +25,10 @@ type dateCursor struct { dateFormat string value string - valueTpl *Template + valueTpl *template.Template } -func newDateCursorFromConfig(config config, log *logp.Logger) *dateCursor { +func newDateCursorFromConfig(config config.Config, log *logp.Logger) *dateCursor { c := &dateCursor{ enabled: config.DateCursor.IsEnabled(), url: *config.URL.URL, @@ -41,7 +43,7 @@ func newDateCursorFromConfig(config config, log *logp.Logger) *dateCursor { c.urlField = config.DateCursor.URLField c.initialInterval = config.DateCursor.InitialInterval c.dateFormat = config.DateCursor.GetDateFormat() - c.valueTpl = config.DateCursor.ValueTemplate + c.valueTpl = config.DateCursor.ValueTemplate.Template return c } @@ -66,7 +68,7 @@ func (c *dateCursor) getURL() string { value = dateStr } else { buf := new(bytes.Buffer) - if err := c.valueTpl.Template.Execute(buf, dateStr); err != nil { + if err := c.valueTpl.Execute(buf, dateStr); err != nil { return c.url.String() } value = buf.String() diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 766fa364864d..90477c9d0e37 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/useragent" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" "github.com/elastic/go-concert/ctxtool" "github.com/elastic/go-concert/timed" ) @@ -63,8 +64,7 @@ func (log *retryLogger) Warn(format string, args ...interface{}) { log.log.Warnf(format, args...) } -type httpJSONInput struct { - config config + config config.Config tlsConfig *tlscommon.TLSConfig } diff --git a/x-pack/filebeat/input/httpjson/pagination.go b/x-pack/filebeat/input/httpjson/pagination.go index 9a7bf82b2b4a..f58be4c8de6b 100644 --- a/x-pack/filebeat/input/httpjson/pagination.go +++ b/x-pack/filebeat/input/httpjson/pagination.go @@ -12,18 +12,19 @@ import ( "regexp" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" ) type pagination struct { extraBodyContent common.MapStr - header *Header + header *config.Header idField string requestField string urlField string url string } -func newPaginationFromConfig(config config) *pagination { +func newPaginationFromConfig(config config.Config) *pagination { if !config.Pagination.IsEnabled() { return nil } diff --git a/x-pack/filebeat/input/httpjson/rate_limiter.go b/x-pack/filebeat/input/httpjson/rate_limiter.go index 57d206224ac0..fcf5e3ffd8dc 100644 --- a/x-pack/filebeat/input/httpjson/rate_limiter.go +++ b/x-pack/filebeat/input/httpjson/rate_limiter.go @@ -12,6 +12,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" ) type rateLimiter struct { @@ -22,7 +23,7 @@ type rateLimiter struct { remaining string } -func newRateLimiterFromConfig(config config, log *logp.Logger) *rateLimiter { +func newRateLimiterFromConfig(config config.Config, log *logp.Logger) *rateLimiter { if config.RateLimit == nil { return nil } diff --git a/x-pack/filebeat/input/httpjson/requester.go b/x-pack/filebeat/input/httpjson/requester.go index b5f58179aa0b..7fdef8b315b7 100644 --- a/x-pack/filebeat/input/httpjson/requester.go +++ b/x-pack/filebeat/input/httpjson/requester.go @@ -17,6 +17,7 @@ import ( stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" ) type requestInfo struct { @@ -43,7 +44,7 @@ type requester struct { } func newRequester( - config config, + config config.Config, rateLimiter *rateLimiter, dateCursor *dateCursor, pagination *pagination, From 562626af93a5eb406a2f549903608079e9a8b390 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 24 Aug 2020 09:58:53 +0200 Subject: [PATCH 03/13] Minor improvements --- x-pack/filebeat/input/httpjson/pagination_test.go | 2 +- x-pack/filebeat/input/httpjson/rate_limiter.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/pagination_test.go b/x-pack/filebeat/input/httpjson/pagination_test.go index 9b04de75819b..32e3261c1e6f 100644 --- a/x-pack/filebeat/input/httpjson/pagination_test.go +++ b/x-pack/filebeat/input/httpjson/pagination_test.go @@ -42,7 +42,7 @@ func TestCreateRequestInfoFromBody(t *testing.T) { contentMap: common.MapStr{}, headers: common.MapStr{}, } - err := pagination.setRequestInfoFromBody( + _ = pagination.setRequestInfoFromBody( common.MapStr(m), common.MapStr(m), ri, diff --git a/x-pack/filebeat/input/httpjson/rate_limiter.go b/x-pack/filebeat/input/httpjson/rate_limiter.go index fcf5e3ffd8dc..9c2beca1dcb2 100644 --- a/x-pack/filebeat/input/httpjson/rate_limiter.go +++ b/x-pack/filebeat/input/httpjson/rate_limiter.go @@ -123,7 +123,7 @@ func (r *rateLimiter) getRateLimit(header http.Header) (int64, error) { if err != nil { return 0, fmt.Errorf("failed to parse rate-limit reset value: %w", err) } - if time.Unix(epoch, 0).Sub(time.Now()) <= 0 { + if time.Until(time.Unix(epoch, 0)) <= 0 { return 0, nil } From eca0e109fa97585baa2ad5e8c7b723d3dafe1c4c Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 24 Aug 2020 09:59:24 +0200 Subject: [PATCH 04/13] Fix tests --- x-pack/filebeat/input/httpjson/httpjson_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index b541c16002e6..82102731e257 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -23,7 +23,7 @@ import ( beattest "github.com/elastic/beats/v7/libbeat/publisher/testing" ) -func TestHTTPJSONInput(t *testing.T) { +func TestStatelessHTTPJSONInput(t *testing.T) { testCases := []struct { name string setupServer func(*testing.T, http.HandlerFunc, map[string]interface{}) @@ -224,7 +224,7 @@ func TestHTTPJSONInput(t *testing.T) { cfg := common.MustNewConfigFrom(tc.baseConfig) - input, err := configure(cfg) + input, err := statelessConfigure(cfg) assert.NoError(t, err) assert.Equal(t, "httpjson", input.Name()) From 0cafd4f86b902de13154211edf2dea0f1b49f841 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 24 Aug 2020 09:59:48 +0200 Subject: [PATCH 05/13] Create input manager --- .../filebeat/input/httpjson/input_manager.go | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 x-pack/filebeat/input/httpjson/input_manager.go diff --git a/x-pack/filebeat/input/httpjson/input_manager.go b/x-pack/filebeat/input/httpjson/input_manager.go new file mode 100644 index 000000000000..5c3153658c36 --- /dev/null +++ b/x-pack/filebeat/input/httpjson/input_manager.go @@ -0,0 +1,45 @@ +package httpjson + +import ( + "github.com/elastic/go-concert/unison" + "go.uber.org/multierr" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" +) + +// inputManager wraps one stateless input manager +// and one cursor input manager. It will create one or the other +// based on the config that is passed. +type inputManager struct { + stateless *stateless.InputManager + cursor *cursor.InputManager +} + +var _ v2.InputManager = inputManager{} + +// Init initializes both wrapped input managers. +func (m inputManager) Init(grp unison.Group, mode v2.Mode) error { + return multierr.Append( + m.stateless.Init(grp, mode), + m.cursor.Init(grp, mode), + ) +} + +// Create creates a cursor input manager if the config has a date cursor set up, +// otherwise it creates a stateless input manager. +func (m inputManager) Create(cfg *common.Config) (v2.Input, error) { + var config config.Config + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + + if config.DateCursor != nil { + return m.cursor.Create(cfg) + } + + return m.stateless.Create(cfg) +} From fe2eff09b1f15c72055f6ad4d07ff9175f3222ce Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 24 Aug 2020 10:00:16 +0200 Subject: [PATCH 06/13] Change requester to accept and store a cursor --- x-pack/filebeat/input/httpjson/requester.go | 37 ++++++++++++++++++--- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/requester.go b/x-pack/filebeat/input/httpjson/requester.go index 7fdef8b315b7..8e0aa96e1359 100644 --- a/x-pack/filebeat/input/httpjson/requester.go +++ b/x-pack/filebeat/input/httpjson/requester.go @@ -14,7 +14,7 @@ import ( "net/http" "strings" - stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" @@ -73,7 +73,7 @@ type response struct { } // processHTTPRequest processes HTTP request, and handles pagination if enabled -func (r *requester) processHTTPRequest(ctx context.Context, publisher stateless.Publisher) error { +func (r *requester) processHTTPRequest(ctx context.Context, publisher cursor.Publisher) error { ri := &requestInfo{ url: r.dateCursor.getURL(), contentMap: common.MapStr{}, @@ -211,7 +211,7 @@ func (r *requester) createHTTPRequest(ctx context.Context, ri *requestInfo) (*ht } // processEventArray publishes an event for each object contained in the array. It returns the last object in the array and an error if any. -func (r *requester) processEventArray(publisher stateless.Publisher, events []interface{}) (map[string]interface{}, error) { +func (r *requester) processEventArray(publisher cursor.Publisher, events []interface{}) (map[string]interface{}, error) { var last map[string]interface{} for _, t := range events { switch v := t.(type) { @@ -222,7 +222,9 @@ func (r *requester) processEventArray(publisher stateless.Publisher, events []in if err != nil { return nil, fmt.Errorf("failed to marshal %+v: %w", e, err) } - publisher.Publish(makeEvent(string(d))) + if err := publisher.Publish(makeEvent(string(d)), r.getCursor()); err != nil { + return nil, fmt.Errorf("failed to publish: %w", err) + } } default: return nil, fmt.Errorf("expected only JSON objects in the array but got a %T", v) @@ -274,3 +276,30 @@ func splitEvent(splitKey string, event map[string]interface{}) []map[string]inte return events } + +type cursorState struct { + LastCalledURL config.URL + LastDateCursorValue string +} + +func (r *requester) getCursor() cursorState { + return cursorState{ + LastCalledURL: config.URL{URL: &r.dateCursor.url}, + LastDateCursorValue: r.dateCursor.value, + } +} + +func (r *requester) loadCursor(c *cursor.Cursor, log *logp.Logger) { + if c == nil || c.IsNew() { + return + } + + var cs cursorState + if err := c.Unpack(&cs); err != nil { + log.Errorf("Reset http cursor state. Failed to read from registry: %v", err) + return + } + + r.dateCursor.url = *cs.LastCalledURL.URL + r.dateCursor.value = cs.LastDateCursorValue +} From c368d4044c0e43d5ee19da14efc24e57bfe8f0a6 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 24 Aug 2020 10:01:49 +0200 Subject: [PATCH 07/13] Modify input to be embedded --- x-pack/filebeat/input/httpjson/input.go | 43 +++++++------------------ 1 file changed, 12 insertions(+), 31 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 90477c9d0e37..3bb3ca0f4573 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -15,6 +15,7 @@ import ( "go.uber.org/zap" v2 "github.com/elastic/beats/v7/filebeat/input/v2" + cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -64,6 +65,7 @@ func (log *retryLogger) Warn(format string, args ...interface{}) { log.log.Warnf(format, args...) } +type input struct { config config.Config tlsConfig *tlscommon.TLSConfig } @@ -77,34 +79,9 @@ func Plugin() v2.Plugin { } } -func configure(cfg *common.Config) (stateless.Input, error) { - conf := defaultConfig() - if err := cfg.Unpack(&conf); err != nil { - return nil, err - } - - return newHTTPJSONInput(conf) -} - -func newHTTPJSONInput(config config) (*httpJSONInput, error) { - if err := config.Validate(); err != nil { - return nil, err - } - - tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) - if err != nil { - return nil, err - } - - return &httpJSONInput{ - config: config, - tlsConfig: tlsConfig, - }, nil -} - -func (*httpJSONInput) Name() string { return inputName } +func (*input) Name() string { return inputName } -func (in *httpJSONInput) Test(v2.TestContext) error { +func (in *input) test() error { port := func() string { if in.config.URL.Port() != "" { return in.config.URL.Port() @@ -124,9 +101,11 @@ func (in *httpJSONInput) Test(v2.TestContext) error { return nil } -// Run starts the input and blocks until it ends the execution. -// It will return on context cancellation, any other error will be retried. -func (in *httpJSONInput) Run(ctx v2.Context, publisher stateless.Publisher) error { +func (in *input) run( + ctx v2.Context, + publisher cursor.Publisher, + cursor *cursor.Cursor, +) error { log := ctx.Logger.With("url", in.config.URL) stdCtx := ctxtool.FromCanceller(ctx.Cancelation) @@ -151,6 +130,8 @@ func (in *httpJSONInput) Run(ctx v2.Context, publisher stateless.Publisher) erro log, ) + requester.loadCursor(cursor, log) + // TODO: disallow passing interval = 0 as a mean to run once. if in.config.Interval == 0 { return requester.processHTTPRequest(stdCtx, publisher) @@ -169,7 +150,7 @@ func (in *httpJSONInput) Run(ctx v2.Context, publisher stateless.Publisher) erro return nil } -func (in *httpJSONInput) newHTTPClient(ctx context.Context) (*http.Client, error) { +func (in *input) newHTTPClient(ctx context.Context) (*http.Client, error) { // Make retryable HTTP client client := &retryablehttp.Client{ HTTPClient: &http.Client{ From a0e208a106075f09c30b489c220321ba5420fdae Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 24 Aug 2020 10:02:36 +0200 Subject: [PATCH 08/13] Create stateless and cursor inputs --- .../filebeat/input/httpjson/input_cursor.go | 60 +++++++++++++++++ .../input/httpjson/input_stateless.go | 65 +++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 x-pack/filebeat/input/httpjson/input_cursor.go create mode 100644 x-pack/filebeat/input/httpjson/input_stateless.go diff --git a/x-pack/filebeat/input/httpjson/input_cursor.go b/x-pack/filebeat/input/httpjson/input_cursor.go new file mode 100644 index 000000000000..825b3e74c507 --- /dev/null +++ b/x-pack/filebeat/input/httpjson/input_cursor.go @@ -0,0 +1,60 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" +) + +type cursorInput struct { + *input +} + +func cursorConfigure(cfg *common.Config) ([]cursor.Source, cursor.Input, error) { + conf := config.Default() + if err := cfg.Unpack(&conf); err != nil { + return nil, nil, err + } + + return newCursorInput(conf) +} + +func newCursorInput(config config.Config) ([]cursor.Source, cursor.Input, error) { + if err := config.Validate(); err != nil { + return nil, nil, err + } + + tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) + if err != nil { + return nil, nil, err + } + + return nil, + &cursorInput{ + input: &input{ + config: config, + tlsConfig: tlsConfig, + }, + }, nil +} + +func (in *cursorInput) Test(cursor.Source, v2.TestContext) error { + return in.test() +} + +// Run starts the input and blocks until it ends the execution. +// It will return on context cancellation, any other error will be retried. +func (in *cursorInput) Run( + ctx v2.Context, + _ cursor.Source, + cursor cursor.Cursor, + publisher cursor.Publisher, +) error { + return in.run(ctx, publisher, &cursor) +} diff --git a/x-pack/filebeat/input/httpjson/input_stateless.go b/x-pack/filebeat/input/httpjson/input_stateless.go new file mode 100644 index 000000000000..f89ddb45b624 --- /dev/null +++ b/x-pack/filebeat/input/httpjson/input_stateless.go @@ -0,0 +1,65 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" +) + +type statelessInput struct { + *input +} + +func statelessConfigure(cfg *common.Config) (stateless.Input, error) { + conf := config.Default() + if err := cfg.Unpack(&conf); err != nil { + return nil, err + } + + return newStatelessInput(conf) +} + +func newStatelessInput(config config.Config) (*statelessInput, error) { + if err := config.Validate(); err != nil { + return nil, err + } + + tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) + if err != nil { + return nil, err + } + + return &statelessInput{ + input: &input{ + config: config, + tlsConfig: tlsConfig, + }, + }, nil +} + +func (in *statelessInput) Test(v2.TestContext) error { + return in.test() +} + +type statlessPublisher struct { + wrapped stateless.Publisher +} + +func (pub statlessPublisher) Publish(event beat.Event, _ interface{}) error { + pub.wrapped.Publish(event) + return nil +} + +// Run starts the input and blocks until it ends the execution. +// It will return on context cancellation, any other error will be retried. +func (in *statelessInput) Run(ctx v2.Context, publisher stateless.Publisher) error { + pub := statlessPublisher{wrapped: publisher} + return in.run(ctx, pub, nil) +} From e241c26f68d30c46ae48d7e804c33666b87a7736 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 24 Aug 2020 10:02:51 +0200 Subject: [PATCH 09/13] Initialize new input manager on publish --- x-pack/filebeat/input/default-inputs/inputs.go | 2 +- x-pack/filebeat/input/httpjson/input.go | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/default-inputs/inputs.go b/x-pack/filebeat/input/default-inputs/inputs.go index 1fe245b80f70..cd8562560dac 100644 --- a/x-pack/filebeat/input/default-inputs/inputs.go +++ b/x-pack/filebeat/input/default-inputs/inputs.go @@ -27,7 +27,7 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 return []v2.Plugin{ cloudfoundry.Plugin(), http_endpoint.Plugin(), - httpjson.Plugin(), + httpjson.Plugin(log, store), o365audit.Plugin(log, store), } } diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 3bb3ca0f4573..867dc9b5788b 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -70,12 +70,21 @@ type input struct { tlsConfig *tlscommon.TLSConfig } -func Plugin() v2.Plugin { +func Plugin(log *logp.Logger, store cursor.StateStore) v2.Plugin { + sim := stateless.NewInputManager(statelessConfigure) return v2.Plugin{ Name: inputName, Stability: feature.Beta, Deprecated: false, - Manager: stateless.NewInputManager(configure), + Manager: inputManager{ + stateless: &sim, + cursor: &cursor.InputManager{ + Logger: log, + StateStore: store, + Type: inputName, + Configure: cursorConfigure, + }, + }, } } From 5ff7ed8737b90e79af89dd4d31709a5ec82d914a Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 24 Aug 2020 11:19:15 +0200 Subject: [PATCH 10/13] Add changelog entry and format files --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/httpjson/input_manager.go | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 58ca2acb35c1..f1faeec4afb3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -596,6 +596,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add type and sub_type to panw panos fileset {pull}20912[20912] - Always attempt community_id processor on zeek module {pull}21155[21155] - Add related.hosts ecs field to all modules {pull}21160[21160] +- Keep cursor state between httpjson input restarts {pull}20751[20751] *Heartbeat* diff --git a/x-pack/filebeat/input/httpjson/input_manager.go b/x-pack/filebeat/input/httpjson/input_manager.go index 5c3153658c36..08ac19d6e651 100644 --- a/x-pack/filebeat/input/httpjson/input_manager.go +++ b/x-pack/filebeat/input/httpjson/input_manager.go @@ -1,9 +1,14 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + package httpjson import ( - "github.com/elastic/go-concert/unison" "go.uber.org/multierr" + "github.com/elastic/go-concert/unison" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" From 3d0120df1ed53399a5f8b03f0a4a110b42b95b7f Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 24 Aug 2020 12:05:07 +0200 Subject: [PATCH 11/13] Move test data folder --- .../input/httpjson/{ => config}/testdata/credentials.json | 0 .../input/httpjson/{ => config}/testdata/invalid_credentials.json | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename x-pack/filebeat/input/httpjson/{ => config}/testdata/credentials.json (100%) rename x-pack/filebeat/input/httpjson/{ => config}/testdata/invalid_credentials.json (100%) diff --git a/x-pack/filebeat/input/httpjson/testdata/credentials.json b/x-pack/filebeat/input/httpjson/config/testdata/credentials.json similarity index 100% rename from x-pack/filebeat/input/httpjson/testdata/credentials.json rename to x-pack/filebeat/input/httpjson/config/testdata/credentials.json diff --git a/x-pack/filebeat/input/httpjson/testdata/invalid_credentials.json b/x-pack/filebeat/input/httpjson/config/testdata/invalid_credentials.json similarity index 100% rename from x-pack/filebeat/input/httpjson/testdata/invalid_credentials.json rename to x-pack/filebeat/input/httpjson/config/testdata/invalid_credentials.json From 9376c54f0d26fd79dbf13e60d0a473d2a6e603f4 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Wed, 26 Aug 2020 10:28:46 +0200 Subject: [PATCH 12/13] Change tests --- x-pack/filebeat/input/httpjson/input.go | 16 ++++++++++++++ .../filebeat/input/httpjson/input_cursor.go | 17 ++------------ .../input/httpjson/input_stateless.go | 22 +++++-------------- .../{httpjson_test.go => input_test.go} | 18 ++++++++++----- 4 files changed, 35 insertions(+), 38 deletions(-) rename x-pack/filebeat/input/httpjson/{httpjson_test.go => input_test.go} (96%) diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 867dc9b5788b..b6ef653c4ff3 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -88,6 +88,22 @@ func Plugin(log *logp.Logger, store cursor.StateStore) v2.Plugin { } } +func newInput(config config.Config) (*input, error) { + if err := config.Validate(); err != nil { + return nil, err + } + + tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) + if err != nil { + return nil, err + } + + return &input{ + config: config, + tlsConfig: tlsConfig, + }, nil +} + func (*input) Name() string { return inputName } func (in *input) test() error { diff --git a/x-pack/filebeat/input/httpjson/input_cursor.go b/x-pack/filebeat/input/httpjson/input_cursor.go index 825b3e74c507..f472c27ab04b 100644 --- a/x-pack/filebeat/input/httpjson/input_cursor.go +++ b/x-pack/filebeat/input/httpjson/input_cursor.go @@ -8,7 +8,6 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" ) @@ -21,27 +20,15 @@ func cursorConfigure(cfg *common.Config) ([]cursor.Source, cursor.Input, error) if err := cfg.Unpack(&conf); err != nil { return nil, nil, err } - return newCursorInput(conf) } func newCursorInput(config config.Config) ([]cursor.Source, cursor.Input, error) { - if err := config.Validate(); err != nil { - return nil, nil, err - } - - tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) + input, err := newInput(config) if err != nil { return nil, nil, err } - - return nil, - &cursorInput{ - input: &input{ - config: config, - tlsConfig: tlsConfig, - }, - }, nil + return nil, &cursorInput{input: input}, nil } func (in *cursorInput) Test(cursor.Source, v2.TestContext) error { diff --git a/x-pack/filebeat/input/httpjson/input_stateless.go b/x-pack/filebeat/input/httpjson/input_stateless.go index f89ddb45b624..c1f81cace596 100644 --- a/x-pack/filebeat/input/httpjson/input_stateless.go +++ b/x-pack/filebeat/input/httpjson/input_stateless.go @@ -9,7 +9,6 @@ import ( stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" ) @@ -22,37 +21,26 @@ func statelessConfigure(cfg *common.Config) (stateless.Input, error) { if err := cfg.Unpack(&conf); err != nil { return nil, err } - return newStatelessInput(conf) } func newStatelessInput(config config.Config) (*statelessInput, error) { - if err := config.Validate(); err != nil { - return nil, err - } - - tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) + input, err := newInput(config) if err != nil { return nil, err } - - return &statelessInput{ - input: &input{ - config: config, - tlsConfig: tlsConfig, - }, - }, nil + return &statelessInput{input: input}, nil } func (in *statelessInput) Test(v2.TestContext) error { return in.test() } -type statlessPublisher struct { +type statelessPublisher struct { wrapped stateless.Publisher } -func (pub statlessPublisher) Publish(event beat.Event, _ interface{}) error { +func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error { pub.wrapped.Publish(event) return nil } @@ -60,6 +48,6 @@ func (pub statlessPublisher) Publish(event beat.Event, _ interface{}) error { // Run starts the input and blocks until it ends the execution. // It will return on context cancellation, any other error will be retried. func (in *statelessInput) Run(ctx v2.Context, publisher stateless.Publisher) error { - pub := statlessPublisher{wrapped: publisher} + pub := statelessPublisher{wrapped: publisher} return in.run(ctx, pub, nil) } diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/input_test.go similarity index 96% rename from x-pack/filebeat/input/httpjson/httpjson_test.go rename to x-pack/filebeat/input/httpjson/input_test.go index 82102731e257..fe169b0250f4 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/input_test.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" beattest "github.com/elastic/beats/v7/libbeat/publisher/testing" + "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" ) func TestStatelessHTTPJSONInput(t *testing.T) { @@ -224,20 +225,25 @@ func TestStatelessHTTPJSONInput(t *testing.T) { cfg := common.MustNewConfigFrom(tc.baseConfig) - input, err := statelessConfigure(cfg) + conf := config.Default() + assert.NoError(t, cfg.Unpack(&conf)) + + input, err := newInput(conf) assert.NoError(t, err) assert.Equal(t, "httpjson", input.Name()) - assert.NoError(t, input.Test(v2.TestContext{})) + assert.NoError(t, input.test()) + + chanClient := beattest.NewChanClient(len(tc.expected)) + t.Cleanup(func() { _ = chanClient.Close() }) - pub := beattest.NewChanClient(len(tc.expected)) - t.Cleanup(func() { _ = pub.Close() }) + pub := statelessPublisher{wrapped: chanClient} ctx, cancel := newV2Context() t.Cleanup(cancel) var g errgroup.Group - g.Go(func() error { return input.Run(ctx, pub) }) + g.Go(func() error { return input.run(ctx, pub, nil) }) timeout := time.NewTimer(5 * time.Second) t.Cleanup(func() { _ = timeout.Stop() }) @@ -249,7 +255,7 @@ func TestStatelessHTTPJSONInput(t *testing.T) { case <-timeout.C: t.Errorf("timed out waiting for %d events", len(tc.expected)) return - case got := <-pub.Channel: + case got := <-chanClient.Channel: val, err := got.Fields.GetValue("message") assert.NoError(t, err) assert.JSONEq(t, tc.expected[receivedCount], val.(string)) From ba92901498880ae64deb7e2fbacf4cd0170034b8 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Wed, 16 Sep 2020 14:43:57 +0200 Subject: [PATCH 13/13] Apply requested changes --- .../input/httpjson/{config => }/config.go | 66 ++++++++--------- .../httpjson/{config => }/config_oauth.go | 72 +++++++++---------- .../{config => }/config_oauth_test.go | 40 +++++------ .../httpjson/{config => }/config_test.go | 20 +++--- x-pack/filebeat/input/httpjson/date_cursor.go | 37 +++++----- x-pack/filebeat/input/httpjson/input.go | 66 ++++++++--------- .../filebeat/input/httpjson/input_cursor.go | 42 ++++++++--- .../filebeat/input/httpjson/input_manager.go | 3 +- .../input/httpjson/input_stateless.go | 21 +++--- x-pack/filebeat/input/httpjson/input_test.go | 13 ++-- x-pack/filebeat/input/httpjson/pagination.go | 7 +- .../filebeat/input/httpjson/rate_limiter.go | 3 +- x-pack/filebeat/input/httpjson/requester.go | 28 +++----- .../{config => }/testdata/credentials.json | 0 .../testdata/invalid_credentials.json | 0 15 files changed, 211 insertions(+), 207 deletions(-) rename x-pack/filebeat/input/httpjson/{config => }/config.go (74%) rename x-pack/filebeat/input/httpjson/{config => }/config_oauth.go (78%) rename x-pack/filebeat/input/httpjson/{config => }/config_oauth_test.go (67%) rename x-pack/filebeat/input/httpjson/{config => }/config_test.go (98%) rename x-pack/filebeat/input/httpjson/{config => }/testdata/credentials.json (100%) rename x-pack/filebeat/input/httpjson/{config => }/testdata/invalid_credentials.json (100%) diff --git a/x-pack/filebeat/input/httpjson/config/config.go b/x-pack/filebeat/input/httpjson/config.go similarity index 74% rename from x-pack/filebeat/input/httpjson/config/config.go rename to x-pack/filebeat/input/httpjson/config.go index b6eddbd00e61..ee1445b8a3d8 100644 --- a/x-pack/filebeat/input/httpjson/config/config.go +++ b/x-pack/filebeat/input/httpjson/config.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package config +package httpjson import ( "errors" @@ -17,9 +17,9 @@ import ( "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) -// Config contains information about httpjson configuration -type Config struct { - OAuth2 *OAuth2 `config:"oauth2"` +// config contains information about httpjson configuration +type config struct { + OAuth2 *oauth2Config `config:"oauth2"` APIKey string `config:"api_key"` AuthenticationScheme string `config:"authentication_scheme"` HTTPClientTimeout time.Duration `config:"http_client_timeout"` @@ -30,21 +30,21 @@ type Config struct { JSONObjects string `config:"json_objects_array"` SplitEventsBy string `config:"split_events_by"` NoHTTPBody bool `config:"no_http_body"` - Pagination *Pagination `config:"pagination"` - RateLimit *RateLimit `config:"rate_limit"` + Pagination *paginationConfig `config:"pagination"` + RateLimit *rateLimitConfig `config:"rate_limit"` RetryMax int `config:"retry.max_attempts"` RetryWaitMin time.Duration `config:"retry.wait_min"` RetryWaitMax time.Duration `config:"retry.wait_max"` TLS *tlscommon.Config `config:"ssl"` - URL *URL `config:"url" validate:"required"` - DateCursor *DateCursor `config:"date_cursor"` + URL *urlConfig `config:"url" validate:"required"` + DateCursor *dateCursorConfig `config:"date_cursor"` } // Pagination contains information about httpjson pagination settings -type Pagination struct { +type paginationConfig struct { Enabled *bool `config:"enabled"` ExtraBodyContent common.MapStr `config:"extra_body_content"` - Header *Header `config:"header"` + Header *headerConfig `config:"header"` IDField string `config:"id_field"` RequestField string `config:"req_field"` URLField string `config:"url_field"` @@ -52,76 +52,76 @@ type Pagination struct { } // IsEnabled returns true if the `enable` field is set to true in the yaml. -func (p *Pagination) IsEnabled() bool { +func (p *paginationConfig) isEnabled() bool { return p != nil && (p.Enabled == nil || *p.Enabled) } // HTTP Header information for pagination -type Header struct { +type headerConfig struct { FieldName string `config:"field_name" validate:"required"` RegexPattern *regexp.Regexp `config:"regex_pattern" validate:"required"` } // HTTP Header Rate Limit information -type RateLimit struct { +type rateLimitConfig struct { Limit string `config:"limit"` Reset string `config:"reset"` Remaining string `config:"remaining"` } -type DateCursor struct { - Enabled *bool `config:"enabled"` - Field string `config:"field"` - URLField string `config:"url_field" validate:"required"` - ValueTemplate *Template `config:"value_template"` - DateFormat string `config:"date_format"` - InitialInterval time.Duration `config:"initial_interval"` +type dateCursorConfig struct { + Enabled *bool `config:"enabled"` + Field string `config:"field"` + URLField string `config:"url_field" validate:"required"` + ValueTemplate *templateConfig `config:"value_template"` + DateFormat string `config:"date_format"` + InitialInterval time.Duration `config:"initial_interval"` } -type Template struct { +type templateConfig struct { *template.Template } -func (t *Template) Unpack(in string) error { +func (t *templateConfig) Unpack(in string) error { tpl, err := template.New("tpl").Parse(in) if err != nil { return err } - *t = Template{Template: tpl} + *t = templateConfig{Template: tpl} return nil } -type URL struct { +type urlConfig struct { *url.URL } -func (u *URL) Unpack(in string) error { +func (u *urlConfig) Unpack(in string) error { parsed, err := url.Parse(in) if err != nil { return err } - *u = URL{URL: parsed} + *u = urlConfig{URL: parsed} return nil } // IsEnabled returns true if the `enable` field is set to true in the yaml. -func (dc *DateCursor) IsEnabled() bool { +func (dc *dateCursorConfig) isEnabled() bool { return dc != nil && (dc.Enabled == nil || *dc.Enabled) } // IsEnabled returns true if the `enable` field is set to true in the yaml. -func (dc *DateCursor) GetDateFormat() string { +func (dc *dateCursorConfig) getDateFormat() string { if dc.DateFormat == "" { return time.RFC3339 } return dc.DateFormat } -func (dc *DateCursor) Validate() error { +func (dc *dateCursorConfig) Validate() error { if dc.DateFormat == "" { return nil } @@ -132,7 +132,7 @@ func (dc *DateCursor) Validate() error { return nil } -func (c *Config) Validate() error { +func (c *config) Validate() error { switch strings.ToUpper(c.HTTPMethod) { case "GET", "POST": break @@ -154,7 +154,7 @@ func (c *Config) Validate() error { } } } - if c.OAuth2.IsEnabled() { + if c.OAuth2.isEnabled() { if c.APIKey != "" || c.AuthenticationScheme != "" { return errors.New("invalid configuration: oauth2 and api_key or authentication_scheme cannot be set simultaneously") } @@ -162,8 +162,8 @@ func (c *Config) Validate() error { return nil } -func Default() Config { - var c Config +func newDefaultConfig() config { + var c config c.HTTPMethod = "GET" c.HTTPClientTimeout = 60 * time.Second c.RetryWaitMin = 1 * time.Second diff --git a/x-pack/filebeat/input/httpjson/config/config_oauth.go b/x-pack/filebeat/input/httpjson/config_oauth.go similarity index 78% rename from x-pack/filebeat/input/httpjson/config/config_oauth.go rename to x-pack/filebeat/input/httpjson/config_oauth.go index a02af8b1542d..d7412fd0ba80 100644 --- a/x-pack/filebeat/input/httpjson/config/config_oauth.go +++ b/x-pack/filebeat/input/httpjson/config_oauth.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package config +package httpjson import ( "context" @@ -20,32 +20,32 @@ import ( "golang.org/x/oauth2/google" ) -// An OAuth2Provider represents a supported oauth provider. -type OAuth2Provider string +// An oauth2Provider represents a supported oauth provider. +type oauth2Provider string const ( - OAuth2ProviderDefault OAuth2Provider = "" // OAuth2ProviderDefault means no specific provider is set. - OAuth2ProviderAzure OAuth2Provider = "azure" // OAuth2ProviderAzure AzureAD. - OAuth2ProviderGoogle OAuth2Provider = "google" // OAuth2ProviderGoogle Google. + oauth2ProviderDefault oauth2Provider = "" // OAuth2ProviderDefault means no specific provider is set. + oauth2ProviderAzure oauth2Provider = "azure" // OAuth2ProviderAzure AzureAD. + oauth2ProviderGoogle oauth2Provider = "google" // OAuth2ProviderGoogle Google. ) -func (p *OAuth2Provider) Unpack(in string) error { - *p = OAuth2Provider(in) +func (p *oauth2Provider) Unpack(in string) error { + *p = oauth2Provider(in) return nil } -func (p OAuth2Provider) canonical() OAuth2Provider { - return OAuth2Provider(strings.ToLower(string(p))) +func (p oauth2Provider) canonical() oauth2Provider { + return oauth2Provider(strings.ToLower(string(p))) } -// OAuth2 contains information about oauth2 authentication settings. -type OAuth2 struct { +// oauth2Config contains information about oauth2 authentication settings. +type oauth2Config struct { // common oauth fields ClientID string `config:"client.id"` ClientSecret string `config:"client.secret"` Enabled *bool `config:"enabled"` EndpointParams map[string][]string `config:"endpoint_params"` - Provider OAuth2Provider `config:"provider"` + Provider oauth2Provider `config:"provider"` Scopes []string `config:"scopes"` TokenURL string `config:"token_url"` @@ -61,26 +61,26 @@ type OAuth2 struct { } // IsEnabled returns true if the `enable` field is set to true in the yaml. -func (o *OAuth2) IsEnabled() bool { +func (o *oauth2Config) isEnabled() bool { return o != nil && (o.Enabled == nil || *o.Enabled) } // Client wraps the given http.Client and returns a new one that will use the oauth authentication. -func (o *OAuth2) Client(ctx context.Context, client *http.Client) (*http.Client, error) { +func (o *oauth2Config) client(ctx context.Context, client *http.Client) (*http.Client, error) { // only required to let oauth2 library to find our custom client in the context ctx = context.WithValue(context.Background(), oauth2.HTTPClient, client) - switch o.GetProvider() { - case OAuth2ProviderAzure, OAuth2ProviderDefault: + switch o.getProvider() { + case oauth2ProviderAzure, oauth2ProviderDefault: creds := clientcredentials.Config{ ClientID: o.ClientID, ClientSecret: o.ClientSecret, - TokenURL: o.GetTokenURL(), + TokenURL: o.getTokenURL(), Scopes: o.Scopes, - EndpointParams: o.GetEndpointParams(), + EndpointParams: o.getEndpointParams(), } return creds.Client(ctx), nil - case OAuth2ProviderGoogle: + case oauth2ProviderGoogle: if o.GoogleJWTFile != "" { cfg, err := google.JWTConfigFromJSON(o.GoogleCredentialsJSON, o.Scopes...) if err != nil { @@ -101,9 +101,9 @@ func (o *OAuth2) Client(ctx context.Context, client *http.Client) (*http.Client, } // GetTokenURL returns the TokenURL. -func (o *OAuth2) GetTokenURL() string { - switch o.GetProvider() { - case OAuth2ProviderAzure: +func (o *oauth2Config) getTokenURL() string { + switch o.getProvider() { + case oauth2ProviderAzure: if o.TokenURL == "" { return endpoints.AzureAD(o.AzureTenantID).TokenURL } @@ -113,14 +113,14 @@ func (o *OAuth2) GetTokenURL() string { } // GetProvider returns provider in its canonical form. -func (o OAuth2) GetProvider() OAuth2Provider { +func (o oauth2Config) getProvider() oauth2Provider { return o.Provider.canonical() } // GetEndpointParams returns endpoint params with any provider ones combined. -func (o OAuth2) GetEndpointParams() map[string][]string { - switch o.GetProvider() { - case OAuth2ProviderAzure: +func (o oauth2Config) getEndpointParams() map[string][]string { + switch o.getProvider() { + case oauth2ProviderAzure: if o.AzureResource != "" { if o.EndpointParams == nil { o.EndpointParams = map[string][]string{} @@ -133,18 +133,18 @@ func (o OAuth2) GetEndpointParams() map[string][]string { } // Validate checks if oauth2 config is valid. -func (o *OAuth2) Validate() error { - switch o.GetProvider() { - case OAuth2ProviderAzure: +func (o *oauth2Config) Validate() error { + switch o.getProvider() { + case oauth2ProviderAzure: return o.validateAzureProvider() - case OAuth2ProviderGoogle: + case oauth2ProviderGoogle: return o.validateGoogleProvider() - case OAuth2ProviderDefault: + case oauth2ProviderDefault: if o.TokenURL == "" || o.ClientID == "" || o.ClientSecret == "" { return errors.New("invalid configuration: both token_url and client credentials must be provided") } default: - return fmt.Errorf("invalid configuration: unknown provider %q", o.GetProvider()) + return fmt.Errorf("invalid configuration: unknown provider %q", o.getProvider()) } return nil } @@ -152,7 +152,7 @@ func (o *OAuth2) Validate() error { // findDefaultGoogleCredentials will default to google.FindDefaultCredentials and will only be changed for testing purposes var findDefaultGoogleCredentials = google.FindDefaultCredentials -func (o *OAuth2) validateGoogleProvider() error { +func (o *oauth2Config) validateGoogleProvider() error { if o.TokenURL != "" || o.ClientID != "" || o.ClientSecret != "" || o.AzureTenantID != "" || o.AzureResource != "" || len(o.EndpointParams) > 0 { return errors.New("invalid configuration: none of token_url and client credentials can be used, use google.credentials_file, google.jwt_file, google.credentials_json or ADC instead") @@ -192,7 +192,7 @@ func (o *OAuth2) validateGoogleProvider() error { return fmt.Errorf("invalid configuration: no authentication credentials were configured or detected (ADC)") } -func (o *OAuth2) populateCredentialsJSONFromFile(file string) error { +func (o *oauth2Config) populateCredentialsJSONFromFile(file string) error { if _, err := os.Stat(file); os.IsNotExist(err) { return fmt.Errorf("invalid configuration: the file %q cannot be found", file) } @@ -211,7 +211,7 @@ func (o *OAuth2) populateCredentialsJSONFromFile(file string) error { return nil } -func (o *OAuth2) validateAzureProvider() error { +func (o *oauth2Config) validateAzureProvider() error { if o.TokenURL == "" && o.AzureTenantID == "" { return errors.New("invalid configuration: at least one of token_url or tenant_id must be provided") } diff --git a/x-pack/filebeat/input/httpjson/config/config_oauth_test.go b/x-pack/filebeat/input/httpjson/config_oauth_test.go similarity index 67% rename from x-pack/filebeat/input/httpjson/config/config_oauth_test.go rename to x-pack/filebeat/input/httpjson/config_oauth_test.go index cc0e43bcd58c..67ec63b6650f 100644 --- a/x-pack/filebeat/input/httpjson/config/config_oauth_test.go +++ b/x-pack/filebeat/input/httpjson/config_oauth_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package config +package httpjson import ( "reflect" @@ -11,8 +11,8 @@ import ( func TestProviderCanonical(t *testing.T) { const ( - a OAuth2Provider = "gOoGle" - b OAuth2Provider = "google" + a oauth2Provider = "gOoGle" + b oauth2Provider = "google" ) if a.canonical() != b.canonical() { @@ -21,74 +21,74 @@ func TestProviderCanonical(t *testing.T) { } func TestGetProviderIsCanonical(t *testing.T) { - const expected OAuth2Provider = "google" + const expected oauth2Provider = "google" - oauth2 := OAuth2{Provider: "GOogle"} - if oauth2.GetProvider() != expected { + oauth2 := oauth2Config{Provider: "GOogle"} + if oauth2.getProvider() != expected { t.Fatal("GetProvider should return canonical provider") } } func TestIsEnabled(t *testing.T) { - oauth2 := OAuth2{} - if !oauth2.IsEnabled() { + oauth2 := oauth2Config{} + if !oauth2.isEnabled() { t.Fatal("OAuth2 should be enabled by default") } var enabled = false oauth2.Enabled = &enabled - if oauth2.IsEnabled() { + if oauth2.isEnabled() { t.Fatal("OAuth2 should be disabled") } enabled = true - if !oauth2.IsEnabled() { + if !oauth2.isEnabled() { t.Fatal("OAuth2 should be enabled") } } func TestGetTokenURL(t *testing.T) { const expected = "http://localhost" - oauth2 := OAuth2{TokenURL: "http://localhost"} - if got := oauth2.GetTokenURL(); got != expected { + oauth2 := oauth2Config{TokenURL: "http://localhost"} + if got := oauth2.getTokenURL(); got != expected { t.Fatalf("GetTokenURL should return the provided TokenURL but got %q", got) } } func TestGetTokenURLWithAzure(t *testing.T) { const expectedWithoutTenantID = "http://localhost" - oauth2 := OAuth2{TokenURL: "http://localhost", Provider: "azure"} - if got := oauth2.GetTokenURL(); got != expectedWithoutTenantID { + oauth2 := oauth2Config{TokenURL: "http://localhost", Provider: "azure"} + if got := oauth2.getTokenURL(); got != expectedWithoutTenantID { t.Fatalf("GetTokenURL should return the provided TokenURL but got %q", got) } oauth2.TokenURL = "" oauth2.AzureTenantID = "a_tenant_id" const expectedWithTenantID = "https://login.microsoftonline.com/a_tenant_id/oauth2/v2.0/token" - if got := oauth2.GetTokenURL(); got != expectedWithTenantID { + if got := oauth2.getTokenURL(); got != expectedWithTenantID { t.Fatalf("GetTokenURL should return the generated TokenURL but got %q", got) } } func TestGetEndpointParams(t *testing.T) { var expected = map[string][]string{"foo": {"bar"}} - oauth2 := OAuth2{EndpointParams: map[string][]string{"foo": {"bar"}}} - if got := oauth2.GetEndpointParams(); !reflect.DeepEqual(got, expected) { + oauth2 := oauth2Config{EndpointParams: map[string][]string{"foo": {"bar"}}} + if got := oauth2.getEndpointParams(); !reflect.DeepEqual(got, expected) { t.Fatalf("GetEndpointParams should return the provided EndpointParams but got %q", got) } } func TestGetEndpointParamsWithAzure(t *testing.T) { var expectedWithoutResource = map[string][]string{"foo": {"bar"}} - oauth2 := OAuth2{Provider: "azure", EndpointParams: map[string][]string{"foo": {"bar"}}} - if got := oauth2.GetEndpointParams(); !reflect.DeepEqual(got, expectedWithoutResource) { + oauth2 := oauth2Config{Provider: "azure", EndpointParams: map[string][]string{"foo": {"bar"}}} + if got := oauth2.getEndpointParams(); !reflect.DeepEqual(got, expectedWithoutResource) { t.Fatalf("GetEndpointParams should return the provided EndpointParams but got %q", got) } oauth2.AzureResource = "baz" var expectedWithResource = map[string][]string{"foo": {"bar"}, "resource": {"baz"}} - if got := oauth2.GetEndpointParams(); !reflect.DeepEqual(got, expectedWithResource) { + if got := oauth2.getEndpointParams(); !reflect.DeepEqual(got, expectedWithResource) { t.Fatalf("GetEndpointParams should return the provided EndpointParams but got %q", got) } } diff --git a/x-pack/filebeat/input/httpjson/config/config_test.go b/x-pack/filebeat/input/httpjson/config_test.go similarity index 98% rename from x-pack/filebeat/input/httpjson/config/config_test.go rename to x-pack/filebeat/input/httpjson/config_test.go index 1629f3c0a0d4..85c7c64848df 100644 --- a/x-pack/filebeat/input/httpjson/config/config_test.go +++ b/x-pack/filebeat/input/httpjson/config_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package config +package httpjson import ( "context" @@ -25,7 +25,7 @@ func TestConfigValidationCase1(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := Default() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. no_http_body and http_request_body cannot coexist.") } @@ -39,7 +39,7 @@ func TestConfigValidationCase2(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := Default() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. no_http_body and pagination.extra_body_content cannot coexist.") } @@ -53,7 +53,7 @@ func TestConfigValidationCase3(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := Default() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. no_http_body and pagination.req_field cannot coexist.") } @@ -66,7 +66,7 @@ func TestConfigValidationCase4(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := Default() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. pagination.header and pagination.req_field cannot coexist.") } @@ -79,7 +79,7 @@ func TestConfigValidationCase5(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := Default() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. pagination.header and pagination.id_field cannot coexist.") } @@ -92,7 +92,7 @@ func TestConfigValidationCase6(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := Default() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. pagination.header and extra_body_content cannot coexist.") } @@ -105,7 +105,7 @@ func TestConfigValidationCase7(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := Default() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. http_method DELETE is not allowed.") } @@ -116,7 +116,7 @@ func TestConfigMustFailWithInvalidURL(t *testing.T) { "url": "::invalid::", } cfg := common.MustNewConfigFrom(m) - conf := Default() + conf := newDefaultConfig() err := cfg.Unpack(&conf) assert.EqualError(t, err, `parse "::invalid::": missing protocol scheme accessing 'url'`) } @@ -414,7 +414,7 @@ func TestConfigOauth2Validation(t *testing.T) { } cfg := common.MustNewConfigFrom(c.input) - conf := Default() + conf := newDefaultConfig() err := cfg.Unpack(&conf) switch { diff --git a/x-pack/filebeat/input/httpjson/date_cursor.go b/x-pack/filebeat/input/httpjson/date_cursor.go index ed762985e686..66ca659de788 100644 --- a/x-pack/filebeat/input/httpjson/date_cursor.go +++ b/x-pack/filebeat/input/httpjson/date_cursor.go @@ -12,7 +12,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" ) type dateCursor struct { @@ -24,13 +23,12 @@ type dateCursor struct { initialInterval time.Duration dateFormat string - value string valueTpl *template.Template } -func newDateCursorFromConfig(config config.Config, log *logp.Logger) *dateCursor { +func newDateCursorFromConfig(config config, log *logp.Logger) *dateCursor { c := &dateCursor{ - enabled: config.DateCursor.IsEnabled(), + enabled: config.DateCursor.isEnabled(), url: *config.URL.URL, } @@ -42,23 +40,23 @@ func newDateCursorFromConfig(config config.Config, log *logp.Logger) *dateCursor c.field = config.DateCursor.Field c.urlField = config.DateCursor.URLField c.initialInterval = config.DateCursor.InitialInterval - c.dateFormat = config.DateCursor.GetDateFormat() + c.dateFormat = config.DateCursor.getDateFormat() c.valueTpl = config.DateCursor.ValueTemplate.Template return c } -func (c *dateCursor) getURL() string { +func (c *dateCursor) getURL(prevValue string) string { if !c.enabled { return c.url.String() } var dateStr string - if c.value == "" { + if prevValue == "" { t := timeNow().UTC().Add(-c.initialInterval) dateStr = t.Format(c.dateFormat) } else { - dateStr = c.value + dateStr = prevValue } q := c.url.Query() @@ -76,32 +74,33 @@ func (c *dateCursor) getURL() string { q.Set(c.urlField, value) - c.url.RawQuery = q.Encode() + url := c.url + url.RawQuery = q.Encode() - return c.url.String() + return url.String() } -func (c *dateCursor) advance(m common.MapStr) { +func (c *dateCursor) getNextValue(m common.MapStr) string { if c.field == "" { - c.value = time.Now().UTC().Format(c.dateFormat) - return + return time.Now().UTC().Format(c.dateFormat) } v, err := m.GetValue(c.field) if err != nil { c.log.Warnf("date_cursor field: %q", err) - return + return "" } + switch t := v.(type) { case string: _, err := time.Parse(c.dateFormat, t) if err != nil { c.log.Warn("date_cursor field does not have the expected layout") - return + return "" } - c.value = t - default: - c.log.Warn("date_cursor field must be a string, cursor will not advance") - return + return t } + + c.log.Warn("date_cursor field must be a string, cursor will not advance") + return "" } diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index b6ef653c4ff3..5445197f5630 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -9,6 +9,7 @@ import ( "fmt" "net" "net/http" + "net/url" "time" "github.com/hashicorp/go-retryablehttp" @@ -23,7 +24,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common/useragent" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" "github.com/elastic/go-concert/ctxtool" "github.com/elastic/go-concert/timed" ) @@ -65,11 +65,6 @@ func (log *retryLogger) Warn(format string, args ...interface{}) { log.log.Warnf(format, args...) } -type input struct { - config config.Config - tlsConfig *tlscommon.TLSConfig -} - func Plugin(log *logp.Logger, store cursor.StateStore) v2.Plugin { sim := stateless.NewInputManager(statelessConfigure) return v2.Plugin{ @@ -88,7 +83,7 @@ func Plugin(log *logp.Logger, store cursor.StateStore) v2.Plugin { } } -func newInput(config config.Config) (*input, error) { +func newTLSConfig(config config) (*tlscommon.TLSConfig, error) { if err := config.Validate(); err != nil { return nil, err } @@ -98,56 +93,53 @@ func newInput(config config.Config) (*input, error) { return nil, err } - return &input{ - config: config, - tlsConfig: tlsConfig, - }, nil + return tlsConfig, nil } -func (*input) Name() string { return inputName } - -func (in *input) test() error { +func test(url *url.URL) error { port := func() string { - if in.config.URL.Port() != "" { - return in.config.URL.Port() + if url.Port() != "" { + return url.Port() } - switch in.config.URL.Scheme { + switch url.Scheme { case "https": return "443" } return "80" }() - _, err := net.DialTimeout("tcp", net.JoinHostPort(in.config.URL.Hostname(), port), time.Second) + _, err := net.DialTimeout("tcp", net.JoinHostPort(url.Hostname(), port), time.Second) if err != nil { - return fmt.Errorf("url %q is unreachable", in.config.URL) + return fmt.Errorf("url %q is unreachable", url) } return nil } -func (in *input) run( +func run( ctx v2.Context, + config config, + tlsConfig *tlscommon.TLSConfig, publisher cursor.Publisher, cursor *cursor.Cursor, ) error { - log := ctx.Logger.With("url", in.config.URL) + log := ctx.Logger.With("url", config.URL) stdCtx := ctxtool.FromCanceller(ctx.Cancelation) - httpClient, err := in.newHTTPClient(stdCtx) + httpClient, err := newHTTPClient(stdCtx, config, tlsConfig) if err != nil { return err } - dateCursor := newDateCursorFromConfig(in.config, log) + dateCursor := newDateCursorFromConfig(config, log) - rateLimiter := newRateLimiterFromConfig(in.config, log) + rateLimiter := newRateLimiterFromConfig(config, log) - pagination := newPaginationFromConfig(in.config) + pagination := newPaginationFromConfig(config) requester := newRequester( - in.config, + config, rateLimiter, dateCursor, pagination, @@ -158,11 +150,11 @@ func (in *input) run( requester.loadCursor(cursor, log) // TODO: disallow passing interval = 0 as a mean to run once. - if in.config.Interval == 0 { + if config.Interval == 0 { return requester.processHTTPRequest(stdCtx, publisher) } - err = timed.Periodic(stdCtx, in.config.Interval, func() error { + err = timed.Periodic(stdCtx, config.Interval, func() error { log.Info("Process another repeated request.") if err := requester.processHTTPRequest(stdCtx, publisher); err != nil { log.Error(err) @@ -175,29 +167,29 @@ func (in *input) run( return nil } -func (in *input) newHTTPClient(ctx context.Context) (*http.Client, error) { +func newHTTPClient(ctx context.Context, config config, tlsConfig *tlscommon.TLSConfig) (*http.Client, error) { // Make retryable HTTP client client := &retryablehttp.Client{ HTTPClient: &http.Client{ Transport: &http.Transport{ DialContext: (&net.Dialer{ - Timeout: in.config.HTTPClientTimeout, + Timeout: config.HTTPClientTimeout, }).DialContext, - TLSClientConfig: in.tlsConfig.ToConfig(), + TLSClientConfig: tlsConfig.ToConfig(), DisableKeepAlives: true, }, - Timeout: in.config.HTTPClientTimeout, + Timeout: config.HTTPClientTimeout, }, Logger: newRetryLogger(), - RetryWaitMin: in.config.RetryWaitMin, - RetryWaitMax: in.config.RetryWaitMax, - RetryMax: in.config.RetryMax, + RetryWaitMin: config.RetryWaitMin, + RetryWaitMax: config.RetryWaitMax, + RetryMax: config.RetryMax, CheckRetry: retryablehttp.DefaultRetryPolicy, Backoff: retryablehttp.DefaultBackoff, } - if in.config.OAuth2.IsEnabled() { - return in.config.OAuth2.Client(ctx, client.StandardClient()) + if config.OAuth2.isEnabled() { + return config.OAuth2.client(ctx, client.StandardClient()) } return client.StandardClient(), nil diff --git a/x-pack/filebeat/input/httpjson/input_cursor.go b/x-pack/filebeat/input/httpjson/input_cursor.go index f472c27ab04b..d18a91f39183 100644 --- a/x-pack/filebeat/input/httpjson/input_cursor.go +++ b/x-pack/filebeat/input/httpjson/input_cursor.go @@ -8,40 +8,60 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) -type cursorInput struct { - *input +type cursorInput struct{} + +func (cursorInput) Name() string { + return "httpjson-cursor" +} + +type source struct { + config config + tlsConfig *tlscommon.TLSConfig +} + +func (src source) Name() string { + return src.config.URL.String() } func cursorConfigure(cfg *common.Config) ([]cursor.Source, cursor.Input, error) { - conf := config.Default() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err != nil { return nil, nil, err } return newCursorInput(conf) } -func newCursorInput(config config.Config) ([]cursor.Source, cursor.Input, error) { - input, err := newInput(config) +func newCursorInput(config config) ([]cursor.Source, cursor.Input, error) { + tlsConfig, err := newTLSConfig(config) if err != nil { return nil, nil, err } - return nil, &cursorInput{input: input}, nil + // we only allow one url per config, if we wanted to allow more than one + // each source should hold only one url + return []cursor.Source{ + &source{config: config, + tlsConfig: tlsConfig, + }, + }, + &cursorInput{}, + nil } -func (in *cursorInput) Test(cursor.Source, v2.TestContext) error { - return in.test() +func (in *cursorInput) Test(src cursor.Source, _ v2.TestContext) error { + return test((src.(*source)).config.URL.URL) } // Run starts the input and blocks until it ends the execution. // It will return on context cancellation, any other error will be retried. func (in *cursorInput) Run( ctx v2.Context, - _ cursor.Source, + src cursor.Source, cursor cursor.Cursor, publisher cursor.Publisher, ) error { - return in.run(ctx, publisher, &cursor) + s := src.(*source) + return run(ctx, s.config, s.tlsConfig, publisher, &cursor) } diff --git a/x-pack/filebeat/input/httpjson/input_manager.go b/x-pack/filebeat/input/httpjson/input_manager.go index 08ac19d6e651..21f5066dc052 100644 --- a/x-pack/filebeat/input/httpjson/input_manager.go +++ b/x-pack/filebeat/input/httpjson/input_manager.go @@ -13,7 +13,6 @@ import ( cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" ) // inputManager wraps one stateless input manager @@ -37,7 +36,7 @@ func (m inputManager) Init(grp unison.Group, mode v2.Mode) error { // Create creates a cursor input manager if the config has a date cursor set up, // otherwise it creates a stateless input manager. func (m inputManager) Create(cfg *common.Config) (v2.Input, error) { - var config config.Config + var config config if err := cfg.Unpack(&config); err != nil { return nil, err } diff --git a/x-pack/filebeat/input/httpjson/input_stateless.go b/x-pack/filebeat/input/httpjson/input_stateless.go index c1f81cace596..c7ebf6c3d4c0 100644 --- a/x-pack/filebeat/input/httpjson/input_stateless.go +++ b/x-pack/filebeat/input/httpjson/input_stateless.go @@ -9,31 +9,36 @@ import ( stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) type statelessInput struct { - *input + config config + tlsConfig *tlscommon.TLSConfig +} + +func (statelessInput) Name() string { + return "httpjson-stateless" } func statelessConfigure(cfg *common.Config) (stateless.Input, error) { - conf := config.Default() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err != nil { return nil, err } return newStatelessInput(conf) } -func newStatelessInput(config config.Config) (*statelessInput, error) { - input, err := newInput(config) +func newStatelessInput(config config) (*statelessInput, error) { + tlsConfig, err := newTLSConfig(config) if err != nil { return nil, err } - return &statelessInput{input: input}, nil + return &statelessInput{config: config, tlsConfig: tlsConfig}, nil } func (in *statelessInput) Test(v2.TestContext) error { - return in.test() + return test(in.config.URL.URL) } type statelessPublisher struct { @@ -49,5 +54,5 @@ func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error { // It will return on context cancellation, any other error will be retried. func (in *statelessInput) Run(ctx v2.Context, publisher stateless.Publisher) error { pub := statelessPublisher{wrapped: publisher} - return in.run(ctx, pub, nil) + return run(ctx, in.config, in.tlsConfig, pub, nil) } diff --git a/x-pack/filebeat/input/httpjson/input_test.go b/x-pack/filebeat/input/httpjson/input_test.go index fe169b0250f4..242811d27953 100644 --- a/x-pack/filebeat/input/httpjson/input_test.go +++ b/x-pack/filebeat/input/httpjson/input_test.go @@ -21,7 +21,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" beattest "github.com/elastic/beats/v7/libbeat/publisher/testing" - "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" ) func TestStatelessHTTPJSONInput(t *testing.T) { @@ -225,25 +224,23 @@ func TestStatelessHTTPJSONInput(t *testing.T) { cfg := common.MustNewConfigFrom(tc.baseConfig) - conf := config.Default() + conf := newDefaultConfig() assert.NoError(t, cfg.Unpack(&conf)) - input, err := newInput(conf) + input, err := newStatelessInput(conf) assert.NoError(t, err) - assert.Equal(t, "httpjson", input.Name()) - assert.NoError(t, input.test()) + assert.Equal(t, "httpjson-stateless", input.Name()) + assert.NoError(t, input.Test(v2.TestContext{})) chanClient := beattest.NewChanClient(len(tc.expected)) t.Cleanup(func() { _ = chanClient.Close() }) - pub := statelessPublisher{wrapped: chanClient} - ctx, cancel := newV2Context() t.Cleanup(cancel) var g errgroup.Group - g.Go(func() error { return input.run(ctx, pub, nil) }) + g.Go(func() error { return input.Run(ctx, chanClient) }) timeout := time.NewTimer(5 * time.Second) t.Cleanup(func() { _ = timeout.Stop() }) diff --git a/x-pack/filebeat/input/httpjson/pagination.go b/x-pack/filebeat/input/httpjson/pagination.go index f58be4c8de6b..020bc783055c 100644 --- a/x-pack/filebeat/input/httpjson/pagination.go +++ b/x-pack/filebeat/input/httpjson/pagination.go @@ -12,20 +12,19 @@ import ( "regexp" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" ) type pagination struct { extraBodyContent common.MapStr - header *config.Header + header *headerConfig idField string requestField string urlField string url string } -func newPaginationFromConfig(config config.Config) *pagination { - if !config.Pagination.IsEnabled() { +func newPaginationFromConfig(config config) *pagination { + if !config.Pagination.isEnabled() { return nil } return &pagination{ diff --git a/x-pack/filebeat/input/httpjson/rate_limiter.go b/x-pack/filebeat/input/httpjson/rate_limiter.go index 9c2beca1dcb2..93c2b4a3fe7d 100644 --- a/x-pack/filebeat/input/httpjson/rate_limiter.go +++ b/x-pack/filebeat/input/httpjson/rate_limiter.go @@ -12,7 +12,6 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" ) type rateLimiter struct { @@ -23,7 +22,7 @@ type rateLimiter struct { remaining string } -func newRateLimiterFromConfig(config config.Config, log *logp.Logger) *rateLimiter { +func newRateLimiterFromConfig(config config, log *logp.Logger) *rateLimiter { if config.RateLimit == nil { return nil } diff --git a/x-pack/filebeat/input/httpjson/requester.go b/x-pack/filebeat/input/httpjson/requester.go index 8e0aa96e1359..df0a1efb1ebf 100644 --- a/x-pack/filebeat/input/httpjson/requester.go +++ b/x-pack/filebeat/input/httpjson/requester.go @@ -17,7 +17,6 @@ import ( cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/config" ) type requestInfo struct { @@ -41,10 +40,12 @@ type requester struct { authScheme string jsonObjects string splitEventsBy string + + cursorState cursorState } func newRequester( - config config.Config, + config config, rateLimiter *rateLimiter, dateCursor *dateCursor, pagination *pagination, @@ -75,7 +76,7 @@ type response struct { // processHTTPRequest processes HTTP request, and handles pagination if enabled func (r *requester) processHTTPRequest(ctx context.Context, publisher cursor.Publisher) error { ri := &requestInfo{ - url: r.dateCursor.getURL(), + url: r.dateCursor.getURL(r.cursorState.LastDateCursorValue), contentMap: common.MapStr{}, headers: r.headers, } @@ -167,7 +168,7 @@ func (r *requester) processHTTPRequest(ctx context.Context, publisher cursor.Pub } if lastObj != nil && r.dateCursor.enabled { - r.dateCursor.advance(common.MapStr(lastObj)) + r.updateCursorState(ri.url, r.dateCursor.getNextValue(common.MapStr(lastObj))) } return nil @@ -222,7 +223,7 @@ func (r *requester) processEventArray(publisher cursor.Publisher, events []inter if err != nil { return nil, fmt.Errorf("failed to marshal %+v: %w", e, err) } - if err := publisher.Publish(makeEvent(string(d)), r.getCursor()); err != nil { + if err := publisher.Publish(makeEvent(string(d)), r.cursorState); err != nil { return nil, fmt.Errorf("failed to publish: %w", err) } } @@ -278,15 +279,13 @@ func splitEvent(splitKey string, event map[string]interface{}) []map[string]inte } type cursorState struct { - LastCalledURL config.URL + LastCalledURL string LastDateCursorValue string } -func (r *requester) getCursor() cursorState { - return cursorState{ - LastCalledURL: config.URL{URL: &r.dateCursor.url}, - LastDateCursorValue: r.dateCursor.value, - } +func (r *requester) updateCursorState(url, value string) { + r.cursorState.LastCalledURL = url + r.cursorState.LastDateCursorValue = value } func (r *requester) loadCursor(c *cursor.Cursor, log *logp.Logger) { @@ -294,12 +293,7 @@ func (r *requester) loadCursor(c *cursor.Cursor, log *logp.Logger) { return } - var cs cursorState - if err := c.Unpack(&cs); err != nil { + if err := c.Unpack(&r.cursorState); err != nil { log.Errorf("Reset http cursor state. Failed to read from registry: %v", err) - return } - - r.dateCursor.url = *cs.LastCalledURL.URL - r.dateCursor.value = cs.LastDateCursorValue } diff --git a/x-pack/filebeat/input/httpjson/config/testdata/credentials.json b/x-pack/filebeat/input/httpjson/testdata/credentials.json similarity index 100% rename from x-pack/filebeat/input/httpjson/config/testdata/credentials.json rename to x-pack/filebeat/input/httpjson/testdata/credentials.json diff --git a/x-pack/filebeat/input/httpjson/config/testdata/invalid_credentials.json b/x-pack/filebeat/input/httpjson/testdata/invalid_credentials.json similarity index 100% rename from x-pack/filebeat/input/httpjson/config/testdata/invalid_credentials.json rename to x-pack/filebeat/input/httpjson/testdata/invalid_credentials.json