From 82cbd0924002909585662c6d5420416d47b1cf43 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Tue, 23 May 2023 19:09:12 +0800 Subject: [PATCH 01/24] fetch task with index --- models/actions/run.go | 40 +++++++++++++++++++++++++++- routers/api/actions/runner/runner.go | 24 +++++++++++------ routers/init.go | 4 +++ 3 files changed, 59 insertions(+), 9 deletions(-) diff --git a/models/actions/run.go b/models/actions/run.go index b58683dd36b35..1f9afb105649b 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "strings" + "sync" "time" "code.gitea.io/gitea/models/db" @@ -14,6 +15,7 @@ import ( user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" api "code.gitea.io/gitea/modules/structs" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" @@ -53,6 +55,35 @@ func init() { db.RegisterModel(new(ActionRunIndex)) } +var ActionsTaskIndexCache taskIndexCache + +type taskIndexCache struct { + index int64 + lock sync.RWMutex +} + +func (c *taskIndexCache) Get() int64 { + c.lock.RLock() + defer c.lock.RUnlock() + return c.index +} + +func (c *taskIndexCache) Increase(num int64) int64 { + c.lock.Lock() + defer c.lock.Unlock() + c.index += num + return c.index +} + +func (c *taskIndexCache) Init() { + // Using the count of tasks that not be assgined to any runner as the initial task index, when Gitea starts. + if count, err := db.GetEngine(db.DefaultContext).Where("task_id=? AND status=?", 0, StatusWaiting).Count(new(ActionRunJob)); err != nil { + log.Fatal("Init task index cache error: %v", err) + } else { + c.index = count + } +} + func (run *ActionRun) HTMLURL() string { if run.Repo == nil { return "" @@ -224,7 +255,14 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork return err } - return commiter.Commit() + if err := commiter.Commit(); err != nil { + return err + } + + // increase index in memory cache. + ActionsTaskIndexCache.Increase(int64(len(runJobs))) + + return nil } func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) { diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 895b281725da6..b32ab0313d900 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -96,20 +96,28 @@ func (s *Service) Register( // FetchTask assigns a task to the runner func (s *Service) FetchTask( ctx context.Context, - _ *connect.Request[runnerv1.FetchTaskRequest], + req *connect.Request[runnerv1.FetchTaskRequest], ) (*connect.Response[runnerv1.FetchTaskResponse], error) { runner := GetRunner(ctx) var task *runnerv1.Task - if t, ok, err := pickTask(ctx, runner); err != nil { - log.Error("pick task failed: %v", err) - return nil, status.Errorf(codes.Internal, "pick task: %v", err) - } else if ok { - task = t + taskIndex := req.Msg.TaskIndex + cacheIndex := actions_model.ActionsTaskIndexCache.Get() + if req.Msg.TaskIndex != cacheIndex { + // if the task index in request is not equal to the index in cache, + // it means there are some tasks still not be assgined. + // try to pick a task for the runner that send the request. + if t, ok, err := pickTask(ctx, runner); err != nil { + log.Error("pick task failed: %v", err) + return nil, status.Errorf(codes.Internal, "pick task: %v", err) + } else if ok { + task = t + taskIndex = cacheIndex + } } - res := connect.NewResponse(&runnerv1.FetchTaskResponse{ - Task: task, + Task: task, + TaskIndex: taskIndex, }) return res, nil } diff --git a/routers/init.go b/routers/init.go index 5737ef3dc06a4..6854c50256264 100644 --- a/routers/init.go +++ b/routers/init.go @@ -9,6 +9,7 @@ import ( "runtime" "code.gitea.io/gitea/models" + actions_model "code.gitea.io/gitea/models/actions" asymkey_model "code.gitea.io/gitea/models/asymkey" "code.gitea.io/gitea/modules/cache" "code.gitea.io/gitea/modules/eventsource" @@ -199,6 +200,9 @@ func NormalRoutes(ctx context.Context) *web.Route { // TODO: this prefix should be generated with a token string with runner ? prefix = "/api/actions_pipeline" r.Mount(prefix, actions_router.ArtifactsRoutes(prefix)) + + // init task index + actions_model.ActionsTaskIndexCache.Init() } return r From da135c46ddabfb8ff054a94fecc6119ebd412117 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Wed, 24 May 2023 10:43:12 +0800 Subject: [PATCH 02/24] add test --- models/actions/run.go | 40 +++++++++++++++++++++++++++- routers/api/actions/runner/runner.go | 24 +++++++++++------ routers/init.go | 4 +++ 3 files changed, 59 insertions(+), 9 deletions(-) diff --git a/models/actions/run.go b/models/actions/run.go index b58683dd36b35..1f9afb105649b 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "strings" + "sync" "time" "code.gitea.io/gitea/models/db" @@ -14,6 +15,7 @@ import ( user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" api "code.gitea.io/gitea/modules/structs" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" @@ -53,6 +55,35 @@ func init() { db.RegisterModel(new(ActionRunIndex)) } +var ActionsTaskIndexCache taskIndexCache + +type taskIndexCache struct { + index int64 + lock sync.RWMutex +} + +func (c *taskIndexCache) Get() int64 { + c.lock.RLock() + defer c.lock.RUnlock() + return c.index +} + +func (c *taskIndexCache) Increase(num int64) int64 { + c.lock.Lock() + defer c.lock.Unlock() + c.index += num + return c.index +} + +func (c *taskIndexCache) Init() { + // Using the count of tasks that not be assgined to any runner as the initial task index, when Gitea starts. + if count, err := db.GetEngine(db.DefaultContext).Where("task_id=? AND status=?", 0, StatusWaiting).Count(new(ActionRunJob)); err != nil { + log.Fatal("Init task index cache error: %v", err) + } else { + c.index = count + } +} + func (run *ActionRun) HTMLURL() string { if run.Repo == nil { return "" @@ -224,7 +255,14 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork return err } - return commiter.Commit() + if err := commiter.Commit(); err != nil { + return err + } + + // increase index in memory cache. + ActionsTaskIndexCache.Increase(int64(len(runJobs))) + + return nil } func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) { diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 895b281725da6..b32ab0313d900 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -96,20 +96,28 @@ func (s *Service) Register( // FetchTask assigns a task to the runner func (s *Service) FetchTask( ctx context.Context, - _ *connect.Request[runnerv1.FetchTaskRequest], + req *connect.Request[runnerv1.FetchTaskRequest], ) (*connect.Response[runnerv1.FetchTaskResponse], error) { runner := GetRunner(ctx) var task *runnerv1.Task - if t, ok, err := pickTask(ctx, runner); err != nil { - log.Error("pick task failed: %v", err) - return nil, status.Errorf(codes.Internal, "pick task: %v", err) - } else if ok { - task = t + taskIndex := req.Msg.TaskIndex + cacheIndex := actions_model.ActionsTaskIndexCache.Get() + if req.Msg.TaskIndex != cacheIndex { + // if the task index in request is not equal to the index in cache, + // it means there are some tasks still not be assgined. + // try to pick a task for the runner that send the request. + if t, ok, err := pickTask(ctx, runner); err != nil { + log.Error("pick task failed: %v", err) + return nil, status.Errorf(codes.Internal, "pick task: %v", err) + } else if ok { + task = t + taskIndex = cacheIndex + } } - res := connect.NewResponse(&runnerv1.FetchTaskResponse{ - Task: task, + Task: task, + TaskIndex: taskIndex, }) return res, nil } diff --git a/routers/init.go b/routers/init.go index 5737ef3dc06a4..6854c50256264 100644 --- a/routers/init.go +++ b/routers/init.go @@ -9,6 +9,7 @@ import ( "runtime" "code.gitea.io/gitea/models" + actions_model "code.gitea.io/gitea/models/actions" asymkey_model "code.gitea.io/gitea/models/asymkey" "code.gitea.io/gitea/modules/cache" "code.gitea.io/gitea/modules/eventsource" @@ -199,6 +200,9 @@ func NormalRoutes(ctx context.Context) *web.Route { // TODO: this prefix should be generated with a token string with runner ? prefix = "/api/actions_pipeline" r.Mount(prefix, actions_router.ArtifactsRoutes(prefix)) + + // init task index + actions_model.ActionsTaskIndexCache.Init() } return r From 65395a11e495b996710d504e35c40a7b85734e58 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Tue, 30 May 2023 14:38:28 +0800 Subject: [PATCH 03/24] Increase function no need return --- models/actions/run.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/models/actions/run.go b/models/actions/run.go index 1f9afb105649b..b9ad06bd5edd2 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -68,11 +68,10 @@ func (c *taskIndexCache) Get() int64 { return c.index } -func (c *taskIndexCache) Increase(num int64) int64 { +func (c *taskIndexCache) Increase(num int64) { c.lock.Lock() defer c.lock.Unlock() c.index += num - return c.index } func (c *taskIndexCache) Init() { From 148606ceefaec5746cf5aaaea7c01a67975b3de4 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Wed, 7 Jun 2023 10:21:19 +0800 Subject: [PATCH 04/24] use version --- models/actions/run.go | 16 ++++++++-------- routers/api/actions/runner/runner.go | 12 ++++++------ routers/init.go | 2 +- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/models/actions/run.go b/models/actions/run.go index ffdfac8f4dc44..b45562870396d 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -55,27 +55,27 @@ func init() { db.RegisterModel(new(ActionRunIndex)) } -var ActionsTaskIndexCache taskIndexCache +var ActionsTaskVersionCache taskVersionCache -type taskIndexCache struct { +type taskVersionCache struct { index int64 lock sync.RWMutex } -func (c *taskIndexCache) Get() int64 { +func (c *taskVersionCache) Get() int64 { c.lock.RLock() defer c.lock.RUnlock() return c.index } -func (c *taskIndexCache) Increase(num int64) { +func (c *taskVersionCache) Increase(num int64) { c.lock.Lock() defer c.lock.Unlock() c.index += num } -func (c *taskIndexCache) Init() { - // Using the count of tasks that not be assgined to any runner as the initial task index, when Gitea starts. +func (c *taskVersionCache) Init() { + // Using the count of tasks that not be assgined to any runner as the initial task version when Gitea starts. if count, err := db.GetEngine(db.DefaultContext).Where("task_id=? AND status=?", 0, StatusWaiting).Count(new(ActionRunJob)); err != nil { log.Fatal("Init task index cache error: %v", err) } else { @@ -258,8 +258,8 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork return err } - // increase index in memory cache. - ActionsTaskIndexCache.Increase(int64(len(runJobs))) + // increase version in memory cache. + ActionsTaskVersionCache.Increase(int64(len(runJobs))) return nil } diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index b32ab0313d900..819eb48939e8f 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -101,9 +101,9 @@ func (s *Service) FetchTask( runner := GetRunner(ctx) var task *runnerv1.Task - taskIndex := req.Msg.TaskIndex - cacheIndex := actions_model.ActionsTaskIndexCache.Get() - if req.Msg.TaskIndex != cacheIndex { + taskVersion := req.Msg.TaskVersion + cacheVersion := actions_model.ActionsTaskVersionCache.Get() + if req.Msg.TaskVersion != cacheVersion { // if the task index in request is not equal to the index in cache, // it means there are some tasks still not be assgined. // try to pick a task for the runner that send the request. @@ -112,12 +112,12 @@ func (s *Service) FetchTask( return nil, status.Errorf(codes.Internal, "pick task: %v", err) } else if ok { task = t - taskIndex = cacheIndex + taskVersion = cacheVersion } } res := connect.NewResponse(&runnerv1.FetchTaskResponse{ - Task: task, - TaskIndex: taskIndex, + Task: task, + TaskVersion: taskVersion, }) return res, nil } diff --git a/routers/init.go b/routers/init.go index 6854c50256264..44cc5a62c4a15 100644 --- a/routers/init.go +++ b/routers/init.go @@ -202,7 +202,7 @@ func NormalRoutes(ctx context.Context) *web.Route { r.Mount(prefix, actions_router.ArtifactsRoutes(prefix)) // init task index - actions_model.ActionsTaskIndexCache.Init() + actions_model.ActionsTaskVersionCache.Init() } return r From 7caf9d24b651ff6f1bd941b0f03c221bdad07f2f Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Wed, 7 Jun 2023 12:12:20 +0800 Subject: [PATCH 05/24] update --- go.mod | 2 ++ models/actions/run.go | 10 +++++----- routers/api/actions/runner/runner.go | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index b77e350f0ba1a..81e09b61d8665 100644 --- a/go.mod +++ b/go.mod @@ -309,3 +309,5 @@ exclude github.com/gofrs/uuid v4.0.0+incompatible exclude github.com/goccy/go-json v0.4.11 exclude github.com/satori/go.uuid v1.2.0 + +replace code.gitea.io/actions-proto-go => ../actions-proto-go diff --git a/models/actions/run.go b/models/actions/run.go index b45562870396d..7aa83ab5058bf 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -58,20 +58,20 @@ func init() { var ActionsTaskVersionCache taskVersionCache type taskVersionCache struct { - index int64 - lock sync.RWMutex + version int64 + lock sync.RWMutex } func (c *taskVersionCache) Get() int64 { c.lock.RLock() defer c.lock.RUnlock() - return c.index + return c.version } func (c *taskVersionCache) Increase(num int64) { c.lock.Lock() defer c.lock.Unlock() - c.index += num + c.version += num } func (c *taskVersionCache) Init() { @@ -79,7 +79,7 @@ func (c *taskVersionCache) Init() { if count, err := db.GetEngine(db.DefaultContext).Where("task_id=? AND status=?", 0, StatusWaiting).Count(new(ActionRunJob)); err != nil { log.Fatal("Init task index cache error: %v", err) } else { - c.index = count + c.version = count } } diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 819eb48939e8f..d6b42822cc75f 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -104,7 +104,7 @@ func (s *Service) FetchTask( taskVersion := req.Msg.TaskVersion cacheVersion := actions_model.ActionsTaskVersionCache.Get() if req.Msg.TaskVersion != cacheVersion { - // if the task index in request is not equal to the index in cache, + // if the task version in request is not equal to the version in cache, // it means there are some tasks still not be assgined. // try to pick a task for the runner that send the request. if t, ok, err := pickTask(ctx, runner); err != nil { From b718d8b022813d76bebd8298f35afd96a947e2d0 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Wed, 7 Jun 2023 12:14:07 +0800 Subject: [PATCH 06/24] go mod --- go.mod | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.mod b/go.mod index 81e09b61d8665..b77e350f0ba1a 100644 --- a/go.mod +++ b/go.mod @@ -309,5 +309,3 @@ exclude github.com/gofrs/uuid v4.0.0+incompatible exclude github.com/goccy/go-json v0.4.11 exclude github.com/satori/go.uuid v1.2.0 - -replace code.gitea.io/actions-proto-go => ../actions-proto-go From d59c383568da503e3b4f41e8f2de0d9a71240017 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Wed, 28 Jun 2023 15:50:46 +0800 Subject: [PATCH 07/24] go mod --- go.mod | 2 +- go.sum | 4 ++-- models/actions/run.go | 12 ++++++------ routers/api/actions/runner/runner.go | 12 ++++++------ routers/init.go | 2 +- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 885bb34220469..859a95437e8ee 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module code.gitea.io/gitea go 1.20 require ( - code.gitea.io/actions-proto-go v0.3.0 + code.gitea.io/actions-proto-go v0.3.1 code.gitea.io/gitea-vet v0.2.2 code.gitea.io/sdk/gitea v0.15.1 codeberg.org/gusted/mcaptcha v0.0.0-20220723083913-4f3072e1d570 diff --git a/go.sum b/go.sum index 9b4538bc65bc5..09bf8a65fa579 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -code.gitea.io/actions-proto-go v0.3.0 h1:9Tvg8+TaaCXPKi6EnWl9vVgs2VZsj1Cs5afnsHa4AmM= -code.gitea.io/actions-proto-go v0.3.0/go.mod h1:00ys5QDo1iHN1tHNvvddAcy2W/g+425hQya1cCSvq9A= +code.gitea.io/actions-proto-go v0.3.1 h1:PMyiQtBKb8dNnpEO2R5rcZdXSis+UQZVo/SciMtR1aU= +code.gitea.io/actions-proto-go v0.3.1/go.mod h1:00ys5QDo1iHN1tHNvvddAcy2W/g+425hQya1cCSvq9A= code.gitea.io/gitea-vet v0.2.1/go.mod h1:zcNbT/aJEmivCAhfmkHOlT645KNOf9W2KnkLgFjGGfE= code.gitea.io/gitea-vet v0.2.2 h1:TEOV/Glf38iGmKzKP0EB++Z5OSL4zGg3RrAvlwaMuvk= code.gitea.io/gitea-vet v0.2.2/go.mod h1:zcNbT/aJEmivCAhfmkHOlT645KNOf9W2KnkLgFjGGfE= diff --git a/models/actions/run.go b/models/actions/run.go index d20378c8085ad..d5bae965e7d74 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -56,26 +56,26 @@ func init() { db.RegisterModel(new(ActionRunIndex)) } -var ActionsTaskVersionCache taskVersionCache +var ActionsTasksVersionCache tasksVersionCache -type taskVersionCache struct { +type tasksVersionCache struct { version int64 lock sync.RWMutex } -func (c *taskVersionCache) Get() int64 { +func (c *tasksVersionCache) Get() int64 { c.lock.RLock() defer c.lock.RUnlock() return c.version } -func (c *taskVersionCache) Increase(num int64) { +func (c *tasksVersionCache) Increase(num int64) { c.lock.Lock() defer c.lock.Unlock() c.version += num } -func (c *taskVersionCache) Init() { +func (c *tasksVersionCache) Init() { // Using the count of tasks that not be assgined to any runner as the initial task version when Gitea starts. if count, err := db.GetEngine(db.DefaultContext).Where("task_id=? AND status=?", 0, StatusWaiting).Count(new(ActionRunJob)); err != nil { log.Fatal("Init task index cache error: %v", err) @@ -260,7 +260,7 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork } // increase version in memory cache. - ActionsTaskVersionCache.Increase(int64(len(runJobs))) + ActionsTasksVersionCache.Increase(int64(len(runJobs))) return nil } diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 8590f779835d6..2170a3ee88f56 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -132,9 +132,9 @@ func (s *Service) FetchTask( runner := GetRunner(ctx) var task *runnerv1.Task - taskVersion := req.Msg.TaskVersion - cacheVersion := actions_model.ActionsTaskVersionCache.Get() - if req.Msg.TaskVersion != cacheVersion { + tasksVersion := req.Msg.TasksVersion + cacheVersion := actions_model.ActionsTasksVersionCache.Get() + if req.Msg.TasksVersion != cacheVersion { // if the task version in request is not equal to the version in cache, // it means there are some tasks still not be assgined. // try to pick a task for the runner that send the request. @@ -143,12 +143,12 @@ func (s *Service) FetchTask( return nil, status.Errorf(codes.Internal, "pick task: %v", err) } else if ok { task = t - taskVersion = cacheVersion + tasksVersion = cacheVersion } } res := connect.NewResponse(&runnerv1.FetchTaskResponse{ - Task: task, - TaskVersion: taskVersion, + Task: task, + TasksVersion: tasksVersion, }) return res, nil } diff --git a/routers/init.go b/routers/init.go index d9b80692f7468..04d921fd69c8a 100644 --- a/routers/init.go +++ b/routers/init.go @@ -198,7 +198,7 @@ func NormalRoutes() *web.Route { r.Mount(prefix, actions_router.ArtifactsRoutes(prefix)) // init task index - actions_model.ActionsTaskVersionCache.Init() + actions_model.ActionsTasksVersionCache.Init() } return r From 72ea6ac51a02ad6d3967841f0fb0d07ccbe8ec6c Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Thu, 29 Jun 2023 17:57:54 +0800 Subject: [PATCH 08/24] use db to store tasks version --- models/actions/run.go | 39 +----------------- models/actions/tasks_version.go | 60 +++++++++++++++++++++++++++ models/migrations/migrations.go | 2 + models/migrations/v1_21/v264.go | 61 ++++++++++++++++++++++++++++ routers/api/actions/runner/runner.go | 27 +++++++++--- routers/init.go | 4 -- services/actions/notifier_helper.go | 17 +++++++- 7 files changed, 161 insertions(+), 49 deletions(-) create mode 100644 models/actions/tasks_version.go create mode 100644 models/migrations/v1_21/v264.go diff --git a/models/actions/run.go b/models/actions/run.go index d5bae965e7d74..7b62ff884f4ca 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -7,7 +7,6 @@ import ( "context" "fmt" "strings" - "sync" "time" "code.gitea.io/gitea/models/db" @@ -15,7 +14,6 @@ import ( user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/json" - "code.gitea.io/gitea/modules/log" api "code.gitea.io/gitea/modules/structs" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" @@ -56,34 +54,6 @@ func init() { db.RegisterModel(new(ActionRunIndex)) } -var ActionsTasksVersionCache tasksVersionCache - -type tasksVersionCache struct { - version int64 - lock sync.RWMutex -} - -func (c *tasksVersionCache) Get() int64 { - c.lock.RLock() - defer c.lock.RUnlock() - return c.version -} - -func (c *tasksVersionCache) Increase(num int64) { - c.lock.Lock() - defer c.lock.Unlock() - c.version += num -} - -func (c *tasksVersionCache) Init() { - // Using the count of tasks that not be assgined to any runner as the initial task version when Gitea starts. - if count, err := db.GetEngine(db.DefaultContext).Where("task_id=? AND status=?", 0, StatusWaiting).Count(new(ActionRunJob)); err != nil { - log.Fatal("Init task index cache error: %v", err) - } else { - c.version = count - } -} - func (run *ActionRun) HTMLURL() string { if run.Repo == nil { return "" @@ -255,14 +225,7 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork return err } - if err := commiter.Commit(); err != nil { - return err - } - - // increase version in memory cache. - ActionsTasksVersionCache.Increase(int64(len(runJobs))) - - return nil + return commiter.Commit() } func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) { diff --git a/models/actions/tasks_version.go b/models/actions/tasks_version.go new file mode 100644 index 0000000000000..5123d1660e1d6 --- /dev/null +++ b/models/actions/tasks_version.go @@ -0,0 +1,60 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" +) + +type ActionTasksVersion struct { + ID int64 `xorm:"pk autoincr"` + OwnerID int64 `xorm:"UNIQUE(owner_repo)"` + RepoID int64 `xorm:"INDEX UNIQUE(owner_repo)"` + Version int64 + CreatedUnix timeutil.TimeStamp `xorm:"created NOT NULL"` + UpdatedUnix timeutil.TimeStamp `xorm:"updated"` +} + +func init() { + db.RegisterModel(new(ActionTasksVersion)) +} + +func GetTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (*ActionTasksVersion, error) { + var tasksVersion ActionTasksVersion + has, err := db.GetEngine(ctx).Where("owner_id = ? AND repo_id = ?", ownerID, repoID).Get(&tasksVersion) + if err != nil { + return nil, err + } else if !has { + return nil, fmt.Errorf("tasks version with owner id %d repo id %d: %w", ownerID, repoID, util.ErrNotExist) + } + return &tasksVersion, err +} + +func InsertTasksVersion(ctx context.Context, ownerID, repoID int64) (*ActionTasksVersion, error) { + tasksVersion := &ActionTasksVersion{ + OwnerID: ownerID, + RepoID: repoID, + // Set the default value of version to 1, so that the first fetch request after the runner starts will definitely query the database. + Version: 1, + } + return tasksVersion, db.Insert(ctx, tasksVersion) +} + +func IncreaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (bool, error) { + result, err := db.GetEngine(ctx).Exec("UPDATE action_tasks_version SET version = version + 1 WHERE owner_id = ? AND repo_id = ?", ownerID, repoID) + if err != nil { + return false, err + } + affected, err := result.RowsAffected() + if err != nil { + return false, err + } + + return affected != 0, err +} diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index 3e9b348e63cf4..962d1fa137180 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -509,6 +509,8 @@ var migrations = []Migration{ NewMigration("Add TriggerEvent to action_run table", v1_21.AddTriggerEventToActionRun), // v263 -> v264 NewMigration("Add git_size and lfs_size columns to repository table", v1_21.AddGitSizeAndLFSSizeToRepositoryTable), + // v264 -> v265 + NewMigration("Add action_tasks_version table", v1_21.CreateActionTasksVersionTable), } // GetCurrentDBVersion returns the current db version diff --git a/models/migrations/v1_21/v264.go b/models/migrations/v1_21/v264.go new file mode 100644 index 0000000000000..ee5bc14ec1825 --- /dev/null +++ b/models/migrations/v1_21/v264.go @@ -0,0 +1,61 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v1_21 //nolint + +import ( + "code.gitea.io/gitea/modules/timeutil" + + "xorm.io/xorm" +) + +func CreateActionTasksVersionTable(x *xorm.Engine) error { + sess := x.NewSession() + defer sess.Close() + + if err := sess.Begin(); err != nil { + return err + } + + type ActionTasksVersion struct { + ID int64 `xorm:"pk autoincr"` + OwnerID int64 `xorm:"UNIQUE(owner_repo)"` + RepoID int64 `xorm:"INDEX UNIQUE(owner_repo)"` + Version int64 + CreatedUnix timeutil.TimeStamp `xorm:"created NOT NULL"` + UpdatedUnix timeutil.TimeStamp `xorm:"updated"` + } + + // cerate action_tasks_version table. + if err := x.Sync(new(ActionTasksVersion)); err != nil { + return err + } + + // initialize data + type ScopeItem struct { + OwnerID int64 + RepoID int64 + } + scopes := []ScopeItem{} + if err := sess.Distinct("owner_id", "repo_id").Table("action_runner").Where("deleted is null").Find(&scopes); err != nil { + return err + } + + if len(scopes) > 0 { + versions := make([]ActionTasksVersion, 0, len(scopes)) + for _, scope := range scopes { + versions = append(versions, ActionTasksVersion{ + OwnerID: scope.OwnerID, + RepoID: scope.RepoID, + // Set the default value of version to 1, so that the first fetch request after the runner starts will definitely query the database. + Version: 1, + }) + } + + if _, err := sess.Insert(&versions); err != nil { + return err + } + } + + return sess.Commit() +} diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 2170a3ee88f56..2f933b4d36646 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -87,6 +87,16 @@ func (s *Service) Register( return nil, errors.New("can't update runner token status") } + if _, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID); err != nil { + if !errors.Is(err, util.ErrNotExist) { + return nil, errors.New("query tasks version failure") + } + // create a row of action_tasks_version if not exists yet. + if _, err := actions_model.InsertTasksVersion(ctx, runner.OwnerID, runner.RepoID); err != nil { + return nil, errors.New("can't insert tasks version") + } + } + res := connect.NewResponse(&runnerv1.RegisterResponse{ Runner: &runnerv1.Runner{ Id: runner.ID, @@ -132,19 +142,24 @@ func (s *Service) FetchTask( runner := GetRunner(ctx) var task *runnerv1.Task - tasksVersion := req.Msg.TasksVersion - cacheVersion := actions_model.ActionsTasksVersionCache.Get() - if req.Msg.TasksVersion != cacheVersion { - // if the task version in request is not equal to the version in cache, - // it means there are some tasks still not be assgined. + tasksVersion := req.Msg.TasksVersion // task version from runner + atv, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID) + if err != nil { + log.Error("query tasks version failed: %v", err) + return nil, status.Errorf(codes.Internal, "query tasks version: %v", err) + } + latestVersion := atv.Version + if req.Msg.TasksVersion != latestVersion { + // if the task version in request is not equal to the version in db, + // it means there may still be some tasks not be assgined. // try to pick a task for the runner that send the request. if t, ok, err := pickTask(ctx, runner); err != nil { log.Error("pick task failed: %v", err) return nil, status.Errorf(codes.Internal, "pick task: %v", err) } else if ok { task = t - tasksVersion = cacheVersion } + tasksVersion = latestVersion } res := connect.NewResponse(&runnerv1.FetchTaskResponse{ Task: task, diff --git a/routers/init.go b/routers/init.go index 04d921fd69c8a..ddbabcc397447 100644 --- a/routers/init.go +++ b/routers/init.go @@ -9,7 +9,6 @@ import ( "runtime" "code.gitea.io/gitea/models" - actions_model "code.gitea.io/gitea/models/actions" asymkey_model "code.gitea.io/gitea/models/asymkey" "code.gitea.io/gitea/modules/cache" "code.gitea.io/gitea/modules/eventsource" @@ -196,9 +195,6 @@ func NormalRoutes() *web.Route { // TODO: this prefix should be generated with a token string with runner ? prefix = "/api/actions_pipeline" r.Mount(prefix, actions_router.ArtifactsRoutes(prefix)) - - // init task index - actions_model.ActionsTasksVersionCache.Init() } return r diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index 8e6cdcf680d1b..0634843ebe246 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -241,7 +241,22 @@ func notify(ctx context.Context, input *notifyInput) error { } else { CreateCommitStatus(ctx, jobs...) } - + } + // increase tasks version + // 1. increase global + if _, err := actions_model.IncreaseTasksVersionByScope(ctx, 0, 0); err != nil { + log.Error("IncreaseTasksVersionByScope(Global): %v", err) + return err + } + // 2. increase owner + if _, err := actions_model.IncreaseTasksVersionByScope(ctx, input.Repo.OwnerID, 0); err != nil { + log.Error("IncreaseTasksVersionByScope(Owner): %v", err) + return err + } + // 3. increase repo + if _, err := actions_model.IncreaseTasksVersionByScope(ctx, 0, input.Repo.ID); err != nil { + log.Error("IncreaseTasksVersionByScope(Repo): %v", err) + return err } return nil } From e905fede0692f508b638f92d10780f101df50713 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Thu, 29 Jun 2023 18:35:41 +0800 Subject: [PATCH 09/24] support re-run job --- models/actions/tasks_version.go | 23 ++++++++++++++++++++++- routers/web/repo/actions/view.go | 10 ++++++++++ services/actions/notifier_helper.go | 17 +++-------------- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/models/actions/tasks_version.go b/models/actions/tasks_version.go index 5123d1660e1d6..1d2d54e586550 100644 --- a/models/actions/tasks_version.go +++ b/models/actions/tasks_version.go @@ -8,6 +8,7 @@ import ( "fmt" "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" ) @@ -46,7 +47,7 @@ func InsertTasksVersion(ctx context.Context, ownerID, repoID int64) (*ActionTask return tasksVersion, db.Insert(ctx, tasksVersion) } -func IncreaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (bool, error) { +func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (bool, error) { result, err := db.GetEngine(ctx).Exec("UPDATE action_tasks_version SET version = version + 1 WHERE owner_id = ? AND repo_id = ?", ownerID, repoID) if err != nil { return false, err @@ -58,3 +59,23 @@ func IncreaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (bo return affected != 0, err } + +func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { + // increase tasks version + // 1. increase global + if _, err := increaseTasksVersionByScope(ctx, 0, 0); err != nil { + log.Error("IncreaseTasksVersionByScope(Global): %v", err) + return err + } + // 2. increase owner + if _, err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil { + log.Error("IncreaseTasksVersionByScope(Owner): %v", err) + return err + } + // 3. increase repo + if _, err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil { + log.Error("IncreaseTasksVersionByScope(Repo): %v", err) + return err + } + return nil +} diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index 7c2e9d63d6d32..721614930a1ef 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -267,6 +267,11 @@ func RerunOne(ctx *context_module.Context) { return } + if err := actions_model.IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { + ctx.Error(http.StatusInternalServerError, err.Error()) + return + } + ctx.JSON(http.StatusOK, struct{}{}) } @@ -285,6 +290,11 @@ func RerunAll(ctx *context_module.Context) { } } + if err := actions_model.IncreaseTaskVersion(ctx, jobs[0].OwnerID, jobs[0].RepoID); err != nil { + ctx.Error(http.StatusInternalServerError, err.Error()) + return + } + ctx.JSON(http.StatusOK, struct{}{}) } diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index 0634843ebe246..c05ee05da476d 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -242,22 +242,11 @@ func notify(ctx context.Context, input *notifyInput) error { CreateCommitStatus(ctx, jobs...) } } - // increase tasks version - // 1. increase global - if _, err := actions_model.IncreaseTasksVersionByScope(ctx, 0, 0); err != nil { - log.Error("IncreaseTasksVersionByScope(Global): %v", err) - return err - } - // 2. increase owner - if _, err := actions_model.IncreaseTasksVersionByScope(ctx, input.Repo.OwnerID, 0); err != nil { - log.Error("IncreaseTasksVersionByScope(Owner): %v", err) - return err - } - // 3. increase repo - if _, err := actions_model.IncreaseTasksVersionByScope(ctx, 0, input.Repo.ID); err != nil { - log.Error("IncreaseTasksVersionByScope(Repo): %v", err) + + if err := actions_model.IncreaseTaskVersion(ctx, input.Repo.OwnerID, input.Repo.ID); err != nil { return err } + return nil } From c430b286dc2d5f0937719e747fdf8450c5383bd6 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Fri, 30 Jun 2023 11:17:07 +0800 Subject: [PATCH 10/24] log error --- services/actions/notifier_helper.go | 1 + 1 file changed, 1 insertion(+) diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index c05ee05da476d..6fae6dad18f1c 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -244,6 +244,7 @@ func notify(ctx context.Context, input *notifyInput) error { } if err := actions_model.IncreaseTaskVersion(ctx, input.Repo.OwnerID, input.Repo.ID); err != nil { + log.Error("IncreaseTaskVersion: %v", err) return err } From 96915c66711640c3be210690dc1180bf515bf9c4 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Fri, 30 Jun 2023 11:22:31 +0800 Subject: [PATCH 11/24] typo --- models/migrations/migrations.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index 74dc30ac586d5..1654ad3cc32f9 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -511,7 +511,7 @@ var migrations = []Migration{ NewMigration("Add git_size and lfs_size columns to repository table", v1_21.AddGitSizeAndLFSSizeToRepositoryTable), // v264 -> v265 NewMigration("Add branch table", v1_21.AddBranchTable), - // v26 5-> v266 + // v265-> v266 NewMigration("Add action_tasks_version table", v1_21.CreateActionTasksVersionTable), } From 1861d2562a21415e5279f3b5b85bd79b9af9ab79 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Fri, 30 Jun 2023 14:31:56 +0800 Subject: [PATCH 12/24] fix migrations --- models/actions/tasks_version.go | 2 +- models/migrations/migrations.go | 2 +- models/migrations/v1_21/v265.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/models/actions/tasks_version.go b/models/actions/tasks_version.go index 1d2d54e586550..54b88e1694aba 100644 --- a/models/actions/tasks_version.go +++ b/models/actions/tasks_version.go @@ -18,7 +18,7 @@ type ActionTasksVersion struct { OwnerID int64 `xorm:"UNIQUE(owner_repo)"` RepoID int64 `xorm:"INDEX UNIQUE(owner_repo)"` Version int64 - CreatedUnix timeutil.TimeStamp `xorm:"created NOT NULL"` + CreatedUnix timeutil.TimeStamp `xorm:"created"` UpdatedUnix timeutil.TimeStamp `xorm:"updated"` } diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index 1654ad3cc32f9..9bda1e529116e 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -511,7 +511,7 @@ var migrations = []Migration{ NewMigration("Add git_size and lfs_size columns to repository table", v1_21.AddGitSizeAndLFSSizeToRepositoryTable), // v264 -> v265 NewMigration("Add branch table", v1_21.AddBranchTable), - // v265-> v266 + // v265 -> v266 NewMigration("Add action_tasks_version table", v1_21.CreateActionTasksVersionTable), } diff --git a/models/migrations/v1_21/v265.go b/models/migrations/v1_21/v265.go index ee5bc14ec1825..106ba2db4ff65 100644 --- a/models/migrations/v1_21/v265.go +++ b/models/migrations/v1_21/v265.go @@ -22,12 +22,12 @@ func CreateActionTasksVersionTable(x *xorm.Engine) error { OwnerID int64 `xorm:"UNIQUE(owner_repo)"` RepoID int64 `xorm:"INDEX UNIQUE(owner_repo)"` Version int64 - CreatedUnix timeutil.TimeStamp `xorm:"created NOT NULL"` + CreatedUnix timeutil.TimeStamp `xorm:"created"` UpdatedUnix timeutil.TimeStamp `xorm:"updated"` } // cerate action_tasks_version table. - if err := x.Sync(new(ActionTasksVersion)); err != nil { + if err := sess.Sync(new(ActionTasksVersion)); err != nil { return err } From 0771112e14897e5943306b18180026cf630581d3 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Thu, 6 Jul 2023 17:08:28 +0800 Subject: [PATCH 13/24] fix review --- models/actions/run.go | 5 +++ models/actions/run_job.go | 6 ++++ models/actions/tasks_version.go | 53 ++++++++++++++++++++-------- models/migrations/v1_21/v265.go | 40 +-------------------- routers/api/actions/runner/runner.go | 29 +++++++-------- routers/web/repo/actions/view.go | 10 ------ services/actions/notifier_helper.go | 4 --- 7 files changed, 62 insertions(+), 85 deletions(-) diff --git a/models/actions/run.go b/models/actions/run.go index 7b62ff884f4ca..64cc15c3fdcc6 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -225,6 +225,11 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork return err } + // if new action jobs are triggered, increase tasks version. + if err := increaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil { + return err + } + return commiter.Commit() } diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 0002e507704d5..155b05831ec7d 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -131,6 +131,12 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col } if runStatus.IsDone() { run.Stopped = timeutil.TimeStampNow() + } else if !runStatus.IsRunning() { + fmt.Println("run status: ", runStatus.String()) + // other status changed, increase tasks version + if err := increaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { + return affected, err + } } return affected, UpdateRun(ctx, run) } diff --git a/models/actions/tasks_version.go b/models/actions/tasks_version.go index 54b88e1694aba..85da374f20868 100644 --- a/models/actions/tasks_version.go +++ b/models/actions/tasks_version.go @@ -13,6 +13,10 @@ import ( "code.gitea.io/gitea/modules/util" ) +// ActionTasksVersion +// If both ownerID and repoID is zero, its scope is global. +// If ownerID is not zero and repoID is zero, its scope is org (there is no user-level runner currrently). +// If ownerID is zero and repoID is not zero, its scope is repo. type ActionTasksVersion struct { ID int64 `xorm:"pk autoincr"` OwnerID int64 `xorm:"UNIQUE(owner_repo)"` @@ -26,15 +30,15 @@ func init() { db.RegisterModel(new(ActionTasksVersion)) } -func GetTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (*ActionTasksVersion, error) { +func GetTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (int64, error) { var tasksVersion ActionTasksVersion has, err := db.GetEngine(ctx).Where("owner_id = ? AND repo_id = ?", ownerID, repoID).Get(&tasksVersion) if err != nil { - return nil, err + return 0, err } else if !has { - return nil, fmt.Errorf("tasks version with owner id %d repo id %d: %w", ownerID, repoID, util.ErrNotExist) + return 0, fmt.Errorf("tasks version with owner id %d repo id %d: %w", ownerID, repoID, util.ErrNotExist) } - return &tasksVersion, err + return tasksVersion.Version, err } func InsertTasksVersion(ctx context.Context, ownerID, repoID int64) (*ActionTasksVersion, error) { @@ -44,38 +48,57 @@ func InsertTasksVersion(ctx context.Context, ownerID, repoID int64) (*ActionTask // Set the default value of version to 1, so that the first fetch request after the runner starts will definitely query the database. Version: 1, } - return tasksVersion, db.Insert(ctx, tasksVersion) + if _, err := db.GetEngine(ctx).Insert(tasksVersion); err != nil { + return nil, err + } + return tasksVersion, nil } -func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (bool, error) { +func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) error { result, err := db.GetEngine(ctx).Exec("UPDATE action_tasks_version SET version = version + 1 WHERE owner_id = ? AND repo_id = ?", ownerID, repoID) if err != nil { - return false, err + return err } affected, err := result.RowsAffected() if err != nil { - return false, err + return err + } + + if affected == 0 { + // if update sql does not affect any rows, the database may be broken, + // so re-insert the row of version data here. + if _, err := InsertTasksVersion(ctx, ownerID, repoID); err != nil { + return err + } } - return affected != 0, err + return nil } -func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { - // increase tasks version +func increaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { + dbCtx, commiter, err := db.TxContext(ctx) + if err != nil { + return err + } + defer commiter.Close() + + ctx = dbCtx.WithContext(ctx) + // 1. increase global - if _, err := increaseTasksVersionByScope(ctx, 0, 0); err != nil { + if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil { log.Error("IncreaseTasksVersionByScope(Global): %v", err) return err } // 2. increase owner - if _, err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil { + if err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil { log.Error("IncreaseTasksVersionByScope(Owner): %v", err) return err } // 3. increase repo - if _, err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil { + if err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil { log.Error("IncreaseTasksVersionByScope(Repo): %v", err) return err } - return nil + + return commiter.Commit() } diff --git a/models/migrations/v1_21/v265.go b/models/migrations/v1_21/v265.go index 106ba2db4ff65..bc0e954bdcc93 100644 --- a/models/migrations/v1_21/v265.go +++ b/models/migrations/v1_21/v265.go @@ -10,13 +10,6 @@ import ( ) func CreateActionTasksVersionTable(x *xorm.Engine) error { - sess := x.NewSession() - defer sess.Close() - - if err := sess.Begin(); err != nil { - return err - } - type ActionTasksVersion struct { ID int64 `xorm:"pk autoincr"` OwnerID int64 `xorm:"UNIQUE(owner_repo)"` @@ -26,36 +19,5 @@ func CreateActionTasksVersionTable(x *xorm.Engine) error { UpdatedUnix timeutil.TimeStamp `xorm:"updated"` } - // cerate action_tasks_version table. - if err := sess.Sync(new(ActionTasksVersion)); err != nil { - return err - } - - // initialize data - type ScopeItem struct { - OwnerID int64 - RepoID int64 - } - scopes := []ScopeItem{} - if err := sess.Distinct("owner_id", "repo_id").Table("action_runner").Where("deleted is null").Find(&scopes); err != nil { - return err - } - - if len(scopes) > 0 { - versions := make([]ActionTasksVersion, 0, len(scopes)) - for _, scope := range scopes { - versions = append(versions, ActionTasksVersion{ - OwnerID: scope.OwnerID, - RepoID: scope.RepoID, - // Set the default value of version to 1, so that the first fetch request after the runner starts will definitely query the database. - Version: 1, - }) - } - - if _, err := sess.Insert(&versions); err != nil { - return err - } - } - - return sess.Commit() + return x.Sync(new(ActionTasksVersion)) } diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 2f933b4d36646..d1d54348e4bdc 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -87,16 +87,6 @@ func (s *Service) Register( return nil, errors.New("can't update runner token status") } - if _, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID); err != nil { - if !errors.Is(err, util.ErrNotExist) { - return nil, errors.New("query tasks version failure") - } - // create a row of action_tasks_version if not exists yet. - if _, err := actions_model.InsertTasksVersion(ctx, runner.OwnerID, runner.RepoID); err != nil { - return nil, errors.New("can't insert tasks version") - } - } - res := connect.NewResponse(&runnerv1.RegisterResponse{ Runner: &runnerv1.Runner{ Id: runner.ID, @@ -143,13 +133,19 @@ func (s *Service) FetchTask( var task *runnerv1.Task tasksVersion := req.Msg.TasksVersion // task version from runner - atv, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID) + latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID) if err != nil { - log.Error("query tasks version failed: %v", err) - return nil, status.Errorf(codes.Internal, "query tasks version: %v", err) + if !errors.Is(err, util.ErrNotExist) { + log.Error("query tasks version failed: %v", err) + return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err) + } + // create a row of action_tasks_version if not exists yet. + if _, err := actions_model.InsertTasksVersion(ctx, runner.OwnerID, runner.RepoID); err != nil { + return nil, status.Errorf(codes.Internal, "insert tasks version failed: %v", err) + } } - latestVersion := atv.Version - if req.Msg.TasksVersion != latestVersion { + + if tasksVersion != latestVersion { // if the task version in request is not equal to the version in db, // it means there may still be some tasks not be assgined. // try to pick a task for the runner that send the request. @@ -159,11 +155,10 @@ func (s *Service) FetchTask( } else if ok { task = t } - tasksVersion = latestVersion } res := connect.NewResponse(&runnerv1.FetchTaskResponse{ Task: task, - TasksVersion: tasksVersion, + TasksVersion: latestVersion, }) return res, nil } diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index c32015213ccc1..537bc618075c4 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -268,11 +268,6 @@ func RerunOne(ctx *context_module.Context) { return } - if err := actions_model.IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { - ctx.Error(http.StatusInternalServerError, err.Error()) - return - } - ctx.JSON(http.StatusOK, struct{}{}) } @@ -291,11 +286,6 @@ func RerunAll(ctx *context_module.Context) { } } - if err := actions_model.IncreaseTaskVersion(ctx, jobs[0].OwnerID, jobs[0].RepoID); err != nil { - ctx.Error(http.StatusInternalServerError, err.Error()) - return - } - ctx.JSON(http.StatusOK, struct{}{}) } diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index 6fae6dad18f1c..c071475eeeb84 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -241,11 +241,7 @@ func notify(ctx context.Context, input *notifyInput) error { } else { CreateCommitStatus(ctx, jobs...) } - } - if err := actions_model.IncreaseTaskVersion(ctx, input.Repo.OwnerID, input.Repo.ID); err != nil { - log.Error("IncreaseTaskVersion: %v", err) - return err } return nil From b61682be610b0b7b9d280ae7e0e19a123e2941d4 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Thu, 6 Jul 2023 17:13:39 +0800 Subject: [PATCH 14/24] delete fmt --- models/actions/run_job.go | 1 - 1 file changed, 1 deletion(-) diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 155b05831ec7d..27f8d2298037c 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -132,7 +132,6 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col if runStatus.IsDone() { run.Stopped = timeutil.TimeStampNow() } else if !runStatus.IsRunning() { - fmt.Println("run status: ", runStatus.String()) // other status changed, increase tasks version if err := increaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { return affected, err From 67a42164119c245fa924a5e2ebf1b93254b33439 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Thu, 6 Jul 2023 17:40:08 +0800 Subject: [PATCH 15/24] fix if cond --- models/actions/run_job.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 27f8d2298037c..3aec626290b10 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -131,8 +131,8 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col } if runStatus.IsDone() { run.Stopped = timeutil.TimeStampNow() - } else if !runStatus.IsRunning() { - // other status changed, increase tasks version + } else if runStatus.IsWaiting() { + // if the status of job changes to waiting again, increase tasks version. if err := increaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { return affected, err } From 0d5b9170a6cb1c1fb58dee63dba886982e2f9eb4 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Fri, 7 Jul 2023 10:01:12 +0800 Subject: [PATCH 16/24] delete empty line --- services/actions/notifier_helper.go | 1 - 1 file changed, 1 deletion(-) diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index c071475eeeb84..8e6cdcf680d1b 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -243,7 +243,6 @@ func notify(ctx context.Context, input *notifyInput) error { } } - return nil } From 5285ed4e4d1706bff4d3cc7b6fc207431f8cc33d Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Fri, 7 Jul 2023 10:16:00 +0800 Subject: [PATCH 17/24] update --- models/actions/run.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/models/actions/run.go b/models/actions/run.go index 64cc15c3fdcc6..85d0625754672 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -195,6 +195,7 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork } runJobs := make([]*ActionRunJob, 0, len(jobs)) + var hasWaiting bool for _, v := range jobs { id, job := v.Job() needs := job.Needs() @@ -205,6 +206,8 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork status := StatusWaiting if len(needs) > 0 || run.NeedApproval { status = StatusBlocked + } else { + hasWaiting = true } job.Name, _ = util.SplitStringAtByteN(job.Name, 255) runJobs = append(runJobs, &ActionRunJob{ @@ -225,9 +228,11 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork return err } - // if new action jobs are triggered, increase tasks version. - if err := increaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil { - return err + // if there is a job in the waiting status, increase tasks version. + if hasWaiting { + if err := increaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil { + return err + } } return commiter.Commit() From 3e12fd39e7d874357c5f7b10afd72486911d9b1e Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Fri, 7 Jul 2023 16:01:50 +0800 Subject: [PATCH 18/24] adjust some logic --- models/actions/run.go | 2 +- models/actions/run_job.go | 7 +++-- models/actions/tasks_version.go | 39 ++++++++++++++++------------ routers/api/actions/runner/runner.go | 11 +++----- 4 files changed, 33 insertions(+), 26 deletions(-) diff --git a/models/actions/run.go b/models/actions/run.go index 85d0625754672..5396c612f6e38 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -230,7 +230,7 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork // if there is a job in the waiting status, increase tasks version. if hasWaiting { - if err := increaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil { + if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil { return err } } diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 3aec626290b10..9d762aa68bc11 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -131,12 +131,15 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col } if runStatus.IsDone() { run.Stopped = timeutil.TimeStampNow() - } else if runStatus.IsWaiting() { + } + + if affected != 0 && util.SliceContains(cols, "status") && job.Status.IsWaiting() { // if the status of job changes to waiting again, increase tasks version. - if err := increaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { + if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { return affected, err } } + return affected, UpdateRun(ctx, run) } diff --git a/models/actions/tasks_version.go b/models/actions/tasks_version.go index 85da374f20868..62f31d6340887 100644 --- a/models/actions/tasks_version.go +++ b/models/actions/tasks_version.go @@ -5,12 +5,10 @@ package actions import ( "context" - "fmt" "code.gitea.io/gitea/models/db" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/timeutil" - "code.gitea.io/gitea/modules/util" ) // ActionTasksVersion @@ -36,16 +34,15 @@ func GetTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (int64, if err != nil { return 0, err } else if !has { - return 0, fmt.Errorf("tasks version with owner id %d repo id %d: %w", ownerID, repoID, util.ErrNotExist) + return 0, nil } return tasksVersion.Version, err } -func InsertTasksVersion(ctx context.Context, ownerID, repoID int64) (*ActionTasksVersion, error) { +func insertTasksVersion(ctx context.Context, ownerID, repoID int64) (*ActionTasksVersion, error) { tasksVersion := &ActionTasksVersion{ OwnerID: ownerID, RepoID: repoID, - // Set the default value of version to 1, so that the first fetch request after the runner starts will definitely query the database. Version: 1, } if _, err := db.GetEngine(ctx).Insert(tasksVersion); err != nil { @@ -67,7 +64,7 @@ func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) err if affected == 0 { // if update sql does not affect any rows, the database may be broken, // so re-insert the row of version data here. - if _, err := InsertTasksVersion(ctx, ownerID, repoID); err != nil { + if _, err := insertTasksVersion(ctx, ownerID, repoID); err != nil { return err } } @@ -75,7 +72,7 @@ func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) err return nil } -func increaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { +func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { dbCtx, commiter, err := db.TxContext(ctx) if err != nil { return err @@ -84,20 +81,30 @@ func increaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { ctx = dbCtx.WithContext(ctx) + allScopes := ownerID > 0 && repoID > 0 + // 1. increase global - if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil { - log.Error("IncreaseTasksVersionByScope(Global): %v", err) - return err + if allScopes || (ownerID == 0 && repoID == 0) { + if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil { + log.Error("IncreaseTasksVersionByScope(Global): %v", err) + return err + } } + // 2. increase owner - if err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil { - log.Error("IncreaseTasksVersionByScope(Owner): %v", err) - return err + if allScopes || ownerID > 0 { + if err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil { + log.Error("IncreaseTasksVersionByScope(Owner): %v", err) + return err + } } + // 3. increase repo - if err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil { - log.Error("IncreaseTasksVersionByScope(Repo): %v", err) - return err + if allScopes || repoID > 0 { + if err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil { + log.Error("IncreaseTasksVersionByScope(Repo): %v", err) + return err + } } return commiter.Commit() diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index d1d54348e4bdc..d5b0398ff66e2 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -135,13 +135,10 @@ func (s *Service) FetchTask( tasksVersion := req.Msg.TasksVersion // task version from runner latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID) if err != nil { - if !errors.Is(err, util.ErrNotExist) { - log.Error("query tasks version failed: %v", err) - return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err) - } - // create a row of action_tasks_version if not exists yet. - if _, err := actions_model.InsertTasksVersion(ctx, runner.OwnerID, runner.RepoID); err != nil { - return nil, status.Errorf(codes.Internal, "insert tasks version failed: %v", err) + return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err) + } else if latestVersion == 0 { + if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil { + return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err) } } From 7d66700ae355e393a9e2f3df0f7185ae71f60a20 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Fri, 7 Jul 2023 16:59:51 +0800 Subject: [PATCH 19/24] fix --- models/actions/run_job.go | 14 +++++++------- models/actions/tasks_version.go | 14 +++++--------- routers/api/actions/runner/runner.go | 1 + 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 9d762aa68bc11..a0471e83c24ec 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -111,6 +111,13 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col return affected, nil } + if affected != 0 && util.SliceContains(cols, "status") && job.Status.IsWaiting() { + // if the status of job changes to waiting again, increase tasks version. + if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { + return affected, err + } + } + if job.RunID == 0 { var err error if job, err = GetRunJobByID(ctx, job.ID); err != nil { @@ -133,13 +140,6 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col run.Stopped = timeutil.TimeStampNow() } - if affected != 0 && util.SliceContains(cols, "status") && job.Status.IsWaiting() { - // if the status of job changes to waiting again, increase tasks version. - if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { - return affected, err - } - } - return affected, UpdateRun(ctx, run) } diff --git a/models/actions/tasks_version.go b/models/actions/tasks_version.go index 62f31d6340887..562950ac83d00 100644 --- a/models/actions/tasks_version.go +++ b/models/actions/tasks_version.go @@ -81,18 +81,14 @@ func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { ctx = dbCtx.WithContext(ctx) - allScopes := ownerID > 0 && repoID > 0 - // 1. increase global - if allScopes || (ownerID == 0 && repoID == 0) { - if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil { - log.Error("IncreaseTasksVersionByScope(Global): %v", err) - return err - } + if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil { + log.Error("IncreaseTasksVersionByScope(Global): %v", err) + return err } // 2. increase owner - if allScopes || ownerID > 0 { + if ownerID > 0 { if err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil { log.Error("IncreaseTasksVersionByScope(Owner): %v", err) return err @@ -100,7 +96,7 @@ func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { } // 3. increase repo - if allScopes || repoID > 0 { + if repoID > 0 { if err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil { log.Error("IncreaseTasksVersionByScope(Repo): %v", err) return err diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index d5b0398ff66e2..d5a92fcb78f09 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -140,6 +140,7 @@ func (s *Service) FetchTask( if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil { return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err) } + latestVersion++ } if tasksVersion != latestVersion { From 9b025bc11eb99850cf366c209cfd8715b37dc59c Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Fri, 7 Jul 2023 18:35:11 +0800 Subject: [PATCH 20/24] comment --- models/actions/run_job.go | 1 - routers/api/actions/runner/runner.go | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/models/actions/run_job.go b/models/actions/run_job.go index a0471e83c24ec..c7620cd8bca2f 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -139,7 +139,6 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col if runStatus.IsDone() { run.Stopped = timeutil.TimeStampNow() } - return affected, UpdateRun(ctx, run) } diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index d5a92fcb78f09..8bba940ef6d50 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -140,6 +140,9 @@ func (s *Service) FetchTask( if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil { return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err) } + // if we don't increase the value of `latestVersion` here, + // the reponse of FetchTask will return tasksVersion as zero. + // and the runner will treat it as an old version of Gitea. latestVersion++ } From 8c7051a3847369d9187b334cb3baed94f1334dff Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Mon, 10 Jul 2023 11:56:58 +0800 Subject: [PATCH 21/24] typo --- routers/api/actions/runner/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 8bba940ef6d50..6de5964cb77bc 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -141,7 +141,7 @@ func (s *Service) FetchTask( return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err) } // if we don't increase the value of `latestVersion` here, - // the reponse of FetchTask will return tasksVersion as zero. + // the response of FetchTask will return tasksVersion as zero. // and the runner will treat it as an old version of Gitea. latestVersion++ } From be3576b26e3eb439daac37aac0f0c8d5abba66bc Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Fri, 21 Jul 2023 15:15:25 +0800 Subject: [PATCH 22/24] context --- models/actions/tasks_version.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/models/actions/tasks_version.go b/models/actions/tasks_version.go index 562950ac83d00..e12d55fdfb4e7 100644 --- a/models/actions/tasks_version.go +++ b/models/actions/tasks_version.go @@ -79,17 +79,15 @@ func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { } defer commiter.Close() - ctx = dbCtx.WithContext(ctx) - // 1. increase global - if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil { + if err := increaseTasksVersionByScope(dbCtx, 0, 0); err != nil { log.Error("IncreaseTasksVersionByScope(Global): %v", err) return err } // 2. increase owner if ownerID > 0 { - if err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil { + if err := increaseTasksVersionByScope(dbCtx, ownerID, 0); err != nil { log.Error("IncreaseTasksVersionByScope(Owner): %v", err) return err } @@ -97,7 +95,7 @@ func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { // 3. increase repo if repoID > 0 { - if err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil { + if err := increaseTasksVersionByScope(dbCtx, 0, repoID); err != nil { log.Error("IncreaseTasksVersionByScope(Repo): %v", err) return err } From 4283ebedbec1d02f9ff8158e02bc7de27888e098 Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Fri, 21 Jul 2023 15:43:23 +0800 Subject: [PATCH 23/24] another context --- models/actions/task.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/models/actions/task.go b/models/actions/task.go index 719fd193657f3..8d526e9d2c481 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -220,9 +220,8 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask return nil, false, err } defer commiter.Close() - ctx = dbCtx.WithContext(ctx) - e := db.GetEngine(ctx) + e := db.GetEngine(dbCtx) jobCond := builder.NewCond() if runner.RepoID != 0 { From 98cec83c742edab8788c6d33954989fc9047160d Mon Sep 17 00:00:00 2001 From: sillyguodong Date: Fri, 21 Jul 2023 15:58:37 +0800 Subject: [PATCH 24/24] no introduce new ctx any more --- models/actions/task.go | 4 ++-- models/actions/tasks_version.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/models/actions/task.go b/models/actions/task.go index aae5c7086a843..9cc0fd0df83db 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -215,13 +215,13 @@ func GetRunningTaskByToken(ctx context.Context, token string) (*ActionTask, erro } func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) { - dbCtx, commiter, err := db.TxContext(ctx) + ctx, commiter, err := db.TxContext(ctx) if err != nil { return nil, false, err } defer commiter.Close() - e := db.GetEngine(dbCtx) + e := db.GetEngine(ctx) jobCond := builder.NewCond() if runner.RepoID != 0 { diff --git a/models/actions/tasks_version.go b/models/actions/tasks_version.go index e12d55fdfb4e7..5c0a86538d570 100644 --- a/models/actions/tasks_version.go +++ b/models/actions/tasks_version.go @@ -73,21 +73,21 @@ func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) err } func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { - dbCtx, commiter, err := db.TxContext(ctx) + ctx, commiter, err := db.TxContext(ctx) if err != nil { return err } defer commiter.Close() // 1. increase global - if err := increaseTasksVersionByScope(dbCtx, 0, 0); err != nil { + if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil { log.Error("IncreaseTasksVersionByScope(Global): %v", err) return err } // 2. increase owner if ownerID > 0 { - if err := increaseTasksVersionByScope(dbCtx, ownerID, 0); err != nil { + if err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil { log.Error("IncreaseTasksVersionByScope(Owner): %v", err) return err } @@ -95,7 +95,7 @@ func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { // 3. increase repo if repoID > 0 { - if err := increaseTasksVersionByScope(dbCtx, 0, repoID); err != nil { + if err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil { log.Error("IncreaseTasksVersionByScope(Repo): %v", err) return err }