From 7f9319d5eb985cb16c989e50cc6878d63d3d3859 Mon Sep 17 00:00:00 2001 From: tizhou86 Date: Thu, 11 Jun 2020 20:56:07 +0800 Subject: [PATCH] Add the allocate logic for gpu sharing. --- pkg/scheduler/api/node_info.go | 37 ++++++++++++ pkg/scheduler/api/pod_info.go | 13 ++++ .../plugins/predicates/predicates.go | 59 +++++++++++++++---- 3 files changed, 99 insertions(+), 10 deletions(-) diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index 7b035022d7b..a489553daa0 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -403,3 +403,40 @@ func (ni *NodeInfo) GetDevicesAllGPUMemory() map[int]uint { } return res } + +func (ni *NodeInfo) GetDeviceCoreId(pod *v1.Pod) (int, bool) { + + resId := -1 + resOk := false + cMem := uint(0) + rMems := ni.GetDevicesRemainGPUMemory() + + gpuReq := uint(0) + + if len(pod.ObjectMeta.Annotations) > 0 { + req, ok := pod.ObjectMeta.Annotations["volcano.sh/pod-gpu-memory"] + if ok { + s, _ := strconv.Atoi(req) + gpuReq = uint(s) + } + } + + if gpuReq > uint(0) { + if len(rMems) > 0 { + for i := 0; i < len(ni.Devices); i++ { + rMem, ok := rMems[i] + if ok { + if rMem >= gpuReq { + if resId == -1 || cMem > rMem { + resId = i + cMem = rMem + } + resOk = true + } + } + } + } + } + + return resId, resOk +} diff --git a/pkg/scheduler/api/pod_info.go b/pkg/scheduler/api/pod_info.go index 56a94034bb9..0a0e68ad417 100644 --- a/pkg/scheduler/api/pod_info.go +++ b/pkg/scheduler/api/pod_info.go @@ -17,6 +17,8 @@ limitations under the License. package api import ( + "fmt" + v1 "k8s.io/api/core/v1" ) @@ -71,3 +73,14 @@ func GetPodResourceWithoutInitContainers(pod *v1.Pod) *Resource { return result } + +func UpdateGPUPod(oldPod *v1.Pod, coreId int, memoryPerCore int) (newPod *v1.Pod) { + newPod = oldPod.DeepCopy() + if len(newPod.ObjectMeta.Annotations) == 0 { + newPod.ObjectMeta.Annotations = map[string]string{} + } + + newPod.ObjectMeta.Annotations["volcano.sh/gpu-core-id"] = fmt.Sprintf("%d", coreId) + + return newPod +} diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index e5cdda25d6f..527c94d71e3 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -19,6 +19,8 @@ package predicates import ( "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/api/core/v1" "k8s.io/klog" "k8s.io/kubernetes/pkg/scheduler/algorithm" @@ -118,6 +120,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { nodeMap, _ = util.GenerateNodeMapAndSlice(ssn.Nodes) + predicate := enablePredicate(pp.pluginArguments) + + //TODO: (tizhou86) zhonghu will pass the client set into this function + clientSet := NewClientSetOrClientSetInstance() + // Register event handlers to update task info in PodLister & nodeMap ssn.AddEventHandler(&framework.EventHandler{ AllocateFunc: func(event *framework.Event) { @@ -125,23 +132,57 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { nodeName := event.Task.NodeName node, found := nodeMap[nodeName] - if !found { - klog.Warningf("predicates, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + nodeInfo, foundInfo := ssn.Nodes[nodeName] + + if predicate.gpuSharingEnable { + if !foundInfo { + klog.Warningf("predicates with gpu sharing, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + } else { + coreId, found := nodeInfo.GetDeviceCoreId(pod) + if found { + var err error + pod, err = clientSet.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) + newPod := api.UpdateGPUPod(pod, coreId, nodeInfo.GPUTotalMemory/nodeInfo.GPUTotalCore) + _, err = clientSet.CoreV1().Pods(newPod.Namespace).Update(newPod) + } else { + klog.Errorf("The node %s can't place the pod %s in ns %s", pod.Spec.NodeName, pod.Name, pod.Namespace) + } + + dev, found := nodeInfo.Devices[coreId] + if !found { + + } else { + dev.PodMap[newPod.UID] = newPod + node.AddPod(pod) + } + + klog.V(4).Infof("predicates with gpu sharing, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName) + } } else { - node.AddPod(pod) - klog.V(4).Infof("predicates, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName) + if !found { + klog.Warningf("predicates, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + } else { + node.AddPod(pod) + klog.V(4).Infof("predicates, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName) + } } + }, DeallocateFunc: func(event *framework.Event) { pod := pl.UpdateTask(event.Task, "") nodeName := event.Task.NodeName node, found := nodeMap[nodeName] - if !found { - klog.Warningf("predicates, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + + if predicate.gpuSharingEnable { + //TODO: (tizhou86) add deallocate logic } else { - node.RemovePod(pod) - klog.V(4).Infof("predicates, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName) + if !found { + klog.Warningf("predicates, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + } else { + node.RemovePod(pod) + klog.V(4).Infof("predicates, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName) + } } }, }) @@ -150,8 +191,6 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { Session: ssn, } - predicate := enablePredicate(pp.pluginArguments) - ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { nodeInfo, found := nodeMap[node.Name] if !found {