diff --git a/.deadcode-out b/.deadcode-out index 31b04687dc..87dc416ff1 100644 --- a/.deadcode-out +++ b/.deadcode-out @@ -59,16 +59,12 @@ forgejo.org/models/user IsErrExternalLoginUserAlreadyExist IsErrExternalLoginUserNotExist NewFederatedUser - NewFederatedUserFollower IsErrUserSettingIsNotExist GetUserAllSettings DeleteUserSetting GetFederatedUser GetFederatedUserByUserID GetFollowersForUser - AddFollower - RemoveFollower - IsFollowingAp forgejo.org/modules/activitypub NewContext @@ -99,11 +95,7 @@ forgejo.org/modules/eventsource Event.String forgejo.org/modules/forgefed - NewForgeFollowFromAp NewForgeFollow - ForgeFollow.MarshalJSON - ForgeFollow.UnmarshalJSON - ForgeFollow.Validate NewForgeUndoLike ForgeUndoLike.UnmarshalJSON ForgeUndoLike.Validate @@ -228,7 +220,6 @@ forgejo.org/modules/util/filebuffer forgejo.org/modules/validation IsErrNotValid - ValidateIDExists forgejo.org/modules/web RouteMock @@ -245,11 +236,8 @@ forgejo.org/routers/web/org forgejo.org/services/context GetPrivateContext -forgejo.org/services/federation - NewErrInternalf - ErrInternal.Error - Init - NewServiceResultWithBytes +forgejo.org/services/convert + ToActivityPubPersonFeedItem forgejo.org/services/repository IsErrForkAlreadyExist diff --git a/routers/api/v1/activitypub/person.go b/routers/api/v1/activitypub/person.go index 1da7933418..6120a078af 100644 --- a/routers/api/v1/activitypub/person.go +++ b/routers/api/v1/activitypub/person.go @@ -4,14 +4,14 @@ package activitypub import ( - "fmt" "net/http" - "strings" "forgejo.org/modules/activitypub" "forgejo.org/modules/log" - "forgejo.org/modules/setting" + "forgejo.org/modules/web" "forgejo.org/services/context" + "forgejo.org/services/convert" + "forgejo.org/services/federation" ap "github.com/go-ap/activitypub" "github.com/go-ap/jsonld" @@ -34,45 +34,12 @@ func Person(ctx *context.APIContext) { // "200": // "$ref": "#/responses/ActivityPub" - // TODO: the setting.AppURL during the test doesn't follow the definition: "It always has a '/' suffix" - link := fmt.Sprintf("%s/api/v1/activitypub/user-id/%d", strings.TrimSuffix(setting.AppURL, "/"), ctx.ContextUser.ID) - person := ap.PersonNew(ap.IRI(link)) - - person.Name = ap.NaturalLanguageValuesNew() - err := person.Name.Set("en", ap.Content(ctx.ContextUser.FullName)) + person, err := convert.ToActivityPubPerson(ctx, ctx.ContextUser) if err != nil { - ctx.ServerError("Set Name", err) + ctx.ServerError("convert.ToActivityPubPerson", err) return } - person.PreferredUsername = ap.NaturalLanguageValuesNew() - err = person.PreferredUsername.Set("en", ap.Content(ctx.ContextUser.Name)) - if err != nil { - ctx.ServerError("Set PreferredUsername", err) - return - } - - person.URL = ap.IRI(ctx.ContextUser.HTMLURL()) - - person.Icon = ap.Image{ - Type: ap.ImageType, - MediaType: "image/png", - URL: ap.IRI(ctx.ContextUser.AvatarLink(ctx)), - } - - person.Inbox = ap.IRI(link + "/inbox") - person.Outbox = ap.IRI(link + "/outbox") - - person.PublicKey.ID = ap.IRI(link + "#main-key") - person.PublicKey.Owner = ap.IRI(link) - - publicKeyPem, err := activitypub.GetPublicKey(ctx, ctx.ContextUser) - if err != nil { - ctx.ServerError("GetPublicKey", err) - return - } - person.PublicKey.PublicKeyPem = publicKeyPem - binary, err := jsonld.WithContext(jsonld.IRI(ap.ActivityBaseURI), jsonld.IRI(ap.SecurityContextURI)).Marshal(person) if err != nil { ctx.ServerError("MarshalJSON", err) @@ -99,8 +66,15 @@ func PersonInbox(ctx *context.APIContext) { // type: integer // required: true // responses: - // "204": + // "202": // "$ref": "#/responses/empty" - ctx.Status(http.StatusNoContent) + form := web.GetForm(ctx) + activity := form.(*ap.Activity) + result, err := federation.ProcessPersonInbox(ctx, ctx.ContextUser, activity) + if err != nil { + ctx.Error(federation.HTTPStatus(err), "PersonInbox", err) + return + } + responseServiceResult(ctx, result) } diff --git a/routers/init.go b/routers/init.go index 90a1cb1e89..9a304527fa 100644 --- a/routers/init.go +++ b/routers/init.go @@ -38,6 +38,7 @@ import ( "forgejo.org/services/auth/source/oauth2" "forgejo.org/services/automerge" "forgejo.org/services/cron" + federation_service "forgejo.org/services/federation" feed_service "forgejo.org/services/feed" indexer_service "forgejo.org/services/indexer" "forgejo.org/services/mailer" @@ -122,6 +123,7 @@ func InitWebInstalled(ctx context.Context) { mailer.NewContext(ctx) mustInit(cache.Init) mustInit(feed_service.Init) + mustInit(federation_service.Init) mustInit(uinotification.Init) mustInitCtx(ctx, archiver.Init) diff --git a/services/convert/activitypub_person.go b/services/convert/activitypub_person.go new file mode 100644 index 0000000000..2c05f8c1c0 --- /dev/null +++ b/services/convert/activitypub_person.go @@ -0,0 +1,62 @@ +// Copyright 2024 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package convert + +import ( + "context" + + "forgejo.org/models/activities" + user_model "forgejo.org/models/user" + "forgejo.org/modules/activitypub" + + ap "github.com/go-ap/activitypub" +) + +func ToActivityPubPersonFeedItem(item *activities.FederatedUserActivity) ap.Note { + return ap.Note{ + AttributedTo: ap.IRI(item.ActorURI), + Content: ap.NaturalLanguageValues{{Value: ap.Content(item.NoteContent), Ref: ap.NilLangRef}}, + ID: ap.IRI(item.NoteURL), + URL: ap.IRI(item.OriginalNote), + } +} + +func ToActivityPubPerson(ctx context.Context, user *user_model.User) (*ap.Person, error) { + link := user.APActorID() + person := ap.PersonNew(ap.IRI(link)) + + person.Name = ap.NaturalLanguageValuesNew() + err := person.Name.Set("en", ap.Content(user.FullName)) + if err != nil { + return nil, err + } + + person.PreferredUsername = ap.NaturalLanguageValuesNew() + err = person.PreferredUsername.Set("en", ap.Content(user.Name)) + if err != nil { + return nil, err + } + + person.URL = ap.IRI(user.HTMLURL()) + + person.Icon = ap.Image{ + Type: ap.ImageType, + MediaType: "image/png", + URL: ap.IRI(user.AvatarLink(ctx)), + } + + person.Inbox = ap.IRI(link + "/inbox") + person.Outbox = ap.IRI(link + "/outbox") + + person.PublicKey.ID = ap.IRI(link + "#main-key") + person.PublicKey.Owner = ap.IRI(link) + + publicKeyPem, err := activitypub.GetPublicKey(ctx, user) + if err != nil { + return nil, err + } + person.PublicKey.PublicKeyPem = publicKeyPem + + return person, nil +} diff --git a/services/federation/delivery_queue.go b/services/federation/delivery_queue.go new file mode 100644 index 0000000000..f71467e9f0 --- /dev/null +++ b/services/federation/delivery_queue.go @@ -0,0 +1,76 @@ +// Copyright 2024 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package federation + +import ( + "fmt" + "io" + + "forgejo.org/models/user" + "forgejo.org/modules/activitypub" + "forgejo.org/modules/graceful" + "forgejo.org/modules/log" + "forgejo.org/modules/process" + "forgejo.org/modules/queue" +) + +type deliveryQueueItem struct { + Doer *user.User + InboxURL string + Payload []byte + DeliveryCount int +} + +var deliveryQueue *queue.WorkerPoolQueue[deliveryQueueItem] + +func initDeliveryQueue() error { + deliveryQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "activitypub_inbox_delivery", deliveryQueueHandler) + if deliveryQueue == nil { + return fmt.Errorf("unable to create activitypub_inbox_delivery queue") + } + go graceful.GetManager().RunWithCancel(deliveryQueue) + + return nil +} + +func deliveryQueueHandler(items ...deliveryQueueItem) (unhandled []deliveryQueueItem) { + for _, item := range items { + item.DeliveryCount++ + err := deliverToInbox(item) + if err != nil && item.DeliveryCount < 10 { + unhandled = append(unhandled, item) + } + } + return unhandled +} + +func deliverToInbox(item deliveryQueueItem) error { + ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(), + fmt.Sprintf("Delivering an Activity via user[%d] (%s), to %s", item.Doer.ID, item.Doer.Name, item.InboxURL)) + defer finished() + + clientFactory, err := activitypub.GetClientFactory(ctx) + if err != nil { + return err + } + apclient, err := clientFactory.WithKeys(ctx, item.Doer, item.Doer.APActorID()+"#main-key") + if err != nil { + return err + } + + log.Debug("Delivering %s to %s", item.Payload, item.InboxURL) + res, err := apclient.Post(item.Payload, item.InboxURL) + if err != nil { + return err + } + if res.StatusCode >= 400 { + defer res.Body.Close() + body, _ := io.ReadAll(io.LimitReader(res.Body, 16*1024)) + + log.Warn("Delivering to %s failed: %d %s, %v times", item.InboxURL, res.StatusCode, string(body), item.DeliveryCount) + return fmt.Errorf("delivery failed") + } + + return nil +} diff --git a/services/federation/federation_service.go b/services/federation/federation_service.go index 36788e725a..ccdb9bbab0 100644 --- a/services/federation/federation_service.go +++ b/services/federation/federation_service.go @@ -23,7 +23,10 @@ import ( ) func Init() error { - return nil + if !setting.Federation.Enabled { + return nil + } + return initDeliveryQueue() } func FindOrCreateFederationHost(ctx context.Context, actorURI string) (*forgefed.FederationHost, error) { diff --git a/services/federation/person_inbox_follow.go b/services/federation/person_inbox_follow.go new file mode 100644 index 0000000000..baa7934ad5 --- /dev/null +++ b/services/federation/person_inbox_follow.go @@ -0,0 +1,73 @@ +// Copyright 2024 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package federation + +import ( + "context" + "fmt" + "net/http" + + "forgejo.org/models/user" + "forgejo.org/modules/forgefed" + "forgejo.org/modules/log" + + ap "github.com/go-ap/activitypub" + "github.com/go-ap/jsonld" +) + +func processPersonFollow(ctx context.Context, ctxUser *user.User, activity *ap.Activity) (ServiceResult, error) { + follow, err := forgefed.NewForgeFollowFromAp(*activity) + if err != nil { + log.Error("Invalid follow activity: %s", err) + return ServiceResult{}, NewErrNotAcceptablef("Invalid follow activity: %v", err) + } + + actorURI := follow.Actor.GetLink().String() + _, federatedUser, federationHost, err := FindOrCreateFederatedUser(ctx, actorURI) + if err != nil { + log.Error("Error finding or creating federated user (%s): %v", actorURI, err) + return ServiceResult{}, NewErrNotAcceptablef("Federated user not found: %v", err) + } + + following, err := user.IsFollowingAp(ctx, ctxUser, federatedUser) + if err != nil { + log.Error("forgefed.IsFollowing: %v", err) + return ServiceResult{}, NewErrNotAcceptablef("forgefed.IsFollowing: %v", err) + } + if following { + // If the user is already following, we're good, nothing to do. + log.Trace("Local user[%d] is already following federated user[%d]", ctxUser.ID, federatedUser.ID) + return NewServiceResultStatusOnly(http.StatusNoContent), nil + } + + follower, err := user.AddFollower(ctx, ctxUser, federatedUser) + if err != nil { + log.Error("Unable to add follower: %v", err) + return ServiceResult{}, NewErrNotAcceptablef("Unable to add follower: %v", err) + } + + accept := ap.AcceptNew(ap.IRI(fmt.Sprintf( + "%s#accepts/follow/%d", ctxUser.APActorID(), follower.ID, + )), follow) + accept.Actor = ap.IRI(ctxUser.APActorID()) + payload, err := jsonld.WithContext(jsonld.IRI(ap.ActivityBaseURI)).Marshal(accept) + if err != nil { + log.Error("Unable to Marshal JSON: %v", err) + return ServiceResult{}, NewErrInternalf("MarshalJSON: %v", err) + } + + hostURL := federationHost.AsURL() + if err := deliveryQueue.Push(deliveryQueueItem{ + InboxURL: hostURL.JoinPath(federatedUser.InboxPath).String(), + Doer: ctxUser, + Payload: payload, + }); err != nil { + log.Error("Unable to push to pending queue: %v", err) + return ServiceResult{}, NewErrInternalf("Unable to push to pending queue: %v", err) + } + + // Respond back with an accept + result := NewServiceResultWithBytes(http.StatusAccepted, []byte(`{"status":"Accepted"}`)) + return result, nil +} diff --git a/services/federation/person_inbox_undo.go b/services/federation/person_inbox_undo.go new file mode 100644 index 0000000000..4379cf242a --- /dev/null +++ b/services/federation/person_inbox_undo.go @@ -0,0 +1,47 @@ +// Copyright 2024 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package federation + +import ( + "context" + "net/http" + + "forgejo.org/models/user" + "forgejo.org/modules/log" + + ap "github.com/go-ap/activitypub" +) + +func processPersonInboxUndo(ctx context.Context, ctxUser *user.User, activity *ap.Activity) (ServiceResult, error) { + if activity.Object.GetType() != ap.FollowType { + log.Error("Invalid object type for Undo activity: %v", activity.Object.GetType()) + return ServiceResult{}, NewErrNotAcceptablef("Invalid object type for Undo activity: %v", activity.Object.GetType()) + } + + actorURI := activity.Actor.GetLink().String() + _, federatedUser, _, err := findFederatedUser(ctx, actorURI) + if err != nil { + log.Error("User not found: %v", err) + return ServiceResult{}, NewErrInternalf("User not found: %v", err) + } + + if federatedUser != nil { + following, err := user.IsFollowingAp(ctx, ctxUser, federatedUser) + if err != nil { + log.Error("forgefed.IsFollowing: %v", err) + return ServiceResult{}, NewErrInternalf("forgefed.IsFollowing: %v", err) + } + if !following { + // The local user is not following the federated one, nothing to do. + log.Trace("Local user[%d] is not following federated user[%d]", ctxUser.ID, federatedUser.ID) + return NewServiceResultStatusOnly(http.StatusNoContent), nil + } + if err := user.RemoveFollower(ctx, ctxUser, federatedUser); err != nil { + log.Error("Unable to remove follower", err) + return ServiceResult{}, NewErrInternalf("Unable to remove follower: %v", err) + } + } + + return NewServiceResultStatusOnly(http.StatusNoContent), nil +} diff --git a/services/federation/person_service.go b/services/federation/person_service.go new file mode 100644 index 0000000000..f67d2b492d --- /dev/null +++ b/services/federation/person_service.go @@ -0,0 +1,25 @@ +// Copyright 2024 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package federation + +import ( + "context" + + "forgejo.org/models/user" + "forgejo.org/modules/log" + + ap "github.com/go-ap/activitypub" +) + +func ProcessPersonInbox(ctx context.Context, user *user.User, activity *ap.Activity) (ServiceResult, error) { + switch activity.Type { + case ap.FollowType: + return processPersonFollow(ctx, user, activity) + case ap.UndoType: + return processPersonInboxUndo(ctx, user, activity) + } + + log.Error("Unsupported PersonInbox activity: %v", activity.Type) + return ServiceResult{}, NewErrNotAcceptablef("unsupported activity: %v", activity.Type) +} diff --git a/templates/swagger/v1_json.tmpl b/templates/swagger/v1_json.tmpl index a610620eab..492f9487f3 100644 --- a/templates/swagger/v1_json.tmpl +++ b/templates/swagger/v1_json.tmpl @@ -162,7 +162,7 @@ } ], "responses": { - "204": { + "202": { "$ref": "#/responses/empty" } } diff --git a/tests/integration/activitypub_client_test.go b/tests/integration/activitypub_client_test.go index 2adb8304c2..67482a7277 100644 --- a/tests/integration/activitypub_client_test.go +++ b/tests/integration/activitypub_client_test.go @@ -6,14 +6,15 @@ package integration import ( "net/url" "testing" + "time" - "forgejo.org/models/db" "forgejo.org/models/unittest" user_model "forgejo.org/models/user" "forgejo.org/modules/activitypub" "forgejo.org/modules/setting" "forgejo.org/modules/test" "forgejo.org/routers" + "forgejo.org/services/contexttest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -25,15 +26,15 @@ func TestActivityPubClientBodySize(t *testing.T) { onGiteaRun(t, func(t *testing.T, u *url.URL) { user1 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 1}) - - clientFactory, err := activitypub.GetClientFactory(db.DefaultContext) - require.NoError(t, err) - - apClient, err := clientFactory.WithKeys(db.DefaultContext, user1, user1.KeyID()) - require.NoError(t, err) - url := u.JoinPath("/api/v1/nodeinfo").String() + ctx, _ := contexttest.MockAPIContext(t, url) + clientFactory, err := activitypub.NewClientFactoryWithTimeout(60 * time.Second) + require.NoError(t, err) + + apClient, err := clientFactory.WithKeys(ctx, user1, user1.KeyID()) + require.NoError(t, err) + // Request with normal MaxSize t.Run("NormalMaxSize", func(t *testing.T) { resp, err := apClient.GetBody(url) diff --git a/tests/integration/api_activitypub_person_inbox_follow_test.go b/tests/integration/api_activitypub_person_inbox_follow_test.go new file mode 100644 index 0000000000..f171b8951f --- /dev/null +++ b/tests/integration/api_activitypub_person_inbox_follow_test.go @@ -0,0 +1,105 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package integration + +import ( + "fmt" + "net/http" + "net/url" + "testing" + "time" + + "forgejo.org/models/unittest" + user_model "forgejo.org/models/user" + "forgejo.org/modules/activitypub" + "forgejo.org/modules/setting" + "forgejo.org/modules/test" + "forgejo.org/routers" + "forgejo.org/services/contexttest" + "forgejo.org/services/federation" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Flow of this test is documented at: https://codeberg.org/forgejo-contrib/federation/src/branch/main/doc/user-activity-following.md +func TestActivityPubPersonInboxFollow(t *testing.T) { + defer test.MockVariableValue(&setting.Federation.Enabled, true)() + defer test.MockVariableValue(&setting.Federation.SignatureEnforced, false)() + defer test.MockVariableValue(&testWebRoutes, routers.NormalRoutes())() + + federation.Init() + + mock := test.NewFederationServerMock() + federatedSrv := mock.DistantServer(t) + defer federatedSrv.Close() + + onGiteaRun(t, func(t *testing.T, localUrl *url.URL) { + defer test.MockVariableValue(&setting.AppURL, localUrl.String())() + + distantURL := federatedSrv.URL + distantUser15URL := fmt.Sprintf("%s/api/v1/activitypub/user-id/15", distantURL) + + localUser := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + localUser2URL := localUrl.JoinPath("/api/v1/activitypub/user-id/2").String() + localUser2Inbox := localUrl.JoinPath("/api/v1/activitypub/user-id/2/inbox").String() + + ctx, _ := contexttest.MockAPIContext(t, localUser2Inbox) + + // distant follows local + followActivity := []byte(fmt.Sprintf( + `{"type":"Follow",`+ + `"actor":"%s",`+ + `"object":"%s"}`, + distantUser15URL, + localUser2URL, + )) + cf, err := activitypub.NewClientFactoryWithTimeout(60 * time.Second) + require.NoError(t, err) + c, err := cf.WithKeysDirect(ctx, mock.ApActor.PrivKey, + mock.ApActor.KeyID(federatedSrv.URL)) + require.NoError(t, err) + resp, err := c.Post(followActivity, localUser2Inbox) + require.NoError(t, err) + assert.Equal(t, http.StatusAccepted, resp.StatusCode) + + // local follow exists + distantFederatedUser := unittest.AssertExistsAndLoadBean(t, &user_model.FederatedUser{ExternalID: "15"}) + unittest.AssertExistsAndLoadBean(t, + &user_model.FederatedUserFollower{ + FollowedUserID: localUser.ID, + FollowingUserID: distantFederatedUser.UserID, + }, + ) + + // distant is informed about accepting follow + assert.Contains(t, mock.LastPost, "\"type\":\"Accept\"") + + // distant undoes follow + undoFollowActivity := []byte(fmt.Sprintf( + `{"type":"Undo",`+ + `"actor":"%s",`+ + `"object":{"type":"Follow",`+ + `"actor":"%s",`+ + `"object":"%s"}}`, + distantUser15URL, + distantUser15URL, + localUser2URL, + )) + c, err = cf.WithKeysDirect(ctx, mock.ApActor.PrivKey, + mock.ApActor.KeyID(federatedSrv.URL)) + require.NoError(t, err) + resp, err = c.Post(undoFollowActivity, localUser2Inbox) + require.NoError(t, err) + assert.Equal(t, http.StatusNoContent, resp.StatusCode) + + // local follow removed + unittest.AssertNotExistsBean(t, + &user_model.FederatedUserFollower{ + FollowedUserID: localUser.ID, + FollowingUserID: distantFederatedUser.UserID, + }, + ) + }) +} diff --git a/tests/integration/api_activitypub_person_test.go b/tests/integration/api_activitypub_person_test.go index 277b150a1e..04d1fb1648 100644 --- a/tests/integration/api_activitypub_person_test.go +++ b/tests/integration/api_activitypub_person_test.go @@ -105,9 +105,9 @@ func TestActivityPubPersonInbox(t *testing.T) { c, err := cf.WithKeys(ctx, user1, user1url) require.NoError(t, err) - // Signed request "succeeds" + // invalid request is rejected resp, err := c.Post([]byte{}, user2inboxurl) require.NoError(t, err) - assert.Equal(t, http.StatusNoContent, resp.StatusCode) + assert.Equal(t, http.StatusNotAcceptable, resp.StatusCode) }) }