
Currently references a pre-release version of `code.forgejo.org/forgejo/runner/v11`, pending release of https://code.forgejo.org/forgejo/runner/pulls/1026. Fixes #5914. This PR is quite large, but it can be reviewed commit-by-commit in relatively small, logical chunks. Adds support for workflows with a `concurrency` block, and submembers `group` and `cancel-in-progress`. For example: ``` on: workflow_dispatch: jobs: rust-checks: runs-on: debian-latest steps: - run: sleep 300 concurrency: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: false ``` The concurrency block effectively ends up with four supported behaviors that users will want to choose from: - Backwards compatibility / default -- if omitted completely, the existing Forgejo behavior will be implemented. That behavior is that push and pull request synchronize events will cancel all previous runs on the same repository, branch, and workflow. - Unlimited concurrency -- if the `cancel-in-progress` value is set to `false` and no `group` is provided, then the previously described Forgejo behavior will be disabled and an unlimited number of workflows can be executed simultaneously (to the maximum supported by the Forgejo Runner capacity). - Queue-behind -- if a `group` is provided and `cancel-in-progress: false` is set, then every new action run with in the same repository with the same group value will be queued behind previous workflow runs, allowing only one workflow to execute at a time in the group, but allowing all workflows to finish naturally. - Cancel-in-progress -- if a `group` is provided and `cancel-in-progress: true` is set, then every new action run with in the same repository with the same group value will cause previously queued or running runs to be cancelled, allowing only one workflow to execute at a time in the group, but preferring execution of the most recent workflow. Both the `group` and `cancel-in-progress` values can access values from the `github`, `inputs` and `vars` context for dynamic behavior. ## Checklist The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org). ### Tests - I added test coverage for Go changes... - [x] in their respective `*_test.go` for unit tests. - [x] in the `tests/integration` directory if it involves interactions with a live Forgejo server. - I added test coverage for JavaScript changes... - [ ] in `web_src/js/*.test.js` if it can be unit tested. - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)). ### Documentation - [x] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change. - https://codeberg.org/forgejo/docs/pulls/1513 - [ ] I did not document these changes and I do not expect someone else to do it. ### Release notes - [ ] I do not want this change to show in the release notes. - [x] I want the title to show in the release notes with a link to this pull request. - [ ] I want the content of the `release-notes/<pull request number>.md` to be be used for the release notes instead of the title. <!--start release-notes-assistant--> ## Release notes <!--URL:https://codeberg.org/forgejo/forgejo--> - Features - [PR](https://codeberg.org/forgejo/forgejo/pulls/9434): <!--number 9434 --><!--line 0 --><!--description aW1wbGVtZW50ICJjb25jdXJyZW5jeSIgYmxvY2sgaW4gRm9yZ2VqbyBBY3Rpb25zIGF0IHRoZSB3b3JrZmxvdyBsZXZlbA==-->implement "concurrency" block in Forgejo Actions at the workflow level<!--description--> <!--end release-notes-assistant--> Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/9434 Reviewed-by: Earl Warren <earl-warren@noreply.codeberg.org> Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net> Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
308 lines
9.1 KiB
Go
308 lines
9.1 KiB
Go
// Copyright 2023 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package actions
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
actions_model "forgejo.org/models/actions"
|
|
"forgejo.org/models/db"
|
|
repo_model "forgejo.org/models/repo"
|
|
"forgejo.org/models/unit"
|
|
"forgejo.org/modules/log"
|
|
"forgejo.org/modules/timeutil"
|
|
webhook_module "forgejo.org/modules/webhook"
|
|
|
|
"code.forgejo.org/forgejo/runner/v11/act/jobparser"
|
|
act_model "code.forgejo.org/forgejo/runner/v11/act/model"
|
|
"github.com/robfig/cron/v3"
|
|
"xorm.io/builder"
|
|
)
|
|
|
|
// StartScheduleTasks start the task
|
|
func StartScheduleTasks(ctx context.Context) error {
|
|
return startTasks(ctx)
|
|
}
|
|
|
|
// startTasks retrieves specifications in pages, creates a schedule task for each specification,
|
|
// and updates the specification's next run time and previous run time.
|
|
// The function returns an error if there's an issue with finding or updating the specifications.
|
|
func startTasks(ctx context.Context) error {
|
|
// Set the page size
|
|
pageSize := 50
|
|
|
|
// Retrieve specs in pages until all specs have been retrieved
|
|
now := time.Now()
|
|
for page := 1; ; page++ {
|
|
// Retrieve the specs for the current page
|
|
specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{
|
|
ListOptions: db.ListOptions{
|
|
Page: page,
|
|
PageSize: pageSize,
|
|
},
|
|
Next: now.Unix(),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("find specs: %w", err)
|
|
}
|
|
|
|
if err := specs.LoadRepos(ctx); err != nil {
|
|
return fmt.Errorf("LoadRepos: %w", err)
|
|
}
|
|
|
|
// Loop through each spec and create a schedule task for it
|
|
for _, row := range specs {
|
|
if row.Repo.IsArchived {
|
|
// Skip if the repo is archived
|
|
continue
|
|
}
|
|
|
|
cfg, err := row.Repo.GetUnit(ctx, unit.TypeActions)
|
|
if err != nil {
|
|
if repo_model.IsErrUnitTypeNotExist(err) {
|
|
// Skip the actions unit of this repo is disabled.
|
|
continue
|
|
}
|
|
return fmt.Errorf("GetUnit: %w", err)
|
|
}
|
|
actionConfig := cfg.ActionsConfig()
|
|
if actionConfig.IsWorkflowDisabled(row.Schedule.WorkflowID) {
|
|
continue
|
|
}
|
|
|
|
createAndSchedule := func(row *actions_model.ActionScheduleSpec) (cron.Schedule, error) {
|
|
if err := CreateScheduleTask(ctx, row.Schedule); err != nil {
|
|
return nil, fmt.Errorf("CreateScheduleTask: %v", err)
|
|
}
|
|
|
|
// Parse the spec
|
|
schedule, err := row.Parse()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Parse(Spec=%v): %v", row.Spec, err)
|
|
}
|
|
return schedule, nil
|
|
}
|
|
|
|
schedule, err := createAndSchedule(row)
|
|
if err != nil {
|
|
log.Error("RepoID=%v WorkflowID=%v: %v", row.Schedule.RepoID, row.Schedule.WorkflowID, err)
|
|
actionConfig.DisableWorkflow(row.Schedule.WorkflowID)
|
|
if err := repo_model.UpdateRepoUnit(ctx, cfg); err != nil {
|
|
log.Error("RepoID=%v WorkflowID=%v: CreateScheduleTask: %v", row.Schedule.RepoID, row.Schedule.WorkflowID, err)
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Update the spec's next run time and previous run time
|
|
row.Prev = row.Next
|
|
row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix())
|
|
if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil {
|
|
log.Error("UpdateScheduleSpec: %v", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Stop if all specs have been retrieved
|
|
if len(specs) < pageSize {
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CreateScheduleTask creates a scheduled task from a cron action schedule.
|
|
// It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job.
|
|
func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error {
|
|
// Create a new action run based on the schedule
|
|
run := &actions_model.ActionRun{
|
|
Title: cron.Title,
|
|
RepoID: cron.RepoID,
|
|
OwnerID: cron.OwnerID,
|
|
WorkflowID: cron.WorkflowID,
|
|
TriggerUserID: cron.TriggerUserID,
|
|
Ref: cron.Ref,
|
|
CommitSHA: cron.CommitSHA,
|
|
Event: cron.Event,
|
|
EventPayload: cron.EventPayload,
|
|
TriggerEvent: string(webhook_module.HookEventSchedule),
|
|
ScheduleID: cron.ID,
|
|
Status: actions_model.StatusWaiting,
|
|
}
|
|
|
|
vars, err := actions_model.GetVariablesOfRun(ctx, run)
|
|
if err != nil {
|
|
log.Error("GetVariablesOfRun: %v", err)
|
|
return err
|
|
}
|
|
|
|
workflow, err := act_model.ReadWorkflow(bytes.NewReader(cron.Content), false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
notifications, err := workflow.Notifications()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
run.NotifyEmail = notifications
|
|
|
|
err = ConfigureActionRunConcurrency(workflow, run, vars, map[string]any{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if run.ConcurrencyType == actions_model.CancelInProgress {
|
|
if err := CancelPreviousWithConcurrencyGroup(
|
|
ctx,
|
|
run.RepoID,
|
|
run.ConcurrencyGroup,
|
|
); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Parse the workflow specification from the cron schedule
|
|
workflows, err := jobParser(cron.Content, jobparser.WithVars(vars))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Insert the action run and its associated jobs into the database
|
|
if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Return nil if no errors occurred
|
|
return nil
|
|
}
|
|
|
|
// CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event.
|
|
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
|
|
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
|
|
// Find all runs in the specified repository, reference, and workflow with non-final status
|
|
runs, _, err := db.FindAndCount[actions_model.ActionRun](ctx, actions_model.FindRunOptions{
|
|
RepoID: repoID,
|
|
Ref: ref,
|
|
WorkflowID: workflowID,
|
|
TriggerEvent: event,
|
|
Status: []actions_model.Status{actions_model.StatusRunning, actions_model.StatusWaiting, actions_model.StatusBlocked},
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Iterate over each found run and cancel its associated jobs.
|
|
errorSlice := []error{}
|
|
for _, run := range runs {
|
|
err := cancelJobsForRun(ctx, run)
|
|
errorSlice = append(errorSlice, err)
|
|
}
|
|
err = errors.Join(errorSlice...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Cancels all pending jobs in the same repository with the same concurrency group.
|
|
func CancelPreviousWithConcurrencyGroup(ctx context.Context, repoID int64, concurrencyGroup string) error {
|
|
runs, _, err := db.FindAndCount[actions_model.ActionRun](ctx, actions_model.FindRunOptions{
|
|
RepoID: repoID,
|
|
ConcurrencyGroup: concurrencyGroup,
|
|
Status: []actions_model.Status{actions_model.StatusRunning, actions_model.StatusWaiting, actions_model.StatusBlocked},
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Iterate over each found run and cancel its associated jobs.
|
|
errorSlice := []error{}
|
|
for _, run := range runs {
|
|
err := cancelJobsForRun(ctx, run)
|
|
errorSlice = append(errorSlice, err)
|
|
}
|
|
err = errors.Join(errorSlice...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func cancelJobsForRun(ctx context.Context, run *actions_model.ActionRun) error {
|
|
// Find all jobs associated with the current run.
|
|
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
|
|
RunID: run.ID,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Iterate over each job and attempt to cancel it.
|
|
errorSlice := []error{}
|
|
for _, job := range jobs {
|
|
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
|
|
status := job.Status
|
|
if status.IsDone() {
|
|
continue
|
|
}
|
|
|
|
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
|
|
if job.TaskID == 0 {
|
|
job.Status = actions_model.StatusCancelled
|
|
job.Stopped = timeutil.TimeStampNow()
|
|
|
|
// Update the job's status and stopped time in the database.
|
|
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
|
|
if err != nil {
|
|
errorSlice = append(errorSlice, err)
|
|
continue
|
|
}
|
|
|
|
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
|
|
if n == 0 {
|
|
errorSlice = append(errorSlice, errors.New("job has changed, try again"))
|
|
continue
|
|
}
|
|
|
|
// Continue with the next job.
|
|
continue
|
|
}
|
|
|
|
// If the job has an associated task, try to stop the task, effectively cancelling the job.
|
|
if err := StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil {
|
|
errorSlice = append(errorSlice, errors.New("job has changed, try again"))
|
|
continue
|
|
}
|
|
}
|
|
|
|
return errors.Join(errorSlice...)
|
|
}
|
|
|
|
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository, cancelPreviousJobs bool) error {
|
|
// If actions disabled when there is schedule task, this will remove the outdated schedule tasks
|
|
// There is no other place we can do this because the app.ini will be changed manually
|
|
if err := actions_model.DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil {
|
|
return fmt.Errorf("DeleteCronTaskByRepo: %v", err)
|
|
}
|
|
if cancelPreviousJobs {
|
|
// cancel running cron jobs of this repository and delete old schedules
|
|
if err := CancelPreviousJobs(
|
|
ctx,
|
|
repo.ID,
|
|
repo.DefaultBranch,
|
|
"",
|
|
webhook_module.HookEventSchedule,
|
|
); err != nil {
|
|
return fmt.Errorf("CancelPreviousJobs: %v", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|