Skip to content

Commit

Permalink
fix the way endpoint resources get created to make expose tests pass.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmurret committed Nov 7, 2023
1 parent a2f3e89 commit 43ced61
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 49 deletions.
18 changes: 7 additions & 11 deletions agent/xds/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestAllResourcesFromSnapshot(t *testing.T) {
if tt.alsoRunTestForV2 {
generator := xdsv2.NewResourceGenerator(testutil.Logger(t))

converter := proxystateconverter.NewConverter(testutil.Logger(t), &mockCfgFetcher{addressLan: "10.10.10.10"})
converter := proxystateconverter.NewConverter(testutil.Logger(t), &mockCfgFetcher{addressLan: "192.0.2.1"})
proxyState, err := converter.ProxyStateFromSnapshot(snap)
require.NoError(t, err)

Expand Down Expand Up @@ -805,8 +805,7 @@ func getExposePathGoldenTestCases() []goldenTestCase {
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotExposeConfig(t, nil)
},
// TODO(jm): enable with https://github.com/hashicorp/consul/pull/19459
alsoRunTestForV2: false,
alsoRunTestForV2: true,
},
{
name: "downstream-service-with-unix-sockets",
Expand All @@ -833,8 +832,7 @@ func getExposePathGoldenTestCases() []goldenTestCase {
}
})
},
// TODO(jm): enable with https://github.com/hashicorp/consul/pull/19459
alsoRunTestForV2: false,
alsoRunTestForV2: true,
},
{
name: "expose-checks",
Expand All @@ -844,14 +842,12 @@ func getExposePathGoldenTestCases() []goldenTestCase {
return "192.0.2.1"
})
},
// TODO(jm): enable with https://github.com/hashicorp/consul/pull/19459
alsoRunTestForV2: false,
alsoRunTestForV2: true,
},
{
name: "expose-paths-grpc-new-cluster-http1",
create: proxycfg.TestConfigSnapshotGRPCExposeHTTP1,
// TODO(jm): enable with https://github.com/hashicorp/consul/pull/19459
alsoRunTestForV2: false,
name: "expose-paths-grpc-new-cluster-http1",
create: proxycfg.TestConfigSnapshotGRPCExposeHTTP1,
alsoRunTestForV2: true,
},
{
// NOTE: if IPv6 is not supported in the kernel per
Expand Down
90 changes: 59 additions & 31 deletions agent/xdsv2/cluster_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
envoy_aggregate_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/aggregate/v3"
envoy_upstreams_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3"
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
Expand All @@ -18,39 +19,47 @@ import (
"github.com/hashicorp/consul/proto-public/pbmesh/v2beta1/pbproxystate"
)

func (pr *ProxyResources) makeClusters(name string) (map[string]proto.Message, error) {
func (pr *ProxyResources) makeClustersAndEndpoints(name string) (map[string]proto.Message, map[string]proto.Message, error) {
envoyClusters := make(map[string]proto.Message)
envoyEndpoints := make(map[string]proto.Message)
proxyStateCluster, ok := pr.proxyState.Clusters[name]
if !ok {
return nil, fmt.Errorf("cluster %q not found", name)
return nil, nil, fmt.Errorf("cluster %q not found", name)
}

switch proxyStateCluster.Group.(type) {
case *pbproxystate.Cluster_FailoverGroup:
fg := proxyStateCluster.GetFailoverGroup()
clusters, err := pr.makeEnvoyAggregateCluster(name, proxyStateCluster.Protocol, fg)
clusters, eps, err := pr.makeEnvoyAggregateCluster(name, proxyStateCluster.Protocol, fg)
if err != nil {
return nil, err
return nil, nil, err
}
for _, c := range clusters {
envoyClusters[c.Name] = c
if ep, ok := eps[c.Name]; ok {
envoyEndpoints[c.Name] = ep
}
}

case *pbproxystate.Cluster_EndpointGroup:
eg := proxyStateCluster.GetEndpointGroup()
cluster, err := pr.makeEnvoyCluster(name, proxyStateCluster.Protocol, eg)
cluster, eps, err := pr.makeEnvoyCluster(name, proxyStateCluster.Protocol, eg)
if err != nil {
return nil, err
return nil, nil, err
}
envoyClusters[cluster.Name] = cluster
if ep, ok := eps[cluster.Name]; ok {
envoyEndpoints[cluster.Name] = ep
}

default:
return nil, errors.New("cluster group type should be Endpoint Group or Failover Group")
return nil, nil, errors.New("cluster group type should be Endpoint Group or Failover Group")
}
return envoyClusters, nil
return envoyClusters, envoyEndpoints, nil
}

func (pr *ProxyResources) makeEnvoyCluster(name string, protocol pbproxystate.Protocol, eg *pbproxystate.EndpointGroup) (*envoy_cluster_v3.Cluster, error) {
func (pr *ProxyResources) makeEnvoyCluster(name string, protocol pbproxystate.Protocol,
eg *pbproxystate.EndpointGroup) (*envoy_cluster_v3.Cluster, map[string]*envoy_endpoint_v3.ClusterLoadAssignment, error) {
if eg != nil {
switch t := eg.Group.(type) {
case *pbproxystate.EndpointGroup_Dynamic:
Expand All @@ -66,13 +75,14 @@ func (pr *ProxyResources) makeEnvoyCluster(name string, protocol pbproxystate.Pr
passthrough := eg.GetPassthrough()
return pr.makeEnvoyPassthroughCluster(name, protocol, passthrough)
default:
return nil, fmt.Errorf("unsupported endpoint group type: %s", t)
return nil, nil, fmt.Errorf("unsupported endpoint group type: %s", t)
}
}
return nil, fmt.Errorf("no endpoint group")
return nil, nil, fmt.Errorf("no endpoint group")
}

func (pr *ProxyResources) makeEnvoyDynamicCluster(name string, protocol pbproxystate.Protocol, dynamic *pbproxystate.DynamicEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
func (pr *ProxyResources) makeEnvoyDynamicCluster(name string, protocol pbproxystate.Protocol,
dynamic *pbproxystate.DynamicEndpointGroup) (*envoy_cluster_v3.Cluster, map[string]*envoy_endpoint_v3.ClusterLoadAssignment, error) {
cluster := &envoy_cluster_v3.Cluster{
Name: name,
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_EDS},
Expand All @@ -87,7 +97,7 @@ func (pr *ProxyResources) makeEnvoyDynamicCluster(name string, protocol pbproxys
}
err := addHttpProtocolOptions(protocol, cluster)
if err != nil {
return nil, err
return nil, nil, err
}
if dynamic.Config != nil {
if dynamic.Config.UseAltStatName {
Expand All @@ -106,23 +116,32 @@ func (pr *ProxyResources) makeEnvoyDynamicCluster(name string, protocol pbproxys

err := addEnvoyLBToCluster(dynamic.Config, cluster)
if err != nil {
return nil, err
return nil, nil, err
}
}

if dynamic.OutboundTls != nil {
envoyTransportSocket, err := pr.makeEnvoyTransportSocket(dynamic.OutboundTls)
if err != nil {
return nil, err
return nil, nil, err
}
cluster.TransportSocket = envoyTransportSocket
}

return cluster, nil
endpointResources := make(map[string]*envoy_endpoint_v3.ClusterLoadAssignment)
if cluster.Name != xdscommon.LocalAppClusterName {
if endpointList, ok := pr.proxyState.Endpoints[cluster.Name]; ok {
protoEndpoint := makeEnvoyClusterLoadAssignment(cluster.Name, endpointList.Endpoints)
endpointResources[cluster.Name] = protoEndpoint
}
}

return cluster, endpointResources, nil

}

func (pr *ProxyResources) makeEnvoyStaticCluster(name string, protocol pbproxystate.Protocol, static *pbproxystate.StaticEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
func (pr *ProxyResources) makeEnvoyStaticCluster(name string, protocol pbproxystate.Protocol,
static *pbproxystate.StaticEndpointGroup) (*envoy_cluster_v3.Cluster, map[string]*envoy_endpoint_v3.ClusterLoadAssignment, error) {
cluster := &envoy_cluster_v3.Cluster{
Name: name,
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_STATIC},
Expand All @@ -141,21 +160,23 @@ func (pr *ProxyResources) makeEnvoyStaticCluster(name string, protocol pbproxyst
err = addHttpProtocolOptions(protocol, cluster)
}
if err != nil {
return nil, err
return nil, nil, err
}

if static.Config != nil {
cluster.ConnectTimeout = static.Config.ConnectTimeout
addEnvoyCircuitBreakers(static.GetConfig().CircuitBreakers, cluster)
}
return cluster, nil
return cluster, nil, nil
}

func (pr *ProxyResources) makeEnvoyDnsCluster(name string, protocol pbproxystate.Protocol, dns *pbproxystate.DNSEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
return nil, nil
func (pr *ProxyResources) makeEnvoyDnsCluster(name string, protocol pbproxystate.Protocol,
dns *pbproxystate.DNSEndpointGroup) (*envoy_cluster_v3.Cluster, map[string]*envoy_endpoint_v3.ClusterLoadAssignment, error) {
return nil, nil, nil
}

func (pr *ProxyResources) makeEnvoyPassthroughCluster(name string, protocol pbproxystate.Protocol, passthrough *pbproxystate.PassthroughEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
func (pr *ProxyResources) makeEnvoyPassthroughCluster(name string, protocol pbproxystate.Protocol,
passthrough *pbproxystate.PassthroughEndpointGroup) (*envoy_cluster_v3.Cluster, map[string]*envoy_endpoint_v3.ClusterLoadAssignment, error) {
cluster := &envoy_cluster_v3.Cluster{
Name: name,
ConnectTimeout: passthrough.Config.ConnectTimeout,
Expand All @@ -165,35 +186,42 @@ func (pr *ProxyResources) makeEnvoyPassthroughCluster(name string, protocol pbpr
if passthrough.OutboundTls != nil {
envoyTransportSocket, err := pr.makeEnvoyTransportSocket(passthrough.OutboundTls)
if err != nil {
return nil, err
return nil, nil, err
}
cluster.TransportSocket = envoyTransportSocket
}
err := addHttpProtocolOptions(protocol, cluster)
if err != nil {
return nil, err
return nil, nil, err
}
return cluster, nil
return cluster, nil, nil
}

func (pr *ProxyResources) makeEnvoyAggregateCluster(name string, protocol pbproxystate.Protocol, fg *pbproxystate.FailoverGroup) (map[string]*envoy_cluster_v3.Cluster, error) {
func (pr *ProxyResources) makeEnvoyAggregateCluster(name string, protocol pbproxystate.Protocol,
fg *pbproxystate.FailoverGroup) (map[string]*envoy_cluster_v3.Cluster, map[string]*envoy_endpoint_v3.ClusterLoadAssignment, error) {
clusters := make(map[string]*envoy_cluster_v3.Cluster)
endpointResources := make(map[string]*envoy_endpoint_v3.ClusterLoadAssignment)
if fg != nil {
var egNames []string
for _, eg := range fg.EndpointGroups {
cluster, err := pr.makeEnvoyCluster(eg.Name, protocol, eg)
cluster, _, err := pr.makeEnvoyCluster(eg.Name, protocol, eg)
if err != nil {
return nil, err
return nil, nil, err
}
egNames = append(egNames, cluster.Name)
clusters[cluster.Name] = cluster

if endpointList, ok := pr.proxyState.Endpoints[cluster.Name]; ok {
protoEndpoint := makeEnvoyClusterLoadAssignment(cluster.Name, endpointList.Endpoints)
endpointResources[cluster.Name] = protoEndpoint
}
}
aggregateClusterConfig, err := anypb.New(&envoy_aggregate_cluster_v3.ClusterConfig{
Clusters: egNames,
})

if err != nil {
return nil, err
return nil, nil, err
}

c := &envoy_cluster_v3.Cluster{
Expand All @@ -212,11 +240,11 @@ func (pr *ProxyResources) makeEnvoyAggregateCluster(name string, protocol pbprox
}
err = addHttpProtocolOptions(protocol, c)
if err != nil {
return nil, err
return nil, nil, err
}
clusters[c.Name] = c
}
return clusters, nil
return clusters, endpointResources, nil
}

func addLocalAppHttpProtocolOptions(protocol pbproxystate.Protocol, c *envoy_cluster_v3.Cluster) error {
Expand Down
11 changes: 4 additions & 7 deletions agent/xdsv2/route_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,13 @@ func makeEnvoyQueryParamFromProxystateQueryMatch(psMatch *pbproxystate.QueryPara
}

func (pr *ProxyResources) addEnvoyClustersAndEndpointsToEnvoyResources(clusterName string) {
clusters, _ := pr.makeClusters(clusterName)
clusters, endpoints, _ := pr.makeClustersAndEndpoints(clusterName)
for name, cluster := range clusters {
pr.envoyResources[xdscommon.ClusterType][name] = cluster
}

if name != xdscommon.LocalAppClusterName {
if endpointList, ok := pr.proxyState.Endpoints[name]; ok {
protoEndpoint := makeEnvoyClusterLoadAssignment(name, endpointList.Endpoints)
pr.envoyResources[xdscommon.EndpointType][name] = protoEndpoint
}
}
for name, ep := range endpoints {
pr.envoyResources[xdscommon.EndpointType][name] = ep
}
}

Expand Down

0 comments on commit 43ced61

Please sign in to comment.