feat: implement "concurrency" block in Forgejo Actions at the workflow level (#9434)

Currently references a pre-release version of `code.forgejo.org/forgejo/runner/v11`, pending release of https://code.forgejo.org/forgejo/runner/pulls/1026.

Fixes #5914.

This PR is quite large, but it can be reviewed commit-by-commit in relatively small, logical chunks.

Adds support for workflows with a `concurrency` block, and submembers `group` and `cancel-in-progress`.  For example:
```
on:
  workflow_dispatch:
jobs:
  rust-checks:
    runs-on: debian-latest
    steps:
      - run: sleep 300
concurrency:
  group: ${{ github.workflow }}-${{ github.ref }}
  cancel-in-progress: false
```

The concurrency block effectively ends up with four supported behaviors that users will want to choose from:
- Backwards compatibility / default -- if omitted completely, the existing Forgejo behavior will be implemented.  That behavior is that push and pull request synchronize events will cancel all previous runs on the same repository, branch, and workflow.
- Unlimited concurrency -- if the `cancel-in-progress` value is set to `false` and no `group` is provided, then the previously described Forgejo behavior will be disabled and an unlimited number of workflows can be executed simultaneously (to the maximum supported by the Forgejo Runner capacity).
- Queue-behind -- if a `group` is provided and `cancel-in-progress: false` is set, then every new action run with in the same repository with the same group value will be queued behind previous workflow runs, allowing only one workflow to execute at a time in the group, but allowing all workflows to finish naturally.
- Cancel-in-progress -- if a `group` is provided and `cancel-in-progress: true` is set, then every new action run with in the same repository with the same group value will cause previously queued or running runs to be cancelled, allowing only one workflow to execute at a time in the group, but preferring execution of the most recent workflow.

Both the `group` and `cancel-in-progress` values can access values from the `github`, `inputs` and `vars` context for dynamic behavior.

## Checklist

The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org).

### Tests

- I added test coverage for Go changes...
  - [x] in their respective `*_test.go` for unit tests.
  - [x] in the `tests/integration` directory if it involves interactions with a live Forgejo server.
- I added test coverage for JavaScript changes...
  - [ ] in `web_src/js/*.test.js` if it can be unit tested.
  - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)).

### Documentation

- [x] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change.
  - https://codeberg.org/forgejo/docs/pulls/1513
- [ ] I did not document these changes and I do not expect someone else to do it.

### Release notes

- [ ] I do not want this change to show in the release notes.
- [x] I want the title to show in the release notes with a link to this pull request.
- [ ] I want the content of the `release-notes/<pull request number>.md` to be be used for the release notes instead of the title.

<!--start release-notes-assistant-->

## Release notes
<!--URL:https://codeberg.org/forgejo/forgejo-->
- Features
  - [PR](https://codeberg.org/forgejo/forgejo/pulls/9434): <!--number 9434 --><!--line 0 --><!--description aW1wbGVtZW50ICJjb25jdXJyZW5jeSIgYmxvY2sgaW4gRm9yZ2VqbyBBY3Rpb25zIGF0IHRoZSB3b3JrZmxvdyBsZXZlbA==-->implement "concurrency" block in Forgejo Actions at the workflow level<!--description-->
<!--end release-notes-assistant-->

Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/9434
Reviewed-by: Earl Warren <earl-warren@noreply.codeberg.org>
Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net>
Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
This commit is contained in:
Mathieu Fenniak 2025-10-03 18:43:02 +02:00 committed by Earl Warren
commit c434b963b4
36 changed files with 1664 additions and 158 deletions

View file

@ -224,6 +224,9 @@ forgejo.org/services/context
forgejo.org/services/federation
FollowRemoteActor
forgejo.org/services/notify
UnregisterNotifier
forgejo.org/services/repository
IsErrForkAlreadyExist

View file

@ -2782,6 +2782,10 @@ LEVEL = Info
;SKIP_WORKFLOW_STRINGS = [skip ci],[ci skip],[no ci],[skip actions],[actions skip]
;; Limit on inputs for manual / workflow_dispatch triggers, default is 10
;LIMIT_DISPATCH_INPUTS = 10
;; Support queuing workflow jobs, by setting `concurrency.group` & `concurrency.cancel-in-progress: false`, can increase
;; server and database workload due to more complex database queries and more frequent server task querying; this
;; feature can be disabled to reduce performance impact
;CONCURRENCY_GROUP_QUEUE_ENABLED = true
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

2
go.mod
View file

@ -10,7 +10,7 @@ require (
code.forgejo.org/forgejo/go-rpmutils v1.0.0
code.forgejo.org/forgejo/levelqueue v1.0.0
code.forgejo.org/forgejo/reply v1.0.2
code.forgejo.org/forgejo/runner/v11 v11.1.1
code.forgejo.org/forgejo/runner/v11 v11.1.2
code.forgejo.org/go-chi/binding v1.0.1
code.forgejo.org/go-chi/cache v1.0.1
code.forgejo.org/go-chi/captcha v1.0.2

4
go.sum
View file

@ -28,8 +28,8 @@ code.forgejo.org/forgejo/levelqueue v1.0.0 h1:9krYpU6BM+j/1Ntj6m+VCAIu0UNnne1/Uf
code.forgejo.org/forgejo/levelqueue v1.0.0/go.mod h1:fmG6zhVuqim2rxSFOoasgXO8V2W/k9U31VVYqLIRLhQ=
code.forgejo.org/forgejo/reply v1.0.2 h1:dMhQCHV6/O3L5CLWNTol+dNzDAuyCK88z4J/lCdgFuQ=
code.forgejo.org/forgejo/reply v1.0.2/go.mod h1:RyZUfzQLc+fuLIGjTSQWDAJWPiL4WtKXB/FifT5fM7U=
code.forgejo.org/forgejo/runner/v11 v11.1.1 h1:CoSfxBOLKLMJZq5VhKd5ZIUc3tCf04iyFx926s+zaMA=
code.forgejo.org/forgejo/runner/v11 v11.1.1/go.mod h1:9f0D2EG7uabL+cv71SWHKrGgo2vmLpvko0QLmtn3RDE=
code.forgejo.org/forgejo/runner/v11 v11.1.2 h1:jM5YsNmScH11VJEwmvsTUiqGjAqtiUzBhQ65BIo8ZOs=
code.forgejo.org/forgejo/runner/v11 v11.1.2/go.mod h1:9f0D2EG7uabL+cv71SWHKrGgo2vmLpvko0QLmtn3RDE=
code.forgejo.org/forgejo/ssh v0.0.0-20241211213324-5fc306ca0616 h1:kEZL84+02jY9RxXM4zHBWZ3Fml0B09cmP1LGkDsCfIA=
code.forgejo.org/forgejo/ssh v0.0.0-20241211213324-5fc306ca0616/go.mod h1:zpHEXBstFnQYtGnB8k8kQLol82umzn/2/snG7alWVD8=
code.forgejo.org/go-chi/binding v1.0.1 h1:coKNI+X1NzRN7X85LlrpvBRqk0TXpJ+ja28vusQWEuY=

View file

@ -25,11 +25,23 @@ import (
"xorm.io/builder"
)
type ConcurrencyMode int
const (
// Don't enforce concurrency control. Note that you won't find `UnlimitedConcurrency` implemented directly in the
// code; setting it on an `ActionRun` prevents the other limiting behaviors.
UnlimitedConcurrency ConcurrencyMode = iota
// Queue behind other jobs with the same concurrency group
QueueBehind
// Cancel other jobs with the same concurrency group
CancelInProgress
)
// ActionRun represents a run of a workflow file
type ActionRun struct {
ID int64
Title string
RepoID int64 `xorm:"index unique(repo_index)"`
RepoID int64 `xorm:"index unique(repo_index) index(concurrency)"`
Repo *repo_model.Repository `xorm:"-"`
OwnerID int64 `xorm:"index"`
WorkflowID string `xorm:"index"` // the name of workflow file
@ -56,6 +68,9 @@ type ActionRun struct {
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
NotifyEmail bool
ConcurrencyGroup string `xorm:"'concurrency_group' index(concurrency)"`
ConcurrencyType ConcurrencyMode
}
func init() {
@ -163,6 +178,24 @@ func (run *ActionRun) GetPullRequestEventPayload() (*api.PullRequestPayload, err
return nil, fmt.Errorf("event %s is not a pull request event", run.Event)
}
func (run *ActionRun) SetConcurrencyGroup(concurrencyGroup string) {
// Concurrency groups are case insensitive identifiers, implemented by collapsing case here. Unfortunately the
// `ConcurrencyGroup` field can't be made a private field because xorm doesn't map those fields -- using
// `SetConcurrencyGroup` is required for consistency but not enforced at compile-time.
run.ConcurrencyGroup = strings.ToLower(concurrencyGroup)
}
func (run *ActionRun) SetDefaultConcurrencyGroup() {
// Before ConcurrencyGroups were supported, Forgejo would automatically cancel runs with matching git refs, workflow
// IDs, and trigger events. For backwards compatibility we emulate that behavior:
run.SetConcurrencyGroup(fmt.Sprintf(
"%s_%s_%s__auto",
run.Ref,
run.WorkflowID,
run.TriggerEvent,
))
}
func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
_, err := db.GetEngine(ctx).ID(repo.ID).
SetExpr("num_action_runs",

View file

@ -5,6 +5,7 @@ package actions
import (
"context"
"strings"
"forgejo.org/models/db"
repo_model "forgejo.org/models/repo"
@ -64,14 +65,15 @@ func (runs RunList) LoadRepos(ctx context.Context) error {
type FindRunOptions struct {
db.ListOptions
RepoID int64
OwnerID int64
WorkflowID string
Ref string // the commit/tag/… that caused this workflow
TriggerUserID int64
TriggerEvent webhook_module.HookEventType
Approved bool // not util.OptionalBool, it works only when it's true
Status []Status
RepoID int64
OwnerID int64
WorkflowID string
Ref string // the commit/tag/… that caused this workflow
TriggerUserID int64
TriggerEvent webhook_module.HookEventType
Approved bool // not util.OptionalBool, it works only when it's true
Status []Status
ConcurrencyGroup string
}
func (opts FindRunOptions) ToConds() builder.Cond {
@ -100,6 +102,9 @@ func (opts FindRunOptions) ToConds() builder.Cond {
if opts.TriggerEvent != "" {
cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent})
}
if opts.ConcurrencyGroup != "" {
cond = cond.And(builder.Eq{"concurrency_group": strings.ToLower(opts.ConcurrencyGroup)})
}
return cond
}

View file

@ -5,7 +5,34 @@ package actions
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestGetRunBefore(t *testing.T) {
}
func TestSetConcurrencyGroup(t *testing.T) {
run := ActionRun{}
run.SetConcurrencyGroup("abc123")
assert.Equal(t, "abc123", run.ConcurrencyGroup)
run.SetConcurrencyGroup("ABC123") // case should collapse in SetConcurrencyGroup
assert.Equal(t, "abc123", run.ConcurrencyGroup)
}
func TestSetDefaultConcurrencyGroup(t *testing.T) {
run := ActionRun{
Ref: "refs/heads/main",
WorkflowID: "testing",
TriggerEvent: "pull_request",
}
run.SetDefaultConcurrencyGroup()
assert.Equal(t, "refs/heads/main_testing_pull_request__auto", run.ConcurrencyGroup)
run = ActionRun{
Ref: "refs/heads/main",
WorkflowID: "TESTING", // case should collapse in SetDefaultConcurrencyGroup
TriggerEvent: "pull_request",
}
run.SetDefaultConcurrencyGroup()
assert.Equal(t, "refs/heads/main_testing_pull_request__auto", run.ConcurrencyGroup)
}

View file

@ -239,6 +239,88 @@ func GetRunningTaskByToken(ctx context.Context, token string) (*ActionTask, erro
return nil, errNotExist
}
func getConcurrencyCondition() builder.Cond {
concurrencyCond := builder.NewCond()
// OK to pick if there's no concurrency_group on the run
concurrencyCond = concurrencyCond.Or(builder.Eq{"concurrency_group": ""})
concurrencyCond = concurrencyCond.Or(builder.IsNull{"concurrency_group"})
// OK to pick if it's not a "QueueBehind" concurrency type
concurrencyCond = concurrencyCond.Or(builder.Neq{"concurrency_type": QueueBehind})
// subQuery ends up representing all the runs that would block a run from executing:
subQuery := builder.Select("id").From("action_run", "inner_run").
// A run can't block itself, so exclude it from this search
Where(builder.Neq{"inner_run.id": builder.Expr("outer_run.id")}).
// Blocking runs must be from the same repo & concurrency group
And(builder.Eq{"inner_run.repo_id": builder.Expr("outer_run.repo_id")}).
And(builder.Eq{"inner_run.concurrency_group": builder.Expr("outer_run.concurrency_group")}).
And(
// Ideally the logic here would be that a blocking run is "not done", and "younger", which allows each run
// to be blocked on the previous runs in the concurrency group and therefore execute in order from oldest to
// newest.
//
// But it's possible for runs to be required to run out-of-order -- for example, if a younger run has
// already completed but then it is re-run. If we only used "not done" and "younger" as logic, then the
// re-run would not be blocked, and therefore would violate the concurrency group's single-run goal.
//
// So we use two conditions to meet both needs:
//
// Blocking runs have a running status...
builder.Eq{"inner_run.status": StatusRunning}.Or(
// Blocking runs don't have a IsDone status & are younger than the outer_run
builder.NotIn("inner_run.status", []Status{StatusSuccess, StatusFailure, StatusCancelled, StatusSkipped}).
And(builder.Lt{"inner_run.`index`": builder.Expr("outer_run.`index`")})))
// OK to pick if there are no blocking runs
concurrencyCond = concurrencyCond.Or(builder.NotExists(subQuery))
return concurrencyCond
}
// Returns all the available jobs that could be executed on `runner`, before label filtering is applied. Note that
// only a single job can actually be run from this result for any given invocation, as multiple runs (in order) from any
// single concurrency group could be returned.
func GetAvailableJobsForRunner(e db.Engine, runner *ActionRunner) ([]*ActionRunJob, error) {
jobCond := builder.NewCond()
if runner.RepoID != 0 {
jobCond = builder.Eq{"repo_id": runner.RepoID}
} else if runner.OwnerID != 0 {
jobCond = builder.In("repo_id", builder.Select("`repository`.id").From("repository").
Join("INNER", "repo_unit", "`repository`.id = `repo_unit`.repo_id").
Where(builder.Eq{"`repository`.owner_id": runner.OwnerID, "`repo_unit`.type": unit.TypeActions}))
}
// Concurrency group checks for queuing one run behind the last run in the concurrency group are more
// computationally expensive on the database. To manage the risk that this might have on large-scale deployments
// When this feature is initially released, it can be disabled in the ini file by setting
// `CONCURRENCY_GROUP_QUEUE_ENABLED = false` in the `[actions]` section. If disabled, then actions with a
// concurrency group and `cancel-in-progress: false` will run simultaneously rather than being queued.
if setting.Actions.ConcurrencyGroupQueueEnabled {
jobCond = jobCond.And(getConcurrencyCondition())
}
if jobCond.IsValid() {
// It is *likely* more efficient to use an EXISTS query here rather than an IN clause, as that allows the
// database's query optimizer to perform partial computation of the subquery rather than complete computation.
// However, database engines can be fickle and difficult to predict. We'll retain the original IN clause
// implementation when ConcurrencyGroupQueueEnabled is disabled, which should maintain the same performance
// characteristics. When ConcurrencyGroupQueueEnabled is enabled, it will switch to the EXISTS clause.
if setting.Actions.ConcurrencyGroupQueueEnabled {
jobCond = builder.Exists(builder.Select("id").From("action_run", "outer_run").
Where(builder.Eq{"outer_run.id": builder.Expr("action_run_job.run_id")}).
And(jobCond))
} else {
jobCond = builder.In("run_id", builder.Select("id").From("action_run", "outer_run").Where(jobCond))
}
}
var jobs []*ActionRunJob
if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil {
return nil, err
}
return jobs, nil
}
func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) {
ctx, commiter, err := db.TxContext(ctx)
if err != nil {
@ -248,20 +330,8 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
e := db.GetEngine(ctx)
jobCond := builder.NewCond()
if runner.RepoID != 0 {
jobCond = builder.Eq{"repo_id": runner.RepoID}
} else if runner.OwnerID != 0 {
jobCond = builder.In("repo_id", builder.Select("`repository`.id").From("repository").
Join("INNER", "repo_unit", "`repository`.id = `repo_unit`.repo_id").
Where(builder.Eq{"`repository`.owner_id": runner.OwnerID, "`repo_unit`.type": unit.TypeActions}))
}
if jobCond.IsValid() {
jobCond = builder.In("run_id", builder.Select("id").From("action_run").Where(jobCond))
}
var jobs []*ActionRunJob
if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil {
jobs, err := GetAvailableJobsForRunner(e, runner)
if err != nil {
return nil, false, err
}

View file

@ -525,6 +525,7 @@
ref: "refs/heads/main"
commit_sha: "97f29ee599c373c729132a5c46a046978311e0ee"
event: "workflow_dispatch"
trigger_event: "workflow_dispatch"
is_fork_pull_request: 0
status: 6 # running
started: 1683636528

View file

@ -121,6 +121,8 @@ var migrations = []*Migration{
NewMigration("Add index for release sha1", AddIndexForReleaseSha1),
// v40 -> v41
NewMigration("Add foreign keys to stopwatch & tracked_time", AddForeignKeysStopwatchTrackedTime),
// v41 -> v42
NewMigration("Add action_run concurrency fields", AddActionRunConcurrency),
}
// GetCurrentDBVersion returns the current Forgejo database version.

View file

@ -0,0 +1,20 @@
// Copyright 2025 The Forgejo Authors. All rights reserved.
// SPDX-License-Identifier: GPL-3.0-or-later
package forgejo_migrations
import "xorm.io/xorm"
func AddActionRunConcurrency(x *xorm.Engine) error {
type ActionRun struct {
RepoID int64 `xorm:"index unique(repo_index) index(concurrency)"`
Index int64 `xorm:"index unique(repo_index)"`
ConcurrencyGroup string `xorm:"index(concurrency)"`
ConcurrencyType int
}
_, err := x.SyncWithOptions(xorm.SyncOptions{
// Sync drops indexes by default, and this local ActionRun doesn't have all the indexes -- so disable that.
IgnoreDropIndices: true,
}, new(ActionRun))
return err
}

View file

@ -12,23 +12,25 @@ import (
// Actions settings
var (
Actions = struct {
Enabled bool
LogStorage *Storage // how the created logs should be stored
LogRetentionDays int64 `ini:"LOG_RETENTION_DAYS"`
LogCompression logCompression `ini:"LOG_COMPRESSION"`
ArtifactStorage *Storage // how the created artifacts should be stored
ArtifactRetentionDays int64 `ini:"ARTIFACT_RETENTION_DAYS"`
DefaultActionsURL defaultActionsURL `ini:"DEFAULT_ACTIONS_URL"`
ZombieTaskTimeout time.Duration `ini:"ZOMBIE_TASK_TIMEOUT"`
EndlessTaskTimeout time.Duration `ini:"ENDLESS_TASK_TIMEOUT"`
AbandonedJobTimeout time.Duration `ini:"ABANDONED_JOB_TIMEOUT"`
SkipWorkflowStrings []string `ìni:"SKIP_WORKFLOW_STRINGS"`
LimitDispatchInputs int64 `ini:"LIMIT_DISPATCH_INPUTS"`
Enabled bool
LogStorage *Storage // how the created logs should be stored
LogRetentionDays int64 `ini:"LOG_RETENTION_DAYS"`
LogCompression logCompression `ini:"LOG_COMPRESSION"`
ArtifactStorage *Storage // how the created artifacts should be stored
ArtifactRetentionDays int64 `ini:"ARTIFACT_RETENTION_DAYS"`
DefaultActionsURL defaultActionsURL `ini:"DEFAULT_ACTIONS_URL"`
ZombieTaskTimeout time.Duration `ini:"ZOMBIE_TASK_TIMEOUT"`
EndlessTaskTimeout time.Duration `ini:"ENDLESS_TASK_TIMEOUT"`
AbandonedJobTimeout time.Duration `ini:"ABANDONED_JOB_TIMEOUT"`
SkipWorkflowStrings []string `ìni:"SKIP_WORKFLOW_STRINGS"`
LimitDispatchInputs int64 `ini:"LIMIT_DISPATCH_INPUTS"`
ConcurrencyGroupQueueEnabled bool `ini:"CONCURRENCY_GROUP_QUEUE_ENABLED"`
}{
Enabled: true,
DefaultActionsURL: defaultActionsURLForgejo,
SkipWorkflowStrings: []string{"[skip ci]", "[ci skip]", "[no ci]", "[skip actions]", "[actions skip]"},
LimitDispatchInputs: 10,
Enabled: true,
DefaultActionsURL: defaultActionsURLForgejo,
SkipWorkflowStrings: []string{"[skip ci]", "[ci skip]", "[no ci]", "[skip actions]", "[actions skip]"},
LimitDispatchInputs: 10,
ConcurrencyGroupQueueEnabled: true,
}
)

View file

@ -14,6 +14,7 @@ import (
user_model "forgejo.org/models/user"
"forgejo.org/modules/actions"
"forgejo.org/modules/log"
"forgejo.org/modules/setting"
"forgejo.org/modules/util"
actions_service "forgejo.org/services/actions"
@ -224,6 +225,15 @@ func (s *Service) UpdateTask(
if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil {
log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err)
}
// Reaching a finalized result for a task can cause other tasks in the same concurrency group to become
// unblocked. Increasing task version here allows all applicable runners to requery to the DB for that state.
// Because it is only useful for that condition, and it has system performance risks, only enable it when
// concurrency group queuing is enabled.
if setting.Actions.ConcurrencyGroupQueueEnabled {
if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("fail to increase task version: %w", err))
}
}
}
return connect.NewResponse(&runnerv1.UpdateTaskResponse{

View file

@ -0,0 +1,14 @@
-
id: 600
run_id: 894
repo_id: 63
owner_id: 2
commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee
is_fork_pull_request: 0
name: job_1
attempt: 0
job_id: job_1
task_id: 0
status: 6 # running
runs_on: '["fedora"]'
started: 1683636528

View file

@ -0,0 +1,38 @@
-
id: 900
title: "running workflow_dispatch run"
repo_id: 63
owner_id: 2
workflow_id: "running.yaml"
index: 4
trigger_user_id: 2
ref: "refs/heads/main"
commit_sha: "97f29ee599c373c729132a5c46a046978311e0ee"
trigger_event: "workflow_dispatch"
is_fork_pull_request: 0
status: 6 # running
started: 1683636528
created: 1683636108
updated: 1683636626
need_approval: 0
approved_by: 0
concurrency_group: abc123
-
id: 901
title: "running workflow_dispatch run"
repo_id: 63
owner_id: 2
workflow_id: "running.yaml"
index: 5
trigger_user_id: 2
ref: "refs/heads/main"
commit_sha: "97f29ee599c373c729132a5c46a046978311e0ee"
trigger_event: "workflow_dispatch"
is_fork_pull_request: 0
status: 6 # running
started: 1683636528
created: 1683636108
updated: 1683636626
need_approval: 0
approved_by: 0
concurrency_group: abc123

View file

@ -0,0 +1,28 @@
-
id: 600
run_id: 900
repo_id: 63
owner_id: 2
commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee
is_fork_pull_request: 0
name: job_1
attempt: 0
job_id: job_1
task_id: 0
status: 6 # running
runs_on: '["fedora"]'
started: 1683636528
-
id: 601
run_id: 901
repo_id: 63
owner_id: 2
commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee
is_fork_pull_request: 0
name: job_1
attempt: 0
job_id: job_1
task_id: 0
status: 6 # running
runs_on: '["fedora"]'
started: 1683636528

View file

@ -14,11 +14,11 @@ import (
"forgejo.org/modules/git"
"forgejo.org/modules/json"
"forgejo.org/modules/setting"
"code.forgejo.org/forgejo/runner/v11/act/model"
)
// GenerateGiteaContext generate the gitea context without token and gitea_runtime_token
// job can be nil when generating a context for parsing workflow-level expressions
func GenerateGiteaContext(run *actions_model.ActionRun, job *actions_model.ActionRunJob) map[string]any {
func generateGiteaContextForRun(run *actions_model.ActionRun) *model.GithubContext {
event := map[string]any{}
_ = json.Unmarshal([]byte(run.EventPayload), &event)
@ -41,45 +41,64 @@ func GenerateGiteaContext(run *actions_model.ActionRun, job *actions_model.Actio
refName := git.RefName(ref)
gitContext := map[string]any{
gitContextObj := &model.GithubContext{
// standard contexts, see https://docs.github.com/en/actions/learn-github-actions/contexts#github-context
"action": "", // string, The name of the action currently running, or the id of a step. GitHub removes special characters, and uses the name __run when the current step runs a script without an id. If you use the same action more than once in the same job, the name will include a suffix with the sequence number with underscore before it. For example, the first script you run will have the name __run, and the second script will be named __run_2. Similarly, the second invocation of actions/checkout will be actionscheckout2.
"action_path": "", // string, The path where an action is located. This property is only supported in composite actions. You can use this path to access files located in the same repository as the action.
"action_ref": "", // string, For a step executing an action, this is the ref of the action being executed. For example, v2.
"action_repository": "", // string, For a step executing an action, this is the owner and repository name of the action. For example, actions/checkout.
"action_status": "", // string, For a composite action, the current result of the composite action.
"actor": run.TriggerUser.Name, // string, The username of the user that triggered the initial workflow run. If the workflow run is a re-run, this value may differ from github.triggering_actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges.
"api_url": setting.AppURL + "api/v1", // string, The URL of the GitHub REST API.
"base_ref": baseRef, // string, The base_ref or target branch of the pull request in a workflow run. This property is only available when the event that triggers a workflow run is either pull_request or pull_request_target.
"env": "", // string, Path on the runner to the file that sets environment variables from workflow commands. This file is unique to the current step and is a different file for each step in a job. For more information, see "Workflow commands for GitHub Actions."
"event": event, // object, The full event webhook payload. You can access individual properties of the event using this context. This object is identical to the webhook payload of the event that triggered the workflow run, and is different for each event. The webhooks for each GitHub Actions event is linked in "Events that trigger workflows." For example, for a workflow run triggered by the push event, this object contains the contents of the push webhook payload.
"event_name": run.TriggerEvent, // string, The name of the event that triggered the workflow run.
"event_path": "", // string, The path to the file on the runner that contains the full event webhook payload.
"graphql_url": "", // string, The URL of the GitHub GraphQL API.
"head_ref": headRef, // string, The head_ref or source branch of the pull request in a workflow run. This property is only available when the event that triggers a workflow run is either pull_request or pull_request_target.
"job": "", // string, The job_id of the current job.
"ref": ref, // string, The fully-formed ref of the branch or tag that triggered the workflow run. For workflows triggered by push, this is the branch or tag ref that was pushed. For workflows triggered by pull_request, this is the pull request merge branch. For workflows triggered by release, this is the release tag created. For other triggers, this is the branch or tag ref that triggered the workflow run. This is only set if a branch or tag is available for the event type. The ref given is fully-formed, meaning that for branches the format is refs/heads/<branch_name>, for pull requests it is refs/pull/<pr_number>/merge, and for tags it is refs/tags/<tag_name>. For example, refs/heads/feature-branch-1.
"ref_name": refName.ShortName(), // string, The short ref name of the branch or tag that triggered the workflow run. This value matches the branch or tag name shown on GitHub. For example, feature-branch-1.
"ref_protected": false, // boolean, true if branch protections are configured for the ref that triggered the workflow run.
"ref_type": refName.RefType(), // string, The type of ref that triggered the workflow run. Valid values are branch or tag.
"path": "", // string, Path on the runner to the file that sets system PATH variables from workflow commands. This file is unique to the current step and is a different file for each step in a job. For more information, see "Workflow commands for GitHub Actions."
"repository": run.Repo.OwnerName + "/" + run.Repo.Name, // string, The owner and repository name. For example, Codertocat/Hello-World.
"repository_owner": run.Repo.OwnerName, // string, The repository owner's name. For example, Codertocat.
"repositoryUrl": run.Repo.HTMLURL(), // string, The Git URL to the repository. For example, git://github.com/codertocat/hello-world.git.
"retention_days": "", // string, The number of days that workflow run logs and artifacts are kept.
"run_id": "", // string, A unique number for each workflow run within a repository. This number does not change if you re-run the workflow run.
"run_number": fmt.Sprint(run.Index), // string, A unique number for each run of a particular workflow in a repository. This number begins at 1 for the workflow's first run, and increments with each new run. This number does not change if you re-run the workflow run.
"run_attempt": "", // string, A unique number for each attempt of a particular workflow run in a repository. This number begins at 1 for the workflow run's first attempt, and increments with each re-run.
"secret_source": "Actions", // string, The source of a secret used in a workflow. Possible values are None, Actions, Dependabot, or Codespaces.
"server_url": setting.AppURL, // string, The URL of the GitHub server. For example: https://github.com.
"sha": sha, // string, The commit SHA that triggered the workflow. The value of this commit SHA depends on the event that triggered the workflow. For more information, see "Events that trigger workflows." For example, ffac537e6cbbf934b08745a378932722df287a53.
"triggering_actor": "", // string, The username of the user that initiated the workflow run. If the workflow run is a re-run, this value may differ from github.actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges.
"workflow": run.WorkflowID, // string, The name of the workflow. If the workflow file doesn't specify a name, the value of this property is the full path of the workflow file in the repository.
"workspace": "", // string, The default working directory on the runner for steps, and the default location of your repository when using the checkout action.
// additional contexts
"gitea_default_actions_url": setting.Actions.DefaultActionsURL.URL(),
APIURL: setting.AppURL + "api/v1", // string, The URL of the GitHub REST API.
Action: "", // string, The name of the action currently running, or the id of a step. GitHub removes special characters, and uses the name __run when the current step runs a script without an id. If you use the same action more than once in the same job, the name will include a suffix with the sequence number with underscore before it. For example, the first script you run will have the name __run, and the second script will be named __run_2. Similarly, the second invocation of actions/checkout will be actionscheckout2.
ActionPath: "", // string, The path where an action is located. This property is only supported in composite actions. You can use this path to access files located in the same repository as the action.
ActionRef: "", // string, For a step executing an action, this is the ref of the action being executed. For example, v2.
ActionRepository: "", // string, For a step executing an action, this is the owner and repository name of the action. For example, actions/checkout.
BaseRef: baseRef, // string, The base_ref or target branch of the pull request in a workflow run. This property is only available when the event that triggers a workflow run is either pull_request or pull_request_target.
Event: event, // object, The full event webhook payload. You can access individual properties of the event using this context. This object is identical to the webhook payload of the event that triggered the workflow run, and is different for each event. The webhooks for each GitHub Actions event is linked in "Events that trigger workflows." For example, for a workflow run triggered by the push event, this object contains the contents of the push webhook payload.
EventName: run.TriggerEvent, // string, The name of the event that triggered the workflow run.
EventPath: "", // string, The path to the file on the runner that contains the full event webhook payload.
GraphQLURL: "", // string, The URL of the GitHub GraphQL API.
HeadRef: headRef, // string, The head_ref or source branch of the pull request in a workflow run. This property is only available when the event that triggers a workflow run is either pull_request or pull_request_target.
Job: "", // string, The job_id of the current job.
Ref: ref, // string, The fully-formed ref of the branch or tag that triggered the workflow run. For workflows triggered by push, this is the branch or tag ref that was pushed. For workflows triggered by pull_request, this is the pull request merge branch. For workflows triggered by release, this is the release tag created. For other triggers, this is the branch or tag ref that triggered the workflow run. This is only set if a branch or tag is available for the event type. The ref given is fully-formed, meaning that for branches the format is refs/heads/<branch_name>, for pull requests it is refs/pull/<pr_number>/merge, and for tags it is refs/tags/<tag_name>. For example, refs/heads/feature-branch-1.
RefName: refName.ShortName(), // string, The short ref name of the branch or tag that triggered the workflow run. This value matches the branch or tag name shown on GitHub. For example, feature-branch-1.
RefType: refName.RefType(), // string, The type of ref that triggered the workflow run. Valid values are branch or tag.
Repository: run.Repo.OwnerName + "/" + run.Repo.Name, // string, The owner and repository name. For example, Codertocat/Hello-World.
RepositoryOwner: run.Repo.OwnerName, // string, The repository owner's name. For example, Codertocat.
RetentionDays: "", // string, The number of days that workflow run logs and artifacts are kept.
RunID: "", // string, A unique number for each workflow run within a repository. This number does not change if you re-run the workflow run.
RunNumber: fmt.Sprint(run.Index), // string, A unique number for each run of a particular workflow in a repository. This number begins at 1 for the workflow's first run, and increments with each new run. This number does not change if you re-run the workflow run.
RunAttempt: "", // string, A unique number for each attempt of a particular workflow run in a repository. This number begins at 1 for the workflow run's first attempt, and increments with each re-run.
ServerURL: setting.AppURL, // string, The URL of the GitHub server. For example: https://github.com.
Sha: sha, // string, The commit SHA that triggered the workflow. The value of this commit SHA depends on the event that triggered the workflow. For more information, see "Events that trigger workflows." For example, ffac537e6cbbf934b08745a378932722df287a53.
Workflow: run.WorkflowID, // string, The name of the workflow. If the workflow file doesn't specify a name, the value of this property is the full path of the workflow file in the repository.
Workspace: "", // string, The default working directory on the runner for steps, and the default location of your repository when using the checkout action.
}
if run.TriggerUser != nil {
gitContextObj.Actor = run.TriggerUser.Name // string, The username of the user that triggered the initial workflow run. If the workflow run is a re-run, this value may differ from github.triggering_actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges.
}
return gitContextObj
}
// GenerateGiteaContext generate the gitea context without token and gitea_runtime_token
// job can be nil when generating a context for parsing workflow-level expressions
func GenerateGiteaContext(run *actions_model.ActionRun, job *actions_model.ActionRunJob) map[string]any {
gitContextObj := generateGiteaContextForRun(run)
gitContext, _ := githubContextToMap(gitContextObj)
// standard contexts, see https://docs.github.com/en/actions/learn-github-actions/contexts#github-context
gitContext["action_status"] = "" // string, For a composite action, the current result of the composite action.
gitContext["actor"] = run.TriggerUser.Name // string, The username of the user that triggered the initial workflow run. If the workflow run is a re-run, this value may differ from github.triggering_actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges.
gitContext["env"] = "" // string, Path on the runner to the file that sets environment variables from workflow commands. This file is unique to the current step and is a different file for each step in a job. For more information, see "Workflow commands for GitHub Actions."
gitContext["path"] = "" // string, Path on the runner to the file that sets system PATH variables from workflow commands. This file is unique to the current step and is a different file for each step in a job. For more information, see "Workflow commands for GitHub Actions."
gitContext["ref_protected"] = false // boolean, true if branch protections are configured for the ref that triggered the workflow run.
gitContext["repository_owner"] = run.Repo.OwnerName // string, The repository owner's name. For example, Codertocat.
gitContext["repository"] = run.Repo.OwnerName + "/" + run.Repo.Name // string, The owner and repository name. For example, Codertocat/Hello-World.
gitContext["repositoryUrl"] = run.Repo.HTMLURL() // string, The Git URL to the repository. For example, git://github.com/codertocat/hello-world.git.
gitContext["secret_source"] = "Actions" // string, The source of a secret used in a workflow. Possible values are None, Actions, Dependabot, or Codespaces.
gitContext["server_url"] = setting.AppURL // string, The URL of the GitHub server. For example: https://github.com.
gitContext["triggering_actor"] = "" // string, The username of the user that initiated the workflow run. If the workflow run is a re-run, this value may differ from github.actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges.
gitContext["workflow"] = run.WorkflowID // string, The name of the workflow. If the workflow file doesn't specify a name, the value of this property is the full path of the workflow file in the repository.
// additional contexts
gitContext["gitea_default_actions_url"] = setting.Actions.DefaultActionsURL.URL()
if job != nil {
gitContext["job"] = job.JobID
@ -90,6 +109,21 @@ func GenerateGiteaContext(run *actions_model.ActionRun, job *actions_model.Actio
return gitContext
}
func githubContextToMap(gitContext *model.GithubContext) (map[string]any, error) {
jsonBytes, err := json.Marshal(gitContext)
if err != nil {
return nil, fmt.Errorf("failed to marshal struct: %w", err)
}
var result map[string]any
err = json.Unmarshal(jsonBytes, &result)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal to map: %w", err)
}
return result, nil
}
type TaskNeed struct {
Result actions_model.Status
Outputs map[string]string

View file

@ -7,7 +7,13 @@ import (
"testing"
actions_model "forgejo.org/models/actions"
"forgejo.org/models/repo"
"forgejo.org/models/unittest"
"forgejo.org/models/user"
actions_module "forgejo.org/modules/actions"
"forgejo.org/modules/json"
"forgejo.org/modules/setting"
webhook_module "forgejo.org/modules/webhook"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -27,3 +33,321 @@ func TestFindTaskNeeds(t *testing.T) {
assert.Equal(t, "abc", ret["job1"].Outputs["output_a"])
assert.Equal(t, "bbb", ret["job1"].Outputs["output_b"])
}
func TestGenerateGiteaContext(t *testing.T) {
testUser := &user.User{
ID: 1,
Name: "testuser",
}
testRepo := &repo.Repository{
ID: 1,
OwnerName: "testowner",
Name: "testrepo",
}
emptyField := func(t *testing.T, context map[string]any, field string) {
v, ok := context[field]
assert.True(t, ok, "expected field %q to be present", field)
assert.Empty(t, v)
}
t.Run("Basic workflow run without job", func(t *testing.T) {
run := &actions_model.ActionRun{
ID: 1,
Index: 42,
TriggerUser: testUser,
Repo: testRepo,
TriggerEvent: "push",
Ref: "refs/heads/main",
CommitSHA: "abc123def456",
WorkflowID: "test-workflow",
EventPayload: `{"repository": {"name": "testrepo"}}`,
}
context := GenerateGiteaContext(run, nil)
assert.Equal(t, "testuser", context["actor"])
assert.Equal(t, setting.AppURL+"api/v1", context["api_url"])
assert.Equal(t, "push", context["event_name"])
assert.Equal(t, "refs/heads/main", context["ref"])
assert.Equal(t, "main", context["ref_name"])
assert.Equal(t, "branch", context["ref_type"])
assert.Equal(t, "testowner/testrepo", context["repository"])
assert.Equal(t, "testowner", context["repository_owner"])
assert.Equal(t, "abc123def456", context["sha"])
assert.Equal(t, "42", context["run_number"])
assert.Equal(t, "test-workflow", context["workflow"])
assert.Equal(t, false, context["ref_protected"])
assert.Equal(t, "Actions", context["secret_source"])
assert.Equal(t, setting.AppURL, context["server_url"])
event, ok := context["event"].(map[string]any)
require.True(t, ok)
assert.Equal(t, "testrepo", event["repository"].(map[string]any)["name"])
emptyField(t, context, "action_path")
emptyField(t, context, "action_ref")
emptyField(t, context, "action_repository")
emptyField(t, context, "action_status")
emptyField(t, context, "action")
emptyField(t, context, "base_ref")
emptyField(t, context, "env")
emptyField(t, context, "event_path")
emptyField(t, context, "graphql_url")
emptyField(t, context, "head_ref")
emptyField(t, context, "job")
emptyField(t, context, "path")
emptyField(t, context, "retention_days")
emptyField(t, context, "run_attempt")
emptyField(t, context, "run_id")
emptyField(t, context, "triggering_actor")
emptyField(t, context, "workspace")
})
t.Run("Workflow run with job", func(t *testing.T) {
run := &actions_model.ActionRun{
ID: 1,
Index: 42,
TriggerUser: testUser,
Repo: testRepo,
TriggerEvent: "push",
Ref: "refs/heads/main",
CommitSHA: "abc123def456",
WorkflowID: "test-workflow",
EventPayload: `{}`,
}
job := &actions_model.ActionRunJob{
ID: 100,
RunID: 1,
JobID: "test-job",
Attempt: 1,
}
context := GenerateGiteaContext(run, job)
assert.Equal(t, "test-job", context["job"])
assert.Equal(t, "1", context["run_id"])
assert.Equal(t, "1", context["run_attempt"])
})
t.Run("Pull request event", func(t *testing.T) {
pullRequestPayload := map[string]any{
"pull_request": map[string]any{
"base": map[string]any{
"ref": "main",
"label": "main",
"sha": "base123sha",
},
"head": map[string]any{
"ref": "feature-branch",
"label": "feature-branch",
"sha": "head456sha",
},
},
}
payloadBytes, _ := json.Marshal(pullRequestPayload)
run := &actions_model.ActionRun{
ID: 1,
Index: 42,
TriggerUser: testUser,
Repo: testRepo,
TriggerEvent: "pull_request",
Ref: "refs/pull/1/merge",
CommitSHA: "merge789sha",
WorkflowID: "test-workflow",
Event: webhook_module.HookEventPullRequest,
EventPayload: string(payloadBytes),
}
context := GenerateGiteaContext(run, nil)
assert.Equal(t, "main", context["base_ref"])
assert.Equal(t, "feature-branch", context["head_ref"])
assert.Equal(t, "refs/pull/1/merge", context["ref"])
assert.Equal(t, "merge789sha", context["sha"])
})
t.Run("Pull request target event", func(t *testing.T) {
pullRequestPayload := map[string]any{
"pull_request": map[string]any{
"base": map[string]any{
"ref": "main",
"label": "main",
"sha": "base123sha",
},
"head": map[string]any{
"ref": "feature-branch",
"label": "feature-branch",
"sha": "head456sha",
},
},
}
payloadBytes, _ := json.Marshal(pullRequestPayload)
run := &actions_model.ActionRun{
ID: 1,
Index: 42,
TriggerUser: testUser,
Repo: testRepo,
TriggerEvent: actions_module.GithubEventPullRequestTarget,
Ref: "refs/pull/1/merge",
CommitSHA: "merge789sha",
WorkflowID: "test-workflow",
Event: webhook_module.HookEventPullRequest,
EventPayload: string(payloadBytes),
}
context := GenerateGiteaContext(run, nil)
assert.Equal(t, "main", context["base_ref"])
assert.Equal(t, "feature-branch", context["head_ref"])
// For pull_request_target, ref and sha should be from base
assert.Equal(t, "refs/heads/main", context["ref"])
assert.Equal(t, "base123sha", context["sha"])
assert.Equal(t, "main", context["ref_name"])
assert.Equal(t, "branch", context["ref_type"])
})
}
func TestGenerateGiteaContextForRun(t *testing.T) {
testUser := &user.User{
ID: 1,
Name: "testuser",
}
testRepo := &repo.Repository{
ID: 1,
OwnerName: "testowner",
Name: "testrepo",
}
t.Run("Basic workflow run", func(t *testing.T) {
run := &actions_model.ActionRun{
ID: 1,
Index: 42,
TriggerUser: testUser,
Repo: testRepo,
TriggerEvent: "push",
Ref: "refs/heads/main",
CommitSHA: "abc123def456",
WorkflowID: "test-workflow",
EventPayload: `{"repository": {"name": "testrepo"}}`,
}
gitContextObj := generateGiteaContextForRun(run)
assert.Equal(t, "testuser", gitContextObj.Actor)
assert.Equal(t, setting.AppURL+"api/v1", gitContextObj.APIURL)
assert.Equal(t, "push", gitContextObj.EventName)
assert.Equal(t, "refs/heads/main", gitContextObj.Ref)
assert.Equal(t, "main", gitContextObj.RefName)
assert.Equal(t, "branch", gitContextObj.RefType)
assert.Equal(t, "testowner/testrepo", gitContextObj.Repository)
assert.Equal(t, "testowner", gitContextObj.RepositoryOwner)
assert.Equal(t, "abc123def456", gitContextObj.Sha)
assert.Equal(t, "42", gitContextObj.RunNumber)
assert.Equal(t, "test-workflow", gitContextObj.Workflow)
assert.Equal(t, "testrepo", gitContextObj.Event["repository"].(map[string]any)["name"])
assert.Empty(t, gitContextObj.ActionPath)
assert.Empty(t, gitContextObj.ActionRef)
assert.Empty(t, gitContextObj.ActionRepository)
assert.Empty(t, gitContextObj.Action)
assert.Empty(t, gitContextObj.BaseRef)
assert.Empty(t, gitContextObj.EventPath)
assert.Empty(t, gitContextObj.GraphQLURL)
assert.Empty(t, gitContextObj.HeadRef)
assert.Empty(t, gitContextObj.Job)
assert.Empty(t, gitContextObj.RetentionDays)
assert.Empty(t, gitContextObj.RunAttempt)
assert.Empty(t, gitContextObj.RunID)
assert.Empty(t, gitContextObj.Workspace)
})
t.Run("Pull request event", func(t *testing.T) {
pullRequestPayload := map[string]any{
"pull_request": map[string]any{
"base": map[string]any{
"ref": "main",
"label": "main",
"sha": "base123sha",
},
"head": map[string]any{
"ref": "feature-branch",
"label": "feature-branch",
"sha": "head456sha",
},
},
}
payloadBytes, _ := json.Marshal(pullRequestPayload)
run := &actions_model.ActionRun{
ID: 1,
Index: 42,
TriggerUser: testUser,
Repo: testRepo,
TriggerEvent: "pull_request",
Ref: "refs/pull/1/merge",
CommitSHA: "merge789sha",
WorkflowID: "test-workflow",
Event: webhook_module.HookEventPullRequest,
EventPayload: string(payloadBytes),
}
gitContextObj := generateGiteaContextForRun(run)
assert.Equal(t, "main", gitContextObj.BaseRef)
assert.Equal(t, "feature-branch", gitContextObj.HeadRef)
assert.Equal(t, "refs/pull/1/merge", gitContextObj.Ref)
assert.Equal(t, "merge789sha", gitContextObj.Sha)
})
t.Run("Pull request target event", func(t *testing.T) {
pullRequestPayload := map[string]any{
"pull_request": map[string]any{
"base": map[string]any{
"ref": "main",
"label": "main",
"sha": "base123sha",
},
"head": map[string]any{
"ref": "feature-branch",
"label": "feature-branch",
"sha": "head456sha",
},
},
}
payloadBytes, _ := json.Marshal(pullRequestPayload)
run := &actions_model.ActionRun{
ID: 1,
Index: 42,
TriggerUser: testUser,
Repo: testRepo,
TriggerEvent: actions_module.GithubEventPullRequestTarget,
Ref: "refs/pull/1/merge",
CommitSHA: "merge789sha",
WorkflowID: "test-workflow",
Event: webhook_module.HookEventPullRequest,
EventPayload: string(payloadBytes),
}
gitContextObj := generateGiteaContextForRun(run)
assert.Equal(t, "main", gitContextObj.BaseRef)
assert.Equal(t, "feature-branch", gitContextObj.HeadRef)
// For pull_request_target, ref and sha should be from base
assert.Equal(t, "refs/heads/main", gitContextObj.Ref)
assert.Equal(t, "base123sha", gitContextObj.Sha)
assert.Equal(t, "main", gitContextObj.RefName)
assert.Equal(t, "branch", gitContextObj.RefType)
})
}

View file

@ -351,14 +351,17 @@ func handleWorkflows(
Status: actions_model.StatusWaiting,
}
if workflow, err := model.ReadWorkflow(bytes.NewReader(dwf.Content), false); err == nil {
notifications, err := workflow.Notifications()
if err != nil {
log.Error("Notifications: %w", err)
}
run.NotifyEmail = notifications
workflow, err := model.ReadWorkflow(bytes.NewReader(dwf.Content), false)
if err != nil {
log.Error("unable to read workflow: %v", err)
}
notifications, err := workflow.Notifications()
if err != nil {
log.Error("Notifications: %w", err)
}
run.NotifyEmail = notifications
need, err := ifNeedApproval(ctx, run, input.Repo, input.Doer)
if err != nil {
log.Error("check if need approval for repo %d with user %d: %v", input.Repo.ID, input.Doer.ID, err)
@ -378,6 +381,11 @@ func handleWorkflows(
continue
}
err = ConfigureActionRunConcurrency(workflow, run, vars, map[string]any{})
if err != nil {
log.Error("ConfigureActionRunConcurrency: %v", err)
}
jobs, err := jobParser(dwf.Content, jobparser.WithVars(vars))
if err != nil {
run.Status = actions_model.StatusFailure
@ -387,17 +395,13 @@ func handleWorkflows(
}}
}
// cancel running jobs if the event is push or pull_request_sync
if run.Event == webhook_module.HookEventPush ||
run.Event == webhook_module.HookEventPullRequestSync {
if err := CancelPreviousJobs(
if run.ConcurrencyType == actions_model.CancelInProgress {
if err := CancelPreviousWithConcurrencyGroup(
ctx,
run.RepoID,
run.Ref,
run.WorkflowID,
run.Event,
run.ConcurrencyGroup,
); err != nil {
log.Error("CancelPreviousJobs: %v", err)
log.Error("CancelPreviousWithConcurrencyGroup: %v", err)
}
}

View file

@ -4,6 +4,7 @@
package actions
import (
"slices"
"testing"
actions_model "forgejo.org/models/actions"
@ -144,3 +145,66 @@ func Test_OpenForkPullRequestEvent(t *testing.T) {
assert.Equal(t, webhook_module.HookEventPullRequest, runs[0].Event)
assert.True(t, runs[0].IsForkPullRequest)
}
func TestActionsNotifierConcurrencyGroup(t *testing.T) {
require.NoError(t, unittest.PrepareTestDatabase())
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 10})
doer := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 1})
pr := unittest.AssertExistsAndLoadBean(t, &issues_model.PullRequest{ID: 3})
commit := &git.Commit{
ID: git.MustIDFromString("0000000000000000000000000000000000000000"),
CommitMessage: "test",
}
detectedWorkflows := []*actions_module.DetectedWorkflow{
{
EntryName: "test.yml",
TriggerEvent: &jobparser.Event{
Name: "pull_request",
},
Content: []byte("{ on: pull_request, jobs: { j1: {} }}"),
},
}
input := &notifyInput{
Repo: repo,
Doer: doer,
Event: webhook_module.HookEventPullRequestSync,
PullRequest: pr,
Payload: &api.PullRequestPayload{},
}
err := handleWorkflows(db.DefaultContext, detectedWorkflows, commit, input, "refs/head/main")
require.NoError(t, err)
runs, err := db.Find[actions_model.ActionRun](db.DefaultContext, actions_model.FindRunOptions{
RepoID: repo.ID,
})
require.NoError(t, err)
require.Len(t, runs, 1)
firstRun := runs[0]
assert.Equal(t, "refs/head/main_test.yml_pull_request__auto", firstRun.ConcurrencyGroup)
assert.Equal(t, actions_model.CancelInProgress, firstRun.ConcurrencyType)
assert.Equal(t, actions_model.StatusWaiting, firstRun.Status)
// Also... check if CancelPreviousWithConcurrencyGroup is invoked from handleWorkflows by firing off a second
// workflow and checking that the first one gets cancelled:
err = handleWorkflows(db.DefaultContext, detectedWorkflows, commit, input, "refs/head/main")
require.NoError(t, err)
runs, err = db.Find[actions_model.ActionRun](db.DefaultContext, actions_model.FindRunOptions{
RepoID: repo.ID,
})
require.NoError(t, err)
require.Len(t, runs, 2)
firstRunIndex := slices.IndexFunc(runs, func(run *actions_model.ActionRun) bool { return run.ID == firstRun.ID })
require.NotEqual(t, -1, firstRunIndex)
firstRun = runs[firstRunIndex]
assert.Equal(t, "refs/head/main_test.yml_pull_request__auto", firstRun.ConcurrencyGroup)
assert.Equal(t, actions_model.CancelInProgress, firstRun.ConcurrencyType)
assert.Equal(t, actions_model.StatusCancelled, firstRun.Status)
}

View file

@ -57,20 +57,6 @@ func startTasks(ctx context.Context) error {
// Loop through each spec and create a schedule task for it
for _, row := range specs {
// cancel running jobs if the event is push
if row.Schedule.Event == webhook_module.HookEventPush {
// cancel running jobs of the same workflow
if err := CancelPreviousJobs(
ctx,
row.RepoID,
row.Schedule.Ref,
row.Schedule.WorkflowID,
webhook_module.HookEventSchedule,
); err != nil {
log.Error("CancelPreviousJobs: %v", err)
}
}
if row.Repo.IsArchived {
// Skip if the repo is archived
continue
@ -166,6 +152,21 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule)
}
run.NotifyEmail = notifications
err = ConfigureActionRunConcurrency(workflow, run, vars, map[string]any{})
if err != nil {
return err
}
if run.ConcurrencyType == actions_model.CancelInProgress {
if err := CancelPreviousWithConcurrencyGroup(
ctx,
run.RepoID,
run.ConcurrencyGroup,
); err != nil {
return err
}
}
// Parse the workflow specification from the cron schedule
workflows, err := jobParser(cron.Content, jobparser.WithVars(vars))
if err != nil {
@ -185,7 +186,7 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule)
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
// Find all runs in the specified repository, reference, and workflow with non-final status
runs, total, err := db.FindAndCount[actions_model.ActionRun](ctx, actions_model.FindRunOptions{
runs, _, err := db.FindAndCount[actions_model.ActionRun](ctx, actions_model.FindRunOptions{
RepoID: repoID,
Ref: ref,
WorkflowID: workflowID,
@ -196,58 +197,93 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
return err
}
// If there are no runs found, there's no need to proceed with cancellation, so return nil.
if total == 0 {
return nil
// Iterate over each found run and cancel its associated jobs.
errorSlice := []error{}
for _, run := range runs {
err := cancelJobsForRun(ctx, run)
errorSlice = append(errorSlice, err)
}
err = errors.Join(errorSlice...)
if err != nil {
return err
}
return nil
}
// Cancels all pending jobs in the same repository with the same concurrency group.
func CancelPreviousWithConcurrencyGroup(ctx context.Context, repoID int64, concurrencyGroup string) error {
runs, _, err := db.FindAndCount[actions_model.ActionRun](ctx, actions_model.FindRunOptions{
RepoID: repoID,
ConcurrencyGroup: concurrencyGroup,
Status: []actions_model.Status{actions_model.StatusRunning, actions_model.StatusWaiting, actions_model.StatusBlocked},
})
if err != nil {
return err
}
// Iterate over each found run and cancel its associated jobs.
errorSlice := []error{}
for _, run := range runs {
// Find all jobs associated with the current run.
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
RunID: run.ID,
})
if err != nil {
return err
err := cancelJobsForRun(ctx, run)
errorSlice = append(errorSlice, err)
}
err = errors.Join(errorSlice...)
if err != nil {
return err
}
return nil
}
func cancelJobsForRun(ctx context.Context, run *actions_model.ActionRun) error {
// Find all jobs associated with the current run.
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
RunID: run.ID,
})
if err != nil {
return err
}
// Iterate over each job and attempt to cancel it.
errorSlice := []error{}
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}
// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = actions_model.StatusCancelled
job.Stopped = timeutil.TimeStampNow()
// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
errorSlice = append(errorSlice, err)
continue
}
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = actions_model.StatusCancelled
job.Stopped = timeutil.TimeStampNow()
// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return err
}
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return errors.New("job has changed, try again")
}
// Continue with the next job.
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
errorSlice = append(errorSlice, errors.New("job has changed, try again"))
continue
}
// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil {
return err
}
// Continue with the next job.
continue
}
// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil {
errorSlice = append(errorSlice, errors.New("job has changed, try again"))
continue
}
}
// Return nil to indicate successful cancellation of all running and waiting jobs.
return nil
return errors.Join(errorSlice...)
}
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository, cancelPreviousJobs bool) error {

View file

@ -11,6 +11,7 @@ import (
repo_model "forgejo.org/models/repo"
"forgejo.org/models/unit"
"forgejo.org/models/unittest"
"forgejo.org/modules/timeutil"
webhook_module "forgejo.org/modules/webhook"
"github.com/stretchr/testify/assert"
@ -86,6 +87,8 @@ func TestCreateScheduleTask(t *testing.T) {
assert.Equal(t, cron.EventPayload, run.EventPayload)
assert.Equal(t, cron.ID, run.ScheduleID)
assert.Equal(t, actions_model.StatusWaiting, run.Status)
assert.Equal(t, "branch_some.yml_schedule__auto", run.ConcurrencyGroup)
assert.Equal(t, actions_model.UnlimitedConcurrency, run.ConcurrencyType)
}
assertMutable := func(t *testing.T, expected, run *actions_model.ActionRun) {
@ -173,3 +176,91 @@ jobs:
})
}
}
func TestCancelPreviousJobs(t *testing.T) {
defer unittest.OverrideFixtures("services/actions/TestCancelPreviousJobs")()
require.NoError(t, unittest.PrepareTestDatabase())
run := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 894})
assert.Equal(t, actions_model.StatusRunning, run.Status)
assert.EqualValues(t, 1683636626, run.Updated)
runJob := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: 894})
assert.Equal(t, actions_model.StatusRunning, runJob.Status)
assert.EqualValues(t, 1683636528, runJob.Started)
err := CancelPreviousJobs(t.Context(), 63, "refs/heads/main", "running.yaml", webhook_module.HookEventWorkflowDispatch)
require.NoError(t, err)
run = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 894})
assert.Equal(t, actions_model.StatusCancelled, run.Status)
assert.Greater(t, run.Updated, timeutil.TimeStamp(1683636626))
runJob = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: 894})
assert.Equal(t, actions_model.StatusCancelled, runJob.Status)
assert.Greater(t, runJob.Stopped, timeutil.TimeStamp(1683636528))
}
func TestCancelPreviousWithConcurrencyGroup(t *testing.T) {
for _, tc := range []struct {
name string
updateRun901 map[string]any
}{
// run 900 & 901 in the fixture data have almost the same data and so should both be cancelled by
// TestCancelPreviousWithConcurrencyGroup -- but each test case will vary something different about 601 to
// ensure that only run 600 is targeted by the cancellation
{
name: "only cancels target repo",
updateRun901: map[string]any{"repo_id": 2},
},
{
name: "only cancels target concurrency group",
updateRun901: map[string]any{"concurrency_group": "321cba"},
},
{
name: "only cancels running",
updateRun901: map[string]any{"status": actions_model.StatusSuccess},
},
} {
t.Run(tc.name, func(t *testing.T) {
defer unittest.OverrideFixtures("services/actions/TestCancelPreviousWithConcurrencyGroup")()
require.NoError(t, unittest.PrepareTestDatabase())
e := db.GetEngine(t.Context())
expected901Status := actions_model.StatusRunning
if tc.updateRun901 != nil {
affected, err := e.Table(&actions_model.ActionRun{}).Where("id = ?", 901).Update(tc.updateRun901)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
newStatus, ok := tc.updateRun901["status"]
if ok {
expected901Status = newStatus.(actions_model.Status)
}
}
run := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 900})
assert.Equal(t, actions_model.StatusRunning, run.Status)
assert.EqualValues(t, 1683636626, run.Updated)
assert.Equal(t, "abc123", run.ConcurrencyGroup)
run = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 901})
assert.Equal(t, expected901Status, run.Status)
runJob := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: 900})
assert.Equal(t, actions_model.StatusRunning, runJob.Status)
assert.EqualValues(t, 1683636528, runJob.Started)
// Search for concurrency group should be case-insensitive, which we test here by using a different capitalization
// than the fixture data
err := CancelPreviousWithConcurrencyGroup(t.Context(), 63, "ABC123")
require.NoError(t, err)
run = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 900})
assert.Equal(t, actions_model.StatusCancelled, run.Status)
assert.Greater(t, run.Updated, timeutil.TimeStamp(1683636626))
runJob = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: 900})
assert.Equal(t, actions_model.StatusCancelled, runJob.Status)
assert.Greater(t, runJob.Stopped, timeutil.TimeStamp(1683636528))
run = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 901})
assert.Equal(t, expected901Status, run.Status)
})
}
}

View file

@ -69,6 +69,7 @@ func (entry *Workflow) Dispatch(ctx context.Context, inputGetter InputValueGette
}
inputs := make(map[string]string)
inputsAny := make(map[string]any)
if workflowDispatch := wf.WorkflowDispatchConfig(); workflowDispatch != nil {
for key, input := range workflowDispatch.Inputs {
val := inputGetter(key)
@ -89,6 +90,7 @@ func (entry *Workflow) Dispatch(ctx context.Context, inputGetter InputValueGette
val = strconv.FormatBool(val == "on")
}
inputs[key] = val
inputsAny[key] = val
}
}
@ -138,6 +140,21 @@ func (entry *Workflow) Dispatch(ctx context.Context, inputGetter InputValueGette
return nil, nil, err
}
err = ConfigureActionRunConcurrency(wf, run, vars, inputsAny)
if err != nil {
return nil, nil, err
}
if run.ConcurrencyType == actions_model.CancelInProgress {
if err := CancelPreviousWithConcurrencyGroup(
ctx,
run.RepoID,
run.ConcurrencyGroup,
); err != nil {
return nil, nil, err
}
}
jobs, err := jobParser(content, jobparser.WithVars(vars))
if err != nil {
return nil, nil, err
@ -180,3 +197,38 @@ func GetWorkflowFromCommit(gitRepo *git.Repository, ref, workflowID string) (*Wo
GitEntry: workflowEntry,
}, nil
}
// Sets the ConcurrencyGroup & ConcurrencyType on the provided ActionRun based upon the Workflow's `concurrency` data,
// or appropriate defaults if not present.
func ConfigureActionRunConcurrency(workflow *act_model.Workflow, run *actions_model.ActionRun, vars map[string]string, inputs map[string]any) error {
concurrencyGroup, cancelInProgress, err := jobparser.EvaluateWorkflowConcurrency(
workflow.RawConcurrency, generateGiteaContextForRun(run), vars, inputs)
if err != nil {
return fmt.Errorf("unable to evaluate workflow `concurrency` block: %w", err)
}
if concurrencyGroup != "" {
run.SetConcurrencyGroup(concurrencyGroup)
} else {
run.SetDefaultConcurrencyGroup()
}
if cancelInProgress == nil {
// Maintain compatible behavior from before concurrency groups were implemented -- if `cancel-in-progress`
// isn't defined in the workflow, cancel on push & PR sync events.
if run.Event == webhook.HookEventPush || run.Event == webhook.HookEventPullRequestSync {
run.ConcurrencyType = actions_model.CancelInProgress
} else {
run.ConcurrencyType = actions_model.UnlimitedConcurrency
}
} else if *cancelInProgress {
run.ConcurrencyType = actions_model.CancelInProgress
} else if concurrencyGroup == "" {
// A workflow has explicitly listed `cancel-in-progress: false`, but has *not* provided a concurrency group. In
// this case we want to trigger a different concurrency behavior -- we won't cancel in-progress builds (we were
// asked not to), we won't queue behind other builds (we weren't given a concurrency group so it's reasonable to
// assume the user doesn't want a concurrency limit).
run.ConcurrencyType = actions_model.UnlimitedConcurrency
} else {
run.ConcurrencyType = actions_model.QueueBehind
}
return nil
}

View file

@ -0,0 +1,123 @@
// Copyright 2025 The Forgejo Authors. All rights reserved.
// SPDX-License-Identifier: GPL-3.0-or-later
package actions
import (
"testing"
actions_model "forgejo.org/models/actions"
"forgejo.org/models/repo"
"forgejo.org/modules/webhook"
act_model "code.forgejo.org/forgejo/runner/v11/act/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConfigureActionRunConcurrency(t *testing.T) {
for _, tc := range []struct {
name string
concurrency *act_model.RawConcurrency
vars map[string]string
inputs map[string]any
runEvent webhook.HookEventType
expectedConcurrencyGroup string
expectedConcurrencyType actions_model.ConcurrencyMode
}{
// Before the introduction of concurrency groups, push & pull_request_sync would cancel runs on the same repo,
// reference, workflow, and event -- these cases cover undefined concurrency group and backwards compatibility
// checks.
{
name: "backwards compatibility push",
runEvent: webhook.HookEventPush,
expectedConcurrencyGroup: "refs/head/main_testing.yml_push__auto",
expectedConcurrencyType: actions_model.CancelInProgress,
},
{
name: "backwards compatibility pull_request_sync",
runEvent: webhook.HookEventPullRequestSync,
expectedConcurrencyGroup: "refs/head/main_testing.yml_pull_request_sync__auto",
expectedConcurrencyType: actions_model.CancelInProgress,
},
{
name: "backwards compatibility other event",
runEvent: webhook.HookEventWorkflowDispatch,
expectedConcurrencyGroup: "refs/head/main_testing.yml_workflow_dispatch__auto",
expectedConcurrencyType: actions_model.UnlimitedConcurrency,
},
{
name: "fully-specified cancel-in-progress",
concurrency: &act_model.RawConcurrency{
Group: "abc",
CancelInProgress: "true",
},
runEvent: webhook.HookEventPullRequestSync,
expectedConcurrencyGroup: "abc",
expectedConcurrencyType: actions_model.CancelInProgress,
},
{
name: "fully-specified queue-behind",
concurrency: &act_model.RawConcurrency{
Group: "abc",
CancelInProgress: "false",
},
runEvent: webhook.HookEventPullRequestSync,
expectedConcurrencyGroup: "abc",
expectedConcurrencyType: actions_model.QueueBehind,
},
{
name: "no concurrency group, cancel-in-progress: false",
concurrency: &act_model.RawConcurrency{
CancelInProgress: "false",
},
runEvent: webhook.HookEventPullRequestSync,
expectedConcurrencyGroup: "refs/head/main_testing.yml_pull_request_sync__auto",
expectedConcurrencyType: actions_model.UnlimitedConcurrency,
},
{
name: "interpreted values",
concurrency: &act_model.RawConcurrency{
Group: "${{ github.workflow }}-${{ github.ref }}",
CancelInProgress: "${{ !contains(github.ref, 'release/')}}",
},
runEvent: webhook.HookEventPullRequestSync,
expectedConcurrencyGroup: "testing.yml-refs/head/main",
expectedConcurrencyType: actions_model.CancelInProgress,
},
{
name: "interpreted values with inputs and vars",
concurrency: &act_model.RawConcurrency{
Group: "${{ inputs.abc }}-${{ vars.def }}",
},
inputs: map[string]any{"abc": "123"},
vars: map[string]string{"def": "456"},
runEvent: webhook.HookEventPullRequestSync,
expectedConcurrencyGroup: "123-456",
expectedConcurrencyType: actions_model.CancelInProgress,
},
} {
t.Run(tc.name, func(t *testing.T) {
workflow := &act_model.Workflow{RawConcurrency: tc.concurrency}
run := &actions_model.ActionRun{
Ref: "refs/head/main",
WorkflowID: "testing.yml",
Event: tc.runEvent,
TriggerEvent: string(tc.runEvent),
Repo: &repo.Repository{},
}
err := ConfigureActionRunConcurrency(workflow, run, tc.vars, tc.inputs)
require.NoError(t, err)
if tc.expectedConcurrencyGroup == "" {
assert.Empty(t, run.ConcurrencyGroup, "empty ConcurrencyGroup")
} else {
assert.Equal(t, tc.expectedConcurrencyGroup, run.ConcurrencyGroup)
}
assert.Equal(t, tc.expectedConcurrencyType, run.ConcurrencyType)
})
}
}

View file

@ -5,6 +5,7 @@ package notify
import (
"context"
"slices"
actions_model "forgejo.org/models/actions"
issues_model "forgejo.org/models/issues"
@ -24,6 +25,13 @@ func RegisterNotifier(notifier Notifier) {
notifiers = append(notifiers, notifier)
}
// Intended for undoing RegisterNotifier in tests only, not for production usage
func UnregisterNotifier(notifier Notifier) {
notifiers = slices.DeleteFunc(notifiers, func(maybeNotifier Notifier) bool {
return notifier == maybeNotifier
})
}
// NewWikiPage notifies creating new wiki pages to notifiers
func NewWikiPage(ctx context.Context, doer *user_model.User, repo *repo_model.Repository, page, comment string) {
for _, notifier := range notifiers {

View file

@ -0,0 +1,281 @@
// Copyright 2025 The Forgejo Authors. All rights reserved.
// SPDX-License-Identifier: GPL-3.0-or-later
package integration
import (
"net/url"
"strings"
"testing"
actions_model "forgejo.org/models/actions"
"forgejo.org/models/db"
unit_model "forgejo.org/models/unit"
"forgejo.org/models/unittest"
user_model "forgejo.org/models/user"
"forgejo.org/modules/gitrepo"
"forgejo.org/modules/setting"
"forgejo.org/modules/test"
actions_service "forgejo.org/services/actions"
files_service "forgejo.org/services/repository/files"
"forgejo.org/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestActionConcurrencyRunnerFiltering(t *testing.T) {
defer unittest.OverrideFixtures("tests/integration/fixtures/TestActionConcurrencyRunnerFiltering")()
require.NoError(t, unittest.PrepareTestDatabase())
for _, tc := range []struct {
name string
runnerName string
expectedRunIDs []int64
}{
{
// owner id 2
runnerName: "User runner",
expectedRunIDs: []int64{500, 502},
},
{
// owner id 3
runnerName: "Organisation runner",
expectedRunIDs: []int64{501},
},
{
runnerName: "Repository runner",
expectedRunIDs: []int64{502},
},
{
runnerName: "Global runner",
expectedRunIDs: []int64{500, 501, 502},
},
} {
t.Run(tc.runnerName, func(t *testing.T) {
// defer unittest.OverrideFixtures("tests/integration/fixtures/TestActionConcurrencyRunnerFiltering")()
// require.NoError(t, unittest.PrepareTestDatabase())
doTest := func() {
e := db.GetEngine(t.Context())
runner := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunner{Name: tc.runnerName})
jobs, err := actions_model.GetAvailableJobsForRunner(e, runner)
require.NoError(t, err)
ids := []int64{}
for _, job := range jobs {
ids = append(ids, job.ID)
}
assert.ElementsMatch(t, tc.expectedRunIDs, ids)
}
t.Run("ConcurrencyGroupQueueEnabled", func(t *testing.T) {
defer test.MockVariableValue(&setting.Actions.ConcurrencyGroupQueueEnabled, true)()
doTest()
})
t.Run("ConcurrencyGroupQueueDisabled", func(t *testing.T) {
defer test.MockVariableValue(&setting.Actions.ConcurrencyGroupQueueEnabled, false)()
doTest()
})
})
}
}
// These tests are a little more unit-testy than they are integration tests, but they're placed in the integration test
// suite so that they're run on all database engines.
func TestActionConcurrencyGroupQueue(t *testing.T) {
for _, tc := range []struct {
name string
expectedRunIDs []int64
updateRun500 map[string]any
updateRunJob500 map[string]any
updateRun501 map[string]any
updateRunJob501 map[string]any
queuingDisabled bool
}{
{
name: "queuing disabled",
expectedRunIDs: []int64{500, 501, 502},
queuingDisabled: true,
},
{
// Job 501 & 502's data is configured to be queued-behind job 500, so with queuing enabled it shouldn't
// appear.
name: "concurrency blocked",
expectedRunIDs: []int64{500},
},
{
name: "different repo",
updateRun501: map[string]any{"repo_id": 2},
expectedRunIDs: []int64{500, 501},
},
{
name: "different concurrency group",
updateRun501: map[string]any{"concurrency_group": "321bca"},
expectedRunIDs: []int64{500, 501},
},
{
name: "null concurrency group",
updateRun501: map[string]any{"concurrency_group": nil},
expectedRunIDs: []int64{500, 501},
},
{
name: "empty concurrency group",
updateRun501: map[string]any{"concurrency_group": ""},
expectedRunIDs: []int64{500, 501},
},
{
name: "unlimited concurrency",
updateRun501: map[string]any{"concurrency_type": actions_model.UnlimitedConcurrency},
expectedRunIDs: []int64{500, 501},
},
{
name: "cancel-in-progress type",
updateRun501: map[string]any{"concurrency_type": actions_model.CancelInProgress},
expectedRunIDs: []int64{500, 501},
},
{
name: "blocking job done",
updateRun500: map[string]any{"status": actions_model.StatusCancelled},
updateRunJob500: map[string]any{"status": actions_model.StatusCancelled},
expectedRunIDs: []int64{501},
},
{
name: "mid-index job running",
updateRun501: map[string]any{"status": actions_model.StatusRunning},
updateRunJob501: map[string]any{"status": actions_model.StatusRunning},
expectedRunIDs: []int64{},
},
{
// Reflects a case where 500 may be retried -- there's already a later job (index-wise) in the concurrency
// group that is done, but if 500 is waiting it can still be run
name: "mid-index job ran",
updateRun501: map[string]any{"status": actions_model.StatusSuccess},
updateRunJob501: map[string]any{"status": actions_model.StatusSuccess},
expectedRunIDs: []int64{500},
},
{
// If both job 500 & job 501 are in the same workflow run, and one is running, the other can still start
// (this would be conditional on its `needs` as a job, but that isn't evaluated by GetAvailableJobsForRunner
// so isn't in the scope of testing here)
name: "multiple jobs from same run",
updateRun500: map[string]any{"status": actions_model.StatusRunning},
updateRunJob500: map[string]any{"status": actions_model.StatusRunning},
updateRunJob501: map[string]any{"run_id": 500},
expectedRunIDs: []int64{501},
},
} {
t.Run(tc.name, func(t *testing.T) {
defer unittest.OverrideFixtures("tests/integration/fixtures/TestActionConcurrencyGroupQueue")()
require.NoError(t, unittest.PrepareTestDatabase())
runner := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunner{ID: 1004}, "owner_id = 0 AND repo_id = 0")
defer test.MockVariableValue(&setting.Actions.ConcurrencyGroupQueueEnabled, !tc.queuingDisabled)()
e := db.GetEngine(t.Context())
if tc.updateRun500 != nil {
affected, err := e.Table(&actions_model.ActionRun{}).Where("id = ?", 500).Update(tc.updateRun500)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
}
if tc.updateRunJob500 != nil {
affected, err := e.Table(&actions_model.ActionRunJob{}).Where("id = ?", 500).Update(tc.updateRunJob500)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
}
if tc.updateRun501 != nil {
affected, err := e.Table(&actions_model.ActionRun{}).Where("id = ?", 501).Update(tc.updateRun501)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
}
if tc.updateRunJob501 != nil {
affected, err := e.Table(&actions_model.ActionRunJob{}).Where("id = ?", 501).Update(tc.updateRunJob501)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
}
jobs, err := actions_model.GetAvailableJobsForRunner(e, runner)
require.NoError(t, err)
ids := []int64{}
for _, job := range jobs {
ids = append(ids, job.ID)
}
assert.ElementsMatch(t, tc.expectedRunIDs, ids)
})
}
}
func TestActionConcurrencyGroupQueueFetchNext(t *testing.T) {
if !setting.Database.Type.IsSQLite3() {
// mock repo runner only supported on SQLite testing
t.Skip()
}
onGiteaRun(t, func(t *testing.T, u *url.URL) {
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
// create the repo
repo, sha, f := tests.CreateDeclarativeRepo(t, user2, "repo-workflow-dispatch",
[]unit_model.Type{unit_model.TypeActions}, nil,
[]*files_service.ChangeRepoFile{
{
Operation: "create",
TreePath: ".forgejo/workflows/dispatch.yml",
ContentReader: strings.NewReader(
"name: concurrency group workflow\n" +
"on:\n" +
" workflow_dispatch:\n" +
" inputs:\n" +
" ident:\n" +
" type: string\n" +
"concurrency:\n" +
" group: abc\n" +
" cancel-in-progress: false\n" +
"jobs:\n" +
" test:\n" +
" runs-on: ubuntu-latest\n" +
" steps:\n" +
" - run: echo deployment goes here\n"),
},
},
)
defer f()
gitRepo, err := gitrepo.OpenRepository(db.DefaultContext, repo)
require.NoError(t, err)
defer gitRepo.Close()
workflow, err := actions_service.GetWorkflowFromCommit(gitRepo, "main", "dispatch.yml")
require.NoError(t, err)
assert.Equal(t, "refs/heads/main", workflow.Ref)
assert.Equal(t, sha, workflow.Commit.ID.String())
runner := newMockRunner()
runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"})
// first run within the concurrency group
_, _, err = workflow.Dispatch(db.DefaultContext, func(key string) string { return "task1" }, repo, user2)
require.NoError(t, err)
task1 := runner.fetchTask(t)
// dispatch a second run within the same concurrency group
_, _, err = workflow.Dispatch(db.DefaultContext, func(key string) string { return "task2" }, repo, user2)
require.NoError(t, err)
// assert that we can't fetch and start that second task -- it's blocked behind the first
task2 := runner.maybeFetchTask(t)
assert.Nil(t, task2)
// finish the first task
runner.succeedAtTask(t, task1)
// now task2 should be accessible since task1 has completed
task2 = runner.fetchTask(t)
assert.NotNil(t, task2)
runner.succeedAtTask(t, task2)
})
}

View file

@ -75,6 +75,7 @@ func TestActionNowDoneNotification(t *testing.T) {
onGiteaRun(t, func(t *testing.T, u *url.URL) {
notifier := mockNotifier{t: t, testIdx: 0, lastRunID: -1, runID: -1}
notify_service.RegisterNotifier(&notifier)
defer notify_service.UnregisterNotifier(&notifier)
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})

View file

@ -25,7 +25,8 @@ import (
)
type mockRunner struct {
client *mockRunnerClient
client *mockRunnerClient
lastTasksVersion int64
}
type mockRunnerClient struct {
@ -83,8 +84,7 @@ func (r *mockRunner) doRegister(t *testing.T, name, token string, labels []strin
func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, runnerName string, labels []string) {
if !setting.Database.Type.IsSQLite3() {
// registering a mock runner when using a database other than SQLite leaves leftovers
t.FailNow()
assert.FailNow(t, "registering a mock runner when using a database other than SQLite leaves leftovers")
}
session := loginUser(t, ownerName)
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository)
@ -152,6 +152,15 @@ func (r *mockRunner) deleteRunner(t *testing.T, ownerName, repoName string, runn
MakeRequest(t, req, http.StatusNoContent)
}
func (r *mockRunner) maybeFetchTask(t *testing.T) *runnerv1.Task {
resp, err := r.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{
TasksVersion: r.lastTasksVersion,
}))
require.NoError(t, err)
r.lastTasksVersion = resp.Msg.TasksVersion
return resp.Msg.Task
}
func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1.Task {
fetchTimeout := 10 * time.Second
if len(timeout) > 0 {
@ -159,13 +168,10 @@ func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1
}
var task *runnerv1.Task
assert.Eventually(t, func() bool {
resp, err := r.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{
TasksVersion: 0,
}))
require.NoError(t, err)
if resp.Msg.Task != nil {
task = resp.Msg.Task
require.Eventually(t, func() bool {
maybeTask := r.maybeFetchTask(t)
if maybeTask != nil {
task = maybeTask
return true
}
return false

View file

@ -848,3 +848,61 @@ func TestActionsWorkflowDispatchEvent(t *testing.T) {
assert.Equal(t, "test", j[0])
})
}
func TestActionsWorkflowDispatchConcurrencyGroup(t *testing.T) {
onGiteaRun(t, func(t *testing.T, u *url.URL) {
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
// create the repo
repo, sha, f := tests.CreateDeclarativeRepo(t, user2, "repo-workflow-dispatch",
[]unit_model.Type{unit_model.TypeActions}, nil,
[]*files_service.ChangeRepoFile{
{
Operation: "create",
TreePath: ".gitea/workflows/dispatch.yml",
ContentReader: strings.NewReader(
"name: test\n" +
"on: [workflow_dispatch]\n" +
"jobs:\n" +
" test:\n" +
" runs-on: ubuntu-latest\n" +
" steps:\n" +
" - run: echo helloworld\n" +
"concurrency:\n" +
" group: workflow-magic-group\n" +
" cancel-in-progress: true\n",
),
},
},
)
defer f()
gitRepo, err := gitrepo.OpenRepository(db.DefaultContext, repo)
require.NoError(t, err)
defer gitRepo.Close()
workflow, err := actions_service.GetWorkflowFromCommit(gitRepo, "main", "dispatch.yml")
require.NoError(t, err)
assert.Equal(t, "refs/heads/main", workflow.Ref)
assert.Equal(t, sha, workflow.Commit.ID.String())
inputGetter := func(key string) string {
return ""
}
firstRun, _, err := workflow.Dispatch(db.DefaultContext, inputGetter, repo, user2)
require.NoError(t, err)
assert.Equal(t, 1, unittest.GetCount(t, &actions_model.ActionRun{RepoID: repo.ID}))
assert.Equal(t, "workflow-magic-group", firstRun.ConcurrencyGroup)
assert.Equal(t, actions_model.CancelInProgress, firstRun.ConcurrencyType)
// Dispatch again and verify previous run was cancelled:
secondRun, _, err := workflow.Dispatch(db.DefaultContext, inputGetter, repo, user2)
require.NoError(t, err)
assert.Equal(t, 2, unittest.GetCount(t, &actions_model.ActionRun{RepoID: repo.ID}))
assert.Equal(t, "workflow-magic-group", secondRun.ConcurrencyGroup)
assert.Equal(t, actions_model.CancelInProgress, secondRun.ConcurrencyType)
firstRunReload := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: firstRun.ID})
assert.Equal(t, actions_model.StatusCancelled, firstRunReload.Status)
})
}

View file

@ -0,0 +1,21 @@
-
id: 500
index: 1
status: 5 # StatusWaiting
repo_id: 4
concurrency_group: abc123
concurrency_type: 1 # QueueBehind
-
id: 501
index: 2
status: 5 # StatusWaiting
repo_id: 4
concurrency_group: abc123
concurrency_type: 1 # QueueBehind
-
id: 502
index: 3
status: 5 # StatusWaiting
repo_id: 4
concurrency_group: abc123
concurrency_type: 1 # QueueBehind

View file

@ -0,0 +1,42 @@
-
id: 500
run_id: 500
repo_id: 4
owner_id: 1
commit_sha: 985f0301dba5e7b34be866819cd15ad3d8f508ee
is_fork_pull_request: 0
name: job_1
attempt: 0
job_id: job_1
task_id: 0
status: 5 # StatusWaiting
runs_on: '["fedora"]'
created: 1758848614
-
id: 501
run_id: 501
repo_id: 4
owner_id: 1
commit_sha: 985f0301dba5e7b34be866819cd15ad3d8f508ee
is_fork_pull_request: 0
name: job_1
attempt: 0
job_id: job_1
task_id: 0
status: 5 # StatusWaiting
runs_on: '["fedora"]'
created: 1758848614
-
id: 502
run_id: 502
repo_id: 4
owner_id: 1
commit_sha: 985f0301dba5e7b34be866819cd15ad3d8f508ee
is_fork_pull_request: 0
name: job_1
attempt: 0
job_id: job_1
task_id: 0
status: 5 # StatusWaiting
runs_on: '["fedora"]'
created: 1758848614

View file

@ -0,0 +1,7 @@
-
id: 1004
uuid: "fb857e63-c0ce-4571-a6c9-fde26c128073"
name: "Global runner"
owner_id: 0
repo_id: 0
deleted: 0

View file

@ -0,0 +1,18 @@
-
id: 500
index: 1
status: 5 # StatusWaiting
repo_id: 62
owner_id: 2
-
id: 501
index: 2
status: 5 # StatusWaiting
repo_id: 3
owner_id: 3
-
id: 502
index: 2
status: 5 # StatusWaiting
repo_id: 1
owner_id: 2

View file

@ -0,0 +1,42 @@
-
id: 500
run_id: 500
repo_id: 62
owner_id: 2
commit_sha: 985f0301dba5e7b34be866819cd15ad3d8f508ee
is_fork_pull_request: 0
name: job_1
attempt: 0
job_id: job_1
task_id: 0
status: 5 # StatusWaiting
runs_on: '["fedora"]'
created: 1758848614
-
id: 501
run_id: 501
repo_id: 4
owner_id: 3
commit_sha: 985f0301dba5e7b34be866819cd15ad3d8f508ee
is_fork_pull_request: 0
name: job_1
attempt: 0
job_id: job_1
task_id: 0
status: 5 # StatusWaiting
runs_on: '["fedora"]'
created: 1758848614
-
id: 502
run_id: 502
repo_id: 1
owner_id: 2
commit_sha: 985f0301dba5e7b34be866819cd15ad3d8f508ee
is_fork_pull_request: 0
name: job_1
attempt: 0
job_id: job_1
task_id: 0
status: 5 # StatusWaiting
runs_on: '["fedora"]'
created: 1758848614

View file

@ -0,0 +1,31 @@
-
id: 1001
uuid: "43b5d4d3-401b-42f9-94df-a9d45b447b82"
name: "User runner"
owner_id: 2
repo_id: 0
deleted: 0
-
id: 1002
uuid: "bdc77f4f-2b2b-442d-bd44-e808f4306347"
name: "Organisation runner"
owner_id: 3
repo_id: 0
deleted: 0
-
id: 1003
uuid: "9268bc8c-efbf-4dbe-aeb5-945733cdd098"
name: "Repository runner"
owner_id: 0
repo_id: 1
deleted: 0
-
id: 1004
uuid: "fb857e63-c0ce-4571-a6c9-fde26c128073"
name: "Global runner"
owner_id: 0
repo_id: 0
deleted: 0

View file

@ -0,0 +1,6 @@
-
id: 200
repo_id: 3
type: 10
config: "{}"
created_unix: 946684810