From f85702084ec526b82740f25f584a7e1141814e19 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 22 Aug 2024 16:23:44 -0700 Subject: [PATCH] cherry-pick #7523 to v1.66.x branch (#7547) --- balancer_wrapper.go | 10 +++ clientconn.go | 25 +++---- producer_ext_test.go | 125 ++++++++++++++++++++++++++++++++++ resolver_balancer_ext_test.go | 1 - 4 files changed, 146 insertions(+), 15 deletions(-) create mode 100644 producer_ext_test.go diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 5877b71533bc..6561b769ebf7 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -275,6 +275,16 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve setConnectedAddress(&scs, curAddr) } acbw.stateListener(scs) + acbw.ac.mu.Lock() + defer acbw.ac.mu.Unlock() + if s == connectivity.Ready { + // When changing states to READY, reset stateReadyChan. Wait until + // after we notify the LB policy's listener(s) in order to prevent + // ac.getTransport() from unblocking before the LB policy starts + // tracking the subchannel as READY. + close(acbw.ac.stateReadyChan) + acbw.ac.stateReadyChan = make(chan struct{}) + } }) } diff --git a/clientconn.go b/clientconn.go index 7f6c24ca22ee..9c8850e3fdd5 100644 --- a/clientconn.go +++ b/clientconn.go @@ -825,14 +825,14 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer. } ac := &addrConn{ - state: connectivity.Idle, - cc: cc, - addrs: copyAddresses(addrs), - scopts: opts, - dopts: cc.dopts, - channelz: channelz.RegisterSubChannel(cc.channelz, ""), - resetBackoff: make(chan struct{}), - stateChan: make(chan struct{}), + state: connectivity.Idle, + cc: cc, + addrs: copyAddresses(addrs), + scopts: opts, + dopts: cc.dopts, + channelz: channelz.RegisterSubChannel(cc.channelz, ""), + resetBackoff: make(chan struct{}), + stateReadyChan: make(chan struct{}), } ac.ctx, ac.cancel = context.WithCancel(cc.ctx) // Start with our address set to the first address; this may be updated if @@ -1179,8 +1179,8 @@ type addrConn struct { addrs []resolver.Address // All addresses that the resolver resolved to. // Use updateConnectivityState for updating addrConn's connectivity state. - state connectivity.State - stateChan chan struct{} // closed and recreated on every state change. + state connectivity.State + stateReadyChan chan struct{} // closed and recreated on every READY state change. backoffIdx int // Needs to be stateful for resetConnectBackoff. resetBackoff chan struct{} @@ -1193,9 +1193,6 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) if ac.state == s { return } - // When changing states, reset the state change channel. - close(ac.stateChan) - ac.stateChan = make(chan struct{}) ac.state = s ac.channelz.ChannelMetrics.State.Store(&s) if lastErr == nil { @@ -1513,7 +1510,7 @@ func (ac *addrConn) getReadyTransport() transport.ClientTransport { func (ac *addrConn) getTransport(ctx context.Context) (transport.ClientTransport, error) { for ctx.Err() == nil { ac.mu.Lock() - t, state, sc := ac.transport, ac.state, ac.stateChan + t, state, sc := ac.transport, ac.state, ac.stateReadyChan ac.mu.Unlock() if state == connectivity.Ready { return t, nil diff --git a/producer_ext_test.go b/producer_ext_test.go new file mode 100644 index 000000000000..2da36d8d8e9f --- /dev/null +++ b/producer_ext_test.go @@ -0,0 +1,125 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc_test + +import ( + "context" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/stubserver" + testgrpc "google.golang.org/grpc/interop/grpc_testing" +) + +type producerBuilder struct{} + +type producer struct { + client testgrpc.TestServiceClient + stopped chan struct{} +} + +// Build constructs and returns a producer and its cleanup function +func (*producerBuilder) Build(cci any) (balancer.Producer, func()) { + p := &producer{ + client: testgrpc.NewTestServiceClient(cci.(grpc.ClientConnInterface)), + stopped: make(chan struct{}), + } + return p, func() { + <-p.stopped + } +} + +func (p *producer) TestStreamStart(t *testing.T, streamStarted chan<- struct{}) { + go func() { + defer close(p.stopped) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := p.client.FullDuplexCall(ctx); err != nil { + t.Errorf("Unexpected error starting stream: %v", err) + } + close(streamStarted) + }() +} + +var producerBuilderSingleton = &producerBuilder{} + +// TestProducerStreamStartsAfterReady ensures producer streams only start after +// the subchannel reports as READY to the LB policy. +func (s) TestProducerStreamStartsAfterReady(t *testing.T) { + name := strings.ReplaceAll(strings.ToLower(t.Name()), "/", "") + producerCh := make(chan balancer.Producer) + streamStarted := make(chan struct{}) + done := make(chan struct{}) + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{ + StateListener: func(scs balancer.SubConnState) { + if scs.ConnectivityState == connectivity.Ready { + timer := time.NewTimer(5 * time.Millisecond) + select { + case <-streamStarted: + t.Errorf("Producer stream started before Ready listener returned") + case <-timer.C: + } + close(done) + } + }, + }) + if err != nil { + return err + } + producer, _ := sc.GetOrBuildProducer(producerBuilderSingleton) + producerCh <- producer + sc.Connect() + return nil + }, + } + stub.Register(name, bf) + + ss := stubserver.StubServer{ + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { + return nil + }, + } + if err := ss.StartServer(); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() + + cc, err := grpc.NewClient("dns:///"+ss.Address, + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer cc.Close() + + go cc.Connect() + p := <-producerCh + p.(*producer).TestStreamStart(t, streamStarted) + + <-done +} diff --git a/resolver_balancer_ext_test.go b/resolver_balancer_ext_test.go index e3ad94ea6447..a2e35ccf3822 100644 --- a/resolver_balancer_ext_test.go +++ b/resolver_balancer_ext_test.go @@ -46,7 +46,6 @@ import ( // 5. resolver.Resolver.ResolveNow() -> func (s) TestResolverBalancerInteraction(t *testing.T) { name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) - fmt.Println(name) bf := stub.BalancerFuncs{ UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { bd.ClientConn.ResolveNow(resolver.ResolveNowOptions{})