Skip to content

Commit ba8b4c6

Browse files
committed
Refactor register_job to register_jobs
1 parent d26916b commit ba8b4c6

File tree

3 files changed

+42
-33
lines changed

3 files changed

+42
-33
lines changed

src/databricks/labs/ucx/assessment/sequencing.py

+20-11
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from databricks.sdk import WorkspaceClient
1010
from databricks.sdk.errors import DatabricksError
11-
from databricks.sdk.service import jobs
11+
from databricks.sdk.service.jobs import Job, JobCluster, Task
1212

1313
from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo
1414
from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo
@@ -156,20 +156,29 @@ def __init__(self, ws: WorkspaceClient, administrator_locator: AdministratorLoca
156156

157157
# Outgoing references contains edges in the graph pointing from a node to a set of nodes that the node
158158
# references. These references follow the API references, e.g. a job contains tasks in the
159-
# `jobs.Job.settings.tasks`, thus a job has an outgoing reference to each of those tasks.
159+
# `Job.settings.tasks`, thus a job has an outgoing reference to each of those tasks.
160160
self._outgoing_references: dict[MigrationNodeKey, set[MigrationNode]] = defaultdict(set)
161161

162-
def register_job(self, job: jobs.Job) -> MaybeMigrationNode:
162+
def register_jobs(self, *jobs: Job) -> list[MaybeMigrationNode]:
163163
"""Register a job.
164164
165165
Args:
166-
job (jobs.Job) : The job to register.
166+
jobs (Job) : The jobs to register.
167167
168168
Returns:
169-
MaybeMigrationNode : A maybe migration node, which has the migration node if no problems occurred during
170-
registering. Otherwise, the maybe migration node contains the dependency problems occurring during
171-
registering the job.
169+
list[MaybeMigrationNode] : Each element contains a maybe migration node for each job respectively. If no
170+
problems occurred during registering the job, the maybe migration node contains the migration node.
171+
Otherwise, the maybe migration node contains the dependency problems occurring during registering the
172+
job.
172173
"""
174+
nodes: list[MaybeMigrationNode] = []
175+
for job in jobs:
176+
node = self._register_job(job)
177+
nodes.append(node)
178+
return nodes
179+
180+
def _register_job(self, job: Job) -> MaybeMigrationNode:
181+
"""Register a single job."""
173182
problems: list[DependencyProblem] = []
174183
job_node = self._nodes.get(("JOB", str(job.job_id)), None)
175184
if job_node:
@@ -211,11 +220,11 @@ def register_job(self, job: jobs.Job) -> MaybeMigrationNode:
211220
problems.append(problem)
212221
return MaybeMigrationNode(job_node, problems)
213222

214-
def _register_workflow_task(self, task: jobs.Task, parent: MigrationNode) -> MaybeMigrationNode:
223+
def _register_workflow_task(self, task: Task, parent: MigrationNode) -> MaybeMigrationNode:
215224
"""Register a workflow task.
216225
217226
TODO:
218-
Handle following jobs.Task attributes:
227+
Handle following Task attributes:
219228
- for_each_task
220229
- libraries
221230
- notebook_task
@@ -255,7 +264,7 @@ def _register_workflow_task(self, task: jobs.Task, parent: MigrationNode) -> May
255264
problems.append(problem)
256265
return MaybeMigrationNode(task_node, problems)
257266

258-
def _register_job_cluster(self, cluster: jobs.JobCluster, parent: MigrationNode) -> MaybeMigrationNode:
267+
def _register_job_cluster(self, cluster: JobCluster, parent: MigrationNode) -> MaybeMigrationNode:
259268
"""Register a job cluster.
260269
261270
A job cluster is defined within a job and therefore is found when defined on the job by definition.
@@ -276,7 +285,7 @@ def _register_cluster(self, cluster_id: str) -> MaybeMigrationNode:
276285
"""Register a cluster.
277286
278287
TODO
279-
Handle following jobs.Task attributes:
288+
Handle following Task attributes:
280289
- init_scripts
281290
- instance_pool_id (maybe_not)
282291
- policy_id

tests/integration/assessment/test_sequencing.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ def test_migration_sequencing_simple_job(make_job, runtime_ctx) -> None:
77
"""Sequence a simple job"""
88
job = make_job()
99

10-
maybe_job_node = runtime_ctx.migration_sequencer.register_job(job)
10+
maybe_job_node = runtime_ctx.migration_sequencer.register_jobs(job)[0]
1111
assert not maybe_job_node.failed
1212

1313
steps = runtime_ctx.migration_sequencer.generate_steps()
@@ -30,7 +30,7 @@ def test_migration_sequencing_job_with_task_referencing_cluster(
3030
)
3131
job = make_job(tasks=[task])
3232

33-
maybe_job_node = runtime_ctx.migration_sequencer.register_job(job)
33+
maybe_job_node = runtime_ctx.migration_sequencer.register_jobs(job)[0]
3434
assert not maybe_job_node.failed
3535

3636
steps = runtime_ctx.migration_sequencer.generate_steps()
@@ -45,7 +45,7 @@ def test_migration_sequencing_job_with_task_referencing_non_existing_cluster(run
4545
settings = jobs.JobSettings(name="test-job", tasks=[task])
4646
job = jobs.Job(job_id=1234, settings=settings)
4747

48-
maybe_node = runtime_ctx.migration_sequencer.register_job(job)
48+
maybe_node = runtime_ctx.migration_sequencer.register_jobs(job)[0]
4949
assert maybe_node.failed
5050
assert maybe_node.problems == [
5151
DependencyProblem(

tests/unit/assessment/test_sequencing.py

+19-19
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def admin_locator(ws):
1919
return AdministratorLocator(ws, finders=[lambda _ws: admin_finder])
2020

2121

22-
def test_register_job_with_existing_cluster(ws, admin_locator) -> None:
22+
def test_register_jobs_with_existing_cluster(ws, admin_locator) -> None:
2323
"""Register a job with a task referencing an existing cluster."""
2424
task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123")
2525
settings = jobs.JobSettings(name="test-job", tasks=[task])
@@ -33,12 +33,12 @@ def get_cluster(cluster_id: str) -> ClusterDetails:
3333
ws.clusters.get.side_effect = get_cluster
3434
sequencer = MigrationSequencer(ws, admin_locator)
3535

36-
maybe_node = sequencer.register_job(job)
36+
maybe_node = sequencer.register_jobs(job)[0]
3737

3838
assert not maybe_node.failed
3939

4040

41-
def test_register_job_with_non_existing_cluster(ws, admin_locator) -> None:
41+
def test_register_jobs_with_non_existing_cluster(ws, admin_locator) -> None:
4242
"""Register a job with a task referencing a non-existing cluster."""
4343
task = jobs.Task(task_key="test-task", existing_cluster_id="non-existing-id")
4444
settings = jobs.JobSettings(name="test-job", tasks=[task])
@@ -47,7 +47,7 @@ def test_register_job_with_non_existing_cluster(ws, admin_locator) -> None:
4747
ws.clusters.get.side_effect = ResourceDoesNotExist("Unknown cluster")
4848
sequencer = MigrationSequencer(ws, admin_locator)
4949

50-
maybe_node = sequencer.register_job(job)
50+
maybe_node = sequencer.register_jobs(job)[0]
5151

5252
assert maybe_node.failed
5353
assert maybe_node.problems == [
@@ -58,28 +58,28 @@ def test_register_job_with_non_existing_cluster(ws, admin_locator) -> None:
5858
]
5959

6060

61-
def test_register_job_with_existing_job_cluster_key(ws, admin_locator) -> None:
61+
def test_register_jobs_with_existing_job_cluster_key(ws, admin_locator) -> None:
6262
"""Register a job with a task referencing a existing job cluster."""
6363
job_cluster = jobs.JobCluster("existing-id", ClusterSpec())
6464
task = jobs.Task(task_key="test-task", job_cluster_key="existing-id")
6565
settings = jobs.JobSettings(name="test-job", tasks=[task], job_clusters=[job_cluster])
6666
job = jobs.Job(job_id=1234, settings=settings)
6767
sequencer = MigrationSequencer(ws, admin_locator)
6868

69-
maybe_node = sequencer.register_job(job)
69+
maybe_node = sequencer.register_jobs(job)[0]
7070

7171
assert not maybe_node.failed
7272

7373

74-
def test_register_job_with_non_existing_job_cluster_key(ws, admin_locator) -> None:
74+
def test_register_jobs_with_non_existing_job_cluster_key(ws, admin_locator) -> None:
7575
"""Register a job with a task referencing a non-existing job cluster."""
7676
task = jobs.Task(task_key="test-task", job_cluster_key="non-existing-id")
7777
settings = jobs.JobSettings(name="test-job", tasks=[task])
7878
job = jobs.Job(job_id=1234, settings=settings)
7979

8080
sequencer = MigrationSequencer(ws, admin_locator)
8181

82-
maybe_node = sequencer.register_job(job)
82+
maybe_node = sequencer.register_jobs(job)[0]
8383

8484
assert maybe_node.failed
8585
assert maybe_node.problems == [
@@ -90,20 +90,20 @@ def test_register_job_with_non_existing_job_cluster_key(ws, admin_locator) -> No
9090
]
9191

9292

93-
def test_register_job_with_new_cluster(ws, admin_locator) -> None:
93+
def test_register_jobs_with_new_cluster(ws, admin_locator) -> None:
9494
"""Register a job with a task with a new cluster definition."""
9595
task = jobs.Task(task_key="test-task", new_cluster=ClusterSpec())
9696
settings = jobs.JobSettings(name="test-job", tasks=[task])
9797
job = jobs.Job(job_id=1234, settings=settings)
9898
ws.jobs.get.return_value = job
9999
sequencer = MigrationSequencer(ws, admin_locator)
100100

101-
maybe_node = sequencer.register_job(job)
101+
maybe_node = sequencer.register_jobs(job)[0]
102102

103103
assert not maybe_node.failed
104104

105105

106-
def test_register_job_with_task_dependency(ws, admin_locator) -> None:
106+
def test_register_jobs_with_task_dependency(ws, admin_locator) -> None:
107107
"""Register a job with two tasks having a dependency."""
108108
task1 = jobs.Task(task_key="task1")
109109
task_dependency = jobs.TaskDependency(task1.task_key)
@@ -113,20 +113,20 @@ def test_register_job_with_task_dependency(ws, admin_locator) -> None:
113113
job = jobs.Job(job_id=1234, settings=settings)
114114
sequencer = MigrationSequencer(ws, admin_locator)
115115

116-
maybe_node = sequencer.register_job(job)
116+
maybe_node = sequencer.register_jobs(job)[0]
117117

118118
assert not maybe_node.failed
119119

120120

121-
def test_register_job_with_non_existing_task_dependency(ws, admin_locator) -> None:
121+
def test_register_jobs_with_non_existing_task_dependency(ws, admin_locator) -> None:
122122
"""Register a job with a non-existing task dependency."""
123123
task_dependency = jobs.TaskDependency("non-existing-id")
124124
task = jobs.Task(task_key="task2", depends_on=[task_dependency])
125125
settings = jobs.JobSettings(name="job", tasks=[task])
126126
job = jobs.Job(job_id=1234, settings=settings)
127127
sequencer = MigrationSequencer(ws, admin_locator)
128128

129-
maybe_node = sequencer.register_job(job)
129+
maybe_node = sequencer.register_jobs(job)[0]
130130

131131
assert maybe_node.failed
132132
assert maybe_node.problems == [
@@ -160,7 +160,7 @@ def get_cluster(cluster_id: str) -> ClusterDetails:
160160
ws.clusters.get.side_effect = get_cluster
161161

162162
sequencer = MigrationSequencer(ws, admin_locator)
163-
sequencer.register_job(job)
163+
sequencer.register_jobs(job)
164164

165165
steps = list(sequencer.generate_steps())
166166

@@ -208,7 +208,7 @@ def test_sequence_steps_from_job_task_with_existing_job_cluster_key(ws, admin_lo
208208
settings = jobs.JobSettings(name="test-job", tasks=[task], job_clusters=[job_cluster])
209209
job = jobs.Job(job_id=1234, settings=settings)
210210
sequencer = MigrationSequencer(ws, admin_locator)
211-
sequencer.register_job(job)
211+
sequencer.register_jobs(job)
212212

213213
steps = list(sequencer.generate_steps())
214214

@@ -254,7 +254,7 @@ def test_sequence_steps_from_job_task_with_new_cluster(ws, admin_locator) -> Non
254254
settings = jobs.JobSettings(name="test-job", tasks=[task])
255255
job = jobs.Job(job_id=1234, settings=settings)
256256
sequencer = MigrationSequencer(ws, admin_locator)
257-
sequencer.register_job(job)
257+
sequencer.register_jobs(job)
258258

259259
steps = list(sequencer.generate_steps())
260260

@@ -292,7 +292,7 @@ def test_sequence_steps_from_job_task_with_non_existing_cluster(ws, admin_locato
292292
settings = jobs.JobSettings(name="test-job", tasks=[task])
293293
job = jobs.Job(job_id=1234, settings=settings)
294294
sequencer = MigrationSequencer(ws, admin_locator)
295-
sequencer.register_job(job)
295+
sequencer.register_jobs(job)
296296

297297
steps = list(sequencer.generate_steps())
298298

@@ -334,7 +334,7 @@ def test_sequence_steps_from_job_task_referencing_other_task(ws, admin_locator)
334334
job = jobs.Job(job_id=1234, settings=settings)
335335
sequencer = MigrationSequencer(ws, admin_locator)
336336

337-
maybe_job_node = sequencer.register_job(job)
337+
maybe_job_node = sequencer.register_jobs(job)[0]
338338
assert not maybe_job_node.failed
339339

340340
steps = list(sequencer.generate_steps())

0 commit comments

Comments
 (0)