Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Elasticsearch client for observer only if needed #6407

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/controller/elasticsearch/client/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ func versioned(b *baseClient, v version.Version) Client {
}
}

func (c *baseClient) BasicAuthUser() BasicAuth {
return c.User
}

func (c *baseClient) CaCerts() []*x509.Certificate {
return c.caCerts
}

func (c *baseClient) URL() string {
return c.Endpoint
}
4 changes: 4 additions & 0 deletions pkg/controller/elasticsearch/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ type Client interface {
// The Elasticsearch endpoint will be added automatically to the request URL which should therefore just be the path
// with a leading /
Request(ctx context.Context, r *http.Request) (*http.Response, error)
// BasicAuthUser returns the BasicAuthUser configured for this client
BasicAuthUser() BasicAuth
// CaCerts returns the CaCerts configured for this client
CaCerts() []*x509.Certificate
// Version returns the Elasticsearch version this client is constructed for which should equal the minimal version
// in the cluster.
Version() version.Version
Expand Down
13 changes: 13 additions & 0 deletions pkg/controller/elasticsearch/client/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package client

import (
"crypto/x509"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -33,6 +34,18 @@ func NewMockClientWithUser(v version.Version, u BasicAuth, fn RoundTripFunc) Cli
return versioned(baseClient, v)
}

func NewFullMockClient(v version.Version, u BasicAuth, fn RoundTripFunc, endpoint string, caCerts []*x509.Certificate) Client {
baseClient := &baseClient{
HTTP: &http.Client{
Transport: fn,
},
Endpoint: endpoint,
User: u,
caCerts: caCerts,
}
return versioned(baseClient, v)
}

func NewMockResponse(statusCode int, r *http.Request, body string) *http.Response {
return &http.Response{
StatusCode: statusCode,
Expand Down
33 changes: 32 additions & 1 deletion pkg/controller/elasticsearch/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results {
observedState := d.Observers.ObservedStateResolver(
ctx,
d.ES,
d.newElasticsearchClient(
d.elasticsearchClientProvider(
ctx,
resourcesState,
controllerUser,
Expand Down Expand Up @@ -391,6 +391,22 @@ func (d *defaultDriver) newElasticsearchClient(
)
}

func (d *defaultDriver) elasticsearchClientProvider(
ctx context.Context,
state *reconcile.ResourcesState,
user esclient.BasicAuth,
v version.Version,
caCerts []*x509.Certificate,
) func(existingEsClient esclient.Client) esclient.Client {
return func(existingEsClient esclient.Client) esclient.Client {
url := services.ElasticsearchURL(d.ES, state.CurrentPodsByPhase[corev1.PodRunning])
if isEsClientUpToDate(existingEsClient, v, user, url, caCerts) {
return existingEsClient
}
return d.newElasticsearchClient(ctx, state, user, v, caCerts)
}
}

// maybeSetServiceAccountsOrchestrationHint attempts to update an orchestration hint to let the association controllers
// know whether all the nodes in the cluster are ready to authenticate service accounts.
func (d *defaultDriver) maybeSetServiceAccountsOrchestrationHint(
Expand Down Expand Up @@ -523,3 +539,18 @@ func esReachableConditionMessage(internalService *corev1.Service, isServiceReady
return fmt.Sprintf("Service %s/%s has endpoints", internalService.Namespace, internalService.Name)
}
}

func isEsClientUpToDate(esClient esclient.Client, version version.Version, user esclient.BasicAuth, url string, caCerts []*x509.Certificate) bool {
if esClient == nil {
return false
}
if len(esClient.CaCerts()) != len(caCerts) {
return false
}
for i := range esClient.CaCerts() {
if !esClient.CaCerts()[i].Equal(caCerts[i]) {
return false
}
}
return esClient.Version().Equals(version) && esClient.BasicAuthUser() == user && esClient.URL() == url
}
84 changes: 84 additions & 0 deletions pkg/controller/elasticsearch/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ package driver

import (
"context"
"crypto/x509"
"net/http"
"testing"

"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/version"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -19,6 +23,86 @@ import (
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/set"
)

func Test_isEsClientUpToDate(t *testing.T) {
defaultVersion := version.MustParse("8.6.1")
defaultUser := esclient.BasicAuth{Name: "foo", Password: "bar"}
defaultURL := "https://foo.bar"
defaultCaCerts := []*x509.Certificate{{Raw: []byte("foo")}}
defaultEsClient := esclient.NewFullMockClient(defaultVersion, defaultUser,
func(req *http.Request) *http.Response { return nil },
defaultURL, defaultCaCerts)
tests := []struct {
name string
esClient esclient.Client
version version.Version
user esclient.BasicAuth
url string
caCerts []*x509.Certificate
want bool
}{
{
name: "A new client is created if none is passed",
esClient: nil,
version: defaultVersion,
user: defaultUser,
url: defaultURL,
caCerts: defaultCaCerts,
want: false,
},
{
name: "A new client is created if the version does not match",
esClient: defaultEsClient,
version: version.MustParse("8.6.0"),
user: defaultUser,
url: defaultURL,
caCerts: defaultCaCerts,
want: false,
},
{
name: "A new client is created if the user does not match",
esClient: defaultEsClient,
version: defaultVersion,
user: esclient.BasicAuth{Name: "foo", Password: "changed"},
url: defaultURL,
caCerts: defaultCaCerts,
want: false,
},
{
name: "A new client is created if the url does not match",
esClient: defaultEsClient,
version: defaultVersion,
user: defaultUser,
url: "https://foo.com",
caCerts: defaultCaCerts,
want: false,
},
{
name: "A new client is created if the caCerts do not match",
esClient: defaultEsClient,
version: defaultVersion,
user: defaultUser,
url: defaultURL,
caCerts: []*x509.Certificate{{Raw: []byte("bar")}},
want: false,
},
{
name: "The client is reused if nothing has changed",
esClient: defaultEsClient,
version: defaultVersion,
user: defaultUser,
url: defaultURL,
caCerts: defaultCaCerts,
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := isEsClientUpToDate(tt.esClient, tt.version, tt.user, tt.url, tt.caCerts)
assert.Equal(t, tt.want, actual)
})
}
}

func Test_esReachableConditionMessage(t *testing.T) {
type args struct {
internalService *corev1.Service
Expand Down
16 changes: 11 additions & 5 deletions pkg/controller/elasticsearch/observer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func NewManager(defaultInterval time.Duration, tracer *apm.Tracer) *Manager {
func (m *Manager) ObservedStateResolver(
ctx context.Context,
cluster esv1.Elasticsearch,
esClient client.Client,
esClientProvider func(client.Client) client.Client,
isServiceReady bool,
) func() esv1.ElasticsearchHealth {
observer := m.Observe(ctx, cluster, esClient, isServiceReady)
observer := m.Observe(ctx, cluster, esClientProvider, isServiceReady)
return func() esv1.ElasticsearchHealth {
return observer.LastHealth()
}
Expand All @@ -67,13 +67,20 @@ func (m *Manager) getObserver(key types.NamespacedName) (*Observer, bool) {

// Observe gets or create a cluster state observer for the given cluster
// In case something has changed in the given esClient (eg. different caCert), the observer is recreated accordingly
func (m *Manager) Observe(ctx context.Context, cluster esv1.Elasticsearch, esClient client.Client, isServiceReady bool) *Observer {
func (m *Manager) Observe(ctx context.Context, cluster esv1.Elasticsearch, esClientProvider func(client.Client) client.Client, isServiceReady bool) *Observer {
defer tracing.Span(&ctx)()
nsName := k8s.ExtractNamespacedName(&cluster)
settings := m.extractObserverSettings(ctx, cluster)

observer, exists := m.getObserver(nsName)

var esClient client.Client
if exists {
esClient = esClientProvider(observer.esClient)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are effectively doing the comparison twice now here and on L88. Not sure if it worth optimising this further Equals is pretty fast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for readability it is better to carry out the comparison twice.

} else {
esClient = esClientProvider(nil)
}

switch {
case !exists:
// This Elasticsearch resource has not being observed yet, create the observer and maybe do a first observation.
Expand All @@ -85,8 +92,7 @@ func (m *Manager) Observe(ctx context.Context, cluster esv1.Elasticsearch, esCli
// in case asynchronous observation has been disabled ensure at least one observation at reconciliation time.
return m.getAndObserveSynchronously(ctx, nsName)
default:
// No change, close the provided Client and return the existing observer.
esClient.Close()
// No change, return the existing observer.
return observer
}

Expand Down
13 changes: 8 additions & 5 deletions pkg/controller/elasticsearch/observer/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func TestManager_Observe(t *testing.T) {
if initial, exists := tt.initiallyObserved[tt.clusterToObserve]; exists {
initialCreationTime = initial.creationTime
}
observer := m.Observe(context.Background(), esObject(tt.clusterToObserve), tt.clusterToObserveClient, true)
esClientProvider := func(existingClient client.Client) client.Client { return tt.clusterToObserveClient }
observer := m.Observe(context.Background(), esObject(tt.clusterToObserve), esClientProvider, true)
// returned observer should be the correct one
require.Equal(t, tt.clusterToObserve, observer.cluster)
// list of observers should have been updated
Expand Down Expand Up @@ -180,9 +181,10 @@ func TestManager_ObserveSync(t *testing.T) {
esClient := flappingEsClient()
name := cluster("es1")
cluster := esObject(name)
esClientProvider := func(existingClient client.Client) client.Client { return esClient }
results := []esv1.ElasticsearchHealth{
tt.manager.ObservedStateResolver(context.Background(), cluster, esClient, true)(),
tt.manager.ObservedStateResolver(context.Background(), cluster, esClient, true)(),
tt.manager.ObservedStateResolver(context.Background(), cluster, esClientProvider, true)(),
tt.manager.ObservedStateResolver(context.Background(), cluster, esClientProvider, true)(),
}
require.Equal(t, tt.expectedHealth, results)
tt.manager.StopObserving(name) // let's clean up the go-routines
Expand Down Expand Up @@ -276,10 +278,11 @@ func TestManager_AddObservationListener(t *testing.T) {
<-eventsCluster2
close(doneCh)
}()
esClientProvider := func(existingClient client.Client) client.Client { return fakeEsClient200(client.BasicAuth{}) }
// observe 2 clusters
obs1 := m.Observe(ctx, cluster1, fakeEsClient200(client.BasicAuth{}), true)
obs1 := m.Observe(ctx, cluster1, esClientProvider, true)
defer obs1.Stop()
obs2 := m.Observe(ctx, cluster2, fakeEsClient200(client.BasicAuth{}), true)
obs2 := m.Observe(ctx, cluster2, esClientProvider, true)
defer obs2.Stop()
<-doneCh
}
Expand Down