Skip to content

Commit f08eb47

Browse files
pmahindrakar-ossEraYaNSovietacedMortalHappinesssamhita-alla
authored
Upstream revert revert auth token fix (#5407)
* Revert "Revert "Ensure token is refreshed on Unauthenticated (#5388)" (#5404)" This reverts commit 7d2f0d0. Signed-off-by: pmahindrakar-oss <[email protected]> * Using same mutex for condition variable Signed-off-by: pmahindrakar-oss <[email protected]> * Lock the locker in the wait to adher to cond.Wait() semantics Signed-off-by: pmahindrakar-oss <[email protected]> * comments Signed-off-by: pmahindrakar-oss <[email protected]> * using noop locker as waitlist add is atomic operation Signed-off-by: pmahindrakar-oss <[email protected]> * Replace Azure AD OIDC URL with correct one (#4075) Signed-off-by: Erwin de Haan <[email protected]> Signed-off-by: pmahindrakar-oss <[email protected]> * Update the example Dockerfile to run on k8s (#5412) Signed-off-by: Jason Parraga <[email protected]> Signed-off-by: pmahindrakar-oss <[email protected]> * docs(kubeflow): Fix kubeflow webhook error (#5410) Signed-off-by: Chi-Sheng Liu <[email protected]> Signed-off-by: pmahindrakar-oss <[email protected]> * update flytekit version to 1.12.1b2 in monodocs requirements (#5411) Signed-off-by: Samhita Alla <[email protected]> Signed-off-by: pmahindrakar-oss <[email protected]> * Add supported task types to agent service config and rename (#5402) Signed-off-by: Jason Parraga <[email protected]> Signed-off-by: pmahindrakar-oss <[email protected]> * update lock file (#5416) Signed-off-by: Samhita Alla <[email protected]> Signed-off-by: pmahindrakar-oss <[email protected]> * [monorepo] Fix flytectl install script (#5405) Signed-off-by: pmahindrakar-oss <[email protected]> * bring in changes for flytecl keyring from PR flytectl/pull/488 Signed-off-by: pmahindrakar-oss <[email protected]> * typo fix Signed-off-by: pmahindrakar-oss <[email protected]> --------- Signed-off-by: pmahindrakar-oss <[email protected]> Signed-off-by: Erwin de Haan <[email protected]> Signed-off-by: Jason Parraga <[email protected]> Signed-off-by: Chi-Sheng Liu <[email protected]> Signed-off-by: Samhita Alla <[email protected]> Co-authored-by: Erwin de Haan <[email protected]> Co-authored-by: Jason Parraga <[email protected]> Co-authored-by: Chi-Sheng Liu <[email protected]> Co-authored-by: Samhita Alla <[email protected]> Co-authored-by: Eduardo Apolinario <[email protected]>
1 parent 1e61f4e commit f08eb47

16 files changed

+455
-92
lines changed

flytectl/cmd/core/cmd.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ func generateCommandFunc(cmdEntry CommandEntry) func(cmd *cobra.Command, args []
7373
cmdCtx := NewCommandContextNoClient(cmd.OutOrStdout())
7474
if !cmdEntry.DisableFlyteClient {
7575
clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).
76-
WithTokenCache(pkce.TokenCacheKeyringProvider{
77-
ServiceUser: fmt.Sprintf("%s:%s", adminCfg.Endpoint.String(), pkce.KeyRingServiceUser),
78-
ServiceName: pkce.KeyRingServiceName,
79-
}).Build(ctx)
76+
WithTokenCache(pkce.NewTokenCacheKeyringProvider(
77+
pkce.KeyRingServiceName,
78+
fmt.Sprintf("%s:%s", adminCfg.Endpoint.String(), pkce.KeyRingServiceUser),
79+
)).Build(ctx)
8080
if err != nil {
8181
return err
8282
}

flytectl/pkg/pkce/token_cache_keyring.go

+80-6
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,88 @@
11
package pkce
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
7+
"sync"
8+
9+
"github.com/flyteorg/flyte/flyteidl/clients/go/admin/cache"
10+
"github.com/flyteorg/flyte/flytestdlib/logger"
611

712
"github.com/zalando/go-keyring"
813
"golang.org/x/oauth2"
914
)
1015

16+
const (
17+
KeyRingServiceUser = "flytectl-user"
18+
KeyRingServiceName = "flytectl"
19+
)
20+
1121
// TokenCacheKeyringProvider wraps the logic to save and retrieve tokens from the OS's keyring implementation.
1222
type TokenCacheKeyringProvider struct {
1323
ServiceName string
1424
ServiceUser string
25+
mu *sync.Mutex
26+
condLocker *cache.NoopLocker
27+
cond *sync.Cond
1528
}
1629

17-
const (
18-
KeyRingServiceUser = "flytectl-user"
19-
KeyRingServiceName = "flytectl"
20-
)
30+
func (t *TokenCacheKeyringProvider) PurgeIfEquals(existing *oauth2.Token) (bool, error) {
31+
if existingBytes, err := json.Marshal(existing); err != nil {
32+
return false, fmt.Errorf("unable to marshal token to save in cache due to %w", err)
33+
} else if tokenJSON, err := keyring.Get(t.ServiceName, t.ServiceUser); err != nil {
34+
logger.Warnf(context.Background(), "unable to read token from cache but not failing the purge as the token might not have been saved at all. Error: %v", err)
35+
return true, nil
36+
} else if tokenJSON != string(existingBytes) {
37+
return false, nil
38+
}
39+
40+
_ = keyring.Delete(t.ServiceName, t.ServiceUser)
41+
return true, nil
42+
}
2143

22-
func (t TokenCacheKeyringProvider) SaveToken(token *oauth2.Token) error {
44+
func (t *TokenCacheKeyringProvider) Lock() {
45+
t.mu.Lock()
46+
}
47+
48+
func (t *TokenCacheKeyringProvider) Unlock() {
49+
t.mu.Unlock()
50+
}
51+
52+
// TryLock the cache.
53+
func (t *TokenCacheKeyringProvider) TryLock() bool {
54+
return t.mu.TryLock()
55+
}
56+
57+
// CondWait adds the current go routine to the condition waitlist and waits for another go routine to notify using CondBroadcast
58+
// The current usage is that one who was able to acquire the lock using TryLock is the one who gets a valid token and notifies all the waitlist requesters so that they can use the new valid token.
59+
// It also locks the Locker in the condition variable as the semantics of Wait is that it unlocks the Locker after adding
60+
// the consumer to the waitlist and before blocking on notification.
61+
// We use the condLocker which is noOp locker to get added to waitlist for notifications.
62+
// The underlying notifcationList doesn't need to be guarded as it implementation is atomic and is thread safe
63+
// Refer https://go.dev/src/runtime/sema.go
64+
// Following is the function and its comments
65+
// notifyListAdd adds the caller to a notify list such that it can receive
66+
// notifications. The caller must eventually call notifyListWait to wait for
67+
// such a notification, passing the returned ticket number.
68+
//
69+
// func notifyListAdd(l *notifyList) uint32 {
70+
// // This may be called concurrently, for example, when called from
71+
// // sync.Cond.Wait while holding a RWMutex in read mode.
72+
// return l.wait.Add(1) - 1
73+
// }
74+
func (t *TokenCacheKeyringProvider) CondWait() {
75+
t.condLocker.Lock()
76+
t.cond.Wait()
77+
t.condLocker.Unlock()
78+
}
79+
80+
// CondBroadcast broadcasts the condition.
81+
func (t *TokenCacheKeyringProvider) CondBroadcast() {
82+
t.cond.Broadcast()
83+
}
84+
85+
func (t *TokenCacheKeyringProvider) SaveToken(token *oauth2.Token) error {
2386
var tokenBytes []byte
2487
if token.AccessToken == "" {
2588
return fmt.Errorf("cannot save empty token with expiration %v", token.Expiry)
@@ -38,7 +101,7 @@ func (t TokenCacheKeyringProvider) SaveToken(token *oauth2.Token) error {
38101
return nil
39102
}
40103

41-
func (t TokenCacheKeyringProvider) GetToken() (*oauth2.Token, error) {
104+
func (t *TokenCacheKeyringProvider) GetToken() (*oauth2.Token, error) {
42105
// get saved token
43106
tokenJSON, err := keyring.Get(t.ServiceName, t.ServiceUser)
44107
if len(tokenJSON) == 0 {
@@ -56,3 +119,14 @@ func (t TokenCacheKeyringProvider) GetToken() (*oauth2.Token, error) {
56119

57120
return &token, nil
58121
}
122+
123+
func NewTokenCacheKeyringProvider(serviceName, serviceUser string) *TokenCacheKeyringProvider {
124+
condLocker := &cache.NoopLocker{}
125+
return &TokenCacheKeyringProvider{
126+
mu: &sync.Mutex{},
127+
condLocker: condLocker,
128+
cond: sync.NewCond(condLocker),
129+
ServiceName: serviceName,
130+
ServiceUser: serviceUser,
131+
}
132+
}

flyteidl/clients/go/admin/auth_interceptor.go

+45-7
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ const ProxyAuthorizationHeader = "proxy-authorization"
2020

2121
// MaterializeCredentials will attempt to build a TokenSource given the anonymously available information exposed by the server.
2222
// Once established, it'll invoke PerRPCCredentialsFuture.Store() on perRPCCredentials to populate it with the appropriate values.
23-
func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error {
23+
func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache,
24+
perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error {
2425
authMetadataClient, err := InitializeAuthMetadataClient(ctx, cfg, proxyCredentialsFuture)
2526
if err != nil {
2627
return fmt.Errorf("failed to initialized Auth Metadata Client. Error: %w", err)
@@ -42,11 +43,17 @@ func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.T
4243

4344
tokenSource, err := tokenSourceProvider.GetTokenSource(ctx)
4445
if err != nil {
45-
return err
46+
return fmt.Errorf("failed to get token source. Error: %w", err)
47+
}
48+
49+
_, err = tokenSource.Token()
50+
if err != nil {
51+
return fmt.Errorf("failed to issue token. Error: %w", err)
4652
}
4753

4854
wrappedTokenSource := NewCustomHeaderTokenSource(tokenSource, cfg.UseInsecureConnection, authorizationMetadataKey)
4955
perRPCCredentials.Store(wrappedTokenSource)
56+
5057
return nil
5158
}
5259

@@ -134,19 +141,50 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFut
134141
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
135142
ctx = setHTTPClientContext(ctx, cfg, proxyCredentialsFuture)
136143

144+
// If there is already a token in the cache (e.g. key-ring), we should use it immediately...
145+
t, _ := tokenCache.GetToken()
146+
if t != nil {
147+
err := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)
148+
if err != nil {
149+
return fmt.Errorf("failed to materialize credentials. Error: %v", err)
150+
}
151+
}
152+
137153
err := invoker(ctx, method, req, reply, cc, opts...)
138154
if err != nil {
139155
logger.Debugf(ctx, "Request failed due to [%v]. If it's an unauthenticated error, we will attempt to establish an authenticated context.", err)
140156

141157
if st, ok := status.FromError(err); ok {
142158
// If the error we receive from executing the request expects
143159
if shouldAttemptToAuthenticate(st.Code()) {
144-
logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code())
145-
newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)
146-
if newErr != nil {
147-
return fmt.Errorf("authentication error! Original Error: %v, Auth Error: %w", err, newErr)
160+
err = func() error {
161+
if !tokenCache.TryLock() {
162+
tokenCache.CondWait()
163+
return nil
164+
}
165+
166+
defer tokenCache.Unlock()
167+
_, err := tokenCache.PurgeIfEquals(t)
168+
if err != nil && !errors.Is(err, cache.ErrNotFound) {
169+
logger.Errorf(ctx, "Failed to purge cache. Error [%v]", err)
170+
return fmt.Errorf("failed to purge cache. Error: %w", err)
171+
}
172+
173+
logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code())
174+
newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)
175+
if newErr != nil {
176+
errString := fmt.Sprintf("authentication error! Original Error: %v, Auth Error: %v", err, newErr)
177+
logger.Errorf(ctx, errString)
178+
return fmt.Errorf(errString)
179+
}
180+
181+
tokenCache.CondBroadcast()
182+
return nil
183+
}()
184+
185+
if err != nil {
186+
return err
148187
}
149-
150188
return invoker(ctx, method, req, reply, cc, opts...)
151189
}
152190
}

0 commit comments

Comments
 (0)