diff --git a/internal/command/dev/dev.go b/internal/command/dev/dev.go index 6698954..fa2fb0e 100644 --- a/internal/command/dev/dev.go +++ b/internal/command/dev/dev.go @@ -54,10 +54,10 @@ func runDev(cmd *cobra.Command, args []string) error { devController, devWorker, err := CreateDevControllerAndWorker(devDataDirPath, fmt.Sprintf(":%d", netconstants.DefaultControllerPort), resources, nil, nil) - if err != nil { return err } + defer devWorker.Close() errChan := make(chan error, 2) diff --git a/internal/command/list/vms.go b/internal/command/list/vms.go index ac8ac19..0147e13 100644 --- a/internal/command/list/vms.go +++ b/internal/command/list/vms.go @@ -40,13 +40,13 @@ func runListVMs(cmd *cobra.Command, args []string) error { table := uitable.New() - table.AddRow("Name", "Created", "Image", "Status", "Restart policy") + table.AddRow("Name", "Created", "Image", "Status", "Restart policy", "Assigned worker") for _, vm := range vms { restartPolicyInfo := fmt.Sprintf("%s (%d restarts)", vm.RestartPolicy, vm.RestartCount) createdAtInfo := humanize.RelTime(vm.CreatedAt, time.Now(), "ago", "in the future") - table.AddRow(vm.Name, createdAtInfo, vm.Image, vm.Status, restartPolicyInfo) + table.AddRow(vm.Name, createdAtInfo, vm.Image, vm.Status, restartPolicyInfo, vm.Worker) } fmt.Println(table) diff --git a/internal/command/worker/run.go b/internal/command/worker/run.go index 81be0bd..8a91663 100644 --- a/internal/command/worker/run.go +++ b/internal/command/worker/run.go @@ -105,6 +105,7 @@ func runWorker(cmd *cobra.Command, args []string) (err error) { if err != nil { return err } + defer workerInstance.Close() return workerInstance.Run(cmd.Context()) } diff --git a/internal/worker/vmmanager/vm.go b/internal/worker/vmmanager/vm.go index 54ba2d8..8f4b2ec 100644 --- a/internal/worker/vmmanager/vm.go +++ b/internal/worker/vmmanager/vm.go @@ -190,22 +190,23 @@ func (vm *VM) IP(ctx context.Context) (string, error) { return strings.TrimSpace(stdout), nil } -func (vm *VM) Stop() error { +func (vm *VM) Stop() { if !vm.cloned { - return nil + return } vm.logger.Debugf("stopping VM") - _, _, _ = tart.Tart(context.Background(), vm.logger, "stop", vm.id()) + _, _, err := tart.Tart(context.Background(), vm.logger, "stop", vm.id()) + if err != nil { + vm.logger.Warnf("failed to stop VM: %v", err) + } vm.logger.Debugf("VM stopped") vm.cancel() vm.wg.Wait() - - return nil } func (vm *VM) Delete() error { diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 0ed6ed6..7cf1305 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -86,10 +86,7 @@ func (worker *Worker) Run(ctx context.Context) error { func (worker *Worker) Close() error { var result error for _, vm := range worker.vmm.List() { - err := vm.Stop() - if err != nil { - result = multierror.Append(result, err) - } + vm.Stop() } for _, vm := range worker.vmm.List() { err := vm.Delete() @@ -188,6 +185,7 @@ func (worker *Worker) updateWorker(ctx context.Context) error { return nil } +//nolint:nestif // nested "if" complexity is tolerable for now func (worker *Worker) syncVMs(ctx context.Context) error { remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.name) if err != nil { @@ -199,8 +197,12 @@ func (worker *Worker) syncVMs(ctx context.Context) error { } worker.logger.Infof("syncing %d local VMs against %d remote VMs...", - len(remoteVMsIndex), worker.vmm.Len()) + worker.vmm.Len(), len(remoteVMsIndex)) + // It's important to check the remote VMs against local ones first + // to stop the failed VMs before we start the new VMs, otherwise we + // risk violating the resource constraints (e.g. a maximum of 2 VMs + // per host) for _, vm := range worker.vmm.List() { remoteVM, ok := remoteVMsIndex[vm.OnDiskName()] if !ok { @@ -211,12 +213,22 @@ func (worker *Worker) syncVMs(ctx context.Context) error { if err := worker.deleteVM(vm); err != nil { return err } - } else if remoteVM.Status != v1.VMStatusFailed && vm.Err() != nil { - // Local VM has failed, update remote VM - remoteVM.Status = v1.VMStatusFailed - remoteVM.StatusMessage = vm.Err().Error() - if _, err := worker.client.VMs().Update(ctx, remoteVM); err != nil { - return err + } else { + if remoteVM.Status == v1.VMStatusFailed { + // VM has failed on the remote side, stop it locally to prevent incorrect + // worker's resources calculation in the Controller's scheduler + vm.Stop() + } else if vm.Err() != nil { + // VM has failed on the local side, stop it before reporting as failed to prevent incorrect + // worker's resources calculation in the Controller's scheduler + vm.Stop() + + // Report the VM as failed + remoteVM.Status = v1.VMStatusFailed + remoteVM.StatusMessage = vm.Err().Error() + if _, err := worker.client.VMs().Update(ctx, remoteVM); err != nil { + return err + } } } } @@ -238,6 +250,7 @@ func (worker *Worker) syncVMs(ctx context.Context) error { return nil } +//nolint:nestif,gocognit // complexity is tolerable for now func (worker *Worker) syncOnDiskVMs(ctx context.Context) error { remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.name) if err != nil { @@ -256,10 +269,6 @@ func (worker *Worker) syncOnDiskVMs(ctx context.Context) error { } for _, vmInfo := range vmInfos { - if vmInfo.Running { - continue - } - onDiskName, err := ondiskname.Parse(vmInfo.Name) if err != nil { if errors.Is(err, ondiskname.ErrNotManagedByOrchard) { @@ -269,22 +278,44 @@ func (worker *Worker) syncOnDiskVMs(ctx context.Context) error { return err } + // VMs that exist in the Worker's VM manager will be handled in the syncVMs() + if worker.vmm.Exists(onDiskName) { + continue + } + remoteVM, ok := remoteVMsIndex[onDiskName] if !ok { - // On-disk VM doesn't exist on the controller, delete it + // On-disk VM doesn't exist on the controller nor in the Worker's VM manager, + // stop it (if applicable) and delete it + if vmInfo.Running { + _, _, err := tart.Tart(ctx, worker.logger, "stop", vmInfo.Name) + if err != nil { + worker.logger.Warnf("failed to stop") + } + } + _, _, err := tart.Tart(ctx, worker.logger, "delete", vmInfo.Name) if err != nil { return err } - } else if remoteVM.Status == v1.VMStatusRunning && !worker.vmm.Exists(onDiskName) { - // On-disk VM exist on the controller, - // but we don't know about it, so - // mark it as failed - remoteVM.Status = v1.VMStatusFailed - remoteVM.StatusMessage = "Worker lost track of VM" - _, err := worker.client.VMs().Update(ctx, remoteVM) - if err != nil { - return err + } else if remoteVM.Status != v1.VMStatusPending { + // On-disk VM exists on the controller and was acted upon, + // but we've lost track of it, so shut it down (if applicable) + // and report the error (if not failed yet) + if vmInfo.Running { + _, _, err := tart.Tart(ctx, worker.logger, "stop", vmInfo.Name) + if err != nil { + worker.logger.Warnf("failed to stop") + } + } + + if remoteVM.Status != v1.VMStatusFailed { + remoteVM.Status = v1.VMStatusFailed + remoteVM.StatusMessage = "Worker lost track of VM" + _, err := worker.client.VMs().Update(ctx, remoteVM) + if err != nil { + return err + } } } } @@ -293,9 +324,7 @@ func (worker *Worker) syncOnDiskVMs(ctx context.Context) error { } func (worker *Worker) deleteVM(vm *vmmanager.VM) error { - if err := vm.Stop(); err != nil { - return err - } + vm.Stop() if err := vm.Delete(); err != nil { return err