chore(runner): return errors created by connect
(#7222)
- Instead of creating errors via `google.golang.org/grpc`, use `connectrpc.com/connect`. - This _avoids_ another dependency (still indirectly referenced in testing). Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/7222 Reviewed-by: Otto <otto@codeberg.org> Co-authored-by: Gusted <postmaster@gusted.xyz> Co-committed-by: Gusted <postmaster@gusted.xyz>
This commit is contained in:
parent
511148dbc3
commit
ccd87001c8
4 changed files with 26 additions and 39 deletions
10
assets/go-licenses.json
generated
10
assets/go-licenses.json
generated
File diff suppressed because one or more lines are too long
2
go.mod
2
go.mod
|
@ -108,7 +108,6 @@ require (
|
|||
golang.org/x/sync v0.12.0
|
||||
golang.org/x/sys v0.31.0
|
||||
golang.org/x/text v0.23.0
|
||||
google.golang.org/grpc v1.71.0
|
||||
google.golang.org/protobuf v1.36.4
|
||||
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
|
||||
gopkg.in/ini.v1 v1.67.0
|
||||
|
@ -276,6 +275,7 @@ require (
|
|||
google.golang.org/genproto v0.0.0-20241015192408-796eee8c2d53 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
|
||||
google.golang.org/grpc v1.71.0 // indirect
|
||||
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
|
||||
gopkg.in/warnings.v0 v0.1.2 // indirect
|
||||
)
|
||||
|
|
|
@ -16,8 +16,6 @@ import (
|
|||
"code.gitea.io/gitea/modules/util"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -37,12 +35,12 @@ var withRunner = connect.WithInterceptors(connect.UnaryInterceptorFunc(func(unar
|
|||
runner, err := actions_model.GetRunnerByUUID(ctx, uuid)
|
||||
if err != nil {
|
||||
if errors.Is(err, util.ErrNotExist) {
|
||||
return nil, status.Error(codes.Unauthenticated, "unregistered runner")
|
||||
return nil, connect.NewError(connect.CodeUnauthenticated, errors.New("unregistered runner"))
|
||||
}
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return nil, connect.NewError(connect.CodeInternal, err)
|
||||
}
|
||||
if subtle.ConstantTimeCompare([]byte(runner.TokenHash), []byte(auth_model.HashToken(token, runner.TokenSalt))) != 1 {
|
||||
return nil, status.Error(codes.Unauthenticated, "unregistered runner")
|
||||
return nil, connect.NewError(connect.CodeUnauthenticated, errors.New("unregistered runner"))
|
||||
}
|
||||
|
||||
cols := []string{"last_online"}
|
||||
|
|
|
@ -6,6 +6,7 @@ package runner
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
actions_model "code.gitea.io/gitea/models/actions"
|
||||
|
@ -20,8 +21,6 @@ import (
|
|||
"code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
|
||||
"connectrpc.com/connect"
|
||||
gouuid "github.com/google/uuid"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func NewRunnerServiceHandler() (string, http.Handler) {
|
||||
|
@ -44,27 +43,27 @@ func (s *Service) Register(
|
|||
req *connect.Request[runnerv1.RegisterRequest],
|
||||
) (*connect.Response[runnerv1.RegisterResponse], error) {
|
||||
if req.Msg.Token == "" || req.Msg.Name == "" {
|
||||
return nil, errors.New("missing runner token, name")
|
||||
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("missing runner token, name"))
|
||||
}
|
||||
|
||||
runnerToken, err := actions_model.GetRunnerToken(ctx, req.Msg.Token)
|
||||
if err != nil {
|
||||
return nil, errors.New("runner registration token not found")
|
||||
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("runner registration token not found"))
|
||||
}
|
||||
|
||||
if !runnerToken.IsActive {
|
||||
return nil, errors.New("runner registration token has been invalidated, please use the latest one")
|
||||
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("runner registration token has been invalidated, please use the latest one"))
|
||||
}
|
||||
|
||||
if runnerToken.OwnerID > 0 {
|
||||
if _, err := user_model.GetUserByID(ctx, runnerToken.OwnerID); err != nil {
|
||||
return nil, errors.New("owner of the token not found")
|
||||
return nil, connect.NewError(connect.CodeInternal, errors.New("owner of the token not found"))
|
||||
}
|
||||
}
|
||||
|
||||
if runnerToken.RepoID > 0 {
|
||||
if _, err := repo_model.GetRepositoryByID(ctx, runnerToken.RepoID); err != nil {
|
||||
return nil, errors.New("repository of the token not found")
|
||||
return nil, connect.NewError(connect.CodeInternal, errors.New("repository of the token not found"))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -81,18 +80,18 @@ func (s *Service) Register(
|
|||
AgentLabels: labels,
|
||||
}
|
||||
if err := runner.GenerateToken(); err != nil {
|
||||
return nil, errors.New("can't generate token")
|
||||
return nil, connect.NewError(connect.CodeInternal, errors.New("can't generate token"))
|
||||
}
|
||||
|
||||
// create new runner
|
||||
if err := actions_model.CreateRunner(ctx, runner); err != nil {
|
||||
return nil, errors.New("can't create new runner")
|
||||
return nil, connect.NewError(connect.CodeInternal, errors.New("can't create new runner"))
|
||||
}
|
||||
|
||||
// update token status
|
||||
runnerToken.IsActive = true
|
||||
if err := actions_model.UpdateRunnerToken(ctx, runnerToken, "is_active"); err != nil {
|
||||
return nil, errors.New("can't update runner token status")
|
||||
return nil, connect.NewError(connect.CodeInternal, errors.New("can't update runner token status"))
|
||||
}
|
||||
|
||||
res := connect.NewResponse(&runnerv1.RegisterResponse{
|
||||
|
@ -117,7 +116,7 @@ func (s *Service) Declare(
|
|||
runner.AgentLabels = req.Msg.Labels
|
||||
runner.Version = req.Msg.Version
|
||||
if err := actions_model.UpdateRunner(ctx, runner, "agent_labels", "version"); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "update runner: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("update runner: %w", err))
|
||||
}
|
||||
|
||||
return connect.NewResponse(&runnerv1.DeclareResponse{
|
||||
|
@ -143,10 +142,10 @@ func (s *Service) FetchTask(
|
|||
tasksVersion := req.Msg.TasksVersion // task version from runner
|
||||
latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("query tasks version failed: %w", err))
|
||||
} else if latestVersion == 0 {
|
||||
if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("fail to increase task version: %w", err))
|
||||
}
|
||||
// if we don't increase the value of `latestVersion` here,
|
||||
// the response of FetchTask will return tasksVersion as zero.
|
||||
|
@ -160,7 +159,7 @@ func (s *Service) FetchTask(
|
|||
// try to pick a task for the runner that send the request.
|
||||
if t, ok, err := actions_service.PickTask(ctx, runner); err != nil {
|
||||
log.Error("pick task failed: %v", err)
|
||||
return nil, status.Errorf(codes.Internal, "pick task: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("pick task: %w", err))
|
||||
} else if ok {
|
||||
task = t
|
||||
}
|
||||
|
@ -181,7 +180,7 @@ func (s *Service) UpdateTask(
|
|||
|
||||
task, err := actions_model.UpdateTaskByState(ctx, runner.ID, req.Msg.State)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "update task: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("update task: %w", err))
|
||||
}
|
||||
|
||||
for k, v := range req.Msg.Outputs {
|
||||
|
@ -210,10 +209,10 @@ func (s *Service) UpdateTask(
|
|||
}
|
||||
|
||||
if err := task.LoadJob(ctx); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "load job: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("load job: %w", err))
|
||||
}
|
||||
if err := task.Job.LoadRun(ctx); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "load run: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("load run: %w", err))
|
||||
}
|
||||
|
||||
// don't create commit status for cron job
|
||||
|
@ -247,9 +246,9 @@ func (s *Service) UpdateLog(
|
|||
|
||||
task, err := actions_model.GetTaskByID(ctx, req.Msg.TaskId)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "get task: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("get task: %w", err))
|
||||
} else if runner.ID != task.RunnerID {
|
||||
return nil, status.Errorf(codes.Internal, "invalid runner for task")
|
||||
return nil, connect.NewError(connect.CodeInternal, errors.New("invalid runner for task"))
|
||||
}
|
||||
ack := task.LogLength
|
||||
|
||||
|
@ -259,13 +258,13 @@ func (s *Service) UpdateLog(
|
|||
}
|
||||
|
||||
if task.LogInStorage {
|
||||
return nil, status.Errorf(codes.AlreadyExists, "log file has been archived")
|
||||
return nil, connect.NewError(connect.CodeAlreadyExists, errors.New("log file has been archived"))
|
||||
}
|
||||
|
||||
rows := req.Msg.Rows[ack-req.Msg.Index:]
|
||||
ns, err := actions.WriteLogs(ctx, task.LogFilename, task.LogSize, rows)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "write logs: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("write logs: %w", err))
|
||||
}
|
||||
task.LogLength += int64(len(rows))
|
||||
for _, n := range ns {
|
||||
|
@ -280,12 +279,12 @@ func (s *Service) UpdateLog(
|
|||
task.LogInStorage = true
|
||||
remove, err = actions.TransferLogs(ctx, task.LogFilename)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "transfer logs: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("transfer logs: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
if err := actions_model.UpdateTask(ctx, task, "log_indexes", "log_length", "log_size", "log_in_storage"); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "update task: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("update task: %w", err))
|
||||
}
|
||||
if remove != nil {
|
||||
remove()
|
||||
|
|
Loading…
Add table
Reference in a new issue