diff --git a/.deadcode-out b/.deadcode-out index 24facdf12e..fd2bd56e44 100644 --- a/.deadcode-out +++ b/.deadcode-out @@ -224,6 +224,9 @@ forgejo.org/services/context forgejo.org/services/federation FollowRemoteActor +forgejo.org/services/notify + UnregisterNotifier + forgejo.org/services/repository IsErrForkAlreadyExist diff --git a/custom/conf/app.example.ini b/custom/conf/app.example.ini index 824cf0b21f..56c3634ff2 100644 --- a/custom/conf/app.example.ini +++ b/custom/conf/app.example.ini @@ -2782,6 +2782,10 @@ LEVEL = Info ;SKIP_WORKFLOW_STRINGS = [skip ci],[ci skip],[no ci],[skip actions],[actions skip] ;; Limit on inputs for manual / workflow_dispatch triggers, default is 10 ;LIMIT_DISPATCH_INPUTS = 10 +;; Support queuing workflow jobs, by setting `concurrency.group` & `concurrency.cancel-in-progress: false`, can increase +;; server and database workload due to more complex database queries and more frequent server task querying; this +;; feature can be disabled to reduce performance impact +;CONCURRENCY_GROUP_QUEUE_ENABLED = true ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/go.mod b/go.mod index 79e0bd7310..dfeeb30451 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( code.forgejo.org/forgejo/go-rpmutils v1.0.0 code.forgejo.org/forgejo/levelqueue v1.0.0 code.forgejo.org/forgejo/reply v1.0.2 - code.forgejo.org/forgejo/runner/v11 v11.1.1 + code.forgejo.org/forgejo/runner/v11 v11.1.2 code.forgejo.org/go-chi/binding v1.0.1 code.forgejo.org/go-chi/cache v1.0.1 code.forgejo.org/go-chi/captcha v1.0.2 diff --git a/go.sum b/go.sum index 75e72a226b..f24e204f07 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,8 @@ code.forgejo.org/forgejo/levelqueue v1.0.0 h1:9krYpU6BM+j/1Ntj6m+VCAIu0UNnne1/Uf code.forgejo.org/forgejo/levelqueue v1.0.0/go.mod h1:fmG6zhVuqim2rxSFOoasgXO8V2W/k9U31VVYqLIRLhQ= code.forgejo.org/forgejo/reply v1.0.2 h1:dMhQCHV6/O3L5CLWNTol+dNzDAuyCK88z4J/lCdgFuQ= code.forgejo.org/forgejo/reply v1.0.2/go.mod h1:RyZUfzQLc+fuLIGjTSQWDAJWPiL4WtKXB/FifT5fM7U= -code.forgejo.org/forgejo/runner/v11 v11.1.1 h1:CoSfxBOLKLMJZq5VhKd5ZIUc3tCf04iyFx926s+zaMA= -code.forgejo.org/forgejo/runner/v11 v11.1.1/go.mod h1:9f0D2EG7uabL+cv71SWHKrGgo2vmLpvko0QLmtn3RDE= +code.forgejo.org/forgejo/runner/v11 v11.1.2 h1:jM5YsNmScH11VJEwmvsTUiqGjAqtiUzBhQ65BIo8ZOs= +code.forgejo.org/forgejo/runner/v11 v11.1.2/go.mod h1:9f0D2EG7uabL+cv71SWHKrGgo2vmLpvko0QLmtn3RDE= code.forgejo.org/forgejo/ssh v0.0.0-20241211213324-5fc306ca0616 h1:kEZL84+02jY9RxXM4zHBWZ3Fml0B09cmP1LGkDsCfIA= code.forgejo.org/forgejo/ssh v0.0.0-20241211213324-5fc306ca0616/go.mod h1:zpHEXBstFnQYtGnB8k8kQLol82umzn/2/snG7alWVD8= code.forgejo.org/go-chi/binding v1.0.1 h1:coKNI+X1NzRN7X85LlrpvBRqk0TXpJ+ja28vusQWEuY= diff --git a/models/actions/run.go b/models/actions/run.go index b5f79a0cb3..29fab29770 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -25,11 +25,23 @@ import ( "xorm.io/builder" ) +type ConcurrencyMode int + +const ( + // Don't enforce concurrency control. Note that you won't find `UnlimitedConcurrency` implemented directly in the + // code; setting it on an `ActionRun` prevents the other limiting behaviors. + UnlimitedConcurrency ConcurrencyMode = iota + // Queue behind other jobs with the same concurrency group + QueueBehind + // Cancel other jobs with the same concurrency group + CancelInProgress +) + // ActionRun represents a run of a workflow file type ActionRun struct { ID int64 Title string - RepoID int64 `xorm:"index unique(repo_index)"` + RepoID int64 `xorm:"index unique(repo_index) index(concurrency)"` Repo *repo_model.Repository `xorm:"-"` OwnerID int64 `xorm:"index"` WorkflowID string `xorm:"index"` // the name of workflow file @@ -56,6 +68,9 @@ type ActionRun struct { Created timeutil.TimeStamp `xorm:"created"` Updated timeutil.TimeStamp `xorm:"updated"` NotifyEmail bool + + ConcurrencyGroup string `xorm:"'concurrency_group' index(concurrency)"` + ConcurrencyType ConcurrencyMode } func init() { @@ -163,6 +178,24 @@ func (run *ActionRun) GetPullRequestEventPayload() (*api.PullRequestPayload, err return nil, fmt.Errorf("event %s is not a pull request event", run.Event) } +func (run *ActionRun) SetConcurrencyGroup(concurrencyGroup string) { + // Concurrency groups are case insensitive identifiers, implemented by collapsing case here. Unfortunately the + // `ConcurrencyGroup` field can't be made a private field because xorm doesn't map those fields -- using + // `SetConcurrencyGroup` is required for consistency but not enforced at compile-time. + run.ConcurrencyGroup = strings.ToLower(concurrencyGroup) +} + +func (run *ActionRun) SetDefaultConcurrencyGroup() { + // Before ConcurrencyGroups were supported, Forgejo would automatically cancel runs with matching git refs, workflow + // IDs, and trigger events. For backwards compatibility we emulate that behavior: + run.SetConcurrencyGroup(fmt.Sprintf( + "%s_%s_%s__auto", + run.Ref, + run.WorkflowID, + run.TriggerEvent, + )) +} + func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error { _, err := db.GetEngine(ctx).ID(repo.ID). SetExpr("num_action_runs", diff --git a/models/actions/run_list.go b/models/actions/run_list.go index 92be510569..174d2aa70c 100644 --- a/models/actions/run_list.go +++ b/models/actions/run_list.go @@ -5,6 +5,7 @@ package actions import ( "context" + "strings" "forgejo.org/models/db" repo_model "forgejo.org/models/repo" @@ -64,14 +65,15 @@ func (runs RunList) LoadRepos(ctx context.Context) error { type FindRunOptions struct { db.ListOptions - RepoID int64 - OwnerID int64 - WorkflowID string - Ref string // the commit/tag/… that caused this workflow - TriggerUserID int64 - TriggerEvent webhook_module.HookEventType - Approved bool // not util.OptionalBool, it works only when it's true - Status []Status + RepoID int64 + OwnerID int64 + WorkflowID string + Ref string // the commit/tag/… that caused this workflow + TriggerUserID int64 + TriggerEvent webhook_module.HookEventType + Approved bool // not util.OptionalBool, it works only when it's true + Status []Status + ConcurrencyGroup string } func (opts FindRunOptions) ToConds() builder.Cond { @@ -100,6 +102,9 @@ func (opts FindRunOptions) ToConds() builder.Cond { if opts.TriggerEvent != "" { cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent}) } + if opts.ConcurrencyGroup != "" { + cond = cond.And(builder.Eq{"concurrency_group": strings.ToLower(opts.ConcurrencyGroup)}) + } return cond } diff --git a/models/actions/run_test.go b/models/actions/run_test.go index c9a552a2b2..a305dcf2fc 100644 --- a/models/actions/run_test.go +++ b/models/actions/run_test.go @@ -5,7 +5,34 @@ package actions import ( "testing" + + "github.com/stretchr/testify/assert" ) func TestGetRunBefore(t *testing.T) { } + +func TestSetConcurrencyGroup(t *testing.T) { + run := ActionRun{} + run.SetConcurrencyGroup("abc123") + assert.Equal(t, "abc123", run.ConcurrencyGroup) + run.SetConcurrencyGroup("ABC123") // case should collapse in SetConcurrencyGroup + assert.Equal(t, "abc123", run.ConcurrencyGroup) +} + +func TestSetDefaultConcurrencyGroup(t *testing.T) { + run := ActionRun{ + Ref: "refs/heads/main", + WorkflowID: "testing", + TriggerEvent: "pull_request", + } + run.SetDefaultConcurrencyGroup() + assert.Equal(t, "refs/heads/main_testing_pull_request__auto", run.ConcurrencyGroup) + run = ActionRun{ + Ref: "refs/heads/main", + WorkflowID: "TESTING", // case should collapse in SetDefaultConcurrencyGroup + TriggerEvent: "pull_request", + } + run.SetDefaultConcurrencyGroup() + assert.Equal(t, "refs/heads/main_testing_pull_request__auto", run.ConcurrencyGroup) +} diff --git a/models/actions/task.go b/models/actions/task.go index 6e77db1b18..569fc2bb33 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -239,6 +239,88 @@ func GetRunningTaskByToken(ctx context.Context, token string) (*ActionTask, erro return nil, errNotExist } +func getConcurrencyCondition() builder.Cond { + concurrencyCond := builder.NewCond() + + // OK to pick if there's no concurrency_group on the run + concurrencyCond = concurrencyCond.Or(builder.Eq{"concurrency_group": ""}) + concurrencyCond = concurrencyCond.Or(builder.IsNull{"concurrency_group"}) + + // OK to pick if it's not a "QueueBehind" concurrency type + concurrencyCond = concurrencyCond.Or(builder.Neq{"concurrency_type": QueueBehind}) + + // subQuery ends up representing all the runs that would block a run from executing: + subQuery := builder.Select("id").From("action_run", "inner_run"). + // A run can't block itself, so exclude it from this search + Where(builder.Neq{"inner_run.id": builder.Expr("outer_run.id")}). + // Blocking runs must be from the same repo & concurrency group + And(builder.Eq{"inner_run.repo_id": builder.Expr("outer_run.repo_id")}). + And(builder.Eq{"inner_run.concurrency_group": builder.Expr("outer_run.concurrency_group")}). + And( + // Ideally the logic here would be that a blocking run is "not done", and "younger", which allows each run + // to be blocked on the previous runs in the concurrency group and therefore execute in order from oldest to + // newest. + // + // But it's possible for runs to be required to run out-of-order -- for example, if a younger run has + // already completed but then it is re-run. If we only used "not done" and "younger" as logic, then the + // re-run would not be blocked, and therefore would violate the concurrency group's single-run goal. + // + // So we use two conditions to meet both needs: + // + // Blocking runs have a running status... + builder.Eq{"inner_run.status": StatusRunning}.Or( + // Blocking runs don't have a IsDone status & are younger than the outer_run + builder.NotIn("inner_run.status", []Status{StatusSuccess, StatusFailure, StatusCancelled, StatusSkipped}). + And(builder.Lt{"inner_run.`index`": builder.Expr("outer_run.`index`")}))) + + // OK to pick if there are no blocking runs + concurrencyCond = concurrencyCond.Or(builder.NotExists(subQuery)) + + return concurrencyCond +} + +// Returns all the available jobs that could be executed on `runner`, before label filtering is applied. Note that +// only a single job can actually be run from this result for any given invocation, as multiple runs (in order) from any +// single concurrency group could be returned. +func GetAvailableJobsForRunner(e db.Engine, runner *ActionRunner) ([]*ActionRunJob, error) { + jobCond := builder.NewCond() + if runner.RepoID != 0 { + jobCond = builder.Eq{"repo_id": runner.RepoID} + } else if runner.OwnerID != 0 { + jobCond = builder.In("repo_id", builder.Select("`repository`.id").From("repository"). + Join("INNER", "repo_unit", "`repository`.id = `repo_unit`.repo_id"). + Where(builder.Eq{"`repository`.owner_id": runner.OwnerID, "`repo_unit`.type": unit.TypeActions})) + } + // Concurrency group checks for queuing one run behind the last run in the concurrency group are more + // computationally expensive on the database. To manage the risk that this might have on large-scale deployments + // When this feature is initially released, it can be disabled in the ini file by setting + // `CONCURRENCY_GROUP_QUEUE_ENABLED = false` in the `[actions]` section. If disabled, then actions with a + // concurrency group and `cancel-in-progress: false` will run simultaneously rather than being queued. + if setting.Actions.ConcurrencyGroupQueueEnabled { + jobCond = jobCond.And(getConcurrencyCondition()) + } + if jobCond.IsValid() { + // It is *likely* more efficient to use an EXISTS query here rather than an IN clause, as that allows the + // database's query optimizer to perform partial computation of the subquery rather than complete computation. + // However, database engines can be fickle and difficult to predict. We'll retain the original IN clause + // implementation when ConcurrencyGroupQueueEnabled is disabled, which should maintain the same performance + // characteristics. When ConcurrencyGroupQueueEnabled is enabled, it will switch to the EXISTS clause. + if setting.Actions.ConcurrencyGroupQueueEnabled { + jobCond = builder.Exists(builder.Select("id").From("action_run", "outer_run"). + Where(builder.Eq{"outer_run.id": builder.Expr("action_run_job.run_id")}). + And(jobCond)) + } else { + jobCond = builder.In("run_id", builder.Select("id").From("action_run", "outer_run").Where(jobCond)) + } + } + + var jobs []*ActionRunJob + if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil { + return nil, err + } + return jobs, nil +} + func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) { ctx, commiter, err := db.TxContext(ctx) if err != nil { @@ -248,20 +330,8 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask e := db.GetEngine(ctx) - jobCond := builder.NewCond() - if runner.RepoID != 0 { - jobCond = builder.Eq{"repo_id": runner.RepoID} - } else if runner.OwnerID != 0 { - jobCond = builder.In("repo_id", builder.Select("`repository`.id").From("repository"). - Join("INNER", "repo_unit", "`repository`.id = `repo_unit`.repo_id"). - Where(builder.Eq{"`repository`.owner_id": runner.OwnerID, "`repo_unit`.type": unit.TypeActions})) - } - if jobCond.IsValid() { - jobCond = builder.In("run_id", builder.Select("id").From("action_run").Where(jobCond)) - } - - var jobs []*ActionRunJob - if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil { + jobs, err := GetAvailableJobsForRunner(e, runner) + if err != nil { return nil, false, err } diff --git a/models/fixtures/action_run.yml b/models/fixtures/action_run.yml index 5b6f89ae0e..4e5af28166 100644 --- a/models/fixtures/action_run.yml +++ b/models/fixtures/action_run.yml @@ -525,6 +525,7 @@ ref: "refs/heads/main" commit_sha: "97f29ee599c373c729132a5c46a046978311e0ee" event: "workflow_dispatch" + trigger_event: "workflow_dispatch" is_fork_pull_request: 0 status: 6 # running started: 1683636528 diff --git a/models/forgejo_migrations/migrate.go b/models/forgejo_migrations/migrate.go index 39e8084958..2c34422e91 100644 --- a/models/forgejo_migrations/migrate.go +++ b/models/forgejo_migrations/migrate.go @@ -121,6 +121,8 @@ var migrations = []*Migration{ NewMigration("Add index for release sha1", AddIndexForReleaseSha1), // v40 -> v41 NewMigration("Add foreign keys to stopwatch & tracked_time", AddForeignKeysStopwatchTrackedTime), + // v41 -> v42 + NewMigration("Add action_run concurrency fields", AddActionRunConcurrency), } // GetCurrentDBVersion returns the current Forgejo database version. diff --git a/models/forgejo_migrations/v42.go b/models/forgejo_migrations/v42.go new file mode 100644 index 0000000000..0eb267e93c --- /dev/null +++ b/models/forgejo_migrations/v42.go @@ -0,0 +1,20 @@ +// Copyright 2025 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: GPL-3.0-or-later + +package forgejo_migrations + +import "xorm.io/xorm" + +func AddActionRunConcurrency(x *xorm.Engine) error { + type ActionRun struct { + RepoID int64 `xorm:"index unique(repo_index) index(concurrency)"` + Index int64 `xorm:"index unique(repo_index)"` + ConcurrencyGroup string `xorm:"index(concurrency)"` + ConcurrencyType int + } + _, err := x.SyncWithOptions(xorm.SyncOptions{ + // Sync drops indexes by default, and this local ActionRun doesn't have all the indexes -- so disable that. + IgnoreDropIndices: true, + }, new(ActionRun)) + return err +} diff --git a/modules/setting/actions.go b/modules/setting/actions.go index 52a3ad5309..303fd6363b 100644 --- a/modules/setting/actions.go +++ b/modules/setting/actions.go @@ -12,23 +12,25 @@ import ( // Actions settings var ( Actions = struct { - Enabled bool - LogStorage *Storage // how the created logs should be stored - LogRetentionDays int64 `ini:"LOG_RETENTION_DAYS"` - LogCompression logCompression `ini:"LOG_COMPRESSION"` - ArtifactStorage *Storage // how the created artifacts should be stored - ArtifactRetentionDays int64 `ini:"ARTIFACT_RETENTION_DAYS"` - DefaultActionsURL defaultActionsURL `ini:"DEFAULT_ACTIONS_URL"` - ZombieTaskTimeout time.Duration `ini:"ZOMBIE_TASK_TIMEOUT"` - EndlessTaskTimeout time.Duration `ini:"ENDLESS_TASK_TIMEOUT"` - AbandonedJobTimeout time.Duration `ini:"ABANDONED_JOB_TIMEOUT"` - SkipWorkflowStrings []string `ìni:"SKIP_WORKFLOW_STRINGS"` - LimitDispatchInputs int64 `ini:"LIMIT_DISPATCH_INPUTS"` + Enabled bool + LogStorage *Storage // how the created logs should be stored + LogRetentionDays int64 `ini:"LOG_RETENTION_DAYS"` + LogCompression logCompression `ini:"LOG_COMPRESSION"` + ArtifactStorage *Storage // how the created artifacts should be stored + ArtifactRetentionDays int64 `ini:"ARTIFACT_RETENTION_DAYS"` + DefaultActionsURL defaultActionsURL `ini:"DEFAULT_ACTIONS_URL"` + ZombieTaskTimeout time.Duration `ini:"ZOMBIE_TASK_TIMEOUT"` + EndlessTaskTimeout time.Duration `ini:"ENDLESS_TASK_TIMEOUT"` + AbandonedJobTimeout time.Duration `ini:"ABANDONED_JOB_TIMEOUT"` + SkipWorkflowStrings []string `ìni:"SKIP_WORKFLOW_STRINGS"` + LimitDispatchInputs int64 `ini:"LIMIT_DISPATCH_INPUTS"` + ConcurrencyGroupQueueEnabled bool `ini:"CONCURRENCY_GROUP_QUEUE_ENABLED"` }{ - Enabled: true, - DefaultActionsURL: defaultActionsURLForgejo, - SkipWorkflowStrings: []string{"[skip ci]", "[ci skip]", "[no ci]", "[skip actions]", "[actions skip]"}, - LimitDispatchInputs: 10, + Enabled: true, + DefaultActionsURL: defaultActionsURLForgejo, + SkipWorkflowStrings: []string{"[skip ci]", "[ci skip]", "[no ci]", "[skip actions]", "[actions skip]"}, + LimitDispatchInputs: 10, + ConcurrencyGroupQueueEnabled: true, } ) diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index a971cd3fbf..2b3f0db1e5 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -14,6 +14,7 @@ import ( user_model "forgejo.org/models/user" "forgejo.org/modules/actions" "forgejo.org/modules/log" + "forgejo.org/modules/setting" "forgejo.org/modules/util" actions_service "forgejo.org/services/actions" @@ -224,6 +225,15 @@ func (s *Service) UpdateTask( if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil { log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err) } + // Reaching a finalized result for a task can cause other tasks in the same concurrency group to become + // unblocked. Increasing task version here allows all applicable runners to requery to the DB for that state. + // Because it is only useful for that condition, and it has system performance risks, only enable it when + // concurrency group queuing is enabled. + if setting.Actions.ConcurrencyGroupQueueEnabled { + if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("fail to increase task version: %w", err)) + } + } } return connect.NewResponse(&runnerv1.UpdateTaskResponse{ diff --git a/services/actions/TestCancelPreviousJobs/action_run_job.yml b/services/actions/TestCancelPreviousJobs/action_run_job.yml new file mode 100644 index 0000000000..98a39bbe4e --- /dev/null +++ b/services/actions/TestCancelPreviousJobs/action_run_job.yml @@ -0,0 +1,14 @@ +- + id: 600 + run_id: 894 + repo_id: 63 + owner_id: 2 + commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee + is_fork_pull_request: 0 + name: job_1 + attempt: 0 + job_id: job_1 + task_id: 0 + status: 6 # running + runs_on: '["fedora"]' + started: 1683636528 diff --git a/services/actions/TestCancelPreviousWithConcurrencyGroup/action_run.yml b/services/actions/TestCancelPreviousWithConcurrencyGroup/action_run.yml new file mode 100644 index 0000000000..09555ec356 --- /dev/null +++ b/services/actions/TestCancelPreviousWithConcurrencyGroup/action_run.yml @@ -0,0 +1,38 @@ +- + id: 900 + title: "running workflow_dispatch run" + repo_id: 63 + owner_id: 2 + workflow_id: "running.yaml" + index: 4 + trigger_user_id: 2 + ref: "refs/heads/main" + commit_sha: "97f29ee599c373c729132a5c46a046978311e0ee" + trigger_event: "workflow_dispatch" + is_fork_pull_request: 0 + status: 6 # running + started: 1683636528 + created: 1683636108 + updated: 1683636626 + need_approval: 0 + approved_by: 0 + concurrency_group: abc123 +- + id: 901 + title: "running workflow_dispatch run" + repo_id: 63 + owner_id: 2 + workflow_id: "running.yaml" + index: 5 + trigger_user_id: 2 + ref: "refs/heads/main" + commit_sha: "97f29ee599c373c729132a5c46a046978311e0ee" + trigger_event: "workflow_dispatch" + is_fork_pull_request: 0 + status: 6 # running + started: 1683636528 + created: 1683636108 + updated: 1683636626 + need_approval: 0 + approved_by: 0 + concurrency_group: abc123 diff --git a/services/actions/TestCancelPreviousWithConcurrencyGroup/action_run_job.yml b/services/actions/TestCancelPreviousWithConcurrencyGroup/action_run_job.yml new file mode 100644 index 0000000000..9e4fd930b6 --- /dev/null +++ b/services/actions/TestCancelPreviousWithConcurrencyGroup/action_run_job.yml @@ -0,0 +1,28 @@ +- + id: 600 + run_id: 900 + repo_id: 63 + owner_id: 2 + commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee + is_fork_pull_request: 0 + name: job_1 + attempt: 0 + job_id: job_1 + task_id: 0 + status: 6 # running + runs_on: '["fedora"]' + started: 1683636528 +- + id: 601 + run_id: 901 + repo_id: 63 + owner_id: 2 + commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee + is_fork_pull_request: 0 + name: job_1 + attempt: 0 + job_id: job_1 + task_id: 0 + status: 6 # running + runs_on: '["fedora"]' + started: 1683636528 diff --git a/services/actions/context.go b/services/actions/context.go index bf187c56bf..f79d5affeb 100644 --- a/services/actions/context.go +++ b/services/actions/context.go @@ -14,11 +14,11 @@ import ( "forgejo.org/modules/git" "forgejo.org/modules/json" "forgejo.org/modules/setting" + + "code.forgejo.org/forgejo/runner/v11/act/model" ) -// GenerateGiteaContext generate the gitea context without token and gitea_runtime_token -// job can be nil when generating a context for parsing workflow-level expressions -func GenerateGiteaContext(run *actions_model.ActionRun, job *actions_model.ActionRunJob) map[string]any { +func generateGiteaContextForRun(run *actions_model.ActionRun) *model.GithubContext { event := map[string]any{} _ = json.Unmarshal([]byte(run.EventPayload), &event) @@ -41,45 +41,64 @@ func GenerateGiteaContext(run *actions_model.ActionRun, job *actions_model.Actio refName := git.RefName(ref) - gitContext := map[string]any{ + gitContextObj := &model.GithubContext{ // standard contexts, see https://docs.github.com/en/actions/learn-github-actions/contexts#github-context - "action": "", // string, The name of the action currently running, or the id of a step. GitHub removes special characters, and uses the name __run when the current step runs a script without an id. If you use the same action more than once in the same job, the name will include a suffix with the sequence number with underscore before it. For example, the first script you run will have the name __run, and the second script will be named __run_2. Similarly, the second invocation of actions/checkout will be actionscheckout2. - "action_path": "", // string, The path where an action is located. This property is only supported in composite actions. You can use this path to access files located in the same repository as the action. - "action_ref": "", // string, For a step executing an action, this is the ref of the action being executed. For example, v2. - "action_repository": "", // string, For a step executing an action, this is the owner and repository name of the action. For example, actions/checkout. - "action_status": "", // string, For a composite action, the current result of the composite action. - "actor": run.TriggerUser.Name, // string, The username of the user that triggered the initial workflow run. If the workflow run is a re-run, this value may differ from github.triggering_actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges. - "api_url": setting.AppURL + "api/v1", // string, The URL of the GitHub REST API. - "base_ref": baseRef, // string, The base_ref or target branch of the pull request in a workflow run. This property is only available when the event that triggers a workflow run is either pull_request or pull_request_target. - "env": "", // string, Path on the runner to the file that sets environment variables from workflow commands. This file is unique to the current step and is a different file for each step in a job. For more information, see "Workflow commands for GitHub Actions." - "event": event, // object, The full event webhook payload. You can access individual properties of the event using this context. This object is identical to the webhook payload of the event that triggered the workflow run, and is different for each event. The webhooks for each GitHub Actions event is linked in "Events that trigger workflows." For example, for a workflow run triggered by the push event, this object contains the contents of the push webhook payload. - "event_name": run.TriggerEvent, // string, The name of the event that triggered the workflow run. - "event_path": "", // string, The path to the file on the runner that contains the full event webhook payload. - "graphql_url": "", // string, The URL of the GitHub GraphQL API. - "head_ref": headRef, // string, The head_ref or source branch of the pull request in a workflow run. This property is only available when the event that triggers a workflow run is either pull_request or pull_request_target. - "job": "", // string, The job_id of the current job. - "ref": ref, // string, The fully-formed ref of the branch or tag that triggered the workflow run. For workflows triggered by push, this is the branch or tag ref that was pushed. For workflows triggered by pull_request, this is the pull request merge branch. For workflows triggered by release, this is the release tag created. For other triggers, this is the branch or tag ref that triggered the workflow run. This is only set if a branch or tag is available for the event type. The ref given is fully-formed, meaning that for branches the format is refs/heads/, for pull requests it is refs/pull//merge, and for tags it is refs/tags/. For example, refs/heads/feature-branch-1. - "ref_name": refName.ShortName(), // string, The short ref name of the branch or tag that triggered the workflow run. This value matches the branch or tag name shown on GitHub. For example, feature-branch-1. - "ref_protected": false, // boolean, true if branch protections are configured for the ref that triggered the workflow run. - "ref_type": refName.RefType(), // string, The type of ref that triggered the workflow run. Valid values are branch or tag. - "path": "", // string, Path on the runner to the file that sets system PATH variables from workflow commands. This file is unique to the current step and is a different file for each step in a job. For more information, see "Workflow commands for GitHub Actions." - "repository": run.Repo.OwnerName + "/" + run.Repo.Name, // string, The owner and repository name. For example, Codertocat/Hello-World. - "repository_owner": run.Repo.OwnerName, // string, The repository owner's name. For example, Codertocat. - "repositoryUrl": run.Repo.HTMLURL(), // string, The Git URL to the repository. For example, git://github.com/codertocat/hello-world.git. - "retention_days": "", // string, The number of days that workflow run logs and artifacts are kept. - "run_id": "", // string, A unique number for each workflow run within a repository. This number does not change if you re-run the workflow run. - "run_number": fmt.Sprint(run.Index), // string, A unique number for each run of a particular workflow in a repository. This number begins at 1 for the workflow's first run, and increments with each new run. This number does not change if you re-run the workflow run. - "run_attempt": "", // string, A unique number for each attempt of a particular workflow run in a repository. This number begins at 1 for the workflow run's first attempt, and increments with each re-run. - "secret_source": "Actions", // string, The source of a secret used in a workflow. Possible values are None, Actions, Dependabot, or Codespaces. - "server_url": setting.AppURL, // string, The URL of the GitHub server. For example: https://github.com. - "sha": sha, // string, The commit SHA that triggered the workflow. The value of this commit SHA depends on the event that triggered the workflow. For more information, see "Events that trigger workflows." For example, ffac537e6cbbf934b08745a378932722df287a53. - "triggering_actor": "", // string, The username of the user that initiated the workflow run. If the workflow run is a re-run, this value may differ from github.actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges. - "workflow": run.WorkflowID, // string, The name of the workflow. If the workflow file doesn't specify a name, the value of this property is the full path of the workflow file in the repository. - "workspace": "", // string, The default working directory on the runner for steps, and the default location of your repository when using the checkout action. - - // additional contexts - "gitea_default_actions_url": setting.Actions.DefaultActionsURL.URL(), + APIURL: setting.AppURL + "api/v1", // string, The URL of the GitHub REST API. + Action: "", // string, The name of the action currently running, or the id of a step. GitHub removes special characters, and uses the name __run when the current step runs a script without an id. If you use the same action more than once in the same job, the name will include a suffix with the sequence number with underscore before it. For example, the first script you run will have the name __run, and the second script will be named __run_2. Similarly, the second invocation of actions/checkout will be actionscheckout2. + ActionPath: "", // string, The path where an action is located. This property is only supported in composite actions. You can use this path to access files located in the same repository as the action. + ActionRef: "", // string, For a step executing an action, this is the ref of the action being executed. For example, v2. + ActionRepository: "", // string, For a step executing an action, this is the owner and repository name of the action. For example, actions/checkout. + BaseRef: baseRef, // string, The base_ref or target branch of the pull request in a workflow run. This property is only available when the event that triggers a workflow run is either pull_request or pull_request_target. + Event: event, // object, The full event webhook payload. You can access individual properties of the event using this context. This object is identical to the webhook payload of the event that triggered the workflow run, and is different for each event. The webhooks for each GitHub Actions event is linked in "Events that trigger workflows." For example, for a workflow run triggered by the push event, this object contains the contents of the push webhook payload. + EventName: run.TriggerEvent, // string, The name of the event that triggered the workflow run. + EventPath: "", // string, The path to the file on the runner that contains the full event webhook payload. + GraphQLURL: "", // string, The URL of the GitHub GraphQL API. + HeadRef: headRef, // string, The head_ref or source branch of the pull request in a workflow run. This property is only available when the event that triggers a workflow run is either pull_request or pull_request_target. + Job: "", // string, The job_id of the current job. + Ref: ref, // string, The fully-formed ref of the branch or tag that triggered the workflow run. For workflows triggered by push, this is the branch or tag ref that was pushed. For workflows triggered by pull_request, this is the pull request merge branch. For workflows triggered by release, this is the release tag created. For other triggers, this is the branch or tag ref that triggered the workflow run. This is only set if a branch or tag is available for the event type. The ref given is fully-formed, meaning that for branches the format is refs/heads/, for pull requests it is refs/pull//merge, and for tags it is refs/tags/. For example, refs/heads/feature-branch-1. + RefName: refName.ShortName(), // string, The short ref name of the branch or tag that triggered the workflow run. This value matches the branch or tag name shown on GitHub. For example, feature-branch-1. + RefType: refName.RefType(), // string, The type of ref that triggered the workflow run. Valid values are branch or tag. + Repository: run.Repo.OwnerName + "/" + run.Repo.Name, // string, The owner and repository name. For example, Codertocat/Hello-World. + RepositoryOwner: run.Repo.OwnerName, // string, The repository owner's name. For example, Codertocat. + RetentionDays: "", // string, The number of days that workflow run logs and artifacts are kept. + RunID: "", // string, A unique number for each workflow run within a repository. This number does not change if you re-run the workflow run. + RunNumber: fmt.Sprint(run.Index), // string, A unique number for each run of a particular workflow in a repository. This number begins at 1 for the workflow's first run, and increments with each new run. This number does not change if you re-run the workflow run. + RunAttempt: "", // string, A unique number for each attempt of a particular workflow run in a repository. This number begins at 1 for the workflow run's first attempt, and increments with each re-run. + ServerURL: setting.AppURL, // string, The URL of the GitHub server. For example: https://github.com. + Sha: sha, // string, The commit SHA that triggered the workflow. The value of this commit SHA depends on the event that triggered the workflow. For more information, see "Events that trigger workflows." For example, ffac537e6cbbf934b08745a378932722df287a53. + Workflow: run.WorkflowID, // string, The name of the workflow. If the workflow file doesn't specify a name, the value of this property is the full path of the workflow file in the repository. + Workspace: "", // string, The default working directory on the runner for steps, and the default location of your repository when using the checkout action. } + if run.TriggerUser != nil { + gitContextObj.Actor = run.TriggerUser.Name // string, The username of the user that triggered the initial workflow run. If the workflow run is a re-run, this value may differ from github.triggering_actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges. + } + + return gitContextObj +} + +// GenerateGiteaContext generate the gitea context without token and gitea_runtime_token +// job can be nil when generating a context for parsing workflow-level expressions +func GenerateGiteaContext(run *actions_model.ActionRun, job *actions_model.ActionRunJob) map[string]any { + gitContextObj := generateGiteaContextForRun(run) + + gitContext, _ := githubContextToMap(gitContextObj) + + // standard contexts, see https://docs.github.com/en/actions/learn-github-actions/contexts#github-context + gitContext["action_status"] = "" // string, For a composite action, the current result of the composite action. + gitContext["actor"] = run.TriggerUser.Name // string, The username of the user that triggered the initial workflow run. If the workflow run is a re-run, this value may differ from github.triggering_actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges. + gitContext["env"] = "" // string, Path on the runner to the file that sets environment variables from workflow commands. This file is unique to the current step and is a different file for each step in a job. For more information, see "Workflow commands for GitHub Actions." + gitContext["path"] = "" // string, Path on the runner to the file that sets system PATH variables from workflow commands. This file is unique to the current step and is a different file for each step in a job. For more information, see "Workflow commands for GitHub Actions." + gitContext["ref_protected"] = false // boolean, true if branch protections are configured for the ref that triggered the workflow run. + gitContext["repository_owner"] = run.Repo.OwnerName // string, The repository owner's name. For example, Codertocat. + gitContext["repository"] = run.Repo.OwnerName + "/" + run.Repo.Name // string, The owner and repository name. For example, Codertocat/Hello-World. + gitContext["repositoryUrl"] = run.Repo.HTMLURL() // string, The Git URL to the repository. For example, git://github.com/codertocat/hello-world.git. + gitContext["secret_source"] = "Actions" // string, The source of a secret used in a workflow. Possible values are None, Actions, Dependabot, or Codespaces. + gitContext["server_url"] = setting.AppURL // string, The URL of the GitHub server. For example: https://github.com. + gitContext["triggering_actor"] = "" // string, The username of the user that initiated the workflow run. If the workflow run is a re-run, this value may differ from github.actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges. + gitContext["workflow"] = run.WorkflowID // string, The name of the workflow. If the workflow file doesn't specify a name, the value of this property is the full path of the workflow file in the repository. + + // additional contexts + gitContext["gitea_default_actions_url"] = setting.Actions.DefaultActionsURL.URL() if job != nil { gitContext["job"] = job.JobID @@ -90,6 +109,21 @@ func GenerateGiteaContext(run *actions_model.ActionRun, job *actions_model.Actio return gitContext } +func githubContextToMap(gitContext *model.GithubContext) (map[string]any, error) { + jsonBytes, err := json.Marshal(gitContext) + if err != nil { + return nil, fmt.Errorf("failed to marshal struct: %w", err) + } + + var result map[string]any + err = json.Unmarshal(jsonBytes, &result) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal to map: %w", err) + } + + return result, nil +} + type TaskNeed struct { Result actions_model.Status Outputs map[string]string diff --git a/services/actions/context_test.go b/services/actions/context_test.go index c96094ade8..99a13f909f 100644 --- a/services/actions/context_test.go +++ b/services/actions/context_test.go @@ -7,7 +7,13 @@ import ( "testing" actions_model "forgejo.org/models/actions" + "forgejo.org/models/repo" "forgejo.org/models/unittest" + "forgejo.org/models/user" + actions_module "forgejo.org/modules/actions" + "forgejo.org/modules/json" + "forgejo.org/modules/setting" + webhook_module "forgejo.org/modules/webhook" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -27,3 +33,321 @@ func TestFindTaskNeeds(t *testing.T) { assert.Equal(t, "abc", ret["job1"].Outputs["output_a"]) assert.Equal(t, "bbb", ret["job1"].Outputs["output_b"]) } + +func TestGenerateGiteaContext(t *testing.T) { + testUser := &user.User{ + ID: 1, + Name: "testuser", + } + + testRepo := &repo.Repository{ + ID: 1, + OwnerName: "testowner", + Name: "testrepo", + } + + emptyField := func(t *testing.T, context map[string]any, field string) { + v, ok := context[field] + assert.True(t, ok, "expected field %q to be present", field) + assert.Empty(t, v) + } + + t.Run("Basic workflow run without job", func(t *testing.T) { + run := &actions_model.ActionRun{ + ID: 1, + Index: 42, + TriggerUser: testUser, + Repo: testRepo, + TriggerEvent: "push", + Ref: "refs/heads/main", + CommitSHA: "abc123def456", + WorkflowID: "test-workflow", + EventPayload: `{"repository": {"name": "testrepo"}}`, + } + + context := GenerateGiteaContext(run, nil) + + assert.Equal(t, "testuser", context["actor"]) + assert.Equal(t, setting.AppURL+"api/v1", context["api_url"]) + assert.Equal(t, "push", context["event_name"]) + assert.Equal(t, "refs/heads/main", context["ref"]) + assert.Equal(t, "main", context["ref_name"]) + assert.Equal(t, "branch", context["ref_type"]) + assert.Equal(t, "testowner/testrepo", context["repository"]) + assert.Equal(t, "testowner", context["repository_owner"]) + assert.Equal(t, "abc123def456", context["sha"]) + assert.Equal(t, "42", context["run_number"]) + assert.Equal(t, "test-workflow", context["workflow"]) + assert.Equal(t, false, context["ref_protected"]) + assert.Equal(t, "Actions", context["secret_source"]) + assert.Equal(t, setting.AppURL, context["server_url"]) + + event, ok := context["event"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "testrepo", event["repository"].(map[string]any)["name"]) + + emptyField(t, context, "action_path") + emptyField(t, context, "action_ref") + emptyField(t, context, "action_repository") + emptyField(t, context, "action_status") + emptyField(t, context, "action") + emptyField(t, context, "base_ref") + emptyField(t, context, "env") + emptyField(t, context, "event_path") + emptyField(t, context, "graphql_url") + emptyField(t, context, "head_ref") + emptyField(t, context, "job") + emptyField(t, context, "path") + emptyField(t, context, "retention_days") + emptyField(t, context, "run_attempt") + emptyField(t, context, "run_id") + emptyField(t, context, "triggering_actor") + emptyField(t, context, "workspace") + }) + + t.Run("Workflow run with job", func(t *testing.T) { + run := &actions_model.ActionRun{ + ID: 1, + Index: 42, + TriggerUser: testUser, + Repo: testRepo, + TriggerEvent: "push", + Ref: "refs/heads/main", + CommitSHA: "abc123def456", + WorkflowID: "test-workflow", + EventPayload: `{}`, + } + + job := &actions_model.ActionRunJob{ + ID: 100, + RunID: 1, + JobID: "test-job", + Attempt: 1, + } + + context := GenerateGiteaContext(run, job) + + assert.Equal(t, "test-job", context["job"]) + assert.Equal(t, "1", context["run_id"]) + assert.Equal(t, "1", context["run_attempt"]) + }) + + t.Run("Pull request event", func(t *testing.T) { + pullRequestPayload := map[string]any{ + "pull_request": map[string]any{ + "base": map[string]any{ + "ref": "main", + "label": "main", + "sha": "base123sha", + }, + "head": map[string]any{ + "ref": "feature-branch", + "label": "feature-branch", + "sha": "head456sha", + }, + }, + } + + payloadBytes, _ := json.Marshal(pullRequestPayload) + + run := &actions_model.ActionRun{ + ID: 1, + Index: 42, + TriggerUser: testUser, + Repo: testRepo, + TriggerEvent: "pull_request", + Ref: "refs/pull/1/merge", + CommitSHA: "merge789sha", + WorkflowID: "test-workflow", + Event: webhook_module.HookEventPullRequest, + EventPayload: string(payloadBytes), + } + + context := GenerateGiteaContext(run, nil) + + assert.Equal(t, "main", context["base_ref"]) + assert.Equal(t, "feature-branch", context["head_ref"]) + assert.Equal(t, "refs/pull/1/merge", context["ref"]) + assert.Equal(t, "merge789sha", context["sha"]) + }) + + t.Run("Pull request target event", func(t *testing.T) { + pullRequestPayload := map[string]any{ + "pull_request": map[string]any{ + "base": map[string]any{ + "ref": "main", + "label": "main", + "sha": "base123sha", + }, + "head": map[string]any{ + "ref": "feature-branch", + "label": "feature-branch", + "sha": "head456sha", + }, + }, + } + + payloadBytes, _ := json.Marshal(pullRequestPayload) + + run := &actions_model.ActionRun{ + ID: 1, + Index: 42, + TriggerUser: testUser, + Repo: testRepo, + TriggerEvent: actions_module.GithubEventPullRequestTarget, + Ref: "refs/pull/1/merge", + CommitSHA: "merge789sha", + WorkflowID: "test-workflow", + Event: webhook_module.HookEventPullRequest, + EventPayload: string(payloadBytes), + } + + context := GenerateGiteaContext(run, nil) + + assert.Equal(t, "main", context["base_ref"]) + assert.Equal(t, "feature-branch", context["head_ref"]) + // For pull_request_target, ref and sha should be from base + assert.Equal(t, "refs/heads/main", context["ref"]) + assert.Equal(t, "base123sha", context["sha"]) + assert.Equal(t, "main", context["ref_name"]) + assert.Equal(t, "branch", context["ref_type"]) + }) +} + +func TestGenerateGiteaContextForRun(t *testing.T) { + testUser := &user.User{ + ID: 1, + Name: "testuser", + } + + testRepo := &repo.Repository{ + ID: 1, + OwnerName: "testowner", + Name: "testrepo", + } + + t.Run("Basic workflow run", func(t *testing.T) { + run := &actions_model.ActionRun{ + ID: 1, + Index: 42, + TriggerUser: testUser, + Repo: testRepo, + TriggerEvent: "push", + Ref: "refs/heads/main", + CommitSHA: "abc123def456", + WorkflowID: "test-workflow", + EventPayload: `{"repository": {"name": "testrepo"}}`, + } + + gitContextObj := generateGiteaContextForRun(run) + + assert.Equal(t, "testuser", gitContextObj.Actor) + assert.Equal(t, setting.AppURL+"api/v1", gitContextObj.APIURL) + assert.Equal(t, "push", gitContextObj.EventName) + assert.Equal(t, "refs/heads/main", gitContextObj.Ref) + assert.Equal(t, "main", gitContextObj.RefName) + assert.Equal(t, "branch", gitContextObj.RefType) + assert.Equal(t, "testowner/testrepo", gitContextObj.Repository) + assert.Equal(t, "testowner", gitContextObj.RepositoryOwner) + assert.Equal(t, "abc123def456", gitContextObj.Sha) + assert.Equal(t, "42", gitContextObj.RunNumber) + assert.Equal(t, "test-workflow", gitContextObj.Workflow) + + assert.Equal(t, "testrepo", gitContextObj.Event["repository"].(map[string]any)["name"]) + + assert.Empty(t, gitContextObj.ActionPath) + assert.Empty(t, gitContextObj.ActionRef) + assert.Empty(t, gitContextObj.ActionRepository) + assert.Empty(t, gitContextObj.Action) + assert.Empty(t, gitContextObj.BaseRef) + assert.Empty(t, gitContextObj.EventPath) + assert.Empty(t, gitContextObj.GraphQLURL) + assert.Empty(t, gitContextObj.HeadRef) + assert.Empty(t, gitContextObj.Job) + assert.Empty(t, gitContextObj.RetentionDays) + assert.Empty(t, gitContextObj.RunAttempt) + assert.Empty(t, gitContextObj.RunID) + assert.Empty(t, gitContextObj.Workspace) + }) + + t.Run("Pull request event", func(t *testing.T) { + pullRequestPayload := map[string]any{ + "pull_request": map[string]any{ + "base": map[string]any{ + "ref": "main", + "label": "main", + "sha": "base123sha", + }, + "head": map[string]any{ + "ref": "feature-branch", + "label": "feature-branch", + "sha": "head456sha", + }, + }, + } + + payloadBytes, _ := json.Marshal(pullRequestPayload) + + run := &actions_model.ActionRun{ + ID: 1, + Index: 42, + TriggerUser: testUser, + Repo: testRepo, + TriggerEvent: "pull_request", + Ref: "refs/pull/1/merge", + CommitSHA: "merge789sha", + WorkflowID: "test-workflow", + Event: webhook_module.HookEventPullRequest, + EventPayload: string(payloadBytes), + } + + gitContextObj := generateGiteaContextForRun(run) + + assert.Equal(t, "main", gitContextObj.BaseRef) + assert.Equal(t, "feature-branch", gitContextObj.HeadRef) + assert.Equal(t, "refs/pull/1/merge", gitContextObj.Ref) + assert.Equal(t, "merge789sha", gitContextObj.Sha) + }) + + t.Run("Pull request target event", func(t *testing.T) { + pullRequestPayload := map[string]any{ + "pull_request": map[string]any{ + "base": map[string]any{ + "ref": "main", + "label": "main", + "sha": "base123sha", + }, + "head": map[string]any{ + "ref": "feature-branch", + "label": "feature-branch", + "sha": "head456sha", + }, + }, + } + + payloadBytes, _ := json.Marshal(pullRequestPayload) + + run := &actions_model.ActionRun{ + ID: 1, + Index: 42, + TriggerUser: testUser, + Repo: testRepo, + TriggerEvent: actions_module.GithubEventPullRequestTarget, + Ref: "refs/pull/1/merge", + CommitSHA: "merge789sha", + WorkflowID: "test-workflow", + Event: webhook_module.HookEventPullRequest, + EventPayload: string(payloadBytes), + } + + gitContextObj := generateGiteaContextForRun(run) + + assert.Equal(t, "main", gitContextObj.BaseRef) + assert.Equal(t, "feature-branch", gitContextObj.HeadRef) + // For pull_request_target, ref and sha should be from base + assert.Equal(t, "refs/heads/main", gitContextObj.Ref) + assert.Equal(t, "base123sha", gitContextObj.Sha) + assert.Equal(t, "main", gitContextObj.RefName) + assert.Equal(t, "branch", gitContextObj.RefType) + }) +} diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index b24090cab4..553455a332 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -351,14 +351,17 @@ func handleWorkflows( Status: actions_model.StatusWaiting, } - if workflow, err := model.ReadWorkflow(bytes.NewReader(dwf.Content), false); err == nil { - notifications, err := workflow.Notifications() - if err != nil { - log.Error("Notifications: %w", err) - } - run.NotifyEmail = notifications + workflow, err := model.ReadWorkflow(bytes.NewReader(dwf.Content), false) + if err != nil { + log.Error("unable to read workflow: %v", err) } + notifications, err := workflow.Notifications() + if err != nil { + log.Error("Notifications: %w", err) + } + run.NotifyEmail = notifications + need, err := ifNeedApproval(ctx, run, input.Repo, input.Doer) if err != nil { log.Error("check if need approval for repo %d with user %d: %v", input.Repo.ID, input.Doer.ID, err) @@ -378,6 +381,11 @@ func handleWorkflows( continue } + err = ConfigureActionRunConcurrency(workflow, run, vars, map[string]any{}) + if err != nil { + log.Error("ConfigureActionRunConcurrency: %v", err) + } + jobs, err := jobParser(dwf.Content, jobparser.WithVars(vars)) if err != nil { run.Status = actions_model.StatusFailure @@ -387,17 +395,13 @@ func handleWorkflows( }} } - // cancel running jobs if the event is push or pull_request_sync - if run.Event == webhook_module.HookEventPush || - run.Event == webhook_module.HookEventPullRequestSync { - if err := CancelPreviousJobs( + if run.ConcurrencyType == actions_model.CancelInProgress { + if err := CancelPreviousWithConcurrencyGroup( ctx, run.RepoID, - run.Ref, - run.WorkflowID, - run.Event, + run.ConcurrencyGroup, ); err != nil { - log.Error("CancelPreviousJobs: %v", err) + log.Error("CancelPreviousWithConcurrencyGroup: %v", err) } } diff --git a/services/actions/notifier_helper_test.go b/services/actions/notifier_helper_test.go index 525103927b..0c928524aa 100644 --- a/services/actions/notifier_helper_test.go +++ b/services/actions/notifier_helper_test.go @@ -4,6 +4,7 @@ package actions import ( + "slices" "testing" actions_model "forgejo.org/models/actions" @@ -144,3 +145,66 @@ func Test_OpenForkPullRequestEvent(t *testing.T) { assert.Equal(t, webhook_module.HookEventPullRequest, runs[0].Event) assert.True(t, runs[0].IsForkPullRequest) } + +func TestActionsNotifierConcurrencyGroup(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 10}) + doer := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 1}) + pr := unittest.AssertExistsAndLoadBean(t, &issues_model.PullRequest{ID: 3}) + + commit := &git.Commit{ + ID: git.MustIDFromString("0000000000000000000000000000000000000000"), + CommitMessage: "test", + } + detectedWorkflows := []*actions_module.DetectedWorkflow{ + { + EntryName: "test.yml", + TriggerEvent: &jobparser.Event{ + Name: "pull_request", + }, + Content: []byte("{ on: pull_request, jobs: { j1: {} }}"), + }, + } + input := ¬ifyInput{ + Repo: repo, + Doer: doer, + Event: webhook_module.HookEventPullRequestSync, + PullRequest: pr, + Payload: &api.PullRequestPayload{}, + } + + err := handleWorkflows(db.DefaultContext, detectedWorkflows, commit, input, "refs/head/main") + require.NoError(t, err) + + runs, err := db.Find[actions_model.ActionRun](db.DefaultContext, actions_model.FindRunOptions{ + RepoID: repo.ID, + }) + require.NoError(t, err) + require.Len(t, runs, 1) + firstRun := runs[0] + + assert.Equal(t, "refs/head/main_test.yml_pull_request__auto", firstRun.ConcurrencyGroup) + assert.Equal(t, actions_model.CancelInProgress, firstRun.ConcurrencyType) + assert.Equal(t, actions_model.StatusWaiting, firstRun.Status) + + // Also... check if CancelPreviousWithConcurrencyGroup is invoked from handleWorkflows by firing off a second + // workflow and checking that the first one gets cancelled: + + err = handleWorkflows(db.DefaultContext, detectedWorkflows, commit, input, "refs/head/main") + require.NoError(t, err) + + runs, err = db.Find[actions_model.ActionRun](db.DefaultContext, actions_model.FindRunOptions{ + RepoID: repo.ID, + }) + require.NoError(t, err) + require.Len(t, runs, 2) + + firstRunIndex := slices.IndexFunc(runs, func(run *actions_model.ActionRun) bool { return run.ID == firstRun.ID }) + require.NotEqual(t, -1, firstRunIndex) + firstRun = runs[firstRunIndex] + + assert.Equal(t, "refs/head/main_test.yml_pull_request__auto", firstRun.ConcurrencyGroup) + assert.Equal(t, actions_model.CancelInProgress, firstRun.ConcurrencyType) + assert.Equal(t, actions_model.StatusCancelled, firstRun.Status) +} diff --git a/services/actions/schedule_tasks.go b/services/actions/schedule_tasks.go index 07ff7b3187..e7dad2d948 100644 --- a/services/actions/schedule_tasks.go +++ b/services/actions/schedule_tasks.go @@ -57,20 +57,6 @@ func startTasks(ctx context.Context) error { // Loop through each spec and create a schedule task for it for _, row := range specs { - // cancel running jobs if the event is push - if row.Schedule.Event == webhook_module.HookEventPush { - // cancel running jobs of the same workflow - if err := CancelPreviousJobs( - ctx, - row.RepoID, - row.Schedule.Ref, - row.Schedule.WorkflowID, - webhook_module.HookEventSchedule, - ); err != nil { - log.Error("CancelPreviousJobs: %v", err) - } - } - if row.Repo.IsArchived { // Skip if the repo is archived continue @@ -166,6 +152,21 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) } run.NotifyEmail = notifications + err = ConfigureActionRunConcurrency(workflow, run, vars, map[string]any{}) + if err != nil { + return err + } + + if run.ConcurrencyType == actions_model.CancelInProgress { + if err := CancelPreviousWithConcurrencyGroup( + ctx, + run.RepoID, + run.ConcurrencyGroup, + ); err != nil { + return err + } + } + // Parse the workflow specification from the cron schedule workflows, err := jobParser(cron.Content, jobparser.WithVars(vars)) if err != nil { @@ -185,7 +186,7 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) // It's useful when a new run is triggered, and all previous runs needn't be continued anymore. func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error { // Find all runs in the specified repository, reference, and workflow with non-final status - runs, total, err := db.FindAndCount[actions_model.ActionRun](ctx, actions_model.FindRunOptions{ + runs, _, err := db.FindAndCount[actions_model.ActionRun](ctx, actions_model.FindRunOptions{ RepoID: repoID, Ref: ref, WorkflowID: workflowID, @@ -196,58 +197,93 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin return err } - // If there are no runs found, there's no need to proceed with cancellation, so return nil. - if total == 0 { - return nil + // Iterate over each found run and cancel its associated jobs. + errorSlice := []error{} + for _, run := range runs { + err := cancelJobsForRun(ctx, run) + errorSlice = append(errorSlice, err) + } + err = errors.Join(errorSlice...) + if err != nil { + return err + } + + return nil +} + +// Cancels all pending jobs in the same repository with the same concurrency group. +func CancelPreviousWithConcurrencyGroup(ctx context.Context, repoID int64, concurrencyGroup string) error { + runs, _, err := db.FindAndCount[actions_model.ActionRun](ctx, actions_model.FindRunOptions{ + RepoID: repoID, + ConcurrencyGroup: concurrencyGroup, + Status: []actions_model.Status{actions_model.StatusRunning, actions_model.StatusWaiting, actions_model.StatusBlocked}, + }) + if err != nil { + return err } // Iterate over each found run and cancel its associated jobs. + errorSlice := []error{} for _, run := range runs { - // Find all jobs associated with the current run. - jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{ - RunID: run.ID, - }) - if err != nil { - return err + err := cancelJobsForRun(ctx, run) + errorSlice = append(errorSlice, err) + } + err = errors.Join(errorSlice...) + if err != nil { + return err + } + + return nil +} + +func cancelJobsForRun(ctx context.Context, run *actions_model.ActionRun) error { + // Find all jobs associated with the current run. + jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{ + RunID: run.ID, + }) + if err != nil { + return err + } + + // Iterate over each job and attempt to cancel it. + errorSlice := []error{} + for _, job := range jobs { + // Skip jobs that are already in a terminal state (completed, cancelled, etc.). + status := job.Status + if status.IsDone() { + continue } - // Iterate over each job and attempt to cancel it. - for _, job := range jobs { - // Skip jobs that are already in a terminal state (completed, cancelled, etc.). - status := job.Status - if status.IsDone() { + // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. + if job.TaskID == 0 { + job.Status = actions_model.StatusCancelled + job.Stopped = timeutil.TimeStampNow() + + // Update the job's status and stopped time in the database. + n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") + if err != nil { + errorSlice = append(errorSlice, err) continue } - // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. - if job.TaskID == 0 { - job.Status = actions_model.StatusCancelled - job.Stopped = timeutil.TimeStampNow() - - // Update the job's status and stopped time in the database. - n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") - if err != nil { - return err - } - - // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. - if n == 0 { - return errors.New("job has changed, try again") - } - - // Continue with the next job. + // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. + if n == 0 { + errorSlice = append(errorSlice, errors.New("job has changed, try again")) continue } - // If the job has an associated task, try to stop the task, effectively cancelling the job. - if err := StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil { - return err - } + // Continue with the next job. + continue + } + + // If the job has an associated task, try to stop the task, effectively cancelling the job. + if err := StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil { + errorSlice = append(errorSlice, errors.New("job has changed, try again")) + continue } } - // Return nil to indicate successful cancellation of all running and waiting jobs. - return nil + return errors.Join(errorSlice...) } func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository, cancelPreviousJobs bool) error { diff --git a/services/actions/schedule_tasks_test.go b/services/actions/schedule_tasks_test.go index 31ed5ec813..7601b39ebd 100644 --- a/services/actions/schedule_tasks_test.go +++ b/services/actions/schedule_tasks_test.go @@ -11,6 +11,7 @@ import ( repo_model "forgejo.org/models/repo" "forgejo.org/models/unit" "forgejo.org/models/unittest" + "forgejo.org/modules/timeutil" webhook_module "forgejo.org/modules/webhook" "github.com/stretchr/testify/assert" @@ -86,6 +87,8 @@ func TestCreateScheduleTask(t *testing.T) { assert.Equal(t, cron.EventPayload, run.EventPayload) assert.Equal(t, cron.ID, run.ScheduleID) assert.Equal(t, actions_model.StatusWaiting, run.Status) + assert.Equal(t, "branch_some.yml_schedule__auto", run.ConcurrencyGroup) + assert.Equal(t, actions_model.UnlimitedConcurrency, run.ConcurrencyType) } assertMutable := func(t *testing.T, expected, run *actions_model.ActionRun) { @@ -173,3 +176,91 @@ jobs: }) } } + +func TestCancelPreviousJobs(t *testing.T) { + defer unittest.OverrideFixtures("services/actions/TestCancelPreviousJobs")() + require.NoError(t, unittest.PrepareTestDatabase()) + + run := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 894}) + assert.Equal(t, actions_model.StatusRunning, run.Status) + assert.EqualValues(t, 1683636626, run.Updated) + runJob := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: 894}) + assert.Equal(t, actions_model.StatusRunning, runJob.Status) + assert.EqualValues(t, 1683636528, runJob.Started) + + err := CancelPreviousJobs(t.Context(), 63, "refs/heads/main", "running.yaml", webhook_module.HookEventWorkflowDispatch) + require.NoError(t, err) + + run = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 894}) + assert.Equal(t, actions_model.StatusCancelled, run.Status) + assert.Greater(t, run.Updated, timeutil.TimeStamp(1683636626)) + runJob = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: 894}) + assert.Equal(t, actions_model.StatusCancelled, runJob.Status) + assert.Greater(t, runJob.Stopped, timeutil.TimeStamp(1683636528)) +} + +func TestCancelPreviousWithConcurrencyGroup(t *testing.T) { + for _, tc := range []struct { + name string + updateRun901 map[string]any + }{ + // run 900 & 901 in the fixture data have almost the same data and so should both be cancelled by + // TestCancelPreviousWithConcurrencyGroup -- but each test case will vary something different about 601 to + // ensure that only run 600 is targeted by the cancellation + { + name: "only cancels target repo", + updateRun901: map[string]any{"repo_id": 2}, + }, + { + name: "only cancels target concurrency group", + updateRun901: map[string]any{"concurrency_group": "321cba"}, + }, + { + name: "only cancels running", + updateRun901: map[string]any{"status": actions_model.StatusSuccess}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + defer unittest.OverrideFixtures("services/actions/TestCancelPreviousWithConcurrencyGroup")() + require.NoError(t, unittest.PrepareTestDatabase()) + + e := db.GetEngine(t.Context()) + + expected901Status := actions_model.StatusRunning + if tc.updateRun901 != nil { + affected, err := e.Table(&actions_model.ActionRun{}).Where("id = ?", 901).Update(tc.updateRun901) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + newStatus, ok := tc.updateRun901["status"] + if ok { + expected901Status = newStatus.(actions_model.Status) + } + } + + run := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 900}) + assert.Equal(t, actions_model.StatusRunning, run.Status) + assert.EqualValues(t, 1683636626, run.Updated) + assert.Equal(t, "abc123", run.ConcurrencyGroup) + run = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 901}) + assert.Equal(t, expected901Status, run.Status) + runJob := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: 900}) + assert.Equal(t, actions_model.StatusRunning, runJob.Status) + assert.EqualValues(t, 1683636528, runJob.Started) + + // Search for concurrency group should be case-insensitive, which we test here by using a different capitalization + // than the fixture data + err := CancelPreviousWithConcurrencyGroup(t.Context(), 63, "ABC123") + require.NoError(t, err) + + run = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 900}) + assert.Equal(t, actions_model.StatusCancelled, run.Status) + assert.Greater(t, run.Updated, timeutil.TimeStamp(1683636626)) + runJob = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: 900}) + assert.Equal(t, actions_model.StatusCancelled, runJob.Status) + assert.Greater(t, runJob.Stopped, timeutil.TimeStamp(1683636528)) + + run = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 901}) + assert.Equal(t, expected901Status, run.Status) + }) + } +} diff --git a/services/actions/workflows.go b/services/actions/workflows.go index 27d05c9043..bd429e77ab 100644 --- a/services/actions/workflows.go +++ b/services/actions/workflows.go @@ -69,6 +69,7 @@ func (entry *Workflow) Dispatch(ctx context.Context, inputGetter InputValueGette } inputs := make(map[string]string) + inputsAny := make(map[string]any) if workflowDispatch := wf.WorkflowDispatchConfig(); workflowDispatch != nil { for key, input := range workflowDispatch.Inputs { val := inputGetter(key) @@ -89,6 +90,7 @@ func (entry *Workflow) Dispatch(ctx context.Context, inputGetter InputValueGette val = strconv.FormatBool(val == "on") } inputs[key] = val + inputsAny[key] = val } } @@ -138,6 +140,21 @@ func (entry *Workflow) Dispatch(ctx context.Context, inputGetter InputValueGette return nil, nil, err } + err = ConfigureActionRunConcurrency(wf, run, vars, inputsAny) + if err != nil { + return nil, nil, err + } + + if run.ConcurrencyType == actions_model.CancelInProgress { + if err := CancelPreviousWithConcurrencyGroup( + ctx, + run.RepoID, + run.ConcurrencyGroup, + ); err != nil { + return nil, nil, err + } + } + jobs, err := jobParser(content, jobparser.WithVars(vars)) if err != nil { return nil, nil, err @@ -180,3 +197,38 @@ func GetWorkflowFromCommit(gitRepo *git.Repository, ref, workflowID string) (*Wo GitEntry: workflowEntry, }, nil } + +// Sets the ConcurrencyGroup & ConcurrencyType on the provided ActionRun based upon the Workflow's `concurrency` data, +// or appropriate defaults if not present. +func ConfigureActionRunConcurrency(workflow *act_model.Workflow, run *actions_model.ActionRun, vars map[string]string, inputs map[string]any) error { + concurrencyGroup, cancelInProgress, err := jobparser.EvaluateWorkflowConcurrency( + workflow.RawConcurrency, generateGiteaContextForRun(run), vars, inputs) + if err != nil { + return fmt.Errorf("unable to evaluate workflow `concurrency` block: %w", err) + } + if concurrencyGroup != "" { + run.SetConcurrencyGroup(concurrencyGroup) + } else { + run.SetDefaultConcurrencyGroup() + } + if cancelInProgress == nil { + // Maintain compatible behavior from before concurrency groups were implemented -- if `cancel-in-progress` + // isn't defined in the workflow, cancel on push & PR sync events. + if run.Event == webhook.HookEventPush || run.Event == webhook.HookEventPullRequestSync { + run.ConcurrencyType = actions_model.CancelInProgress + } else { + run.ConcurrencyType = actions_model.UnlimitedConcurrency + } + } else if *cancelInProgress { + run.ConcurrencyType = actions_model.CancelInProgress + } else if concurrencyGroup == "" { + // A workflow has explicitly listed `cancel-in-progress: false`, but has *not* provided a concurrency group. In + // this case we want to trigger a different concurrency behavior -- we won't cancel in-progress builds (we were + // asked not to), we won't queue behind other builds (we weren't given a concurrency group so it's reasonable to + // assume the user doesn't want a concurrency limit). + run.ConcurrencyType = actions_model.UnlimitedConcurrency + } else { + run.ConcurrencyType = actions_model.QueueBehind + } + return nil +} diff --git a/services/actions/workflows_test.go b/services/actions/workflows_test.go new file mode 100644 index 0000000000..7f50d7f982 --- /dev/null +++ b/services/actions/workflows_test.go @@ -0,0 +1,123 @@ +// Copyright 2025 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: GPL-3.0-or-later + +package actions + +import ( + "testing" + + actions_model "forgejo.org/models/actions" + "forgejo.org/models/repo" + "forgejo.org/modules/webhook" + + act_model "code.forgejo.org/forgejo/runner/v11/act/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConfigureActionRunConcurrency(t *testing.T) { + for _, tc := range []struct { + name string + concurrency *act_model.RawConcurrency + vars map[string]string + inputs map[string]any + runEvent webhook.HookEventType + expectedConcurrencyGroup string + expectedConcurrencyType actions_model.ConcurrencyMode + }{ + // Before the introduction of concurrency groups, push & pull_request_sync would cancel runs on the same repo, + // reference, workflow, and event -- these cases cover undefined concurrency group and backwards compatibility + // checks. + { + name: "backwards compatibility push", + runEvent: webhook.HookEventPush, + expectedConcurrencyGroup: "refs/head/main_testing.yml_push__auto", + expectedConcurrencyType: actions_model.CancelInProgress, + }, + { + name: "backwards compatibility pull_request_sync", + runEvent: webhook.HookEventPullRequestSync, + expectedConcurrencyGroup: "refs/head/main_testing.yml_pull_request_sync__auto", + expectedConcurrencyType: actions_model.CancelInProgress, + }, + { + name: "backwards compatibility other event", + runEvent: webhook.HookEventWorkflowDispatch, + expectedConcurrencyGroup: "refs/head/main_testing.yml_workflow_dispatch__auto", + expectedConcurrencyType: actions_model.UnlimitedConcurrency, + }, + + { + name: "fully-specified cancel-in-progress", + concurrency: &act_model.RawConcurrency{ + Group: "abc", + CancelInProgress: "true", + }, + runEvent: webhook.HookEventPullRequestSync, + expectedConcurrencyGroup: "abc", + expectedConcurrencyType: actions_model.CancelInProgress, + }, + { + name: "fully-specified queue-behind", + concurrency: &act_model.RawConcurrency{ + Group: "abc", + CancelInProgress: "false", + }, + runEvent: webhook.HookEventPullRequestSync, + expectedConcurrencyGroup: "abc", + expectedConcurrencyType: actions_model.QueueBehind, + }, + { + name: "no concurrency group, cancel-in-progress: false", + concurrency: &act_model.RawConcurrency{ + CancelInProgress: "false", + }, + runEvent: webhook.HookEventPullRequestSync, + expectedConcurrencyGroup: "refs/head/main_testing.yml_pull_request_sync__auto", + expectedConcurrencyType: actions_model.UnlimitedConcurrency, + }, + + { + name: "interpreted values", + concurrency: &act_model.RawConcurrency{ + Group: "${{ github.workflow }}-${{ github.ref }}", + CancelInProgress: "${{ !contains(github.ref, 'release/')}}", + }, + runEvent: webhook.HookEventPullRequestSync, + expectedConcurrencyGroup: "testing.yml-refs/head/main", + expectedConcurrencyType: actions_model.CancelInProgress, + }, + { + name: "interpreted values with inputs and vars", + concurrency: &act_model.RawConcurrency{ + Group: "${{ inputs.abc }}-${{ vars.def }}", + }, + inputs: map[string]any{"abc": "123"}, + vars: map[string]string{"def": "456"}, + runEvent: webhook.HookEventPullRequestSync, + expectedConcurrencyGroup: "123-456", + expectedConcurrencyType: actions_model.CancelInProgress, + }, + } { + t.Run(tc.name, func(t *testing.T) { + workflow := &act_model.Workflow{RawConcurrency: tc.concurrency} + run := &actions_model.ActionRun{ + Ref: "refs/head/main", + WorkflowID: "testing.yml", + Event: tc.runEvent, + TriggerEvent: string(tc.runEvent), + Repo: &repo.Repository{}, + } + + err := ConfigureActionRunConcurrency(workflow, run, tc.vars, tc.inputs) + require.NoError(t, err) + + if tc.expectedConcurrencyGroup == "" { + assert.Empty(t, run.ConcurrencyGroup, "empty ConcurrencyGroup") + } else { + assert.Equal(t, tc.expectedConcurrencyGroup, run.ConcurrencyGroup) + } + assert.Equal(t, tc.expectedConcurrencyType, run.ConcurrencyType) + }) + } +} diff --git a/services/notify/notify.go b/services/notify/notify.go index 02c18272cb..3ca704f717 100644 --- a/services/notify/notify.go +++ b/services/notify/notify.go @@ -5,6 +5,7 @@ package notify import ( "context" + "slices" actions_model "forgejo.org/models/actions" issues_model "forgejo.org/models/issues" @@ -24,6 +25,13 @@ func RegisterNotifier(notifier Notifier) { notifiers = append(notifiers, notifier) } +// Intended for undoing RegisterNotifier in tests only, not for production usage +func UnregisterNotifier(notifier Notifier) { + notifiers = slices.DeleteFunc(notifiers, func(maybeNotifier Notifier) bool { + return notifier == maybeNotifier + }) +} + // NewWikiPage notifies creating new wiki pages to notifiers func NewWikiPage(ctx context.Context, doer *user_model.User, repo *repo_model.Repository, page, comment string) { for _, notifier := range notifiers { diff --git a/tests/integration/actions_concurrency_group_queue_test.go b/tests/integration/actions_concurrency_group_queue_test.go new file mode 100644 index 0000000000..1780b1ad70 --- /dev/null +++ b/tests/integration/actions_concurrency_group_queue_test.go @@ -0,0 +1,281 @@ +// Copyright 2025 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: GPL-3.0-or-later + +package integration + +import ( + "net/url" + "strings" + "testing" + + actions_model "forgejo.org/models/actions" + "forgejo.org/models/db" + unit_model "forgejo.org/models/unit" + "forgejo.org/models/unittest" + user_model "forgejo.org/models/user" + "forgejo.org/modules/gitrepo" + "forgejo.org/modules/setting" + "forgejo.org/modules/test" + actions_service "forgejo.org/services/actions" + files_service "forgejo.org/services/repository/files" + "forgejo.org/tests" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestActionConcurrencyRunnerFiltering(t *testing.T) { + defer unittest.OverrideFixtures("tests/integration/fixtures/TestActionConcurrencyRunnerFiltering")() + require.NoError(t, unittest.PrepareTestDatabase()) + + for _, tc := range []struct { + name string + runnerName string + expectedRunIDs []int64 + }{ + { + // owner id 2 + runnerName: "User runner", + expectedRunIDs: []int64{500, 502}, + }, + { + // owner id 3 + runnerName: "Organisation runner", + expectedRunIDs: []int64{501}, + }, + { + runnerName: "Repository runner", + expectedRunIDs: []int64{502}, + }, + { + runnerName: "Global runner", + expectedRunIDs: []int64{500, 501, 502}, + }, + } { + t.Run(tc.runnerName, func(t *testing.T) { + // defer unittest.OverrideFixtures("tests/integration/fixtures/TestActionConcurrencyRunnerFiltering")() + // require.NoError(t, unittest.PrepareTestDatabase()) + + doTest := func() { + e := db.GetEngine(t.Context()) + + runner := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunner{Name: tc.runnerName}) + jobs, err := actions_model.GetAvailableJobsForRunner(e, runner) + require.NoError(t, err) + + ids := []int64{} + for _, job := range jobs { + ids = append(ids, job.ID) + } + assert.ElementsMatch(t, tc.expectedRunIDs, ids) + } + + t.Run("ConcurrencyGroupQueueEnabled", func(t *testing.T) { + defer test.MockVariableValue(&setting.Actions.ConcurrencyGroupQueueEnabled, true)() + doTest() + }) + + t.Run("ConcurrencyGroupQueueDisabled", func(t *testing.T) { + defer test.MockVariableValue(&setting.Actions.ConcurrencyGroupQueueEnabled, false)() + doTest() + }) + }) + } +} + +// These tests are a little more unit-testy than they are integration tests, but they're placed in the integration test +// suite so that they're run on all database engines. +func TestActionConcurrencyGroupQueue(t *testing.T) { + for _, tc := range []struct { + name string + expectedRunIDs []int64 + updateRun500 map[string]any + updateRunJob500 map[string]any + updateRun501 map[string]any + updateRunJob501 map[string]any + queuingDisabled bool + }{ + { + name: "queuing disabled", + expectedRunIDs: []int64{500, 501, 502}, + queuingDisabled: true, + }, + { + // Job 501 & 502's data is configured to be queued-behind job 500, so with queuing enabled it shouldn't + // appear. + name: "concurrency blocked", + expectedRunIDs: []int64{500}, + }, + { + name: "different repo", + updateRun501: map[string]any{"repo_id": 2}, + expectedRunIDs: []int64{500, 501}, + }, + { + name: "different concurrency group", + updateRun501: map[string]any{"concurrency_group": "321bca"}, + expectedRunIDs: []int64{500, 501}, + }, + { + name: "null concurrency group", + updateRun501: map[string]any{"concurrency_group": nil}, + expectedRunIDs: []int64{500, 501}, + }, + { + name: "empty concurrency group", + updateRun501: map[string]any{"concurrency_group": ""}, + expectedRunIDs: []int64{500, 501}, + }, + { + name: "unlimited concurrency", + updateRun501: map[string]any{"concurrency_type": actions_model.UnlimitedConcurrency}, + expectedRunIDs: []int64{500, 501}, + }, + { + name: "cancel-in-progress type", + updateRun501: map[string]any{"concurrency_type": actions_model.CancelInProgress}, + expectedRunIDs: []int64{500, 501}, + }, + { + name: "blocking job done", + updateRun500: map[string]any{"status": actions_model.StatusCancelled}, + updateRunJob500: map[string]any{"status": actions_model.StatusCancelled}, + expectedRunIDs: []int64{501}, + }, + { + name: "mid-index job running", + updateRun501: map[string]any{"status": actions_model.StatusRunning}, + updateRunJob501: map[string]any{"status": actions_model.StatusRunning}, + expectedRunIDs: []int64{}, + }, + { + // Reflects a case where 500 may be retried -- there's already a later job (index-wise) in the concurrency + // group that is done, but if 500 is waiting it can still be run + name: "mid-index job ran", + updateRun501: map[string]any{"status": actions_model.StatusSuccess}, + updateRunJob501: map[string]any{"status": actions_model.StatusSuccess}, + expectedRunIDs: []int64{500}, + }, + { + // If both job 500 & job 501 are in the same workflow run, and one is running, the other can still start + // (this would be conditional on its `needs` as a job, but that isn't evaluated by GetAvailableJobsForRunner + // so isn't in the scope of testing here) + name: "multiple jobs from same run", + updateRun500: map[string]any{"status": actions_model.StatusRunning}, + updateRunJob500: map[string]any{"status": actions_model.StatusRunning}, + updateRunJob501: map[string]any{"run_id": 500}, + expectedRunIDs: []int64{501}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + defer unittest.OverrideFixtures("tests/integration/fixtures/TestActionConcurrencyGroupQueue")() + require.NoError(t, unittest.PrepareTestDatabase()) + runner := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunner{ID: 1004}, "owner_id = 0 AND repo_id = 0") + + defer test.MockVariableValue(&setting.Actions.ConcurrencyGroupQueueEnabled, !tc.queuingDisabled)() + + e := db.GetEngine(t.Context()) + + if tc.updateRun500 != nil { + affected, err := e.Table(&actions_model.ActionRun{}).Where("id = ?", 500).Update(tc.updateRun500) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + } + if tc.updateRunJob500 != nil { + affected, err := e.Table(&actions_model.ActionRunJob{}).Where("id = ?", 500).Update(tc.updateRunJob500) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + } + if tc.updateRun501 != nil { + affected, err := e.Table(&actions_model.ActionRun{}).Where("id = ?", 501).Update(tc.updateRun501) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + } + if tc.updateRunJob501 != nil { + affected, err := e.Table(&actions_model.ActionRunJob{}).Where("id = ?", 501).Update(tc.updateRunJob501) + require.NoError(t, err) + require.EqualValues(t, 1, affected) + } + + jobs, err := actions_model.GetAvailableJobsForRunner(e, runner) + require.NoError(t, err) + + ids := []int64{} + for _, job := range jobs { + ids = append(ids, job.ID) + } + assert.ElementsMatch(t, tc.expectedRunIDs, ids) + }) + } +} + +func TestActionConcurrencyGroupQueueFetchNext(t *testing.T) { + if !setting.Database.Type.IsSQLite3() { + // mock repo runner only supported on SQLite testing + t.Skip() + } + + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + + // create the repo + repo, sha, f := tests.CreateDeclarativeRepo(t, user2, "repo-workflow-dispatch", + []unit_model.Type{unit_model.TypeActions}, nil, + []*files_service.ChangeRepoFile{ + { + Operation: "create", + TreePath: ".forgejo/workflows/dispatch.yml", + ContentReader: strings.NewReader( + "name: concurrency group workflow\n" + + "on:\n" + + " workflow_dispatch:\n" + + " inputs:\n" + + " ident:\n" + + " type: string\n" + + "concurrency:\n" + + " group: abc\n" + + " cancel-in-progress: false\n" + + "jobs:\n" + + " test:\n" + + " runs-on: ubuntu-latest\n" + + " steps:\n" + + " - run: echo deployment goes here\n"), + }, + }, + ) + defer f() + + gitRepo, err := gitrepo.OpenRepository(db.DefaultContext, repo) + require.NoError(t, err) + defer gitRepo.Close() + + workflow, err := actions_service.GetWorkflowFromCommit(gitRepo, "main", "dispatch.yml") + require.NoError(t, err) + assert.Equal(t, "refs/heads/main", workflow.Ref) + assert.Equal(t, sha, workflow.Commit.ID.String()) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}) + + // first run within the concurrency group + _, _, err = workflow.Dispatch(db.DefaultContext, func(key string) string { return "task1" }, repo, user2) + require.NoError(t, err) + task1 := runner.fetchTask(t) + + // dispatch a second run within the same concurrency group + _, _, err = workflow.Dispatch(db.DefaultContext, func(key string) string { return "task2" }, repo, user2) + require.NoError(t, err) + + // assert that we can't fetch and start that second task -- it's blocked behind the first + task2 := runner.maybeFetchTask(t) + assert.Nil(t, task2) + + // finish the first task + runner.succeedAtTask(t, task1) + + // now task2 should be accessible since task1 has completed + task2 = runner.fetchTask(t) + assert.NotNil(t, task2) + runner.succeedAtTask(t, task2) + }) +} diff --git a/tests/integration/actions_run_now_done_notification_test.go b/tests/integration/actions_run_now_done_notification_test.go index 480d67a73d..1f462a9874 100644 --- a/tests/integration/actions_run_now_done_notification_test.go +++ b/tests/integration/actions_run_now_done_notification_test.go @@ -75,6 +75,7 @@ func TestActionNowDoneNotification(t *testing.T) { onGiteaRun(t, func(t *testing.T, u *url.URL) { notifier := mockNotifier{t: t, testIdx: 0, lastRunID: -1, runID: -1} notify_service.RegisterNotifier(¬ifier) + defer notify_service.UnregisterNotifier(¬ifier) user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) diff --git a/tests/integration/actions_runner_test.go b/tests/integration/actions_runner_test.go index 2dda089d87..a9bb122680 100644 --- a/tests/integration/actions_runner_test.go +++ b/tests/integration/actions_runner_test.go @@ -25,7 +25,8 @@ import ( ) type mockRunner struct { - client *mockRunnerClient + client *mockRunnerClient + lastTasksVersion int64 } type mockRunnerClient struct { @@ -83,8 +84,7 @@ func (r *mockRunner) doRegister(t *testing.T, name, token string, labels []strin func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, runnerName string, labels []string) { if !setting.Database.Type.IsSQLite3() { - // registering a mock runner when using a database other than SQLite leaves leftovers - t.FailNow() + assert.FailNow(t, "registering a mock runner when using a database other than SQLite leaves leftovers") } session := loginUser(t, ownerName) token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository) @@ -152,6 +152,15 @@ func (r *mockRunner) deleteRunner(t *testing.T, ownerName, repoName string, runn MakeRequest(t, req, http.StatusNoContent) } +func (r *mockRunner) maybeFetchTask(t *testing.T) *runnerv1.Task { + resp, err := r.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ + TasksVersion: r.lastTasksVersion, + })) + require.NoError(t, err) + r.lastTasksVersion = resp.Msg.TasksVersion + return resp.Msg.Task +} + func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1.Task { fetchTimeout := 10 * time.Second if len(timeout) > 0 { @@ -159,13 +168,10 @@ func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1 } var task *runnerv1.Task - assert.Eventually(t, func() bool { - resp, err := r.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ - TasksVersion: 0, - })) - require.NoError(t, err) - if resp.Msg.Task != nil { - task = resp.Msg.Task + require.Eventually(t, func() bool { + maybeTask := r.maybeFetchTask(t) + if maybeTask != nil { + task = maybeTask return true } return false diff --git a/tests/integration/actions_trigger_test.go b/tests/integration/actions_trigger_test.go index d13c666020..a1c5b90362 100644 --- a/tests/integration/actions_trigger_test.go +++ b/tests/integration/actions_trigger_test.go @@ -848,3 +848,61 @@ func TestActionsWorkflowDispatchEvent(t *testing.T) { assert.Equal(t, "test", j[0]) }) } + +func TestActionsWorkflowDispatchConcurrencyGroup(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + + // create the repo + repo, sha, f := tests.CreateDeclarativeRepo(t, user2, "repo-workflow-dispatch", + []unit_model.Type{unit_model.TypeActions}, nil, + []*files_service.ChangeRepoFile{ + { + Operation: "create", + TreePath: ".gitea/workflows/dispatch.yml", + ContentReader: strings.NewReader( + "name: test\n" + + "on: [workflow_dispatch]\n" + + "jobs:\n" + + " test:\n" + + " runs-on: ubuntu-latest\n" + + " steps:\n" + + " - run: echo helloworld\n" + + "concurrency:\n" + + " group: workflow-magic-group\n" + + " cancel-in-progress: true\n", + ), + }, + }, + ) + defer f() + + gitRepo, err := gitrepo.OpenRepository(db.DefaultContext, repo) + require.NoError(t, err) + defer gitRepo.Close() + + workflow, err := actions_service.GetWorkflowFromCommit(gitRepo, "main", "dispatch.yml") + require.NoError(t, err) + assert.Equal(t, "refs/heads/main", workflow.Ref) + assert.Equal(t, sha, workflow.Commit.ID.String()) + + inputGetter := func(key string) string { + return "" + } + + firstRun, _, err := workflow.Dispatch(db.DefaultContext, inputGetter, repo, user2) + require.NoError(t, err) + assert.Equal(t, 1, unittest.GetCount(t, &actions_model.ActionRun{RepoID: repo.ID})) + assert.Equal(t, "workflow-magic-group", firstRun.ConcurrencyGroup) + assert.Equal(t, actions_model.CancelInProgress, firstRun.ConcurrencyType) + + // Dispatch again and verify previous run was cancelled: + secondRun, _, err := workflow.Dispatch(db.DefaultContext, inputGetter, repo, user2) + require.NoError(t, err) + assert.Equal(t, 2, unittest.GetCount(t, &actions_model.ActionRun{RepoID: repo.ID})) + assert.Equal(t, "workflow-magic-group", secondRun.ConcurrencyGroup) + assert.Equal(t, actions_model.CancelInProgress, secondRun.ConcurrencyType) + firstRunReload := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: firstRun.ID}) + assert.Equal(t, actions_model.StatusCancelled, firstRunReload.Status) + }) +} diff --git a/tests/integration/fixtures/TestActionConcurrencyGroupQueue/action_run.yml b/tests/integration/fixtures/TestActionConcurrencyGroupQueue/action_run.yml new file mode 100644 index 0000000000..f2cc456b8d --- /dev/null +++ b/tests/integration/fixtures/TestActionConcurrencyGroupQueue/action_run.yml @@ -0,0 +1,21 @@ +- + id: 500 + index: 1 + status: 5 # StatusWaiting + repo_id: 4 + concurrency_group: abc123 + concurrency_type: 1 # QueueBehind +- + id: 501 + index: 2 + status: 5 # StatusWaiting + repo_id: 4 + concurrency_group: abc123 + concurrency_type: 1 # QueueBehind +- + id: 502 + index: 3 + status: 5 # StatusWaiting + repo_id: 4 + concurrency_group: abc123 + concurrency_type: 1 # QueueBehind diff --git a/tests/integration/fixtures/TestActionConcurrencyGroupQueue/action_run_job.yml b/tests/integration/fixtures/TestActionConcurrencyGroupQueue/action_run_job.yml new file mode 100644 index 0000000000..c8c12085d3 --- /dev/null +++ b/tests/integration/fixtures/TestActionConcurrencyGroupQueue/action_run_job.yml @@ -0,0 +1,42 @@ +- + id: 500 + run_id: 500 + repo_id: 4 + owner_id: 1 + commit_sha: 985f0301dba5e7b34be866819cd15ad3d8f508ee + is_fork_pull_request: 0 + name: job_1 + attempt: 0 + job_id: job_1 + task_id: 0 + status: 5 # StatusWaiting + runs_on: '["fedora"]' + created: 1758848614 +- + id: 501 + run_id: 501 + repo_id: 4 + owner_id: 1 + commit_sha: 985f0301dba5e7b34be866819cd15ad3d8f508ee + is_fork_pull_request: 0 + name: job_1 + attempt: 0 + job_id: job_1 + task_id: 0 + status: 5 # StatusWaiting + runs_on: '["fedora"]' + created: 1758848614 +- + id: 502 + run_id: 502 + repo_id: 4 + owner_id: 1 + commit_sha: 985f0301dba5e7b34be866819cd15ad3d8f508ee + is_fork_pull_request: 0 + name: job_1 + attempt: 0 + job_id: job_1 + task_id: 0 + status: 5 # StatusWaiting + runs_on: '["fedora"]' + created: 1758848614 diff --git a/tests/integration/fixtures/TestActionConcurrencyGroupQueue/action_runner.yml b/tests/integration/fixtures/TestActionConcurrencyGroupQueue/action_runner.yml new file mode 100644 index 0000000000..d783f83110 --- /dev/null +++ b/tests/integration/fixtures/TestActionConcurrencyGroupQueue/action_runner.yml @@ -0,0 +1,7 @@ +- + id: 1004 + uuid: "fb857e63-c0ce-4571-a6c9-fde26c128073" + name: "Global runner" + owner_id: 0 + repo_id: 0 + deleted: 0 diff --git a/tests/integration/fixtures/TestActionConcurrencyRunnerFiltering/action_run.yml b/tests/integration/fixtures/TestActionConcurrencyRunnerFiltering/action_run.yml new file mode 100644 index 0000000000..07e88f868c --- /dev/null +++ b/tests/integration/fixtures/TestActionConcurrencyRunnerFiltering/action_run.yml @@ -0,0 +1,18 @@ +- + id: 500 + index: 1 + status: 5 # StatusWaiting + repo_id: 62 + owner_id: 2 +- + id: 501 + index: 2 + status: 5 # StatusWaiting + repo_id: 3 + owner_id: 3 +- + id: 502 + index: 2 + status: 5 # StatusWaiting + repo_id: 1 + owner_id: 2 diff --git a/tests/integration/fixtures/TestActionConcurrencyRunnerFiltering/action_run_job.yml b/tests/integration/fixtures/TestActionConcurrencyRunnerFiltering/action_run_job.yml new file mode 100644 index 0000000000..17d5b6426a --- /dev/null +++ b/tests/integration/fixtures/TestActionConcurrencyRunnerFiltering/action_run_job.yml @@ -0,0 +1,42 @@ +- + id: 500 + run_id: 500 + repo_id: 62 + owner_id: 2 + commit_sha: 985f0301dba5e7b34be866819cd15ad3d8f508ee + is_fork_pull_request: 0 + name: job_1 + attempt: 0 + job_id: job_1 + task_id: 0 + status: 5 # StatusWaiting + runs_on: '["fedora"]' + created: 1758848614 +- + id: 501 + run_id: 501 + repo_id: 4 + owner_id: 3 + commit_sha: 985f0301dba5e7b34be866819cd15ad3d8f508ee + is_fork_pull_request: 0 + name: job_1 + attempt: 0 + job_id: job_1 + task_id: 0 + status: 5 # StatusWaiting + runs_on: '["fedora"]' + created: 1758848614 +- + id: 502 + run_id: 502 + repo_id: 1 + owner_id: 2 + commit_sha: 985f0301dba5e7b34be866819cd15ad3d8f508ee + is_fork_pull_request: 0 + name: job_1 + attempt: 0 + job_id: job_1 + task_id: 0 + status: 5 # StatusWaiting + runs_on: '["fedora"]' + created: 1758848614 diff --git a/tests/integration/fixtures/TestActionConcurrencyRunnerFiltering/action_runner.yml b/tests/integration/fixtures/TestActionConcurrencyRunnerFiltering/action_runner.yml new file mode 100644 index 0000000000..95599b19bd --- /dev/null +++ b/tests/integration/fixtures/TestActionConcurrencyRunnerFiltering/action_runner.yml @@ -0,0 +1,31 @@ +- + id: 1001 + uuid: "43b5d4d3-401b-42f9-94df-a9d45b447b82" + name: "User runner" + owner_id: 2 + repo_id: 0 + deleted: 0 + +- + id: 1002 + uuid: "bdc77f4f-2b2b-442d-bd44-e808f4306347" + name: "Organisation runner" + owner_id: 3 + repo_id: 0 + deleted: 0 + +- + id: 1003 + uuid: "9268bc8c-efbf-4dbe-aeb5-945733cdd098" + name: "Repository runner" + owner_id: 0 + repo_id: 1 + deleted: 0 + +- + id: 1004 + uuid: "fb857e63-c0ce-4571-a6c9-fde26c128073" + name: "Global runner" + owner_id: 0 + repo_id: 0 + deleted: 0 diff --git a/tests/integration/fixtures/TestActionConcurrencyRunnerFiltering/repo_unit.yml b/tests/integration/fixtures/TestActionConcurrencyRunnerFiltering/repo_unit.yml new file mode 100644 index 0000000000..84c2b7ad86 --- /dev/null +++ b/tests/integration/fixtures/TestActionConcurrencyRunnerFiltering/repo_unit.yml @@ -0,0 +1,6 @@ +- + id: 200 + repo_id: 3 + type: 10 + config: "{}" + created_unix: 946684810