Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown done in pretty ugly way :) #95

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ _cgo_export.*
_testmain.go

*.exe

# Other Go stuff
go.mod
go.sum
219 changes: 219 additions & 0 deletions admin/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/* Borrowed from: github.com/draxil/gearman_admin */
package admin

import (
"bufio"
"fmt"
"io"
"net"
"strings"
"strconv"
"regexp"
)

// Connection to a gearman server
type Connection struct {
net.Conn
}

var colrx *regexp.Regexp

func init(){
colrx = regexp.MustCompile("[ \t]+")
}

// Connect to a gearman server
// gearadmin, err := gearman_admin.Connect("tcp", "gearman:4730")
func Connect(network, address string) (connection *Connection, err error) {
connection = &Connection{}
connection.Conn, err = net.Dial(network, address)

if err != nil {
err = fmt.Errorf("Error connecting to gearman server: %s", err.Error())
return
}

return
}

// Query connected workers list
func (c *Connection) Workers() (workers []Worker, err error) {
_, err = c.Write([]byte("workers\n"))
if err != nil {
return nil, err
}

var lines []string
lines, err = read_response(c)
if err != nil {
return nil, err
}

workers, err = workers_from_lines(lines)

return workers, err
}

// Query known tasks and their statuses
func (c *Connection) Status() (statuses []FunctionStatus, err error) {
_, err = c.Write([]byte("status\n"))
if err != nil {
err = fmt.Errorf("Error requesting function status list: %s", err.Error())
return
}

var lines []string
lines, err = read_response(c)

if err != nil {
err = fmt.Errorf("Error getting function status list: %s", err.Error())
}

statuses, err = func_statuses_from_lines(lines)

if err != nil {
err = fmt.Errorf("Error getting function status list: %s", err.Error())
statuses = []FunctionStatus{}
}

return
}

func process_line( line string ) []string{
parts := colrx.Split(line, -1)

return parts
}

func func_statuses_from_lines(lines []string) (funcs []FunctionStatus, err error){
funcs = make([]FunctionStatus, 0, len(lines))

for _, line := range lines {
parts := process_line(line)

if len(parts) < 3 {
err = ProtocolError("Incomplete status entry only " + strconv.Itoa(len(parts)) + " fields found: " + line)
return
}

var fs FunctionStatus

fs.Name = parts[0]

var unfinished, running, workers int

unfinished, err = strconv.Atoi(parts[1])
if err != nil {
err = ProtocolError("bad unfinished count format: " + err.Error())
return
}

running, err = strconv.Atoi(parts[2])

if err != nil {
err = ProtocolError("bad running count format: " + err.Error())
return
}

workers, err = strconv.Atoi(parts[3])
if err != nil {
err = ProtocolError("bad worker count format: " + err.Error())
return
}

fs.UnfinishedJobs = unfinished
fs.RunningJobs = running
fs.Workers = workers

funcs = append(funcs, fs)
}

return

}

func workers_from_lines(lines []string) (workers []Worker, err error) {
workers = make([]Worker, 0, len(lines))

for _, line := range lines {
parts := process_line(line)

if len(parts) < 4 {
err = ProtocolError("Incomplete worker entry")
return
}

if parts[3] != `:` {
err = ProtocolError("Malformed worker entry '" + parts[3] + "'")
return
}

worker := Worker {
Fd : parts[0],
Addr : parts[1],
ClientId : parts[2],
}

if len(parts) > 4 {
worker.Functions = parts[4:]
}

workers = append(workers, worker)
}

return
}


// Decoded description of a gearman worker
type Worker struct {
Fd string // File descriptor
Addr string // IP address
ClientId string // Client ID
Functions []string // List of functions
}

// Check a worker for a particular function
func (w *Worker) HasFunction(funcname string) bool {
for _, v := range w.Functions {
if v == funcname {
return true
}
}
return false
}

func read_response(r io.Reader) (lines []string, err error) {
rdr := bufio.NewReader(r)
lines = make([]string, 0, 0)
for {
line := ""
if line, err = rdr.ReadString('\n'); err != nil {
return nil, err

} else if line == ".\n" {
return lines, nil

} else {
lines = append(lines, strings.TrimRight(line, "\n"))

}
}

return lines, nil
}

// Decoded description of a functions current status
type FunctionStatus struct {
Name string // Function name
UnfinishedJobs int // Number of unfinished jobs
RunningJobs int // Number of running jobs
Workers int // Number of workers available
}

// Protocol error
type ProtocolError string

func (p ProtocolError) Error() string {
return "ProtoFail: " + string(p)
}
4 changes: 4 additions & 0 deletions worker/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (a *agent) work() {
var err error
var data, leftdata []byte
for {
if a.worker.stopped {
return
}

if data, err = a.read(); err != nil {
if opErr, ok := err.(*net.OpError); ok {
if opErr.Temporary() {
Expand Down
28 changes: 25 additions & 3 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ type Worker struct {
funcs jobFuncs
in chan *inPack
running bool
stopped bool
ready bool
active sync.WaitGroup

Id string
ErrorHandler ErrorHandler
Expand All @@ -39,9 +41,10 @@ type Worker struct {
// OneByOne(=1), there will be only one job executed in a time.
func New(limit int) (worker *Worker) {
worker = &Worker{
agents: make([]*agent, 0, limit),
funcs: make(jobFuncs),
in: make(chan *inPack, queueSize),
agents: make([]*agent, 0, limit),
funcs: make(jobFuncs),
in: make(chan *inPack, queueSize),
stopped: false,
}
if limit != Unlimited {
worker.limit = make(chan bool, limit-1)
Expand Down Expand Up @@ -219,6 +222,20 @@ func (worker *Worker) customeHandler(inpack *inPack) {
}
}

// Stop serving
func (worker *Worker) Stop() {
// Set stopped flag
worker.stopped = true
}

// Wait for completeness serving
func (worker *Worker) WaitRunning() {
// Wait for all the running activities has stopped
if worker.stopped {
worker.active.Wait()
}
}

// Close connection and exit main loop
func (worker *Worker) Close() {
worker.Lock()
Expand Down Expand Up @@ -261,6 +278,8 @@ func (worker *Worker) SetId(id string) {
// inner job executing
func (worker *Worker) exec(inpack *inPack) (err error) {
defer func() {
worker.active.Done()

if worker.limit != nil {
<-worker.limit
}
Expand All @@ -276,6 +295,9 @@ func (worker *Worker) exec(inpack *inPack) (err error) {
if !ok {
return fmt.Errorf("The function does not exist: %s", inpack.fn)
}

worker.active.Add(1)

var r *result
if f.timeout == 0 {
d, e := f.f(inpack)
Expand Down