Skip to content

Commit df102bd

Browse files
authored
[Filebeat][httpjson] Make httpjson use cursor input when using date cursor (#20751) (#21384)
* Fix duplicate import * Move config to its own package * Minor improvements * Fix tests * Create input manager * Change requester to accept and store a cursor * Modify input to be embedded * Create stateless and cursor inputs * Initialize new input manager on publish * Add changelog entry and format files * Move test data folder * Change tests * Apply requested changes (cherry picked from commit 8f9d54b)
1 parent 250c4d5 commit df102bd

17 files changed

+385
-184
lines changed

CHANGELOG.next.asciidoc

+1
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,7 @@ field. You can revert this change by configuring tags for the module and omittin
685685
- Add type and sub_type to panw panos fileset {pull}20912[20912]
686686
- Always attempt community_id processor on zeek module {pull}21155[21155]
687687
- Add related.hosts ecs field to all modules {pull}21160[21160]
688+
- Keep cursor state between httpjson input restarts {pull}20751[20751]
688689

689690
*Heartbeat*
690691

filebeat/input/v2/input-cursor/manager.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626

2727
"github.com/elastic/go-concert/unison"
2828

29-
input "github.com/elastic/beats/v7/filebeat/input/v2"
3029
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
3130
"github.com/elastic/beats/v7/libbeat/common"
3231
"github.com/elastic/beats/v7/libbeat/logp"
@@ -145,7 +144,7 @@ func (cim *InputManager) shutdown() {
145144

146145
// Create builds a new v2.Input using the provided Configure function.
147146
// The Input will run a go-routine per source that has been configured.
148-
func (cim *InputManager) Create(config *common.Config) (input.Input, error) {
147+
func (cim *InputManager) Create(config *common.Config) (v2.Input, error) {
149148
if err := cim.init(); err != nil {
150149
return nil, err
151150
}
@@ -180,7 +179,7 @@ func (cim *InputManager) Create(config *common.Config) (input.Input, error) {
180179

181180
// Lock locks a key for exclusive access and returns an resource that can be used to modify
182181
// the cursor state and unlock the key.
183-
func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error) {
182+
func (cim *InputManager) lock(ctx v2.Context, key string) (*resource, error) {
184183
resource := cim.store.Get(key)
185184
err := lockResource(ctx.Logger, resource, ctx.Cancelation)
186185
if err != nil {
@@ -190,7 +189,7 @@ func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error)
190189
return resource, nil
191190
}
192191

193-
func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error {
192+
func lockResource(log *logp.Logger, resource *resource, canceler v2.Canceler) error {
194193
if !resource.lock.TryLock() {
195194
log.Infof("Resource '%v' currently in use, waiting...", resource.key)
196195
err := resource.lock.LockContext(canceler)

x-pack/filebeat/input/default-inputs/inputs.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
2727
return []v2.Plugin{
2828
cloudfoundry.Plugin(),
2929
http_endpoint.Plugin(),
30-
httpjson.Plugin(),
30+
httpjson.Plugin(log, store),
3131
o365audit.Plugin(log, store),
3232
}
3333
}

x-pack/filebeat/input/httpjson/config.go

+29-29
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import (
1717
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
1818
)
1919

20-
// Config contains information about httpjson configuration
20+
// config contains information about httpjson configuration
2121
type config struct {
22-
OAuth2 *OAuth2 `config:"oauth2"`
22+
OAuth2 *oauth2Config `config:"oauth2"`
2323
APIKey string `config:"api_key"`
2424
AuthenticationScheme string `config:"authentication_scheme"`
2525
HTTPClientTimeout time.Duration `config:"http_client_timeout"`
@@ -30,98 +30,98 @@ type config struct {
3030
JSONObjects string `config:"json_objects_array"`
3131
SplitEventsBy string `config:"split_events_by"`
3232
NoHTTPBody bool `config:"no_http_body"`
33-
Pagination *Pagination `config:"pagination"`
34-
RateLimit *RateLimit `config:"rate_limit"`
33+
Pagination *paginationConfig `config:"pagination"`
34+
RateLimit *rateLimitConfig `config:"rate_limit"`
3535
RetryMax int `config:"retry.max_attempts"`
3636
RetryWaitMin time.Duration `config:"retry.wait_min"`
3737
RetryWaitMax time.Duration `config:"retry.wait_max"`
3838
TLS *tlscommon.Config `config:"ssl"`
39-
URL *URL `config:"url" validate:"required"`
40-
DateCursor *DateCursor `config:"date_cursor"`
39+
URL *urlConfig `config:"url" validate:"required"`
40+
DateCursor *dateCursorConfig `config:"date_cursor"`
4141
}
4242

4343
// Pagination contains information about httpjson pagination settings
44-
type Pagination struct {
44+
type paginationConfig struct {
4545
Enabled *bool `config:"enabled"`
4646
ExtraBodyContent common.MapStr `config:"extra_body_content"`
47-
Header *Header `config:"header"`
47+
Header *headerConfig `config:"header"`
4848
IDField string `config:"id_field"`
4949
RequestField string `config:"req_field"`
5050
URLField string `config:"url_field"`
5151
URL string `config:"url"`
5252
}
5353

5454
// IsEnabled returns true if the `enable` field is set to true in the yaml.
55-
func (p *Pagination) IsEnabled() bool {
55+
func (p *paginationConfig) isEnabled() bool {
5656
return p != nil && (p.Enabled == nil || *p.Enabled)
5757
}
5858

5959
// HTTP Header information for pagination
60-
type Header struct {
60+
type headerConfig struct {
6161
FieldName string `config:"field_name" validate:"required"`
6262
RegexPattern *regexp.Regexp `config:"regex_pattern" validate:"required"`
6363
}
6464

6565
// HTTP Header Rate Limit information
66-
type RateLimit struct {
66+
type rateLimitConfig struct {
6767
Limit string `config:"limit"`
6868
Reset string `config:"reset"`
6969
Remaining string `config:"remaining"`
7070
}
7171

72-
type DateCursor struct {
73-
Enabled *bool `config:"enabled"`
74-
Field string `config:"field"`
75-
URLField string `config:"url_field" validate:"required"`
76-
ValueTemplate *Template `config:"value_template"`
77-
DateFormat string `config:"date_format"`
78-
InitialInterval time.Duration `config:"initial_interval"`
72+
type dateCursorConfig struct {
73+
Enabled *bool `config:"enabled"`
74+
Field string `config:"field"`
75+
URLField string `config:"url_field" validate:"required"`
76+
ValueTemplate *templateConfig `config:"value_template"`
77+
DateFormat string `config:"date_format"`
78+
InitialInterval time.Duration `config:"initial_interval"`
7979
}
8080

81-
type Template struct {
81+
type templateConfig struct {
8282
*template.Template
8383
}
8484

85-
func (t *Template) Unpack(in string) error {
85+
func (t *templateConfig) Unpack(in string) error {
8686
tpl, err := template.New("tpl").Parse(in)
8787
if err != nil {
8888
return err
8989
}
9090

91-
*t = Template{Template: tpl}
91+
*t = templateConfig{Template: tpl}
9292

9393
return nil
9494
}
9595

96-
type URL struct {
96+
type urlConfig struct {
9797
*url.URL
9898
}
9999

100-
func (u *URL) Unpack(in string) error {
100+
func (u *urlConfig) Unpack(in string) error {
101101
parsed, err := url.Parse(in)
102102
if err != nil {
103103
return err
104104
}
105105

106-
*u = URL{URL: parsed}
106+
*u = urlConfig{URL: parsed}
107107

108108
return nil
109109
}
110110

111111
// IsEnabled returns true if the `enable` field is set to true in the yaml.
112-
func (dc *DateCursor) IsEnabled() bool {
112+
func (dc *dateCursorConfig) isEnabled() bool {
113113
return dc != nil && (dc.Enabled == nil || *dc.Enabled)
114114
}
115115

116116
// IsEnabled returns true if the `enable` field is set to true in the yaml.
117-
func (dc *DateCursor) GetDateFormat() string {
117+
func (dc *dateCursorConfig) getDateFormat() string {
118118
if dc.DateFormat == "" {
119119
return time.RFC3339
120120
}
121121
return dc.DateFormat
122122
}
123123

124-
func (dc *DateCursor) Validate() error {
124+
func (dc *dateCursorConfig) Validate() error {
125125
if dc.DateFormat == "" {
126126
return nil
127127
}
@@ -154,15 +154,15 @@ func (c *config) Validate() error {
154154
}
155155
}
156156
}
157-
if c.OAuth2.IsEnabled() {
157+
if c.OAuth2.isEnabled() {
158158
if c.APIKey != "" || c.AuthenticationScheme != "" {
159159
return errors.New("invalid configuration: oauth2 and api_key or authentication_scheme cannot be set simultaneously")
160160
}
161161
}
162162
return nil
163163
}
164164

165-
func defaultConfig() config {
165+
func newDefaultConfig() config {
166166
var c config
167167
c.HTTPMethod = "GET"
168168
c.HTTPClientTimeout = 60 * time.Second

x-pack/filebeat/input/httpjson/config_oauth.go

+37-36
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,32 @@ import (
2020
"golang.org/x/oauth2/google"
2121
)
2222

23-
// An OAuth2Provider represents a supported oauth provider.
24-
type OAuth2Provider string
23+
// An oauth2Provider represents a supported oauth provider.
24+
type oauth2Provider string
2525

2626
const (
27-
OAuth2ProviderDefault OAuth2Provider = "" // OAuth2ProviderDefault means no specific provider is set.
28-
OAuth2ProviderAzure OAuth2Provider = "azure" // OAuth2ProviderAzure AzureAD.
29-
OAuth2ProviderGoogle OAuth2Provider = "google" // OAuth2ProviderGoogle Google.
27+
oauth2ProviderDefault oauth2Provider = "" // OAuth2ProviderDefault means no specific provider is set.
28+
oauth2ProviderAzure oauth2Provider = "azure" // OAuth2ProviderAzure AzureAD.
29+
oauth2ProviderGoogle oauth2Provider = "google" // OAuth2ProviderGoogle Google.
3030
)
3131

32-
func (p *OAuth2Provider) Unpack(in string) error {
33-
*p = OAuth2Provider(in)
32+
func (p *oauth2Provider) Unpack(in string) error {
33+
*p = oauth2Provider(in)
3434
return nil
3535
}
3636

37-
func (p OAuth2Provider) canonical() OAuth2Provider {
38-
return OAuth2Provider(strings.ToLower(string(p)))
37+
func (p oauth2Provider) canonical() oauth2Provider {
38+
return oauth2Provider(strings.ToLower(string(p)))
3939
}
4040

41-
// OAuth2 contains information about oauth2 authentication settings.
42-
type OAuth2 struct {
41+
// oauth2Config contains information about oauth2 authentication settings.
42+
type oauth2Config struct {
4343
// common oauth fields
4444
ClientID string `config:"client.id"`
4545
ClientSecret string `config:"client.secret"`
4646
Enabled *bool `config:"enabled"`
4747
EndpointParams map[string][]string `config:"endpoint_params"`
48-
Provider OAuth2Provider `config:"provider"`
48+
Provider oauth2Provider `config:"provider"`
4949
Scopes []string `config:"scopes"`
5050
TokenURL string `config:"token_url"`
5151

@@ -61,25 +61,26 @@ type OAuth2 struct {
6161
}
6262

6363
// IsEnabled returns true if the `enable` field is set to true in the yaml.
64-
func (o *OAuth2) IsEnabled() bool {
64+
func (o *oauth2Config) isEnabled() bool {
6565
return o != nil && (o.Enabled == nil || *o.Enabled)
6666
}
6767

6868
// Client wraps the given http.Client and returns a new one that will use the oauth authentication.
69-
func (o *OAuth2) Client(ctx context.Context, client *http.Client) (*http.Client, error) {
70-
ctx = context.WithValue(ctx, oauth2.HTTPClient, client)
69+
func (o *oauth2Config) client(ctx context.Context, client *http.Client) (*http.Client, error) {
70+
// only required to let oauth2 library to find our custom client in the context
71+
ctx = context.WithValue(context.Background(), oauth2.HTTPClient, client)
7172

72-
switch o.GetProvider() {
73-
case OAuth2ProviderAzure, OAuth2ProviderDefault:
73+
switch o.getProvider() {
74+
case oauth2ProviderAzure, oauth2ProviderDefault:
7475
creds := clientcredentials.Config{
7576
ClientID: o.ClientID,
7677
ClientSecret: o.ClientSecret,
77-
TokenURL: o.GetTokenURL(),
78+
TokenURL: o.getTokenURL(),
7879
Scopes: o.Scopes,
79-
EndpointParams: o.GetEndpointParams(),
80+
EndpointParams: o.getEndpointParams(),
8081
}
8182
return creds.Client(ctx), nil
82-
case OAuth2ProviderGoogle:
83+
case oauth2ProviderGoogle:
8384
if o.GoogleJWTFile != "" {
8485
cfg, err := google.JWTConfigFromJSON(o.GoogleCredentialsJSON, o.Scopes...)
8586
if err != nil {
@@ -100,9 +101,9 @@ func (o *OAuth2) Client(ctx context.Context, client *http.Client) (*http.Client,
100101
}
101102

102103
// GetTokenURL returns the TokenURL.
103-
func (o *OAuth2) GetTokenURL() string {
104-
switch o.GetProvider() {
105-
case OAuth2ProviderAzure:
104+
func (o *oauth2Config) getTokenURL() string {
105+
switch o.getProvider() {
106+
case oauth2ProviderAzure:
106107
if o.TokenURL == "" {
107108
return endpoints.AzureAD(o.AzureTenantID).TokenURL
108109
}
@@ -112,14 +113,14 @@ func (o *OAuth2) GetTokenURL() string {
112113
}
113114

114115
// GetProvider returns provider in its canonical form.
115-
func (o OAuth2) GetProvider() OAuth2Provider {
116+
func (o oauth2Config) getProvider() oauth2Provider {
116117
return o.Provider.canonical()
117118
}
118119

119120
// GetEndpointParams returns endpoint params with any provider ones combined.
120-
func (o OAuth2) GetEndpointParams() map[string][]string {
121-
switch o.GetProvider() {
122-
case OAuth2ProviderAzure:
121+
func (o oauth2Config) getEndpointParams() map[string][]string {
122+
switch o.getProvider() {
123+
case oauth2ProviderAzure:
123124
if o.AzureResource != "" {
124125
if o.EndpointParams == nil {
125126
o.EndpointParams = map[string][]string{}
@@ -132,26 +133,26 @@ func (o OAuth2) GetEndpointParams() map[string][]string {
132133
}
133134

134135
// Validate checks if oauth2 config is valid.
135-
func (o *OAuth2) Validate() error {
136-
switch o.GetProvider() {
137-
case OAuth2ProviderAzure:
136+
func (o *oauth2Config) Validate() error {
137+
switch o.getProvider() {
138+
case oauth2ProviderAzure:
138139
return o.validateAzureProvider()
139-
case OAuth2ProviderGoogle:
140+
case oauth2ProviderGoogle:
140141
return o.validateGoogleProvider()
141-
case OAuth2ProviderDefault:
142+
case oauth2ProviderDefault:
142143
if o.TokenURL == "" || o.ClientID == "" || o.ClientSecret == "" {
143144
return errors.New("invalid configuration: both token_url and client credentials must be provided")
144145
}
145146
default:
146-
return fmt.Errorf("invalid configuration: unknown provider %q", o.GetProvider())
147+
return fmt.Errorf("invalid configuration: unknown provider %q", o.getProvider())
147148
}
148149
return nil
149150
}
150151

151152
// findDefaultGoogleCredentials will default to google.FindDefaultCredentials and will only be changed for testing purposes
152153
var findDefaultGoogleCredentials = google.FindDefaultCredentials
153154

154-
func (o *OAuth2) validateGoogleProvider() error {
155+
func (o *oauth2Config) validateGoogleProvider() error {
155156
if o.TokenURL != "" || o.ClientID != "" || o.ClientSecret != "" ||
156157
o.AzureTenantID != "" || o.AzureResource != "" || len(o.EndpointParams) > 0 {
157158
return errors.New("invalid configuration: none of token_url and client credentials can be used, use google.credentials_file, google.jwt_file, google.credentials_json or ADC instead")
@@ -191,7 +192,7 @@ func (o *OAuth2) validateGoogleProvider() error {
191192
return fmt.Errorf("invalid configuration: no authentication credentials were configured or detected (ADC)")
192193
}
193194

194-
func (o *OAuth2) populateCredentialsJSONFromFile(file string) error {
195+
func (o *oauth2Config) populateCredentialsJSONFromFile(file string) error {
195196
if _, err := os.Stat(file); os.IsNotExist(err) {
196197
return fmt.Errorf("invalid configuration: the file %q cannot be found", file)
197198
}
@@ -210,7 +211,7 @@ func (o *OAuth2) populateCredentialsJSONFromFile(file string) error {
210211
return nil
211212
}
212213

213-
func (o *OAuth2) validateAzureProvider() error {
214+
func (o *oauth2Config) validateAzureProvider() error {
214215
if o.TokenURL == "" && o.AzureTenantID == "" {
215216
return errors.New("invalid configuration: at least one of token_url or tenant_id must be provided")
216217
}

0 commit comments

Comments
 (0)