Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce unnecessary DB queries for Actions tasks #25199

Merged
merged 48 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
82cbd09
fetch task with index
sillyguodong May 23, 2023
da135c4
add test
sillyguodong May 24, 2023
65395a1
Increase function no need return
sillyguodong May 30, 2023
2f460ab
Merge branch 'feature/fetch_with_task_index' of github.com:sillyguodo…
sillyguodong May 30, 2023
964e860
Merge branch 'main' into feature/fetch_with_task_index
sillyguodong May 30, 2023
148606c
use version
sillyguodong Jun 7, 2023
7caf9d2
update
sillyguodong Jun 7, 2023
b718d8b
go mod
sillyguodong Jun 7, 2023
94c3bee
Merge branch 'main' into feature/fetch_with_task_index
sillyguodong Jun 20, 2023
6b4fbad
Merge branch 'main' into feature/fetch_with_task_index
sillyguodong Jun 28, 2023
d59c383
go mod
sillyguodong Jun 28, 2023
23f8721
Merge branch 'main' into feature/fetch_with_task_index
sillyguodong Jun 29, 2023
72ea6ac
use db to store tasks version
sillyguodong Jun 29, 2023
e905fed
support re-run job
sillyguodong Jun 29, 2023
c430b28
log error
sillyguodong Jun 30, 2023
483a01a
Merge branch 'main' into feature/fetch_with_task_index
sillyguodong Jun 30, 2023
96915c6
typo
sillyguodong Jun 30, 2023
1861d25
fix migrations
sillyguodong Jun 30, 2023
46edcba
Merge branch 'main' into feature/fetch_with_task_index
silverwind Jun 30, 2023
ff179fd
Merge branch 'main' into feature/fetch_with_task_index
sillyguodong Jul 4, 2023
0771112
fix review
sillyguodong Jul 6, 2023
b61682b
delete fmt
sillyguodong Jul 6, 2023
67a4216
fix if cond
sillyguodong Jul 6, 2023
0d5b917
delete empty line
sillyguodong Jul 7, 2023
5285ed4
update
sillyguodong Jul 7, 2023
c8e9d93
Merge branch 'main' into feature/fetch_with_task_index
sillyguodong Jul 7, 2023
3e12fd3
adjust some logic
sillyguodong Jul 7, 2023
7d66700
fix
sillyguodong Jul 7, 2023
9b025bc
comment
sillyguodong Jul 7, 2023
8c7051a
typo
sillyguodong Jul 10, 2023
be3576b
context
sillyguodong Jul 21, 2023
4283ebe
another context
sillyguodong Jul 21, 2023
9837fb5
Merge branch 'main' into feature/fetch_with_task_index
sillyguodong Jul 21, 2023
98cec83
no introduce new ctx any more
sillyguodong Jul 21, 2023
32a4778
Merge branch 'main' into feature/fetch_with_task_index
sillyguodong Jul 21, 2023
1e8be9a
Merge branch 'main' into feature/fetch_with_task_index
GiteaBot Jul 21, 2023
75d7109
Merge branch 'main' into feature/fetch_with_task_index
GiteaBot Jul 21, 2023
23ec84e
Merge branch 'main' into feature/fetch_with_task_index
GiteaBot Jul 21, 2023
56fe8de
Merge branch 'main' into feature/fetch_with_task_index
GiteaBot Jul 21, 2023
900d1e8
Merge branch 'main' into feature/fetch_with_task_index
GiteaBot Jul 22, 2023
06ad0cc
Merge branch 'main' into feature/fetch_with_task_index
GiteaBot Jul 22, 2023
43f076f
Merge branch 'main' into feature/fetch_with_task_index
GiteaBot Jul 22, 2023
8590ce4
Merge branch 'main' into feature/fetch_with_task_index
GiteaBot Jul 22, 2023
f82fffb
Merge branch 'main' into feature/fetch_with_task_index
GiteaBot Jul 22, 2023
dd94ee7
Merge branch 'main' into feature/fetch_with_task_index
GiteaBot Jul 22, 2023
fc22e0c
Merge branch 'main' into feature/fetch_with_task_index
GiteaBot Jul 23, 2023
f3a43f9
Merge branch 'main' into feature/fetch_with_task_index
GiteaBot Jul 24, 2023
f840c1b
Merge branch 'main' into feature/fetch_with_task_index
GiteaBot Jul 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
}

runJobs := make([]*ActionRunJob, 0, len(jobs))
var hasWaiting bool
for _, v := range jobs {
id, job := v.Job()
needs := job.Needs()
Expand All @@ -205,6 +206,8 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
status := StatusWaiting
if len(needs) > 0 || run.NeedApproval {
status = StatusBlocked
} else {
hasWaiting = true
}
job.Name, _ = util.SplitStringAtByteN(job.Name, 255)
runJobs = append(runJobs, &ActionRunJob{
Expand All @@ -225,6 +228,13 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
return err
}

// if there is a job in the waiting status, increase tasks version.
if hasWaiting {
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
return err
}
}

return commiter.Commit()
}

Expand Down
7 changes: 7 additions & 0 deletions models/actions/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col
return affected, nil
}

if affected != 0 && util.SliceContains(cols, "status") && job.Status.IsWaiting() {
// if the status of job changes to waiting again, increase tasks version.
if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
return affected, err
}
}

if job.RunID == 0 {
var err error
if job, err = GetRunJobByID(ctx, job.ID); err != nil {
Expand Down
3 changes: 1 addition & 2 deletions models/actions/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,11 @@ func GetRunningTaskByToken(ctx context.Context, token string) (*ActionTask, erro
}

func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) {
dbCtx, commiter, err := db.TxContext(ctx)
ctx, commiter, err := db.TxContext(ctx)
if err != nil {
return nil, false, err
}
defer commiter.Close()
ctx = dbCtx.WithContext(ctx)

e := db.GetEngine(ctx)

Expand Down
105 changes: 105 additions & 0 deletions models/actions/tasks_version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package actions

import (
"context"

"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/timeutil"
)

// ActionTasksVersion
// If both ownerID and repoID is zero, its scope is global.
// If ownerID is not zero and repoID is zero, its scope is org (there is no user-level runner currrently).
// If ownerID is zero and repoID is not zero, its scope is repo.
type ActionTasksVersion struct {
ID int64 `xorm:"pk autoincr"`
OwnerID int64 `xorm:"UNIQUE(owner_repo)"`
RepoID int64 `xorm:"INDEX UNIQUE(owner_repo)"`
Version int64
CreatedUnix timeutil.TimeStamp `xorm:"created"`
UpdatedUnix timeutil.TimeStamp `xorm:"updated"`
}

func init() {
db.RegisterModel(new(ActionTasksVersion))
}

func GetTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (int64, error) {
var tasksVersion ActionTasksVersion
has, err := db.GetEngine(ctx).Where("owner_id = ? AND repo_id = ?", ownerID, repoID).Get(&tasksVersion)
if err != nil {
return 0, err
} else if !has {
return 0, nil
}
return tasksVersion.Version, err
}

func insertTasksVersion(ctx context.Context, ownerID, repoID int64) (*ActionTasksVersion, error) {
tasksVersion := &ActionTasksVersion{
OwnerID: ownerID,
RepoID: repoID,
Version: 1,
}
if _, err := db.GetEngine(ctx).Insert(tasksVersion); err != nil {
return nil, err
}
return tasksVersion, nil
}

func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) error {
result, err := db.GetEngine(ctx).Exec("UPDATE action_tasks_version SET version = version + 1 WHERE owner_id = ? AND repo_id = ?", ownerID, repoID)
if err != nil {
return err
}
affected, err := result.RowsAffected()
if err != nil {
return err
}

if affected == 0 {
// if update sql does not affect any rows, the database may be broken,
// so re-insert the row of version data here.
if _, err := insertTasksVersion(ctx, ownerID, repoID); err != nil {
return err
}
}

return nil
}

func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error {
ctx, commiter, err := db.TxContext(ctx)
if err != nil {
return err
}
defer commiter.Close()

// 1. increase global
if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil {
log.Error("IncreaseTasksVersionByScope(Global): %v", err)
return err
}

// 2. increase owner
if ownerID > 0 {
if err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil {
log.Error("IncreaseTasksVersionByScope(Owner): %v", err)
return err
}
}

// 3. increase repo
if repoID > 0 {
if err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil {
log.Error("IncreaseTasksVersionByScope(Repo): %v", err)
return err
}
}

return commiter.Commit()
}
2 changes: 2 additions & 0 deletions models/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ var migrations = []Migration{
NewMigration("Alter Actions Artifact table", v1_21.AlterActionArtifactTable),
// v266 -> v267
NewMigration("Reduce commit status", v1_21.ReduceCommitStatus),
// v267 -> v268
NewMigration("Add action_tasks_version table", v1_21.CreateActionTasksVersionTable),
}

// GetCurrentDBVersion returns the current db version
Expand Down
23 changes: 23 additions & 0 deletions models/migrations/v1_21/v267.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package v1_21 //nolint

import (
"code.gitea.io/gitea/modules/timeutil"

"xorm.io/xorm"
)

func CreateActionTasksVersionTable(x *xorm.Engine) error {
type ActionTasksVersion struct {
ID int64 `xorm:"pk autoincr"`
OwnerID int64 `xorm:"UNIQUE(owner_repo)"`
RepoID int64 `xorm:"INDEX UNIQUE(owner_repo)"`
Version int64
CreatedUnix timeutil.TimeStamp `xorm:"created"`
UpdatedUnix timeutil.TimeStamp `xorm:"updated"`
}

return x.Sync(new(ActionTasksVersion))
}
33 changes: 26 additions & 7 deletions routers/api/actions/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,39 @@ func (s *Service) Declare(
// FetchTask assigns a task to the runner
func (s *Service) FetchTask(
ctx context.Context,
_ *connect.Request[runnerv1.FetchTaskRequest],
req *connect.Request[runnerv1.FetchTaskRequest],
) (*connect.Response[runnerv1.FetchTaskResponse], error) {
runner := GetRunner(ctx)

var task *runnerv1.Task
if t, ok, err := pickTask(ctx, runner); err != nil {
log.Error("pick task failed: %v", err)
return nil, status.Errorf(codes.Internal, "pick task: %v", err)
} else if ok {
task = t
tasksVersion := req.Msg.TasksVersion // task version from runner
latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID)
if err != nil {
return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err)
} else if latestVersion == 0 {
if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil {
return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err)
}
// if we don't increase the value of `latestVersion` here,
// the response of FetchTask will return tasksVersion as zero.
// and the runner will treat it as an old version of Gitea.
latestVersion++
}

if tasksVersion != latestVersion {
// if the task version in request is not equal to the version in db,
// it means there may still be some tasks not be assgined.
// try to pick a task for the runner that send the request.
if t, ok, err := pickTask(ctx, runner); err != nil {
log.Error("pick task failed: %v", err)
return nil, status.Errorf(codes.Internal, "pick task: %v", err)
} else if ok {
task = t
}
}
res := connect.NewResponse(&runnerv1.FetchTaskResponse{
Task: task,
Task: task,
TasksVersion: latestVersion,
})
return res, nil
}
Expand Down