Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve batch job and restart behavior #30

Merged
merged 1 commit into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"os"
"os/exec"
"strings"
"syscall"
"time"

"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -295,7 +297,8 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("unable to recover a container that is not running")
} else {
se.containerPid = alive
completeName := handle.Config.JobName + handle.Config.Name + "_" + handle.Config.AllocID
parts := strings.Split(handle.Config.ID, "/")
completeName := parts[1] + "_" + parts[2] + "_" + parts[0]
Sout, err := se.Stdout()
if err != nil {
d.logger.Error("Error setting stdout with", "err", err)
Expand Down Expand Up @@ -433,7 +436,8 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
}
} else {
se.containerPid = alive
completeName := cfg.JobName + cfg.Name + "_" + cfg.AllocID
parts := strings.Split(cfg.ID, "/")
completeName := parts[1] + "_" + parts[2] + "_" + parts[0]

se.cmd = &exec.Cmd{
Args: []string{"/usr/local/bin/pot", "start", completeName},
Expand Down Expand Up @@ -505,16 +509,45 @@ OuterLoop:

handle, _ := d.tasks.Get(id)
handle.procState = drivers.TaskStateExited

last_run, err := se.getContainerLastRunStats(handle.taskConfig)
if err != nil {
d.logger.Error("Error getting container last-run-stats with err: ", err)
handle.exitResult.ExitCode = defaultFailedCode
} else {
handle.exitResult.ExitCode = last_run.ExitCode
}

err = se.destroyContainer(handle.taskConfig)
if err != nil {
d.logger.Error("Error destroying container with err: ", err)
}
}

func (d *Driver) potWait(taskID string, se syexec) {
handle, _ := d.tasks.Get(taskID)
err := se.cmd.Wait()
handle.procState = drivers.TaskStateExited
if err != nil {
d.logger.Error("Error exiting se.cmd.Wait in potWait", "Err", err)
handle.exitResult.ExitCode = defaultFailedCode
if exitError, ok := err.(*exec.ExitError); ok {
ws := exitError.Sys().(syscall.WaitStatus)
if ws.ExitStatus() == 125 { // enclosed process exited with error
last_run_stats, err := se.getContainerLastRunStats(handle.taskConfig)
if err != nil {
d.logger.Error("Error getting container last-run-stats with err: ", err)
} else {
handle.exitResult.ExitCode = last_run_stats.ExitCode
}
}
}
}
handle.procState = drivers.TaskStateExited

err = se.destroyContainer(handle.taskConfig)
if err != nil {
d.logger.Error("Error destroying container with err: ", err)
}
}

// WaitTask waits for task completion
Expand Down
1 change: 1 addition & 0 deletions driver/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (h *taskHandle) run() {

if h.syexec.ExitError != nil {
h.exitResult.Err = h.syexec.ExitError
h.exitResult.ExitCode = h.syexec.exitCode
h.procState = drivers.TaskStateUnknown
h.completedAt = time.Now()
return
Expand Down
63 changes: 59 additions & 4 deletions driver/pot.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type syexec struct {
argvStart []string
argvStop []string
argvStats []string
argvLastRunStats []string
argvDestroy []string
cmd *exec.Cmd
cachedir string
Expand Down Expand Up @@ -67,6 +68,10 @@ type potStats struct {
} `json:"ResourceUsage"`
}

type lastRunStats struct {
ExitCode int `json:"ExitCode"`
}

var potStatistics map[string]potStats

func init() {
Expand Down Expand Up @@ -364,8 +369,8 @@ func (s *syexec) containerStats(commandCfg *drivers.TaskConfig) (stats potStats,

func (s *syexec) checkContainerAlive(commandCfg *drivers.TaskConfig) int {
s.logger.Trace("Checking if pot is alive", "Checking")
completeName := commandCfg.JobName + commandCfg.Name
potName := completeName + "_" + commandCfg.AllocID
parts := strings.Split(commandCfg.ID, "/")
potName := parts[1] + "_" + parts[2] + "_" + parts[0]
s.logger.Trace("Allocation name beeing check for liveness", "alive", potName)

psCommand := "/bin/sh /usr/local/bin/pot start " + potName
Expand Down Expand Up @@ -395,8 +400,8 @@ func (s *syexec) checkContainerAlive(commandCfg *drivers.TaskConfig) int {

func (s *syexec) checkContainerExists(commandCfg *drivers.TaskConfig) int {
s.logger.Debug("Checking if pot is alive")
completeName := commandCfg.JobName + commandCfg.Name
potName := completeName + "_" + commandCfg.AllocID
parts := strings.Split(commandCfg.ID, "/")
potName := parts[1] + "_" + parts[2] + "_" + parts[0]
s.logger.Trace("Allocation name beeing check for liveness", "alive", potName)

pidCommand := "/usr/local/bin/pot ls -q | grep " + potName
Expand All @@ -420,3 +425,53 @@ func (s *syexec) checkContainerExists(commandCfg *drivers.TaskConfig) int {

return 0
}

func (s *syexec) getContainerLastRunStats(commandCfg *drivers.TaskConfig) (stats lastRunStats, err error) {
s.logger.Debug("launching LastRunStatsContainer command", strings.Join(s.argvLastRunStats, " "))

cmd := exec.Command(potBIN, s.argvLastRunStats...)

// set the task dir as the working directory for the command
cmd.Dir = commandCfg.TaskDir().Dir
cmd.Path = potBIN
cmd.Args = append([]string{cmd.Path}, s.argvLastRunStats...)

var outb, errb bytes.Buffer
cmd.Stdout = &outb
cmd.Stderr = &errb

// Start the process
if err := cmd.Run(); err != nil {
// try to get the exit code
if exitError, ok := err.(*exec.ExitError); ok {
ws := exitError.Sys().(syscall.WaitStatus)
s.exitCode = ws.ExitStatus()
} else {
s.logger.Error("Could not get exit code for container last-run-stats ", "pot", s.argvLastRunStats)
s.exitCode = defaultFailedCode
}
} else {
// success, exitCode should be 0 if go is ok
ws := cmd.ProcessState.Sys().(syscall.WaitStatus)
s.exitCode = ws.ExitStatus()
}

s.cmd = cmd

s.state = &psState{Pid: s.cmd.Process.Pid, ExitCode: s.exitCode, Time: time.Now()}

var lastRunStats lastRunStats

if s.exitCode != 0 {
err = errors.New("Pot exit code different than 0")
return lastRunStats, err
}

err = json.Unmarshal([]byte(outb.String()), &lastRunStats)
if err != nil {
s.logger.Error("Error unmarshaling json with err: ", err)
return lastRunStats, err
}

return lastRunStats, nil
}
33 changes: 20 additions & 13 deletions driver/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ func prepareContainer(cfg *drivers.TaskConfig, taskCfg TaskConfig) (syexec, erro

argv = append(argv, "prepare", "-U", taskCfg.Image, "-p", taskCfg.Pot, "-t", taskCfg.Tag)

if cfg.AllocID != "" {
argv = append(argv, "-a", cfg.AllocID)
}

if len(taskCfg.Args) > 0 {
if taskCfg.Command == "" {
err := errors.New("command can not be empty if arguments are provided")
Expand Down Expand Up @@ -59,12 +55,17 @@ func prepareContainer(cfg *drivers.TaskConfig, taskCfg TaskConfig) (syexec, erro
}
}

completeName := cfg.JobName + cfg.Name
argv = append(argv, "-n", completeName, "-v")
parts := strings.Split(cfg.ID, "/")
baseName := parts[1]
jobIDAllocID := parts[2] + "_" + parts[0]
if jobIDAllocID != "" {
argv = append(argv, "-a", jobIDAllocID)
}
argv = append(argv, "-n", baseName, "-v")

se.argvCreate = argv

potName := completeName + "_" + cfg.AllocID
potName := baseName + "_" + jobIDAllocID

//Mount local
commandLocal := "mount-in -p " + potName + " -d " + cfg.TaskDir().LocalDir + " -m /local"
Expand Down Expand Up @@ -140,14 +141,18 @@ func prepareContainer(cfg *drivers.TaskConfig, taskCfg TaskConfig) (syexec, erro
argvStop = append(argvStop, "stop", potName)
se.argvStop = argvStop

argvDestroy := make([]string, 0, 50)
argvDestroy = append(argvDestroy, "destroy", "-p", potName)
se.argvDestroy = argvDestroy

argvStats := make([]string, 0, 50)
argvStats = append(argvStats, "get-rss", "-p", potName, "-J")
se.argvStats = argvStats

argvLastRunStats := make([]string, 0, 50)
argvLastRunStats = append(argvLastRunStats, "last-run-stats", "-p", potName)
se.argvLastRunStats = argvLastRunStats

argvDestroy := make([]string, 0, 50)
argvDestroy = append(argvDestroy, "destroy", "-p", potName)
se.argvDestroy = argvDestroy

return se, nil
}

Expand Down Expand Up @@ -209,7 +214,8 @@ func prepareStop(cfg *drivers.TaskConfig, taskCfg TaskConfig) syexec {

argv = append(argv, "stop")

completeName := cfg.JobName + cfg.Name + "_" + cfg.AllocID
parts := strings.Split(cfg.ID, "/")
completeName := parts[1] + "_" + parts[2] + "_" + parts[0]

argv = append(argv, completeName)

Expand All @@ -229,7 +235,8 @@ func prepareDestroy(cfg *drivers.TaskConfig, taskCfg TaskConfig) syexec {

argv = append(argv, "destroy")

completeName := cfg.JobName + cfg.Name + "_" + cfg.AllocID
parts := strings.Split(cfg.ID, "/")
completeName := parts[1] + "_" + parts[2] + "_" + parts[0]

argv = append(argv, "-p", completeName)

Expand Down