From 715f9d51d02302bcbcd9c16f762f973f0b163102 Mon Sep 17 00:00:00 2001 From: Amal Shaji Date: Tue, 28 Feb 2023 11:40:57 +0530 Subject: [PATCH] refactor(server): Optimize connection pools (#31) * Use a map to track pool for subdomain --- internal/server/tunnel/pool.go | 2 +- internal/server/tunnel/server.go | 77 ++++++++++++-------------------- 2 files changed, 29 insertions(+), 50 deletions(-) diff --git a/internal/server/tunnel/pool.go b/internal/server/tunnel/pool.go index 258b389..2e2aa1a 100644 --- a/internal/server/tunnel/pool.go +++ b/internal/server/tunnel/pool.go @@ -78,7 +78,7 @@ func (pool *Pool) Clean() { if idle > pool.size { // We have enough idle connections in the pool. // Terminate the connection if it is idle since more that IdleTimeout - if int(time.Now().Sub(connection.idleSince).Seconds())*1000 > pool.server.Config.IdleTimeout { + if int(time.Since(connection.idleSince).Seconds())*1000 > pool.server.Config.IdleTimeout { connection.close() } } diff --git a/internal/server/tunnel/server.go b/internal/server/tunnel/server.go index 88d88fd..12b7a31 100644 --- a/internal/server/tunnel/server.go +++ b/internal/server/tunnel/server.go @@ -6,7 +6,6 @@ import ( "log" "math/rand" "net/url" - "reflect" "strings" "sync" "time" @@ -23,7 +22,7 @@ type Server struct { Upgrader websocket.Upgrader // In Pools, keep connections with WebSocket peers. - Pools []*Pool + Pools map[string]*Pool // A RWMutex is a reader/writer mutual exclusion Lock, // and it is for exclusive control with pools operation. @@ -72,6 +71,7 @@ func NewServer(configFile string) (server *Server) { server = new(Server) server.Config = config server.Upgrader = websocket.Upgrader{} + server.Pools = make(map[string]*Pool) server.done = make(chan struct{}) server.Dispatcher = make(chan *ConnectionRequest) @@ -111,13 +111,13 @@ func (s *Server) clean() { idle := 0 busy := 0 - var pools []*Pool - for _, pool := range s.Pools { + pools := make(map[string]*Pool) + for subdomain, pool := range s.Pools { if pool.IsEmpty() { log.Printf("Removing empty connection pool : %s", pool.ID) pool.Shutdown() } else { - pools = append(pools, pool) + pools[subdomain] = pool } ps := pool.Size() @@ -156,36 +156,25 @@ func (s *Server) DispatchConnections() { } s.Lock.RLock() - if len(s.Pools) == 0 { + pool, ok := s.Pools[request.Subdomain] + if !ok { // No connection pool available s.Lock.RUnlock() break } - // [1]: Select a pool which has an idle connection - // Build a select statement dynamically to handle an arbitrary number of pools. - cases := make([]reflect.SelectCase, len(s.Pools)+1) - for i, ch := range s.Pools { - // Pick connection for the requested subdomain - if ch.Subdomain != request.Subdomain { - cases[i] = reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(nil)} - continue + var connection *Connection + for _, conn := range pool.connections { + if conn.status == Idle { + connection = conn + break } - cases[i] = reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(ch.idle)} } - cases[len(cases)-1] = reflect.SelectCase{ - Dir: reflect.SelectDefault} s.Lock.RUnlock() - _, value, ok := reflect.Select(cases) - if !ok { - continue // a pool has been removed, try again + if connection == nil { + continue } - connection, _ := value.Interface().(*Connection) // [2]: Verify that we can use this connection and take it. if connection.Take() { @@ -233,37 +222,27 @@ func (s *Server) Shutdown() { } func (s *Server) GetOrCreatePoolForUser(subdomain, localServer, userIdentifier string, id PoolID) (*Pool, error) { - var pool *Pool // There is no need to create a new pool, // if it is already registered in current pools. - for _, p := range s.Pools { - if p.Subdomain == subdomain { - if p.ID == id { - pool = p - break - } else { - // Pool exist for the subdomain, but for different user - return nil, fmt.Errorf("subdomain already in use") - } - } + p, ok := s.Pools[subdomain] + if !ok { + pool := NewPool(s, id, subdomain, localServer, userIdentifier) + s.Pools[subdomain] = pool + return pool, nil } - if pool == nil { - // Create new pool, if no pools exist for the user - pool = NewPool(s, id, subdomain, localServer, userIdentifier) - s.Pools = append(s.Pools, pool) + + if p.ID != id { + return nil, fmt.Errorf("subdomain already in use") } - return pool, nil + + return p, nil } func (s *Server) GetDestinationURL(subdomain string) string { - var dstURL string - - for _, p := range s.Pools { - if p.Subdomain == subdomain { - dstURL = p.LocalServer - break - } + p, ok := s.Pools[subdomain] + if !ok { + return "" } - return dstURL + return p.LocalServer }