Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.

[DCOS-58437] Deploy workloads under role-enforced group. #550

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ef3c10d
Add visual separations for the script's sections.
mpereira Sep 11, 2019
70579ca
Make container name be dependent on test parameter file name.
mpereira Sep 11, 2019
9564118
Check package repos every time, create Marathon group.
mpereira Sep 11, 2019
287a9b9
Use container name for docker image tag and run directory.
mpereira Sep 12, 2019
04717bd
Automatically format some files with Black.
mpereira Oct 7, 2019
9fbca5b
Parameterize group file name, run commands in container.
mpereira Oct 7, 2019
0e856a2
Run commands in the container.
mpereira Oct 7, 2019
d9a7e96
Quote this.
mpereira Oct 7, 2019
585beac
Make deploy-dispatchers.py script support group roles.
mpereira Oct 7, 2019
c4d870f
Show CLUSTER_URL in pre-run report.
mpereira Oct 7, 2019
cd95b27
Add group_role support to streaming workload deploy script.
mpereira Oct 7, 2019
5c1c912
Make sure service name is prefixed with a slash.
mpereira Oct 7, 2019
f40351a
Add group_role support to the batch_test.py script.
mpereira Oct 7, 2019
0b25ba5
Add required roles and permissions required for group role enforcement.
mpereira Oct 7, 2019
0a2f2d1
Add spark-options.json file.
mpereira Oct 7, 2019
2bd9407
Typo.
mpereira Oct 8, 2019
73b5824
Add CLI parameter description.
mpereira Oct 9, 2019
bf0fcf0
Remove failing jobs stuff.
mpereira Oct 9, 2019
0025ef4
Fix shellcheck warning.
mpereira Oct 9, 2019
50b56ef
Variable renames, shellcheck fixes, DSEngine, total cpu/mem/gpu.
mpereira Oct 9, 2019
b849f5d
GROUP_NAME should be coming with no slash prefix.
mpereira Oct 9, 2019
2f1075b
Create quota if it doesn't exit.
mpereira Oct 9, 2019
222cbe2
Install recent DC/OS CLI.
mpereira Oct 10, 2019
464a17b
Fix quoting.
mpereira Oct 10, 2019
af9129e
Should be group name here.
mpereira Oct 10, 2019
c2daf5e
Make revoke_permissions() also take role list.
mpereira Oct 10, 2019
65b3fd9
Fix indentation.
mpereira Oct 10, 2019
bec6c9f
Make DC/OS CLI binary a parameter.
mpereira Oct 10, 2019
5b34f05
Don't break out of loop, just skip to the next element.
mpereira Oct 10, 2019
3cd62ec
This was breaking.
mpereira Oct 10, 2019
b9fa2ec
Add script to list service tasks.
mpereira Oct 10, 2019
1931abb
service_options was being used before being set.
mpereira Oct 10, 2019
956200c
Fix environment variable name.
mpereira Oct 10, 2019
976a775
installing jupiter
alexeygorobets Oct 10, 2019
5d25c89
Improve DSEngine workload deployment script and options.
mpereira Oct 10, 2019
78904b7
rename dsengine options to dsengine-options.jso
alexeygorobets Oct 11, 2019
275da0f
rename dsengine options to dsengine-options.json
alexeygorobets Oct 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 107 additions & 42 deletions scale-tests/batch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

Options:
--docker-image <img> docker image to run on executors
--group-role <group-role> root-level group to apply quotas against (e.g. '/dev') [default: None]
--max-num-dispatchers <n> maximum number of dispatchers to use from dispatchers file
--submits-per-min <n> number of jobs to submit per minute [default: 1]
--spark-cores-max <n> max executor cores per job [default: 1]
Expand All @@ -24,13 +25,15 @@
"""


from docopt import docopt
from threading import Thread

import json
import logging
import os
import random
import sys
import time
from docopt import docopt
from threading import Thread
import typing

import sdk_utils
Expand All @@ -48,9 +51,10 @@


logging.basicConfig(
format='[%(asctime)s|%(name)s|%(levelname)s]: %(message)s',
format="[%(asctime)s|%(name)s|%(levelname)s]: %(message)s",
level=logging.INFO,
stream=sys.stdout)
stream=sys.stdout,
)

log = logging.getLogger(__name__)
MONTE_CARLO_APP_URL = "https://raw.githubusercontent.com/mesosphere/spark-build/master/scale-tests/apps/monte-carlo-portfolio.py"
Expand All @@ -76,62 +80,99 @@ def _get_duration() -> int:

def _get_gpu_user_conf(args):
def _verify_required_args():
if not (args["--spark-mesos-max-gpus"] and
args["--spark-mesos-executor-gpus"] and
args["--docker-image"]):
log.error("""
if not (
args["--spark-mesos-max-gpus"]
and args["--spark-mesos-executor-gpus"]
and args["--docker-image"]
):
log.error(
"""
Missing required arguments for running gpu jobs. Please include:
--spark-mesos-max-gpus
--spark-mesos-executor-gpus
--docker-image
""")
"""
)

_verify_required_args()

# Based on testing, 20gb per GPU is needed to run the job successfully.
# This is due to memory being divvied up and allocated to each GPU device.
memory_multiplier = 20
memory = int(args["--spark-mesos-executor-gpus"]) * memory_multiplier
return ["--conf", "spark.driver.memory={}g".format(str(memory)),
"--conf", "spark.executor.memory={}g".format(str(memory)),
"--conf", "spark.mesos.gpus.max={}".format(args["--spark-mesos-max-gpus"]),
"--conf", "spark.mesos.executor.gpus={}".format(args["--spark-mesos-executor-gpus"]),
"--conf", "spark.mesos.executor.docker.image={}".format(args["--docker-image"]),
"--conf", "spark.mesos.executor.docker.forcePullImage=false"
]


def submit_job(app_url: str, app_args: str, dispatcher: typing.Dict, duration: int, config: typing.List[str]):
return [
"--conf",
"spark.driver.memory={}g".format(str(memory)),
"--conf",
"spark.executor.memory={}g".format(str(memory)),
"--conf",
"spark.mesos.gpus.max={}".format(args["--spark-mesos-max-gpus"]),
"--conf",
"spark.mesos.executor.gpus={}".format(args["--spark-mesos-executor-gpus"]),
"--conf",
"spark.mesos.executor.docker.image={}".format(args["--docker-image"]),
"--conf",
"spark.mesos.executor.docker.forcePullImage=false",
]


def submit_job(
app_url: str,
app_args: str,
dispatcher: typing.Dict,
duration: int,
config: typing.List[str],
group_role: str,
):
dispatcher_name = dispatcher["service"]["name"]
log.info("Submitting job to dispatcher: %s, with duration: %s min.", dispatcher_name, duration)

driver_role = None if group_role else dispatcher["roles"]["executors"]

spark_utils.submit_job(
service_name=dispatcher_name,
app_url=app_url,
app_args=app_args,
verbose=False,
args=config,
driver_role=dispatcher["roles"]["executors"],
driver_role=driver_role,
spark_user=dispatcher["service"]["user"] if sdk_utils.is_strict_mode() else None,
principal=dispatcher["service"]["service_account"] if sdk_utils.is_strict_mode() else None)
principal=dispatcher["service"]["service_account"] if sdk_utils.is_strict_mode() else None,
)


def submit_loop(app_url: str, submits_per_min: int, dispatchers: typing.List[typing.Dict], user_conf: typing.List[str]):
def submit_loop(
app_url: str,
submits_per_min: int,
dispatchers: typing.List[typing.Dict],
user_conf: typing.List[str],
group_role: str,
):
sec_between_submits = 60 / submits_per_min
log.info("sec_between_submits: %s", sec_between_submits)
num_dispatchers = len(dispatchers)
log.info("num_dispatchers: %s", num_dispatchers)

dispatcher_index = 0
while(True):
while True:
duration = _get_duration()

if app_url == MONTE_CARLO_APP_URL:
app_args = "100000 {}".format(str(duration * 30)) # about 30 iterations per min.
else:
app_args = "550 3" # 550 images in 3 batches

t = Thread(target=submit_job, args=(app_url, app_args, dispatchers[dispatcher_index], duration, user_conf))
t = Thread(
target=submit_job,
args=(
app_url,
app_args,
dispatchers[dispatcher_index],
duration,
user_conf,
group_role,
),
)
t.start()
dispatcher_index = (dispatcher_index + 1) % num_dispatchers
log.info("sleeping %s sec.", sec_between_submits)
Expand All @@ -151,35 +192,57 @@ def submit_loop(app_url: str, submits_per_min: int, dispatchers: typing.List[typ
if end <= len(dispatchers):
dispatchers = dispatchers[0:end]
else:
log.warning("""
log.warning(
"""
Specified --max-num-dispatchers is greater than actual dispatcher count in {}.
Using list of dispatchers from file instead.
""".format(args["<dispatcher_file>"]))

user_conf = ["--conf", "spark.cores.max={}".format(args["--spark-cores-max"]),
"--conf", "spark.executor.cores={}".format(args["--spark-executor-cores"]),
"--conf", "spark.mesos.containerizer={}".format(args["--spark-mesos-containerizer"]),
"--conf", "spark.port.maxRetries={}".format(args["--spark-port-max-retries"]),
"--conf", "spark.mesos.driver.failoverTimeout={}".format(args["--spark-mesos-driver-failover-timeout"])
]
""".format(
args["<dispatcher_file>"]
)
)

user_conf = [
"--conf",
"spark.cores.max={}".format(args["--spark-cores-max"]),
"--conf",
"spark.executor.cores={}".format(args["--spark-executor-cores"]),
"--conf",
"spark.mesos.containerizer={}".format(args["--spark-mesos-containerizer"]),
"--conf",
"spark.port.maxRetries={}".format(args["--spark-port-max-retries"]),
"--conf",
"spark.mesos.driver.failoverTimeout={}".format(
args["--spark-mesos-driver-failover-timeout"]
),
]

if args["--spark-mesos-executor-gpus"]:
user_conf += _get_gpu_user_conf(args)
MEMORY_MULTIPLIER = 20
memory = int(args["--spark-mesos-executor-gpus"]) * MEMORY_MULTIPLIER
user_conf += ["--conf", "spark.driver.memory={}g".format(str(memory)),
"--conf", "spark.executor.memory={}g".format(str(memory)),
"--conf", "spark.mesos.gpus.max={}".format(args["--spark-mesos-max-gpus"]),
"--conf", "spark.mesos.executor.gpus={}".format(args["--spark-mesos-executor-gpus"]),
"--conf", "spark.mesos.executor.docker.image={}".format(args["--docker-image"]),
"--conf", "spark.mesos.executor.docker.forcePullImage=false"
]
user_conf += [
"--conf",
"spark.driver.memory={}g".format(str(memory)),
"--conf",
"spark.executor.memory={}g".format(str(memory)),
"--conf",
"spark.mesos.gpus.max={}".format(args["--spark-mesos-max-gpus"]),
"--conf",
"spark.mesos.executor.gpus={}".format(args["--spark-mesos-executor-gpus"]),
"--conf",
"spark.mesos.executor.docker.image={}".format(args["--docker-image"]),
"--conf",
"spark.mesos.executor.docker.forcePullImage=false",
]
app_url = GPU_IMAGE_RECOGNITION_APP_URL
else:
app_url = MONTE_CARLO_APP_URL

if args["--spark-mesos-driver-labels"] is not None:
user_conf += ["--conf", "spark.mesos.driver.labels={}".format(args["--spark-mesos-driver-labels"])]
user_conf += [
"--conf",
"spark.mesos.driver.labels={}".format(args["--spark-mesos-driver-labels"]),
]

if not args["--no-supervise"]:
user_conf += ["--supervise"]
Expand All @@ -188,4 +251,6 @@ def submit_loop(app_url: str, submits_per_min: int, dispatchers: typing.List[typ
end = int(args["--max-num-dispatchers"])
dispatchers = dispatchers[0:end]

submit_loop(app_url, int(args["--submits-per-min"]), dispatchers, user_conf)
group_role = args["--group-role"]

submit_loop(app_url, int(args["--submits-per-min"]), dispatchers, user_conf, group_role)
17 changes: 17 additions & 0 deletions scale-tests/configs/dsengine-options.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"service": {
"name": "data-services/jupyter",
"service_account": "data_services__jupyter",
"service_account_secret": "data_services__jupyter-secret",
"gpu": {
"enabled": true
},
"virtual_network_enabled": true,
"virtual_network_name": "dcos"
},
"spark": {
"spark_mesos_role": "data_services__jupyter",
"spark_mesos_principal": "data_services__jupyter",
"spark_mesos_secret": "data_services__jupyter-secret"
}
}
7 changes: 7 additions & 0 deletions scale-tests/configs/spark-options.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"service": {
"role": "data-services",
"enforce_role": true,
"virtual_network_enabled": true
}
}
Loading