Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a parameter to control the nice level #621

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/disco/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ class Worker(dict):

The :class:`Worker` base class defines the following parameters:

:type nice: int
:param nice: niceness of the job (default 19).
Niceness values range from -20 (most favorable to the process) to 19 (least favorable to the process).
Values lower than 0 need root.

:type save_results: bool
:param save_results: whether or not to save the output to :ref:`DDFS`.

Expand Down Expand Up @@ -148,6 +153,7 @@ def defaults(self):
:return: dict of default values for the :class:`Worker`.
"""
return {'save_results': False,
'nice': 19,
'profile': False,
'required_files': {},
'required_modules': None}
Expand Down Expand Up @@ -198,6 +204,7 @@ def jobdict(self, job, **jobargs):
:return: :ref:`jobdict` dict.
"""
return {'prefix': self.getitem('name', job, jobargs),
'nice': self.getitem('nice', job, jobargs, 19),
'save_results': self.getitem('save_results', job, jobargs, False),
'save_info': self.getitem('save_info', job, jobargs, "ddfs"),
'scheduler': self.getitem('scheduler', job, jobargs, {}),
Expand Down
2 changes: 2 additions & 0 deletions lib/disco/worker/classic/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ def get(key, default=None):
has_reduce = bool(get('reduce'))
reduce_shuffle = bool(get('reduce_shuffle'))
job_input = get('input', [])
nice = get('nice', 19)
has_save_results = get('save', False) or get('save_results', False)

if not isiterable(job_input):
Expand Down Expand Up @@ -292,6 +293,7 @@ def get(key, default=None):
'reduce?': has_reduce,
'reduce_shuffle?': reduce_shuffle,
'nr_reduces': nr_reduces,
'nice': nice,
'save_results': has_save_results})
return jobdict

Expand Down
2 changes: 2 additions & 0 deletions master/include/pipeline.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
schedule :: task_schedule(),
input :: [input_id()],
all_inputs:: boolean(),
nice :: integer(),
save_outputs :: boolean(),
save_info :: string()}).
-type task_spec() :: #task_spec{}.
Expand Down Expand Up @@ -136,6 +137,7 @@
inputs = [] :: [task_output()],
pipeline = [] :: pipeline(),
schedule :: task_schedule(),
nice = 19 :: integer(),
save_results = false :: boolean(),
save_info :: string()}).
-type jobinfo() :: #jobinfo{}.
4 changes: 2 additions & 2 deletions master/src/disco_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ handle_cast(start, #state{task = Task, master = Master} = State) ->
{stop, {shutdown, {error, E}}, State}
end;
handle_cast(work, #state{task = T, port = none} = State) ->
{#task_spec{jobname = JobName, worker = W, jobenvs = JE}, #task_run{}} = T,
{#task_spec{jobname = JobName, nice = Nice, worker = W, jobenvs = JE}, #task_run{}} = T,
JobHome = jobhome(JobName),
Worker = filename:join(JobHome, binary_to_list(W)),
Command = "nice -n 19 " ++ Worker,
Command = "nice -n " ++ integer_to_list(Nice) ++ " " ++ Worker,
JobEnvs = [{S, false} || S <- disco:settings()] ++ JE,
Options = [{cd, JobHome},
stream,
Expand Down
2 changes: 2 additions & 0 deletions master/src/job_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ make_stage_tasks(Stage, _Grouping, [],
make_stage_tasks(Stage, Grouping, [{G, Inputs}|Rest],
#state{jobinfo = #jobinfo{jobname = JN,
jobenvs = JE,
nice = Nice,
save_info = SaveInfo,
save_results = Save,
worker = W},
Expand Down Expand Up @@ -755,6 +756,7 @@ make_stage_tasks(Stage, Grouping, [{G, Inputs}|Rest],
grouping = Grouping,
job_coord = self(),
schedule = Schedule,
nice = Nice,
save_outputs = SaveOutputs,
save_info = SaveInfo},

Expand Down
25 changes: 25 additions & 0 deletions master/src/jobpack.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ jobdict(<<?MAGIC:16/big,
-spec core_jobinfo(jobpack(), job_dict()) -> {jobname(), jobinfo()}.
core_jobinfo(JobPack, JobDict) ->
Prefix = find(<<"prefix">>, JobDict),
Nice = find(<<"nice">>, JobDict, 19),
SaveResults = find(<<"save_results">>, JobDict, false),
SaveInfo = find(<<"save_info">>, JobDict, "ddfs"),
JobInfo = #jobinfo{jobenvs = jobenvs(JobPack),
worker = find(<<"worker">>, JobDict),
owner = find(<<"owner">>, JobDict),
nice = validate_nice(Nice),
save_info = validate_save_info(SaveInfo),
save_results = validate_save_results(SaveResults)},
{validate_prefix(Prefix), JobInfo}.
Expand Down Expand Up @@ -220,6 +222,29 @@ validate_inputs(Inputs) ->
|| [L, Sz, Urls] <- Inputs]
end.

-spec validate_nice(term()) -> integer().
validate_nice(S) ->
case json_validator:validate(integer, S) of
{error, E} ->
lager:warning("Invalid nice in jobpack: ~s",
json_validator:error_msg(E)),
throw({error, invalid_job_nice});
_ ->
case S > 19 of
true ->
lager:warning("Invalid nice in jobpack: ~s is too high", S),
throw({error, invalid_job_nice});
false ->
case S < -20 of
true ->
lager:warning("Invalid nice in jobpack: ~s is too low", S),
throw({error, invalid_job_nice});
false ->
S
end
end
end.

-spec validate_save_info(binary() | list()) -> string().
validate_save_info(S) when is_binary(S)->
validate_save_info(binary_to_list(S));
Expand Down