Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support linting python wheel tasks #1821

Merged
merged 38 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c3dbe02
Add integration test for python wheel task
JCZuurmond Jun 3, 2024
e8c5fc5
Implement simple version to register python wheel task
JCZuurmond Jun 3, 2024
190e588
Format
JCZuurmond Jun 3, 2024
1c69253
Overwrite path lookup to avoid finding library in current working dir…
JCZuurmond Jun 3, 2024
4544ba9
Remove pylint ignore comment
JCZuurmond Jun 3, 2024
3b66da6
Mock white list
JCZuurmond Jun 3, 2024
00e90b8
Change module reference
JCZuurmond Jun 3, 2024
1ebdfb7
Move libraries
JCZuurmond Jun 3, 2024
803eedd
Fix mock whitelist
JCZuurmond Jun 3, 2024
e544caa
Register python wheel task using dist info
JCZuurmond Jun 3, 2024
c2fdadf
Update test asserts
JCZuurmond Jun 3, 2024
f4f48a3
Move existence and directory check into library root
JCZuurmond Jun 4, 2024
3652b26
Rewrite based on PR feedback
JCZuurmond Jun 4, 2024
277b5ad
Use legacy normalized name for distribution name
JCZuurmond Jun 4, 2024
bfdaa9f
Match distribution using a function
JCZuurmond Jun 4, 2024
cb65bf2
Format
JCZuurmond Jun 4, 2024
e88a93b
Fix returns
JCZuurmond Jun 4, 2024
c5c2bc9
Change import
JCZuurmond Jun 4, 2024
07e367d
Use glob to select dist info
JCZuurmond Jun 4, 2024
c04445b
Change mypy ignore to type ignore
JCZuurmond Jun 4, 2024
a3eaa96
Fix tests
JCZuurmond Jun 4, 2024
2e171ce
Handle permission error in library root
JCZuurmond Jun 4, 2024
794545d
Add unit test for distribution not found
JCZuurmond Jun 4, 2024
b515633
Test for entry point not found
JCZuurmond Jun 4, 2024
e05d22f
Import Callable from collections
JCZuurmond Jun 4, 2024
c2443e4
Make _find_distribution a static method
JCZuurmond Jun 4, 2024
3f7aadc
Update name to more accurate
JCZuurmond Jun 4, 2024
c1ccb93
Fix comment
JCZuurmond Jun 4, 2024
642b7c5
Fix test
JCZuurmond Jun 4, 2024
2672c22
Add type ignore
JCZuurmond Jun 4, 2024
f39ef9e
Move legacy normalize into separate function
JCZuurmond Jun 4, 2024
8b920d8
Move name check into function
JCZuurmond Jun 4, 2024
210fc49
Move return into try
JCZuurmond Jun 4, 2024
b8e7846
Use non-legacy normalize
JCZuurmond Jun 4, 2024
0958fe5
Fix reference to testing wheel
JCZuurmond Jun 4, 2024
57e8976
Add happy path test
JCZuurmond Jun 4, 2024
5f1f8e3
Normalize name once
JCZuurmond Jun 4, 2024
1df714d
Move return out of try-except
JCZuurmond Jun 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions src/databricks/labs/ucx/source_code/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import tempfile
from collections.abc import Iterable
from dataclasses import dataclass
from importlib import metadata
from pathlib import Path

from databricks.labs.blueprint.parallel import Threads
Expand Down Expand Up @@ -139,11 +140,37 @@ def _register_spark_python_task(self, graph: DependencyGraph):
path = WorkspacePath(self._ws, notebook_path)
return graph.register_notebook(path)

def _register_python_wheel_task(self, graph: DependencyGraph): # pylint: disable=unused-argument
@staticmethod
def _find_first_matching_distribution(path_lookup: PathLookup, name: str) -> metadata.Distribution | None:
# Prepared exists in importlib.metadata.__init__pyi, but is not defined in importlib.metadata.__init__.py
normalize_name = metadata.Prepared.normalize # type: ignore
normalized_name = normalize_name(name)
for library_root in path_lookup.library_roots:
for path in library_root.glob("*.dist-info"):
distribution = metadata.Distribution.at(path)
if normalize_name(distribution.name) == normalized_name:
return distribution
return None

def _register_python_wheel_task(self, graph: DependencyGraph) -> Iterable[DependencyProblem]:
if not self._task.python_wheel_task:
return
# TODO: https://github.com/databrickslabs/ucx/issues/1640
yield DependencyProblem('not-yet-implemented', 'Python wheel task is not yet implemented')
return []

distribution_name = self._task.python_wheel_task.package_name
distribution = self._find_first_matching_distribution(graph.path_lookup, distribution_name)
if distribution is None:
return [DependencyProblem("distribution-not-found", f"Could not find distribution for {distribution_name}")]
entry_point_name = self._task.python_wheel_task.entry_point
try:
entry_point = distribution.entry_points[entry_point_name]
except KeyError:
return [
DependencyProblem(
"distribution-entry-point-not-found",
f"Could not find distribution entry point for {distribution_name}.{entry_point_name}",
)
]
return graph.register_import(entry_point.module)

def _register_spark_jar_task(self, graph: DependencyGraph): # pylint: disable=unused-argument
if not self._task.spark_jar_task:
Expand Down
10 changes: 9 additions & 1 deletion src/databricks/labs/ucx/source_code/path_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,15 @@ def remove_path(self, index: int):
@property
def library_roots(self) -> list[Path]:
# we may return a combination of WorkspacePath and PosixPath here
return [self._cwd] + self._sys_paths
library_roots = []
for library_root in [self._cwd] + self._sys_paths:
try:
is_existing_directory = library_root.exists() and library_root.is_dir()
except PermissionError:
continue
if is_existing_directory:
library_roots.append(library_root)
return library_roots

@property
def cwd(self):
Expand Down
35 changes: 35 additions & 0 deletions tests/integration/source_code/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,38 @@ def test_job_spark_python_task_linter_unhappy_path(

problems = simple_ctx.workflow_linter.lint_job(j.job_id)
assert len([problem for problem in problems if problem.message == "Could not locate import: greenlet"]) == 1


def test_workflow_linter_lints_python_wheel_task(simple_ctx, ws, make_job, make_random):
whitelist = create_autospec(Whitelist) # databricks is in default list
whitelist.module_compatibility.return_value = UNKNOWN
whitelist.distribution_compatibility.return_value = UNKNOWN

simple_ctx = simple_ctx.replace(
whitelist=whitelist,
path_lookup=PathLookup(Path("/non/existing/path"), []), # Avoid finding current project
)

simple_ctx.workspace_installation.run() # Creates ucx wheel
wheels = [file for file in simple_ctx.installation.files() if file.path.endswith(".whl")]
library = compute.Library(whl=wheels[0].path)

python_wheel_task = jobs.PythonWheelTask("databricks_labs_ucx", "runtime")
task = jobs.Task(
task_key=make_random(4),
python_wheel_task=python_wheel_task,
new_cluster=compute.ClusterSpec(
num_workers=1,
node_type_id=ws.clusters.select_node_type(local_disk=True, min_memory_gb=16),
spark_version=ws.clusters.select_spark_version(latest=True),
),
timeout_seconds=0,
libraries=[library],
)
job_with_ucx_library = make_job(tasks=[task])

problems = simple_ctx.workflow_linter.lint_job(job_with_ucx_library.job_id)

assert len([problem for problem in problems if problem.code == "library-dist-info-not-found"]) == 0
assert len([problem for problem in problems if problem.code == "library-entrypoint-not-found"]) == 0
whitelist.distribution_compatibility.assert_called_once_with(Path(wheels[0].path).name)
55 changes: 55 additions & 0 deletions tests/unit/source_code/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,58 @@ def test_workflow_task_container_builds_dependency_graph_with_known_egg_library(
assert len(problems) == 0
assert graph.path_lookup.resolve(Path("thingy")) is not None
ws.workspace.download.assert_called_once_with(egg_file.as_posix(), format=ExportFormat.AUTO)


def test_workflow_task_container_builds_dependency_graph_with_missing_distribution_in_python_wheel_task(
mock_path_lookup,
graph,
):
ws = create_autospec(WorkspaceClient)
python_wheel_task = jobs.PythonWheelTask(package_name="databricks_labs_ucx", entry_point="runtime")
task = jobs.Task(task_key="test", python_wheel_task=python_wheel_task)

workflow_task_container = WorkflowTaskContainer(ws, task)
problems = workflow_task_container.build_dependency_graph(graph)

assert len(problems) == 1
assert problems[0].code == "distribution-not-found"
assert problems[0].message == "Could not find distribution for databricks_labs_ucx"
ws.assert_not_called()


def test_workflow_task_container_builds_dependency_graph_with_missing_entrypoint_in_python_wheel_task(graph):
ws = create_autospec(WorkspaceClient)

whl_file = Path(__file__).parent / "samples/distribution/dist/thingy-0.0.1-py2.py3-none-any.whl"
with whl_file.open("rb") as f:
ws.workspace.download.return_value = io.BytesIO(f.read())

python_wheel_task = jobs.PythonWheelTask(package_name="thingy", entry_point="non_existing_entrypoint")
libraries = [compute.Library(whl=whl_file.as_posix())]
task = jobs.Task(task_key="test", libraries=libraries, python_wheel_task=python_wheel_task)

workflow_task_container = WorkflowTaskContainer(ws, task)
problems = workflow_task_container.build_dependency_graph(graph)

assert len(problems) == 1
assert problems[0].code == "distribution-entry-point-not-found"
assert problems[0].message == "Could not find distribution entry point for thingy.non_existing_entrypoint"
ws.workspace.download.assert_called_once_with(whl_file.as_posix(), format=ExportFormat.AUTO)


def test_workflow_task_container_builds_dependency_graph_for_python_wheel_task(graph):
ws = create_autospec(WorkspaceClient)

whl_file = Path(__file__).parent / "samples/distribution/dist/thingy-0.0.1-py2.py3-none-any.whl"
with whl_file.open("rb") as f:
ws.workspace.download.return_value = io.BytesIO(f.read())

python_wheel_task = jobs.PythonWheelTask(package_name="thingy", entry_point="runtime")
libraries = [compute.Library(whl=whl_file.as_posix())]
task = jobs.Task(task_key="test", libraries=libraries, python_wheel_task=python_wheel_task)

workflow_task_container = WorkflowTaskContainer(ws, task)
problems = workflow_task_container.build_dependency_graph(graph)

assert len(problems) == 0
ws.workspace.download.assert_called_once_with(whl_file.as_posix(), format=ExportFormat.AUTO)
47 changes: 34 additions & 13 deletions tests/unit/source_code/test_path_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,43 @@ def test_lookup_is_initialized_with_syspath():
assert len(filtered) > 0


def test_lookup_is_initialized_with_handmade_string():
provider = PathLookup.from_pathlike_string(Path.cwd(), "what:on:earth")
def test_lookup_is_initialized_with_handmade_string(tmp_path):
directories, sys_paths = ("what", "on", "earth"), []
for directory in directories:
path = tmp_path / directory
path.mkdir()
sys_paths.append(path)

provider = PathLookup.from_pathlike_string(Path.cwd(), ":".join([p.as_posix() for p in sys_paths]))

assert provider is not None
paths = provider.library_roots[1:]
assert paths == [Path("what"), Path("on"), Path("earth")]
assert provider.library_roots[1:] == sys_paths


def test_lookup_inserts_path(tmp_path):
directories, sys_paths = ("what", "on", "earth"), []
for directory in directories:
path = tmp_path / directory
path.mkdir()
sys_paths.append(path)

provider = PathLookup.from_pathlike_string(Path.cwd(), ":".join([p.as_posix() for p in sys_paths]))

new_sys_path = tmp_path / Path("is")
new_sys_path.mkdir()
provider.insert_path(1, new_sys_path)

assert provider.library_roots[1:] == [sys_paths[0]] + [new_sys_path] + sys_paths[1:]

def test_lookup_inserts_path():
provider = PathLookup.from_pathlike_string(Path.cwd(), "what:on:earth")
provider.insert_path(1, Path("is"))
paths = provider.library_roots[1:]
assert paths == [Path("what"), Path("is"), Path("on"), Path("earth")]

def test_lookup_removes_path(tmp_path):
directories, sys_paths = ("what", "is", "on", "earth"), []
for directory in directories:
path = tmp_path / directory
path.mkdir()
sys_paths.append(path)

def test_lookup_removes_path():
provider = PathLookup.from_pathlike_string(Path.cwd(), "what:is:on:earth")
provider = PathLookup.from_pathlike_string(Path.cwd(), ":".join([p.as_posix() for p in sys_paths]))
provider.remove_path(1)
paths = provider.library_roots[1:]
assert paths == [Path("what"), Path("on"), Path("earth")]
sys_paths.pop(1)
assert provider.library_roots[1:] == sys_paths
Loading