Skip to content

Commit a2331bd

Browse files
authored
RFC: Add workflow execution concurrency (#5659)
1 parent b779bed commit a2331bd

File tree

1 file changed

+288
-0
lines changed

1 file changed

+288
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
# [RFC Template] Title
2+
3+
**Authors:**
4+
5+
- @eapolinario
6+
- @katrogan
7+
8+
## 1 Executive Summary
9+
10+
This is a proposal to implement workflow execution concurrency, defined at the launch plan level. Concurrency applies to all version of a launch plan.
11+
12+
## 2 Motivation
13+
14+
See the following issues
15+
1. https://github.com/flyteorg/flyte/issues/267
16+
2. https://github.com/flyteorg/flyte/issues/420
17+
3. https://github.com/flyteorg/flyte/discussions/3754
18+
4. https://github.com/flyteorg/flyte/issues/5125
19+
20+
## 3 Proposed Implementation
21+
22+
Introduce a new attribute in [LaunchPlan.get_or_create](https://github.com/flyteorg/flytekit/blob/bc2e000cc8d710ed3d135cdbf3cbf257c5da8100/flytekit/core/launch_plan.py#L195) to allow specifying execution concurrency
23+
24+
e.g.
25+
```python
26+
my_lp = LaunchPlan.get_or_create(
27+
name="my_serial_lp",
28+
workflow=my_wf,
29+
...
30+
concurrency=Concurrency(
31+
max=1, # defines how many executions with this launch plan can run in parallel
32+
policy=ConcurrencyPolicy.WAIT # defines the policy to apply when the max concurrency is reached
33+
)
34+
)
35+
```
36+
37+
### FlyteIDL
38+
We propose adding a new IDL message to capture concurrency behavior at CreateExecutionTime and embedding it in the existing [Schedule](https://github.com/flyteorg/flyte/blob/master/flyteidl/protos/flyteidl/admin/schedule.proto) message
39+
40+
```protobuf
41+
message SchedulerPolicy {
42+
// Defines how many executions with this launch plan can run in parallel
43+
uint32 max = 1;
44+
45+
// Defines how to handle the execution when the max concurrency is reached.
46+
ConcurrencyPolicy policy = 2;
47+
}
48+
49+
enum ConcurrencyPolicy {
50+
UNSPECIFIED = 0;
51+
52+
// wait for previous executions to terminate before starting a new one
53+
WAIT = 1;
54+
55+
// fail the CreateExecution request and do not permit the execution to start
56+
ABORT = 2;
57+
58+
// terminate the oldest execution when the concurrency limit is reached and immediately begin proceeding with the new execution
59+
REPLACE = 3;
60+
}
61+
62+
message Schedule {
63+
...
64+
65+
SchedulerPolicy scheduler_policy = X;
66+
}
67+
68+
// embedded in the ExecutionClosure
69+
message ExecutionStateChangeDetails {
70+
...
71+
72+
// Includes the reason for the `PENDING` phase
73+
string description = X;
74+
75+
76+
}
77+
78+
// Can also add to ExecutionSpec to specify execution time overrides
79+
80+
```
81+
### Control Plane
82+
83+
At a broad level, we'll follow the precedent of the [scheduler](https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) defined in FlyteAdmin and define a singleton to manage concurrency across all launch plans.
84+
85+
1. At CreateExecution time, if the active version of the launch plan in the ExecutionSpec has a concurrency policy
86+
1. Create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails`.
87+
1. or fail the request when the concurrency policy is set to `ABORT`
88+
2. let the concurrency controller manage scheduling
89+
1. Do not create the workflow CRD
90+
91+
### Concurrency Controller Singleton
92+
93+
Introduce the Concurrency Controller to poll for all pending executions:
94+
1. Upon start-up, initialize a launch plan informer and a worker pool and spawn N number of worker threads.
95+
1. The launch plan informer will be responsible for keeping a map of launch plans, by [NamedEntityIdentifier](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteidl/protos/flyteidl/admin/common.proto) (for each active launch plan version) and their concurrency policy: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy`
96+
1. Periodically query the DB for pending executions `SELECT * FROM executions WHERE phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT');`
97+
1. For each `PENDING` execution returned by the above query, `Add()` the pending execution to a [workqueue](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go). We can fine tune in the future to include differentiated priority.
98+
1. For each non-`PENDING` execution returned by the above query, update the map value for the specific launch plan named entity using a thread-safe Map of type `rawActiveLaunchPlanExecutions map[admin.NamedEntityIdentifier]util.Set[admin.Execution]` (e.g. using this [set]("k8s.io/apimachinery/pkg/util/sets") library)
99+
1. After processing the complete set of non-terminal executions, transform the `rawActiveLaunchPlanExecutions` map into a thread-safe, ordered list of executions by creation time: `activeLaunchPlanExecutions map[admin.NamedEntityIdentifier][]*core.WorkflowExecutionIdentifier` using an implementation where different keys can be accessed concurrently.
100+
1. For each worker in the workqueue:
101+
1. Check the in-memory map populated launch plan informer to see:
102+
1. If the launch plan no longer has a concurrency policy, proceed to create the execution, see below
103+
1. If the launch plan has an active concurrency policy and max executions has been reached: proceed to respect the concurrency policy:
104+
1. `WAIT`: do nothing
105+
1. `ABORT`: mark the execution as `FAILED` in the db with a sensible error explaining the concurrency policy was violated
106+
1. `REPLACE`: terminate the oldest execution for the execution's launch plan in `activeLaunchPlanExecutions`. If this succeeds, or it's already terminated, then proceed to create the new execution: see below
107+
1. If the launch plan has an active concurrency policy and max executions have not been reached:
108+
1. Proceed to create the execution, see below
109+
1. Finally, always mark the queue item as [Done()](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L33)
110+
111+
Creating an execution
112+
1. create the workflow CRD
113+
1. if the CRD already exists because we've previously processed this pending execution before we had a chance to update the DB state, swallow the already exists error gracefully
114+
1. conditionally mark the execution as `QUEUED` in the db if it's already in `PENDING` or `QUEUED` to not overwrite any events should the execution have already reported progress in the interim
115+
2. conditionally meaning use compare-and-swap semantics within a transaction
116+
1. If creating the workflow CRD fails
117+
2. Use some form of retries (perhaps with back-off) to create the CRD
118+
3. If the CRD creation still fails, mark the execution as `FAILED` in the db if it's currently in `PENDING` or `QUEUED`. This will remove its eligiblity from the pending loop
119+
4. If the CRD creation has failed, but the execution has moved beyond 'QUEUED' and has reported progress from flytepropeller in the interim (due to a network partition, such that the CRD was created but not successfully reported as such) - do not update the execution status in the DB and allow execution to progress to a terminal state
120+
2. Upon successful creation of the workflow CRD **or** failure in step (iv) above to mark the execution as 'FAILED', append the execution identifier to `activeLaunchPlanExecutions` for the launch plan named entity
121+
122+
#### Launch Plan informer
123+
This is an async process we run in the Concurrency Controller to ensure we have an eventually consistent view of launch plans.
124+
125+
Upon Concurrency Controller start-up, we'll query the DB for all active launch plans and populate a map of active launch plans: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy`
126+
127+
Periodically, the informer will re-issue the query, optionally filtering by [UpdatedAt](https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/repositories/models/base.go#L7) to only fetch launch plans that have been updated since the last query to repopulate the map. If all versions of a launch plan have been deactivated since the last time the query ran, we'll want to update the in memory map to empty out the launch plan scheduler policy.
128+
129+
130+
### Flyte Admin changes
131+
### Execution Manager
132+
Because we fetch the launch plan to reconcile execution inputs at CreateExecution time, we'll have the concurrency policy available to us at the time of execution creation.
133+
If there is no concurrency policy defined, we'll proceed as [normal](https://github.com/flyteorg/flyte/blob/f14348165ccdfb26f8509c0f1ef380a360e59c4d/flyteadmin/pkg/manager/impl/execution_manager.go#L1169-L1173) and create the workflow execution CRD and then create a database entry for the execution with phase `UNKNOWN`. This way, we don't incur any penalty for executions
134+
135+
If there is a concurrency policy defined, if it's set to `ABORT` immediately fail the execution. Otherwise, we'll create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails` _but will not create a workflow CRD_
136+
137+
138+
#### Database
139+
For performance, we can introduce new fields to denormalize the launch plan named entity the execution was launched by
140+
In [models/execution.go](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteadmin/pkg/repositories/models/execution.go)
141+
```go
142+
model Execution {
143+
...
144+
LaunchPlanProject string
145+
LaunchPlanDomain string
146+
LaunchPlanName string
147+
}
148+
````
149+
150+
We should consider adding an index to the executions table to include
151+
- phase in (`PENDING`, `QUEUED`, `RUNNING`) only (in order to safeguard for well-populated flyteadmin instances with lots of completed, historical executions)
152+
153+
##### Concurrency by specified launch plan versions
154+
Executions are always tied to the versioned launch plan that triggered them (see [here](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/execution.go#L26))
155+
However, this proposal only applies concurrency at the launch plan Named Entity level, that is by (project, domain, name) and across all versions. The currently active launch plan version will determine the concurrency policy that gets applied for all executions created with the launch plan NamedEntity.
156+
157+
Non-goal, but future proposal: If we wanted to support concurrency by launch plan versions, we'd introduce `LaunchPlanVersion` to the execution model and add duplicates but with update keys for the in memory maps to be by versioned launch plan rather than NamedEntityIdentifier.
158+
159+
We could update usage like so
160+
161+
```python
162+
my_lp = LaunchPlan.get_or_create(
163+
name="my_serial_lp",
164+
workflow=my_wf,
165+
...
166+
concurrency=Concurrency(
167+
max=1, # defines how many executions with this launch plan can run in parallel
168+
policy=ConcurrencyPolicy.WAIT # defines the policy to apply when the max concurrency is reached
169+
precision=ConcurrencyPrecision.LAUNCH_PLAN_VERSION
170+
)
171+
)
172+
```
173+
174+
and by default, when the precision is omitted the SDK could register the launch plan using `ConcurrencyPrecision.LAUNCH_PLAN`
175+
176+
We could update the concurrency protobuf definition like so:
177+
```protobuf
178+
message SchedulerPolicy {
179+
// Defines how many executions with this launch plan can run in parallel
180+
uint32 max = 1;
181+
182+
// Defines how to handle the execution when the max concurrency is reached.
183+
ConcurrencyPolicy policy = 2;
184+
185+
ConcurrencyLevel level = 3;
186+
}
187+
188+
enum ConcurrencyLevel {
189+
// Applies concurrency limits across all launch plan versions.
190+
LAUNCH_PLAN = 0;
191+
192+
// Applies concurrency at the versioned launch plan level
193+
LAUNCH_PLAN_VERSION = 1;
194+
}
195+
```
196+
197+
Note, in this proposal, registering a new version of the launch plan and setting it to active will determine the concurrency policy across all launch plan versions.
198+
199+
#### Prior Art
200+
The flyteadmin native scheduler (https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) already implements a reconciliation loop to catch up on any missed schedules.
201+
202+
203+
## 4 Metrics & Dashboards
204+
- Time spent in PENDING: It's useful to understand the duration spent in PENDING before a launch plan transitions to RUNNING
205+
- It may be useful for Flyte platform operators to also configure alerts if an execution stays in PENDING for too long of a threshold
206+
207+
## 5 Drawbacks
208+
209+
The [executions model](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/execution.go)
210+
already has indices on
211+
- primary key index
212+
- launch plan id
213+
- workflow id
214+
- task id (for single task executions)
215+
- execution created at
216+
- error kind (code)
217+
- user who launched the execution
218+
- state
219+
220+
Database performance suffers as new indices are added (ref [[1](https://use-the-index-luke.com/sql/dml/insert)] [[2](https://www.timescale.com/learn/postgresql-performance-tuning-optimizing-database-indexes)])
221+
222+
## 6 Alternatives
223+
224+
### Scheduling
225+
This proposal purposefully uses random scheduling. But this does not preclude defining other scheduling orders or catch-up policies in the future.
226+
227+
To accomplish this, we can extend the `ConcurrenyPolicy` proto message to encapsulate scheduling behavior
228+
229+
```protobuf
230+
message Concurrency {
231+
// Defines how many executions with this launch plan can run in parallel
232+
uint32 max = 1;
233+
234+
// Defines how to handle the execution when the max concurrency is reached.
235+
ConcurrencyPolicy policy = 2;
236+
237+
ConcurrencyScheduling scheduling = 3;
238+
}
239+
240+
241+
type ConcurrencyScheduling enum {
242+
FIFO = 0;
243+
FILO = 1;
244+
...
245+
}
246+
```
247+
248+
When we process the pending executions in the Concurrency Controller, we can sort the pending executions by creation time in ascending or descending order based on the scheduling policy.
249+
250+
Furthermore, we may want to introduce a max pending period to fail executions that have been in `PENDING` for too long
251+
252+
253+
## 7 Potential Impact and Dependencies
254+
255+
*Here, we aim to be mindful of our environment and generate empathy towards others who may be impacted by our decisions.*
256+
257+
- *What other systems or teams are affected by this proposal?*
258+
- *How could this be exploited by malicious attackers?*
259+
260+
## 8 Unresolved questions
261+
262+
- Should we always attempt to schedule pending executions in ascending order of creation time?
263+
- Decision: We'll use FIFO scheduling by default but can extend scheduling behavior with an enum going forward.
264+
- Should we propagate concurrency policies to child executions?
265+
- Decision: no. Child executions can define concurrency at the child launch plan level if necessary.
266+
267+
## 9 Conclusion
268+
269+
This is a simple and lightweight means for limiting execution concurrency that we can build upon, for flexible scheduling policies and even limiting task execution concurrency.
270+
271+
272+
**Checklist:**
273+
274+
- [x] Copy template
275+
- [x] Draft RFC (think of it as a wireframe)
276+
- [x] Share as WIP with folks you trust to gut-check
277+
- [x] Send pull request when comfortable
278+
- [ ] Label accordingly
279+
- [ ] Assign reviewers
280+
- [ ] Merge PR
281+
282+
**Recommendations**
283+
284+
- Tag RFC title with [WIP] if you're still ironing out details.
285+
- Tag RFC title with [Newbie] if you're trying out something experimental or you're not entirely convinced of what you're proposing.
286+
- Tag RFC title with [RR] if you'd like to schedule a review request to discuss the RFC.
287+
- If there are areas that you're not convinced on, tag people who you consider may know about this and ask for their input.
288+
- If you have doubts, ask on [#feature-discussions](https://slack.com/app_redirect?channel=CPQ3ZFQ84&team=TN89P6GGK) for help moving something forward.

0 commit comments

Comments
 (0)