diff --git a/pkg/fwdservice/fwdservice.go b/pkg/fwdservice/fwdservice.go index 5f8e8be2..a9c3c619 100644 --- a/pkg/fwdservice/fwdservice.go +++ b/pkg/fwdservice/fwdservice.go @@ -2,11 +2,12 @@ package fwdservice import ( "fmt" - "k8s.io/apimachinery/pkg/api/errors" "strconv" "sync" "time" + "k8s.io/apimachinery/pkg/api/errors" + "github.com/txn2/kubefwd/pkg/fwdnet" "github.com/txn2/kubefwd/pkg/fwdport" "github.com/txn2/kubefwd/pkg/fwdpub" @@ -49,16 +50,16 @@ func (svcFwd *ServiceFWD) String() string { // GetPodsForService queries k8s and returns all pods backing this service // which are eligible for portforwarding; exclude some pods which are in final/failure state. -func (svcfwd *ServiceFWD) GetPodsForService() []v1.Pod { - listOpts := metav1.ListOptions{LabelSelector: svcfwd.PodLabelSelector} +func (svcFwd *ServiceFWD) GetPodsForService() []v1.Pod { + listOpts := metav1.ListOptions{LabelSelector: svcFwd.PodLabelSelector} - pods, err := svcfwd.ClientSet.CoreV1().Pods(svcfwd.Svc.Namespace).List(listOpts) + pods, err := svcFwd.ClientSet.CoreV1().Pods(svcFwd.Svc.Namespace).List(listOpts) if err != nil { if errors.IsNotFound(err) { - log.Warnf("WARNING: No Pods found for service %s: %s\n", svcfwd, err.Error()) + log.Warnf("WARNING: No Pods found for service %s: %s\n", svcFwd, err.Error()) } else { - log.Warnf("WARNING: Error in List pods for %s: %s\n", svcfwd, err.Error()) + log.Warnf("WARNING: Error in List pods for %s: %s\n", svcFwd, err.Error()) } return nil } @@ -76,25 +77,25 @@ func (svcfwd *ServiceFWD) GetPodsForService() []v1.Pod { // SyncPodForwards selects one or all pods behind a service, and invokes the forwarding setup for that or those pod(s). // It will remove pods in-mem that are no longer returned by k8s, should these not be correctly deleted. -func (svcfwd *ServiceFWD) SyncPodForwards(force bool) { +func (svcFwd *ServiceFWD) SyncPodForwards(force bool) { // When a whole set of pods gets deleted at once, they all will trigger a SyncPodForwards() call. This would hammer k8s with load needlessly. // Therefore keep a timestamp from when this was last called and only allow call if the previous one was not too recent. - if !force && time.Since(svcfwd.LastSyncedAt) < 10*time.Minute { - log.Debugf("Skipping pods refresh for %s due to rate limiting", svcfwd) + if !force && time.Since(svcFwd.LastSyncedAt) < 10*time.Minute { + log.Debugf("Skipping pods refresh for %s due to rate limiting", svcFwd) return } - defer func() { svcfwd.LastSyncedAt = time.Now() }() + defer func() { svcFwd.LastSyncedAt = time.Now() }() - k8sPods := svcfwd.GetPodsForService() + k8sPods := svcFwd.GetPodsForService() // If no pods are found currently. Will try again next resync period if len(k8sPods) == 0 { - log.Warnf("WARNING: No Running Pods returned for service %s", svcfwd) + log.Warnf("WARNING: No Running Pods returned for service %s", svcFwd) return } // Check if the pods currently being forwarded still exist in k8s and if they are not in a (pre-)running state, if not: remove them - for _, podName := range svcfwd.ListPodNames() { + for _, podName := range svcFwd.ListPodNames() { keep := false for _, pod := range k8sPods { if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) { @@ -103,73 +104,78 @@ func (svcfwd *ServiceFWD) SyncPodForwards(force bool) { } } if !keep { - svcfwd.RemovePod(podName) + svcFwd.RemovePod(podName) } } // Set up portforwarding for one or all of these pods // normal service portforward the first pod as service name. headless service not only forward first Pod as service name, but also portforward all pods. if len(k8sPods) != 0 { - if svcfwd.Headless { - svcfwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, true) - svcfwd.LoopPodsToForward(k8sPods, false) - } else { - // Check if currently we are forwarding a pod which is good to keep using - podNameToKeep := "" - for _, podName := range svcfwd.ListPodNames() { - if podNameToKeep != "" { + + // if this is a headless service forward the first pod from the + // service name, then subsequent pods from their pod name + if svcFwd.Headless { + svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false) + svcFwd.LoopPodsToForward(k8sPods, true) + return + } + + // Check if currently we are forwarding a pod which is good to keep using + podNameToKeep := "" + for _, podName := range svcFwd.ListPodNames() { + if podNameToKeep != "" { + break + } + for _, pod := range k8sPods { + if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) { + podNameToKeep = pod.Name break } - for _, pod := range k8sPods { - if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) { - podNameToKeep = pod.Name - break - } - } } + } - // Stop forwarding others, should there be. In case none of the currently forwarded pods are good to keep, - // podNameToKeep will be the empty string, and the comparison will mean we will remove all pods, which is the desired behaviour. - for _, podName := range svcfwd.ListPodNames() { - if podName != podNameToKeep { - svcfwd.RemovePod(podName) - } + // Stop forwarding others, should there be. In case none of the currently forwarded pods are good to keep, + // podNameToKeep will be the empty string, and the comparison will mean we will remove all pods, which is the desired behaviour. + for _, podName := range svcFwd.ListPodNames() { + if podName != podNameToKeep { + svcFwd.RemovePod(podName) } + } - // If no good pod was being forwarded already, start one - if podNameToKeep == "" { - svcfwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false) - } + // If no good pod was being forwarded already, start one + if podNameToKeep == "" { + svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false) } + } } // LoopPodsToForward starts the portforwarding for each pod in the given list -func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost bool) { +func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost bool) { publisher := &fwdpub.Publisher{ PublisherName: "Services", Output: false, } // Ip address handout is a critical section for synchronization, use a lock which synchronizes inside each namespace. - svcfwd.NamespaceIPLock.Lock() - defer svcfwd.NamespaceIPLock.Unlock() + svcFwd.NamespaceIPLock.Lock() + defer svcFwd.NamespaceIPLock.Unlock() for _, pod := range pods { // If pod is already configured to be forwarded, skip it - if _, found := svcfwd.PortForwards[pod.Name]; found { + if _, found := svcFwd.PortForwards[pod.Name]; found { continue } podPort := "" svcName := "" - localIp, dInc, err := fwdnet.ReadyInterface(127, 1, svcfwd.IpC, *svcfwd.IpD, podPort) + localIp, dInc, err := fwdnet.ReadyInterface(127, 1, svcFwd.IpC, *svcFwd.IpD, podPort) if err != nil { log.Warnf("WARNING: error readying interface: %s\n", err) } - *svcfwd.IpD = dInc + *svcFwd.IpD = dInc - for _, port := range svcfwd.Svc.Spec.Ports { + for _, port := range svcFwd.Svc.Spec.Ports { podPort = port.TargetPort.String() localPort := strconv.Itoa(int(port.Port)) @@ -181,33 +187,34 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost } } - serviceHostName := svcfwd.Svc.Name + serviceHostName := svcFwd.Svc.Name if includePodNameInHost { serviceHostName = pod.Name + "." + serviceHostName } - svcName = serviceHostName - - if !svcfwd.ShortName { + if !svcFwd.ShortName { serviceHostName = serviceHostName + "." + pod.Namespace } - if svcfwd.Domain != "" { - serviceHostName = serviceHostName + "." + svcfwd.Domain + if svcFwd.Domain != "" { + serviceHostName = serviceHostName + "." + svcFwd.Domain } - if svcfwd.Remote { - serviceHostName = fmt.Sprintf("%s.svc.cluster.%s", serviceHostName, svcfwd.Context) + if svcFwd.Remote { + serviceHostName = fmt.Sprintf("%s.svc.cluster.%s", serviceHostName, svcFwd.Context) } + svcName = serviceHostName + log.Debugf("Resolving: %s to %s\n", - serviceHostName, + svcName, localIp.String(), ) - log.Printf("Port-Forward: %s:%d to pod %s:%s\n", - serviceHostName, + log.Printf("Port-Forward: %s %s:%d to pod %s:%s\n", + localIp.String(), + svcName, port.Port, pod.Name, podPort, @@ -215,21 +222,21 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost pfo := &fwdport.PortForwardOpts{ Out: publisher, - Config: svcfwd.ClientConfig, - ClientSet: svcfwd.ClientSet, - RESTClient: svcfwd.RESTClient, - Context: svcfwd.Context, + Config: svcFwd.ClientConfig, + ClientSet: svcFwd.ClientSet, + RESTClient: svcFwd.RESTClient, + Context: svcFwd.Context, Namespace: pod.Namespace, Service: svcName, - ServiceFwd: svcfwd, + ServiceFwd: svcFwd, PodName: pod.Name, PodPort: podPort, LocalIp: localIp, LocalPort: localPort, - Hostfile: svcfwd.Hostfile, - ShortName: svcfwd.ShortName, - Remote: svcfwd.Remote, - Domain: svcfwd.Domain, + Hostfile: svcFwd.Hostfile, + ShortName: svcFwd.ShortName, + Remote: svcFwd.Remote, + Domain: svcFwd.Domain, ManualStopChan: make(chan struct{}), DoneChan: make(chan struct{}), @@ -237,7 +244,7 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost // Fire and forget. The stopping is done in the service.Shutdown() method. go func() { - svcfwd.AddPod(pfo) + svcFwd.AddPod(pfo) if err := pfo.PortForward(); err != nil { select { case <-pfo.ManualStopChan: // if shutdown was given, we don't bother with the error. @@ -248,7 +255,7 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost select { case <-pfo.ManualStopChan: // if shutdown was given, don't log a warning as it's an intented stopping. default: - log.Warnf("Stopped forwarding pod %s for %s", pfo.PodName, svcfwd) + log.Warnf("Stopped forwarding pod %s for %s", pfo.PodName, svcFwd) } } }() @@ -258,31 +265,31 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost } } -func (svcfwd *ServiceFWD) AddPod(pfo *fwdport.PortForwardOpts) { - svcfwd.NamespaceIPLock.Lock() - if _, found := svcfwd.PortForwards[pfo.PodName]; !found { - svcfwd.PortForwards[pfo.PodName] = pfo +func (svcFwd *ServiceFWD) AddPod(pfo *fwdport.PortForwardOpts) { + svcFwd.NamespaceIPLock.Lock() + if _, found := svcFwd.PortForwards[pfo.PodName]; !found { + svcFwd.PortForwards[pfo.PodName] = pfo } - svcfwd.NamespaceIPLock.Unlock() + svcFwd.NamespaceIPLock.Unlock() } -func (svcfwd *ServiceFWD) ListPodNames() []string { - svcfwd.NamespaceIPLock.Lock() - currentPodNames := make([]string, 0, len(svcfwd.PortForwards)) - for podName := range svcfwd.PortForwards { +func (svcFwd *ServiceFWD) ListPodNames() []string { + svcFwd.NamespaceIPLock.Lock() + currentPodNames := make([]string, 0, len(svcFwd.PortForwards)) + for podName := range svcFwd.PortForwards { currentPodNames = append(currentPodNames, podName) } - svcfwd.NamespaceIPLock.Unlock() + svcFwd.NamespaceIPLock.Unlock() return currentPodNames } -func (svcfwd *ServiceFWD) RemovePod(podName string) { - if pod, found := svcfwd.PortForwards[podName]; found { +func (svcFwd *ServiceFWD) RemovePod(podName string) { + if pod, found := svcFwd.PortForwards[podName]; found { pod.Stop() <-pod.DoneChan - svcfwd.NamespaceIPLock.Lock() - delete(svcfwd.PortForwards, podName) - svcfwd.NamespaceIPLock.Unlock() + svcFwd.NamespaceIPLock.Lock() + delete(svcFwd.PortForwards, podName) + svcFwd.NamespaceIPLock.Unlock() } }