Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add workflow concurrency controller #6309

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
541 changes: 541 additions & 0 deletions flyteadmin/concurrency/concurrency_controller.go

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions flyteadmin/concurrency/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package concurrency

import (
"github.com/flyteorg/flyte/flytestdlib/config"
)

const configSectionKey = "concurrency"

// Config contains settings for the concurrency controller
type Config struct {
// Enable determines whether to enable the concurrency controller
Enable bool `json:"enable" pflag:",Enable the execution concurrency controller"`

// ProcessingInterval defines how often to check for pending executions
ProcessingInterval config.Duration `json:"processingInterval" pflag:",How often to process pending executions"`

// Workers defines the number of worker goroutines to process executions
Workers int `json:"workers" pflag:",Number of worker goroutines for processing executions"`

// MaxRetries defines the maximum number of retries for processing an execution
MaxRetries int `json:"maxRetries" pflag:",Maximum number of retries for processing an execution"`

// LaunchPlanRefreshInterval defines how often to refresh launch plan concurrency information
LaunchPlanRefreshInterval config.Duration `json:"launchPlanRefreshInterval" pflag:",How often to refresh launch plan concurrency information"`
}
52 changes: 52 additions & 0 deletions flyteadmin/concurrency/core/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package core

import (
"context"
"time"

"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
)

// ConcurrencyController is the main interface for controlling execution concurrency
// It manages the lifecycle of executions based on concurrency policies
type ConcurrencyController interface {
// Initialize initializes the controller from a previous state if available
Initialize(ctx context.Context) error

// ProcessPendingExecutions processes all pending executions according to concurrency policies
// It runs in a background process and is periodically scheduled
ProcessPendingExecutions(ctx context.Context) error

// CheckConcurrencyConstraints checks if an execution can run based on concurrency constraints
// Returns true if the execution can proceed, false otherwise
CheckConcurrencyConstraints(ctx context.Context, launchPlanID core.Identifier) (bool, error)

// GetConcurrencyPolicy returns the concurrency policy for a given launch plan
GetConcurrencyPolicy(ctx context.Context, launchPlanID core.Identifier) (*admin.SchedulerPolicy, error)

// TrackExecution adds a running execution to the concurrency tracking
TrackExecution(ctx context.Context, execution models.Execution, launchPlanID core.Identifier) error

// UntrackExecution removes a completed execution from concurrency tracking
UntrackExecution(ctx context.Context, executionID core.WorkflowExecutionIdentifier) error

// GetExecutionCounts returns the current count of executions for a launch plan
GetExecutionCounts(ctx context.Context, launchPlanID core.Identifier) (int, error)

// Close gracefully shuts down the controller
Close() error
}

// ConcurrencyMetrics defines metrics for the concurrency controller
type ConcurrencyMetrics interface {
// RecordPendingExecutions records the number of pending executions
RecordPendingExecutions(count int64)

// RecordProcessedExecution records that an execution was processed
RecordProcessedExecution(policy string, allowed bool)

// RecordLatency records the latency of processing a batch of executions
RecordLatency(operation string, startTime time.Time)
}
107 changes: 107 additions & 0 deletions flyteadmin/concurrency/core/policies.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package core

import (
"context"

"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
)

// Policy determines what happens when concurrency limit is reached
type Policy int

const (
// Wait queues the execution until other executions complete
Wait Policy = iota

// Abort fails the execution immediately
Abort

// Replace terminates the oldest running execution
Replace
)

// FromProto converts a proto policy to a core policy
func PolicyFromProto(policy admin.ConcurrencyPolicy) Policy {
switch policy {
case admin.ConcurrencyPolicy_WAIT:
return Wait
case admin.ConcurrencyPolicy_ABORT:
return Abort
case admin.ConcurrencyPolicy_REPLACE:
return Replace
default:
return Wait // Default to Wait policy
}
}

// ToString returns a string representation of the policy
func (p Policy) ToString() string {
switch p {
case Wait:
return "WAIT"
case Abort:
return "ABORT"
case Replace:
return "REPLACE"
default:
return "UNKNOWN"
}
}

// ConcurrencyLevel determines the scope of concurrency limits
type ConcurrencyLevel int

const (
// LaunchPlan applies the limit across all versions of a launch plan
LaunchPlan ConcurrencyLevel = iota

// LaunchPlanVersion applies the limit only to a specific version
LaunchPlanVersion
)

// FromProto converts a proto level to a core level
func LevelFromProto(level admin.ConcurrencyLevel) ConcurrencyLevel {
switch level {
case admin.ConcurrencyLevel_LAUNCH_PLAN:
return LaunchPlan
case admin.ConcurrencyLevel_LAUNCH_PLAN_VERSION:
return LaunchPlanVersion
default:
return LaunchPlan // Default to launch plan level
}
}

// PolicyEvaluator evaluates if an execution can proceed based on concurrency policies
type PolicyEvaluator interface {
// CanExecute determines if an execution can proceed based on concurrency constraints
// Returns true if execution can proceed, and an action to take if not
CanExecute(ctx context.Context, execution models.Execution, policy *admin.SchedulerPolicy, runningCount int) (bool, string, error)
}

// DefaultPolicyEvaluator is the default implementation of PolicyEvaluator
type DefaultPolicyEvaluator struct{}

// CanExecute implements the PolicyEvaluator interface
func (d *DefaultPolicyEvaluator) CanExecute(
ctx context.Context,
execution models.Execution,
policy *admin.SchedulerPolicy,
runningCount int,
) (bool, string, error) {
if policy == nil || runningCount < int(policy.Max) {
return true, "", nil
}

corePolicy := PolicyFromProto(policy.Policy)
switch corePolicy {
case Wait:
return false, "Execution queued - waiting for running executions to complete", nil
case Abort:
return false, "Execution aborted - maximum concurrency limit reached", nil
case Replace:
return true, "Execution will replace oldest running execution", nil
default:
return false, "Unknown concurrency policy", nil
}
}
97 changes: 97 additions & 0 deletions flyteadmin/concurrency/executor/workflow_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package executor

import (
"context"

"github.com/flyteorg/flyte/flyteadmin/pkg/manager/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"
)

// WorkflowExecutor handles creating and managing workflow executions
type WorkflowExecutor struct {
executionManager interfaces.ExecutionInterface
scope promutils.Scope
metrics *workflowExecutorMetrics
}

type workflowExecutorMetrics struct {
successfulExecutions prometheus.Counter
failedExecutions prometheus.Counter
abortedExecutions prometheus.Counter
}

// NewWorkflowExecutor creates a new WorkflowExecutor
func NewWorkflowExecutor(
executionManager interfaces.ExecutionInterface,
scope promutils.Scope,
) *WorkflowExecutor {
executorScope := scope.NewSubScope("workflow_executor")
return &WorkflowExecutor{
executionManager: executionManager,
scope: executorScope,
metrics: &workflowExecutorMetrics{
successfulExecutions: executorScope.MustNewCounter("successful_executions", "Count of successful execution creations"),
failedExecutions: executorScope.MustNewCounter("failed_executions", "Count of failed execution creations"),
abortedExecutions: executorScope.MustNewCounter("aborted_executions", "Count of aborted executions"),
},
}
}

// CreateExecution creates a workflow execution in the system
func (w *WorkflowExecutor) CreateExecution(ctx context.Context, execution models.Execution) error {
// Ensure we have the proper identifier
launchPlanIdentifier := &core.Identifier{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Project: execution.Project,
Domain: execution.Domain,
Name: execution.Name,
}

// Create execution metadata
metadata := &admin.ExecutionMetadata{
Mode: admin.ExecutionMetadata_SCHEDULED, // Or whatever mode is appropriate
}

// Create the execution request
executionRequest := &admin.ExecutionCreateRequest{
Domain: execution.Domain,
Project: execution.Project,
Name: execution.Name,
Spec: &admin.ExecutionSpec{
LaunchPlan: launchPlanIdentifier,
Metadata: metadata,
},
}

// Create the workflow execution
_, err := w.executionManager.CreateExecution(ctx, executionRequest, nil)
if err != nil {
w.metrics.failedExecutions.Inc()
logger.Errorf(ctx, "Failed to create execution: %v", err)
return err
}

w.metrics.successfulExecutions.Inc()
return nil
}

// AbortExecution aborts an existing workflow execution
func (w *WorkflowExecutor) AbortExecution(ctx context.Context, executionID core.WorkflowExecutionIdentifier, cause string) error {
_, err := w.executionManager.TerminateExecution(ctx, &admin.ExecutionTerminateRequest{
Id: &executionID,
Cause: cause,
})

if err != nil {
logger.Errorf(ctx, "Failed to abort execution %v: %v", executionID, err)
return err
}

w.metrics.abortedExecutions.Inc()
return nil
}
Loading