Skip to content

Commit

Permalink
variable name housekeeping
Browse files Browse the repository at this point in the history
  • Loading branch information
cjimti committed Oct 17, 2020
1 parent 67188ae commit cc42c05
Showing 1 changed file with 89 additions and 82 deletions.
171 changes: 89 additions & 82 deletions pkg/fwdservice/fwdservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package fwdservice

import (
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
"strconv"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/errors"

"github.com/txn2/kubefwd/pkg/fwdnet"
"github.com/txn2/kubefwd/pkg/fwdport"
"github.com/txn2/kubefwd/pkg/fwdpub"
Expand Down Expand Up @@ -49,16 +50,16 @@ func (svcFwd *ServiceFWD) String() string {

// GetPodsForService queries k8s and returns all pods backing this service
// which are eligible for portforwarding; exclude some pods which are in final/failure state.
func (svcfwd *ServiceFWD) GetPodsForService() []v1.Pod {
listOpts := metav1.ListOptions{LabelSelector: svcfwd.PodLabelSelector}
func (svcFwd *ServiceFWD) GetPodsForService() []v1.Pod {
listOpts := metav1.ListOptions{LabelSelector: svcFwd.PodLabelSelector}

pods, err := svcfwd.ClientSet.CoreV1().Pods(svcfwd.Svc.Namespace).List(listOpts)
pods, err := svcFwd.ClientSet.CoreV1().Pods(svcFwd.Svc.Namespace).List(listOpts)

if err != nil {
if errors.IsNotFound(err) {
log.Warnf("WARNING: No Pods found for service %s: %s\n", svcfwd, err.Error())
log.Warnf("WARNING: No Pods found for service %s: %s\n", svcFwd, err.Error())
} else {
log.Warnf("WARNING: Error in List pods for %s: %s\n", svcfwd, err.Error())
log.Warnf("WARNING: Error in List pods for %s: %s\n", svcFwd, err.Error())
}
return nil
}
Expand All @@ -76,25 +77,25 @@ func (svcfwd *ServiceFWD) GetPodsForService() []v1.Pod {

// SyncPodForwards selects one or all pods behind a service, and invokes the forwarding setup for that or those pod(s).
// It will remove pods in-mem that are no longer returned by k8s, should these not be correctly deleted.
func (svcfwd *ServiceFWD) SyncPodForwards(force bool) {
func (svcFwd *ServiceFWD) SyncPodForwards(force bool) {
// When a whole set of pods gets deleted at once, they all will trigger a SyncPodForwards() call. This would hammer k8s with load needlessly.
// Therefore keep a timestamp from when this was last called and only allow call if the previous one was not too recent.
if !force && time.Since(svcfwd.LastSyncedAt) < 10*time.Minute {
log.Debugf("Skipping pods refresh for %s due to rate limiting", svcfwd)
if !force && time.Since(svcFwd.LastSyncedAt) < 10*time.Minute {
log.Debugf("Skipping pods refresh for %s due to rate limiting", svcFwd)
return
}
defer func() { svcfwd.LastSyncedAt = time.Now() }()
defer func() { svcFwd.LastSyncedAt = time.Now() }()

k8sPods := svcfwd.GetPodsForService()
k8sPods := svcFwd.GetPodsForService()

// If no pods are found currently. Will try again next resync period
if len(k8sPods) == 0 {
log.Warnf("WARNING: No Running Pods returned for service %s", svcfwd)
log.Warnf("WARNING: No Running Pods returned for service %s", svcFwd)
return
}

// Check if the pods currently being forwarded still exist in k8s and if they are not in a (pre-)running state, if not: remove them
for _, podName := range svcfwd.ListPodNames() {
for _, podName := range svcFwd.ListPodNames() {
keep := false
for _, pod := range k8sPods {
if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) {
Expand All @@ -103,73 +104,78 @@ func (svcfwd *ServiceFWD) SyncPodForwards(force bool) {
}
}
if !keep {
svcfwd.RemovePod(podName)
svcFwd.RemovePod(podName)
}
}
// Set up portforwarding for one or all of these pods
// normal service portforward the first pod as service name. headless service not only forward first Pod as service name, but also portforward all pods.
if len(k8sPods) != 0 {
if svcfwd.Headless {
svcfwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, true)
svcfwd.LoopPodsToForward(k8sPods, false)
} else {
// Check if currently we are forwarding a pod which is good to keep using
podNameToKeep := ""
for _, podName := range svcfwd.ListPodNames() {
if podNameToKeep != "" {

// if this is a headless service forward the first pod from the
// service name, then subsequent pods from their pod name
if svcFwd.Headless {
svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false)
svcFwd.LoopPodsToForward(k8sPods, true)
return
}

// Check if currently we are forwarding a pod which is good to keep using
podNameToKeep := ""
for _, podName := range svcFwd.ListPodNames() {
if podNameToKeep != "" {
break
}
for _, pod := range k8sPods {
if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) {
podNameToKeep = pod.Name
break
}
for _, pod := range k8sPods {
if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) {
podNameToKeep = pod.Name
break
}
}
}
}

// Stop forwarding others, should there be. In case none of the currently forwarded pods are good to keep,
// podNameToKeep will be the empty string, and the comparison will mean we will remove all pods, which is the desired behaviour.
for _, podName := range svcfwd.ListPodNames() {
if podName != podNameToKeep {
svcfwd.RemovePod(podName)
}
// Stop forwarding others, should there be. In case none of the currently forwarded pods are good to keep,
// podNameToKeep will be the empty string, and the comparison will mean we will remove all pods, which is the desired behaviour.
for _, podName := range svcFwd.ListPodNames() {
if podName != podNameToKeep {
svcFwd.RemovePod(podName)
}
}

// If no good pod was being forwarded already, start one
if podNameToKeep == "" {
svcfwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false)
}
// If no good pod was being forwarded already, start one
if podNameToKeep == "" {
svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false)
}

}
}

// LoopPodsToForward starts the portforwarding for each pod in the given list
func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost bool) {
func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost bool) {
publisher := &fwdpub.Publisher{
PublisherName: "Services",
Output: false,
}

// Ip address handout is a critical section for synchronization, use a lock which synchronizes inside each namespace.
svcfwd.NamespaceIPLock.Lock()
defer svcfwd.NamespaceIPLock.Unlock()
svcFwd.NamespaceIPLock.Lock()
defer svcFwd.NamespaceIPLock.Unlock()

for _, pod := range pods {
// If pod is already configured to be forwarded, skip it
if _, found := svcfwd.PortForwards[pod.Name]; found {
if _, found := svcFwd.PortForwards[pod.Name]; found {
continue
}

podPort := ""
svcName := ""

localIp, dInc, err := fwdnet.ReadyInterface(127, 1, svcfwd.IpC, *svcfwd.IpD, podPort)
localIp, dInc, err := fwdnet.ReadyInterface(127, 1, svcFwd.IpC, *svcFwd.IpD, podPort)
if err != nil {
log.Warnf("WARNING: error readying interface: %s\n", err)
}
*svcfwd.IpD = dInc
*svcFwd.IpD = dInc

for _, port := range svcfwd.Svc.Spec.Ports {
for _, port := range svcFwd.Svc.Spec.Ports {

podPort = port.TargetPort.String()
localPort := strconv.Itoa(int(port.Port))
Expand All @@ -181,63 +187,64 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost
}
}

serviceHostName := svcfwd.Svc.Name
serviceHostName := svcFwd.Svc.Name

if includePodNameInHost {
serviceHostName = pod.Name + "." + serviceHostName
}

svcName = serviceHostName

if !svcfwd.ShortName {
if !svcFwd.ShortName {
serviceHostName = serviceHostName + "." + pod.Namespace
}

if svcfwd.Domain != "" {
serviceHostName = serviceHostName + "." + svcfwd.Domain
if svcFwd.Domain != "" {
serviceHostName = serviceHostName + "." + svcFwd.Domain
}

if svcfwd.Remote {
serviceHostName = fmt.Sprintf("%s.svc.cluster.%s", serviceHostName, svcfwd.Context)
if svcFwd.Remote {
serviceHostName = fmt.Sprintf("%s.svc.cluster.%s", serviceHostName, svcFwd.Context)
}

svcName = serviceHostName

log.Debugf("Resolving: %s to %s\n",
serviceHostName,
svcName,
localIp.String(),
)

log.Printf("Port-Forward: %s:%d to pod %s:%s\n",
serviceHostName,
log.Printf("Port-Forward: %s %s:%d to pod %s:%s\n",
localIp.String(),
svcName,
port.Port,
pod.Name,
podPort,
)

pfo := &fwdport.PortForwardOpts{
Out: publisher,
Config: svcfwd.ClientConfig,
ClientSet: svcfwd.ClientSet,
RESTClient: svcfwd.RESTClient,
Context: svcfwd.Context,
Config: svcFwd.ClientConfig,
ClientSet: svcFwd.ClientSet,
RESTClient: svcFwd.RESTClient,
Context: svcFwd.Context,
Namespace: pod.Namespace,
Service: svcName,
ServiceFwd: svcfwd,
ServiceFwd: svcFwd,
PodName: pod.Name,
PodPort: podPort,
LocalIp: localIp,
LocalPort: localPort,
Hostfile: svcfwd.Hostfile,
ShortName: svcfwd.ShortName,
Remote: svcfwd.Remote,
Domain: svcfwd.Domain,
Hostfile: svcFwd.Hostfile,
ShortName: svcFwd.ShortName,
Remote: svcFwd.Remote,
Domain: svcFwd.Domain,

ManualStopChan: make(chan struct{}),
DoneChan: make(chan struct{}),
}

// Fire and forget. The stopping is done in the service.Shutdown() method.
go func() {
svcfwd.AddPod(pfo)
svcFwd.AddPod(pfo)
if err := pfo.PortForward(); err != nil {
select {
case <-pfo.ManualStopChan: // if shutdown was given, we don't bother with the error.
Expand All @@ -248,7 +255,7 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost
select {
case <-pfo.ManualStopChan: // if shutdown was given, don't log a warning as it's an intented stopping.
default:
log.Warnf("Stopped forwarding pod %s for %s", pfo.PodName, svcfwd)
log.Warnf("Stopped forwarding pod %s for %s", pfo.PodName, svcFwd)
}
}
}()
Expand All @@ -258,31 +265,31 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost
}
}

func (svcfwd *ServiceFWD) AddPod(pfo *fwdport.PortForwardOpts) {
svcfwd.NamespaceIPLock.Lock()
if _, found := svcfwd.PortForwards[pfo.PodName]; !found {
svcfwd.PortForwards[pfo.PodName] = pfo
func (svcFwd *ServiceFWD) AddPod(pfo *fwdport.PortForwardOpts) {
svcFwd.NamespaceIPLock.Lock()
if _, found := svcFwd.PortForwards[pfo.PodName]; !found {
svcFwd.PortForwards[pfo.PodName] = pfo
}
svcfwd.NamespaceIPLock.Unlock()
svcFwd.NamespaceIPLock.Unlock()
}

func (svcfwd *ServiceFWD) ListPodNames() []string {
svcfwd.NamespaceIPLock.Lock()
currentPodNames := make([]string, 0, len(svcfwd.PortForwards))
for podName := range svcfwd.PortForwards {
func (svcFwd *ServiceFWD) ListPodNames() []string {
svcFwd.NamespaceIPLock.Lock()
currentPodNames := make([]string, 0, len(svcFwd.PortForwards))
for podName := range svcFwd.PortForwards {
currentPodNames = append(currentPodNames, podName)
}
svcfwd.NamespaceIPLock.Unlock()
svcFwd.NamespaceIPLock.Unlock()
return currentPodNames
}

func (svcfwd *ServiceFWD) RemovePod(podName string) {
if pod, found := svcfwd.PortForwards[podName]; found {
func (svcFwd *ServiceFWD) RemovePod(podName string) {
if pod, found := svcFwd.PortForwards[podName]; found {
pod.Stop()
<-pod.DoneChan
svcfwd.NamespaceIPLock.Lock()
delete(svcfwd.PortForwards, podName)
svcfwd.NamespaceIPLock.Unlock()
svcFwd.NamespaceIPLock.Lock()
delete(svcFwd.PortForwards, podName)
svcFwd.NamespaceIPLock.Unlock()
}
}

Expand Down

0 comments on commit cc42c05

Please sign in to comment.