Monorepo for Tangled
tangled.org
1package repo
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "slices"
12 "strings"
13 "time"
14
15 "tangled.org/core/appview/cloudflare"
16
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/knotacl"
21 "tangled.org/core/appview/knotcompat"
22 "tangled.org/core/appview/models"
23 "tangled.org/core/appview/notify"
24 "tangled.org/core/appview/oauth"
25 "tangled.org/core/appview/pages"
26 "tangled.org/core/appview/pagination"
27 "tangled.org/core/appview/reporesolver"
28 "tangled.org/core/appview/sites"
29 "tangled.org/core/appview/validator"
30 xrpcclient "tangled.org/core/appview/xrpcclient"
31 "tangled.org/core/consts"
32 "tangled.org/core/eventconsumer"
33 "tangled.org/core/idresolver"
34 "tangled.org/core/ogre"
35 "tangled.org/core/orm"
36 "tangled.org/core/rbac"
37 "tangled.org/core/tid"
38 "tangled.org/core/xrpc/serviceauth"
39
40 comatproto "github.com/bluesky-social/indigo/api/atproto"
41 "github.com/bluesky-social/indigo/atproto/atclient"
42 "github.com/bluesky-social/indigo/atproto/syntax"
43 lexutil "github.com/bluesky-social/indigo/lex/util"
44
45 "github.com/go-chi/chi/v5"
46)
47
48type Repo struct {
49 repoResolver *reporesolver.RepoResolver
50 idResolver *idresolver.Resolver
51 config *config.Config
52 oauth *oauth.OAuth
53 pages *pages.Pages
54 spindlestream *eventconsumer.Consumer
55 db *db.DB
56 enforcer *rbac.Enforcer
57 acl *knotacl.Service
58 notifier notify.Notifier
59 logger *slog.Logger
60 serviceAuth *serviceauth.ServiceAuth
61 validator *validator.Validator
62 cfClient *cloudflare.Client
63 ogreClient *ogre.Client
64}
65
66func New(
67 oauth *oauth.OAuth,
68 repoResolver *reporesolver.RepoResolver,
69 pages *pages.Pages,
70 spindlestream *eventconsumer.Consumer,
71 idResolver *idresolver.Resolver,
72 db *db.DB,
73 config *config.Config,
74 notifier notify.Notifier,
75 enforcer *rbac.Enforcer,
76 acl *knotacl.Service,
77 logger *slog.Logger,
78 validator *validator.Validator,
79 cfClient *cloudflare.Client,
80) *Repo {
81 return &Repo{
82 oauth: oauth,
83 repoResolver: repoResolver,
84 pages: pages,
85 idResolver: idResolver,
86 config: config,
87 spindlestream: spindlestream,
88 db: db,
89 notifier: notifier,
90 enforcer: enforcer,
91 acl: acl,
92 logger: logger,
93 validator: validator,
94 cfClient: cfClient,
95 ogreClient: ogre.NewClient(config.Ogre.Host),
96 }
97}
98
99// modify the spindle configured for this repo
100func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) {
101 user := rp.oauth.GetMultiAccountUser(r)
102 l := rp.logger.With("handler", "EditSpindle")
103 l = l.With("did", user.Did)
104
105 errorId := "operation-error"
106 fail := func(msg string, err error) {
107 l.Error(msg, "err", err)
108 rp.pages.Notice(w, errorId, msg)
109 }
110
111 f, err := rp.repoResolver.Resolve(r)
112 if err != nil {
113 fail("Failed to resolve repo. Try again later", err)
114 return
115 }
116
117 newSpindle := r.FormValue("spindle")
118 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value
119 client, err := rp.oauth.AuthorizedClient(r)
120 if err != nil {
121 fail("Failed to authorize. Try again later.", err)
122 return
123 }
124
125 if !removingSpindle {
126 // ensure that this is a valid spindle for this user
127 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Did)
128 if err != nil {
129 fail("Failed to find spindles. Try again later.", err)
130 return
131 }
132
133 if !slices.Contains(validSpindles, newSpindle) {
134 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles))
135 return
136 }
137 }
138
139 newRepo := *f
140 newRepo.Spindle = newSpindle
141 record := newRepo.AsRecord()
142
143 spindlePtr := &newSpindle
144 if removingSpindle {
145 spindlePtr = nil
146 newRepo.Spindle = ""
147 }
148
149 // optimistic update
150 err = db.UpdateSpindle(rp.db, newRepo.RepoDid, spindlePtr)
151 if err != nil {
152 fail("Failed to update spindle. Try again later.", err)
153 return
154 }
155
156 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
157 if err != nil {
158 fail("Failed to update spindle, no record found on PDS.", err)
159 return
160 }
161 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
162 Collection: tangled.RepoNSID,
163 Repo: newRepo.Did,
164 Rkey: newRepo.Rkey,
165 SwapRecord: ex.Cid,
166 Record: &lexutil.LexiconTypeDecoder{
167 Val: &record,
168 },
169 })
170
171 if err != nil {
172 fail("Failed to update spindle, unable to save to PDS.", err)
173 return
174 }
175
176 oldSpindle := f.Spindle
177 if oldSpindle != "" && oldSpindle != newSpindle {
178 remaining, qErr := db.GetRepos(rp.db, orm.FilterEq("spindle", oldSpindle))
179 if qErr != nil {
180 l.Warn("failed to count repos using old spindle", "err", qErr)
181 } else if len(remaining) == 0 {
182 rp.spindlestream.RemoveSource(eventconsumer.NewSpindleSource(oldSpindle))
183 }
184 }
185
186 if !removingSpindle {
187 rp.spindlestream.AddSource(
188 context.Background(),
189 eventconsumer.NewSpindleSource(newSpindle),
190 )
191 }
192
193 rp.pages.HxRefresh(w)
194}
195
196func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) {
197 user := rp.oauth.GetMultiAccountUser(r)
198 l := rp.logger.With("handler", "AddLabel")
199 l = l.With("did", user.Did)
200
201 f, err := rp.repoResolver.Resolve(r)
202 if err != nil {
203 l.Error("failed to get repo and knot", "err", err)
204 return
205 }
206
207 errorId := "add-label-error"
208 fail := func(msg string, err error) {
209 l.Error(msg, "err", err)
210 rp.pages.Notice(w, errorId, msg)
211 }
212
213 // get form values for label definition
214 name := r.FormValue("name")
215 concreteType := r.FormValue("valueType")
216 valueFormat := r.FormValue("valueFormat")
217 enumValues := r.FormValue("enumValues")
218 scope := r.Form["scope"]
219 color := r.FormValue("color")
220 multiple := r.FormValue("multiple") == "true"
221
222 var variants []string
223 for part := range strings.SplitSeq(enumValues, ",") {
224 if part = strings.TrimSpace(part); part != "" {
225 variants = append(variants, part)
226 }
227 }
228
229 if concreteType == "" {
230 concreteType = "null"
231 }
232
233 format := models.ValueTypeFormatAny
234 if valueFormat == "did" {
235 format = models.ValueTypeFormatDid
236 }
237
238 valueType := models.ValueType{
239 Type: models.ConcreteType(concreteType),
240 Format: format,
241 Enum: variants,
242 }
243
244 label := models.LabelDefinition{
245 Did: user.Did,
246 Rkey: tid.TID(),
247 Name: name,
248 ValueType: valueType,
249 Scope: scope,
250 Color: &color,
251 Multiple: multiple,
252 Created: time.Now(),
253 }
254 if err := rp.validator.ValidateLabelDefinition(&label); err != nil {
255 fail(err.Error(), err)
256 return
257 }
258
259 // announce this relation into the firehose, store into owners' pds
260 client, err := rp.oauth.AuthorizedClient(r)
261 if err != nil {
262 fail(err.Error(), err)
263 return
264 }
265
266 // emit a labelRecord
267 labelRecord := label.AsRecord()
268 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
269 Collection: tangled.LabelDefinitionNSID,
270 Repo: label.Did,
271 Rkey: label.Rkey,
272 Record: &lexutil.LexiconTypeDecoder{
273 Val: &labelRecord,
274 },
275 })
276 // invalid record
277 if err != nil {
278 fail("Failed to write record to PDS.", err)
279 return
280 }
281
282 aturi := resp.Uri
283 l = l.With("at-uri", aturi)
284 l.Info("wrote label record to PDS")
285
286 // update the repo to subscribe to this label
287 newRepo := *f
288 newRepo.Labels = append(newRepo.Labels, aturi)
289 repoRecord := newRepo.AsRecord()
290
291 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
292 if err != nil {
293 fail("Failed to update labels, no record found on PDS.", err)
294 return
295 }
296 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
297 Collection: tangled.RepoNSID,
298 Repo: newRepo.Did,
299 Rkey: newRepo.Rkey,
300 SwapRecord: ex.Cid,
301 Record: &lexutil.LexiconTypeDecoder{
302 Val: &repoRecord,
303 },
304 })
305 if err != nil {
306 fail("Failed to update labels for repo.", err)
307 return
308 }
309
310 tx, err := rp.db.BeginTx(r.Context(), nil)
311 if err != nil {
312 fail("Failed to add label.", err)
313 return
314 }
315
316 rollback := func() {
317 err1 := tx.Rollback()
318 err2 := rollbackRecord(context.Background(), aturi, client)
319
320 // ignore txn complete errors, this is okay
321 if errors.Is(err1, sql.ErrTxDone) {
322 err1 = nil
323 }
324
325 if errs := errors.Join(err1, err2); errs != nil {
326 l.Error("failed to rollback changes", "errs", errs)
327 return
328 }
329 }
330 defer rollback()
331
332 _, err = db.AddLabelDefinition(tx, &label)
333 if err != nil {
334 fail("Failed to add label.", err)
335 return
336 }
337
338 if err = db.SubscribeLabel(tx, &models.RepoLabel{
339 RepoDid: syntax.DID(f.RepoDid),
340 LabelAt: label.AtUri(),
341 }); err != nil {
342 fail("Failed to subscribe to label.", err)
343 return
344 }
345
346 err = tx.Commit()
347 if err != nil {
348 fail("Failed to add label.", err)
349 return
350 }
351
352 // clear aturi when everything is successful
353 aturi = ""
354
355 rp.pages.HxRefresh(w)
356}
357
358func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) {
359 user := rp.oauth.GetMultiAccountUser(r)
360 l := rp.logger.With("handler", "DeleteLabel")
361 l = l.With("did", user.Did)
362
363 f, err := rp.repoResolver.Resolve(r)
364 if err != nil {
365 l.Error("failed to get repo and knot", "err", err)
366 return
367 }
368
369 errorId := "label-operation"
370 fail := func(msg string, err error) {
371 l.Error(msg, "err", err)
372 rp.pages.Notice(w, errorId, msg)
373 }
374
375 // get form values
376 labelId := r.FormValue("label-id")
377
378 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId))
379 if err != nil {
380 fail("Failed to find label definition.", err)
381 return
382 }
383
384 client, err := rp.oauth.AuthorizedClient(r)
385 if err != nil {
386 fail(err.Error(), err)
387 return
388 }
389
390 // delete label record from PDS
391 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
392 Collection: tangled.LabelDefinitionNSID,
393 Repo: label.Did,
394 Rkey: label.Rkey,
395 })
396 if err != nil {
397 fail("Failed to delete label record from PDS.", err)
398 return
399 }
400
401 // update repo record to remove the label reference
402 newRepo := *f
403 var updated []string
404 removedAt := label.AtUri().String()
405 for _, l := range newRepo.Labels {
406 if l != removedAt {
407 updated = append(updated, l)
408 }
409 }
410 newRepo.Labels = updated
411 repoRecord := newRepo.AsRecord()
412
413 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
414 if err != nil {
415 fail("Failed to update labels, no record found on PDS.", err)
416 return
417 }
418 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
419 Collection: tangled.RepoNSID,
420 Repo: newRepo.Did,
421 Rkey: newRepo.Rkey,
422 SwapRecord: ex.Cid,
423 Record: &lexutil.LexiconTypeDecoder{
424 Val: &repoRecord,
425 },
426 })
427 if err != nil {
428 fail("Failed to update repo record.", err)
429 return
430 }
431
432 // transaction for DB changes
433 tx, err := rp.db.BeginTx(r.Context(), nil)
434 if err != nil {
435 fail("Failed to delete label.", err)
436 return
437 }
438 defer tx.Rollback()
439
440 err = db.UnsubscribeLabel(
441 tx,
442 orm.FilterEq("repo_did", f.RepoDid),
443 orm.FilterEq("label_at", removedAt),
444 )
445 if err != nil {
446 fail("Failed to unsubscribe label.", err)
447 return
448 }
449
450 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id))
451 if err != nil {
452 fail("Failed to delete label definition.", err)
453 return
454 }
455
456 err = tx.Commit()
457 if err != nil {
458 fail("Failed to delete label.", err)
459 return
460 }
461
462 // everything succeeded
463 rp.pages.HxRefresh(w)
464}
465
466func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) {
467 user := rp.oauth.GetMultiAccountUser(r)
468 l := rp.logger.With("handler", "SubscribeLabel")
469 l = l.With("did", user.Did)
470
471 f, err := rp.repoResolver.Resolve(r)
472 if err != nil {
473 l.Error("failed to get repo and knot", "err", err)
474 return
475 }
476
477 if err := r.ParseForm(); err != nil {
478 l.Error("invalid form", "err", err)
479 return
480 }
481
482 errorId := "default-label-operation"
483 fail := func(msg string, err error) {
484 l.Error(msg, "err", err)
485 rp.pages.Notice(w, errorId, msg)
486 }
487
488 labelAts := r.Form["label"]
489 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
490 if err != nil {
491 fail("Failed to subscribe to label.", err)
492 return
493 }
494
495 newRepo := *f
496 newRepo.Labels = append(newRepo.Labels, labelAts...)
497
498 // dedup
499 slices.Sort(newRepo.Labels)
500 newRepo.Labels = slices.Compact(newRepo.Labels)
501
502 repoRecord := newRepo.AsRecord()
503
504 client, err := rp.oauth.AuthorizedClient(r)
505 if err != nil {
506 fail(err.Error(), err)
507 return
508 }
509
510 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
511 if err != nil {
512 fail("Failed to update labels, no record found on PDS.", err)
513 return
514 }
515 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
516 Collection: tangled.RepoNSID,
517 Repo: newRepo.Did,
518 Rkey: newRepo.Rkey,
519 SwapRecord: ex.Cid,
520 Record: &lexutil.LexiconTypeDecoder{
521 Val: &repoRecord,
522 },
523 })
524
525 tx, err := rp.db.Begin()
526 if err != nil {
527 fail("Failed to subscribe to label.", err)
528 return
529 }
530 defer tx.Rollback()
531
532 for _, l := range labelAts {
533 err = db.SubscribeLabel(tx, &models.RepoLabel{
534 RepoDid: syntax.DID(f.RepoDid),
535 LabelAt: syntax.ATURI(l),
536 })
537 if err != nil {
538 fail("Failed to subscribe to label.", err)
539 return
540 }
541 }
542
543 if err := tx.Commit(); err != nil {
544 fail("Failed to subscribe to label.", err)
545 return
546 }
547
548 // everything succeeded
549 rp.pages.HxRefresh(w)
550}
551
552func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) {
553 user := rp.oauth.GetMultiAccountUser(r)
554 l := rp.logger.With("handler", "UnsubscribeLabel")
555 l = l.With("did", user.Did)
556
557 f, err := rp.repoResolver.Resolve(r)
558 if err != nil {
559 l.Error("failed to get repo and knot", "err", err)
560 return
561 }
562
563 if err := r.ParseForm(); err != nil {
564 l.Error("invalid form", "err", err)
565 return
566 }
567
568 errorId := "default-label-operation"
569 fail := func(msg string, err error) {
570 l.Error(msg, "err", err)
571 rp.pages.Notice(w, errorId, msg)
572 }
573
574 labelAts := r.Form["label"]
575 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
576 if err != nil {
577 fail("Failed to unsubscribe to label.", err)
578 return
579 }
580
581 // update repo record to remove the label reference
582 newRepo := *f
583 var updated []string
584 for _, l := range newRepo.Labels {
585 if !slices.Contains(labelAts, l) {
586 updated = append(updated, l)
587 }
588 }
589 newRepo.Labels = updated
590 repoRecord := newRepo.AsRecord()
591
592 client, err := rp.oauth.AuthorizedClient(r)
593 if err != nil {
594 fail(err.Error(), err)
595 return
596 }
597
598 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
599 if err != nil {
600 fail("Failed to update labels, no record found on PDS.", err)
601 return
602 }
603 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
604 Collection: tangled.RepoNSID,
605 Repo: newRepo.Did,
606 Rkey: newRepo.Rkey,
607 SwapRecord: ex.Cid,
608 Record: &lexutil.LexiconTypeDecoder{
609 Val: &repoRecord,
610 },
611 })
612
613 err = db.UnsubscribeLabel(
614 rp.db,
615 orm.FilterEq("repo_did", f.RepoDid),
616 orm.FilterIn("label_at", labelAts),
617 )
618 if err != nil {
619 fail("Failed to unsubscribe label.", err)
620 return
621 }
622
623 // everything succeeded
624 rp.pages.HxRefresh(w)
625}
626
627func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) {
628 l := rp.logger.With("handler", "LabelPanel")
629
630 f, err := rp.repoResolver.Resolve(r)
631 if err != nil {
632 l.Error("failed to get repo and knot", "err", err)
633 return
634 }
635
636 subjectStr := r.FormValue("subject")
637 subject, err := syntax.ParseATURI(subjectStr)
638 if err != nil {
639 l.Error("failed to get repo and knot", "err", err)
640 return
641 }
642
643 labelDefs, err := db.GetLabelDefinitions(
644 rp.db,
645 orm.FilterIn("at_uri", f.Labels),
646 orm.FilterContains("scope", subject.Collection().String()),
647 )
648 if err != nil {
649 l.Error("failed to fetch label defs", "err", err)
650 return
651 }
652
653 defs := make(map[string]*models.LabelDefinition)
654 for _, l := range labelDefs {
655 defs[l.AtUri().String()] = &l
656 }
657
658 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
659 if err != nil {
660 l.Error("failed to build label state", "err", err)
661 return
662 }
663 state := states[subject]
664
665 user := rp.oauth.GetMultiAccountUser(r)
666 rp.pages.LabelPanel(w, pages.LabelPanelParams{
667 BaseParams: pages.BaseParamsFromContext(r.Context()),
668 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
669 Defs: defs,
670 Subject: subject.String(),
671 State: state,
672 })
673}
674
675func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) {
676 l := rp.logger.With("handler", "EditLabelPanel")
677
678 f, err := rp.repoResolver.Resolve(r)
679 if err != nil {
680 l.Error("failed to get repo and knot", "err", err)
681 return
682 }
683
684 subjectStr := r.FormValue("subject")
685 subject, err := syntax.ParseATURI(subjectStr)
686 if err != nil {
687 l.Error("failed to get repo and knot", "err", err)
688 return
689 }
690
691 labelDefs, err := db.GetLabelDefinitions(
692 rp.db,
693 orm.FilterIn("at_uri", f.Labels),
694 orm.FilterContains("scope", subject.Collection().String()),
695 )
696 if err != nil {
697 l.Error("failed to fetch labels", "err", err)
698 return
699 }
700
701 defs := make(map[string]*models.LabelDefinition)
702 for _, l := range labelDefs {
703 defs[l.AtUri().String()] = &l
704 }
705
706 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
707 if err != nil {
708 l.Error("failed to build label state", "err", err)
709 return
710 }
711 state := states[subject]
712
713 user := rp.oauth.GetMultiAccountUser(r)
714 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{
715 BaseParams: pages.BaseParamsFromContext(r.Context()),
716 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
717 Defs: defs,
718 Subject: subject.String(),
719 State: state,
720 })
721}
722
723func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) {
724 user := rp.oauth.GetMultiAccountUser(r)
725 l := rp.logger.With("handler", "AddCollaborator")
726 l = l.With("did", user.Did)
727
728 f, err := rp.repoResolver.Resolve(r)
729 if err != nil {
730 l.Error("failed to get repo and knot", "err", err)
731 return
732 }
733
734 errorId := "add-collaborator-error"
735 fail := func(msg string, err error) {
736 l.Error(msg, "err", err)
737 rp.pages.Notice(w, errorId, msg)
738 }
739
740 collaborator := r.FormValue("collaborator")
741 if collaborator == "" {
742 fail("Invalid form.", nil)
743 return
744 }
745
746 // remove a single leading `@`, to make @handle work with ResolveIdent
747 collaborator = strings.TrimPrefix(collaborator, "@")
748
749 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
750 if err != nil {
751 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
752 return
753 }
754
755 if collaboratorIdent.DID.String() == user.Did {
756 fail("You seem to be adding yourself as a collaborator.", nil)
757 return
758 }
759 l = l.With("collaborator", collaboratorIdent.Handle)
760 l = l.With("knot", f.Knot)
761
762 capStatus := knotcompat.KnotCapability(r.Context(), f.Knot, rp.config.Core.Dev, consts.CapKnotACL)
763 if capStatus == knotcompat.CapUnknown {
764 fail("Could not reach the knot to add the collaborator. Try again later.", nil)
765 return
766 }
767 if capStatus == knotcompat.CapPresent {
768 if f.RepoDid == "" {
769 fail("This repository is missing its DID and cannot manage collaborators.", nil)
770 return
771 }
772
773 client, err := rp.oauth.ServiceClient(
774 r,
775 oauth.WithService(f.Knot),
776 oauth.WithLxm(tangled.RepoAddCollaboratorNSID),
777 oauth.WithDev(rp.config.Core.Dev),
778 )
779 if err != nil {
780 fail("Failed to connect to knot server.", err)
781 return
782 }
783
784 err = tangled.RepoAddCollaborator(r.Context(), client, &tangled.RepoAddCollaborator_Input{
785 Repo: f.RepoDid,
786 Subject: collaboratorIdent.DID.String(),
787 })
788 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
789 l.Error("failed to call XRPC repo.addCollaborator", "xrpcerr", xrpcerr, "err", err)
790 rp.pages.Notice(w, errorId, xrpcerr.Error())
791 return
792 }
793
794 rp.acl.InvalidateCollaborators(f.Knot, f.RepoDid)
795
796 rp.pages.HxRefresh(w)
797 return
798 }
799
800 existing, err := db.GetCollaborators(rp.db,
801 orm.FilterEq("repo_did", f.RepoDid),
802 orm.FilterEq("subject_did", collaboratorIdent.DID.String()),
803 )
804 if err != nil {
805 fail("Failed to check existing collaborators.", err)
806 return
807 }
808 if len(existing) > 0 {
809 fail(fmt.Sprintf("%s is already a collaborator.", collaboratorIdent.Handle), nil)
810 return
811 }
812
813 // announce this relation into the firehose, store into owners' pds
814 client, err := rp.oauth.AuthorizedClient(r)
815 if err != nil {
816 fail("Failed to write to PDS.", err)
817 return
818 }
819
820 // emit a record
821 currentUser := rp.oauth.GetMultiAccountUser(r)
822 rkey := tid.TID()
823 createdAt := time.Now()
824 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
825 Collection: tangled.RepoCollaboratorNSID,
826 Repo: currentUser.Did,
827 Rkey: rkey,
828 Record: knotcompat.Collaborator(repoCollaboratorRecord(f, collaboratorIdent.DID.String(), createdAt)),
829 })
830 // invalid record
831 if err != nil {
832 fail("Failed to write record to PDS.", err)
833 return
834 }
835
836 aturi := resp.Uri
837 l = l.With("at-uri", aturi)
838 l.Info("wrote record to PDS")
839
840 tx, err := rp.db.BeginTx(r.Context(), nil)
841 if err != nil {
842 fail("Failed to add collaborator.", err)
843 return
844 }
845
846 rollback := func() {
847 err1 := tx.Rollback()
848 err2 := rp.enforcer.E.LoadPolicy()
849 err3 := rollbackRecord(context.Background(), aturi, client)
850
851 // ignore txn complete errors, this is okay
852 if errors.Is(err1, sql.ErrTxDone) {
853 err1 = nil
854 }
855
856 if errs := errors.Join(err1, err2, err3); errs != nil {
857 l.Error("failed to rollback changes", "errs", errs)
858 return
859 }
860 }
861 defer rollback()
862
863 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier())
864 if err != nil {
865 fail("Failed to add collaborator permissions.", err)
866 return
867 }
868
869 err = db.AddCollaborator(tx, models.Collaborator{
870 Did: syntax.DID(currentUser.Did),
871 Rkey: sql.NullString{String: rkey, Valid: true},
872 SubjectDid: collaboratorIdent.DID,
873 RepoDid: syntax.DID(f.RepoDid),
874 Created: createdAt,
875 })
876 if err != nil {
877 fail("Failed to add collaborator.", err)
878 return
879 }
880
881 err = tx.Commit()
882 if err != nil {
883 fail("Failed to add collaborator.", err)
884 return
885 }
886
887 err = rp.enforcer.E.SavePolicy()
888 if err != nil {
889 fail("Failed to update collaborator permissions.", err)
890 return
891 }
892
893 // clear aturi to when everything is successful
894 aturi = ""
895
896 rp.pages.HxRefresh(w)
897}
898
899func (rp *Repo) RemoveCollaborator(w http.ResponseWriter, r *http.Request) {
900 user := rp.oauth.GetMultiAccountUser(r)
901 l := rp.logger.With("handler", "RemoveCollaborator")
902 l = l.With("did", user.Did)
903
904 f, err := rp.repoResolver.Resolve(r)
905 if err != nil {
906 l.Error("failed to get repo and knot", "err", err)
907 return
908 }
909
910 errorId := "collaborator-error"
911 fail := func(msg string, err error) {
912 l.Error(msg, "err", err)
913 rp.pages.Notice(w, errorId, msg)
914 }
915
916 collaborator := r.FormValue("collaborator")
917 if collaborator == "" {
918 fail("Invalid form.", nil)
919 return
920 }
921 collaborator = strings.TrimPrefix(collaborator, "@")
922
923 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
924 if err != nil {
925 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
926 return
927 }
928 l = l.With("collaborator", collaboratorIdent.Handle, "knot", f.Knot)
929
930 if collaboratorIdent.DID.String() == f.Did {
931 fail("Cannot remove the repository owner.", nil)
932 return
933 }
934
935 capStatus := knotcompat.KnotCapability(r.Context(), f.Knot, rp.config.Core.Dev, consts.CapKnotACL)
936 if capStatus == knotcompat.CapUnknown {
937 fail("Could not reach the knot to remove the collaborator. Try again later.", nil)
938 return
939 }
940 if capStatus == knotcompat.CapPresent {
941 if f.RepoDid == "" {
942 fail("This repository is missing its DID and cannot manage collaborators.", nil)
943 return
944 }
945
946 client, err := rp.oauth.ServiceClient(
947 r,
948 oauth.WithService(f.Knot),
949 oauth.WithLxm(tangled.RepoRemoveCollaboratorNSID),
950 oauth.WithDev(rp.config.Core.Dev),
951 )
952 if err != nil {
953 fail("Failed to connect to knot server.", err)
954 return
955 }
956
957 err = tangled.RepoRemoveCollaborator(r.Context(), client, &tangled.RepoRemoveCollaborator_Input{
958 Repo: f.RepoDid,
959 Subject: collaboratorIdent.DID.String(),
960 })
961 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
962 l.Error("failed to call XRPC repo.removeCollaborator", "xrpcerr", xrpcerr, "err", err)
963 rp.pages.Notice(w, errorId, xrpcerr.Error())
964 return
965 }
966
967 rp.acl.InvalidateCollaborators(f.Knot, f.RepoDid)
968
969 rp.pages.HxRefresh(w)
970 return
971 }
972
973 existing, err := db.GetCollaborators(rp.db,
974 orm.FilterEq("repo_did", f.RepoDid),
975 orm.FilterEq("subject_did", collaboratorIdent.DID.String()),
976 )
977 if err != nil {
978 fail("Failed to look up collaborator.", err)
979 return
980 }
981 if len(existing) == 0 {
982 fail(fmt.Sprintf("%s is not a collaborator.", collaboratorIdent.Handle), nil)
983 return
984 }
985 row := existing[0]
986
987 client, err := rp.oauth.AuthorizedClient(r)
988 if err != nil {
989 fail("Failed to write to PDS.", err)
990 return
991 }
992
993 tx, err := rp.db.BeginTx(r.Context(), nil)
994 if err != nil {
995 fail("Failed to remove collaborator.", err)
996 return
997 }
998 committed := false
999 defer func() {
1000 if !committed {
1001 tx.Rollback()
1002 if err := rp.enforcer.E.LoadPolicy(); err != nil {
1003 l.Error("failed to reload policy after rollback", "err", err)
1004 }
1005 }
1006 }()
1007
1008 if err := rp.enforcer.RemoveCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier()); err != nil {
1009 fail("Failed to remove collaborator permissions.", err)
1010 return
1011 }
1012
1013 if err := db.DeleteCollaborator(tx,
1014 orm.FilterEq("repo_did", f.RepoDid),
1015 orm.FilterEq("subject_did", collaboratorIdent.DID.String()),
1016 ); err != nil {
1017 fail("Failed to remove collaborator.", err)
1018 return
1019 }
1020
1021 if row.Rkey.Valid && row.Rkey.String != "" {
1022 if _, err := comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
1023 Collection: tangled.RepoCollaboratorNSID,
1024 Repo: row.Did.String(),
1025 Rkey: row.Rkey.String,
1026 }); err != nil {
1027 fail("Failed to delete collaborator record from PDS.", err)
1028 return
1029 }
1030 }
1031
1032 if err := tx.Commit(); err != nil {
1033 fail("Failed to remove collaborator.", err)
1034 return
1035 }
1036 committed = true
1037
1038 if err := rp.enforcer.E.SavePolicy(); err != nil {
1039 fail("Failed to update collaborator permissions.", err)
1040 return
1041 }
1042
1043 rp.pages.HxRefresh(w)
1044}
1045
1046func (rp *Repo) RenameRepo(w http.ResponseWriter, r *http.Request) {
1047 l := rp.logger.With("handler", "RenameRepo")
1048 noticeId := "rename-repo-error"
1049
1050 user := rp.oauth.GetMultiAccountUser(r)
1051 f, err := rp.repoResolver.Resolve(r)
1052 if err != nil {
1053 l.Error("failed to get repo and knot", "err", err)
1054 rp.pages.Notice(w, noticeId, "Failed to load repository.")
1055 return
1056 }
1057 l = l.With("did", user.Did, "rkey", f.Rkey, "oldName", f.Name)
1058
1059 if f.RepoDid == "" {
1060 rp.pages.Notice(w, noticeId, "This repository's knot has not completed the DID migration; rename is unavailable.")
1061 return
1062 }
1063
1064 if !knotcompat.KnotSupports114(r.Context(), f.Knot, rp.config.Core.Dev) {
1065 rp.pages.Notice(w, noticeId, "This repository's knot is below v1.14 and does not yet support renames. Ask the knot operator to upgrade.")
1066 return
1067 }
1068
1069 newName, err := validateRenameInput(f.Name, f.Rkey, r.FormValue("name"))
1070 if err != nil {
1071 rp.pages.Notice(w, noticeId, err.Error())
1072 return
1073 }
1074 newRkey := strings.ToLower(newName)
1075 l = l.With("newName", newName, "newRkey", newRkey)
1076
1077 atpClient, err := rp.oauth.AuthorizedClient(r)
1078 if err != nil {
1079 l.Error("failed to get authorized client", "err", err)
1080 rp.pages.Notice(w, noticeId, "Failed to authorize. Try again later.")
1081 return
1082 }
1083
1084 newRepo := *f
1085 newRepo.Name = newName
1086 newRepo.Rkey = newRkey
1087 newRepo.Created = time.Now()
1088 record := newRepo.AsRecord()
1089
1090 if newRkey == f.Rkey {
1091 ex, err := comatproto.RepoGetRecord(r.Context(), atpClient, "", tangled.RepoNSID, f.Did, f.Rkey)
1092 if err != nil {
1093 l.Error("failed to fetch existing record", "err", err)
1094 rp.pages.Notice(w, noticeId, "Failed to read repository record from PDS.")
1095 return
1096 }
1097
1098 _, err = comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1099 Collection: tangled.RepoNSID,
1100 Repo: f.Did,
1101 Rkey: f.Rkey,
1102 SwapRecord: ex.Cid,
1103 Record: &lexutil.LexiconTypeDecoder{
1104 Val: &record,
1105 },
1106 })
1107 if err != nil {
1108 l.Error("failed to update display name on PDS", "err", err)
1109 rp.pages.Notice(w, noticeId, "Failed to save display name to PDS.")
1110 return
1111 }
1112 l.Info("updated display name on PDS")
1113
1114 if err := db.UpdateRepoDisplayName(rp.db, f.Did, f.Rkey, newName); err != nil {
1115 l.Error("optimistic display name update failed", "err", err)
1116 }
1117 } else {
1118 ex, getErr := comatproto.RepoGetRecord(r.Context(), atpClient, "", tangled.RepoNSID, f.Did, newRkey)
1119 switch {
1120 case getErr != nil:
1121 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
1122 Collection: tangled.RepoNSID,
1123 Repo: f.Did,
1124 Rkey: &newRkey,
1125 Record: &lexutil.LexiconTypeDecoder{Val: &record},
1126 })
1127 if err != nil {
1128 l.Error("failed to write rename to PDS", "err", err)
1129 rp.pages.Notice(w, noticeId, "Failed to save renamed repository to PDS.")
1130 return
1131 }
1132 l.Info("wrote rename-create to PDS; old record retained as alias")
1133
1134 default:
1135 existing, ok := ex.Value.Val.(*tangled.Repo)
1136 if !ok || existing.RepoDid == nil || *existing.RepoDid != f.RepoDid {
1137 rp.pages.Notice(w, noticeId, fmt.Sprintf("You already have a repository named %q.", newRkey))
1138 return
1139 }
1140 _, err = comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1141 Collection: tangled.RepoNSID,
1142 Repo: f.Did,
1143 Rkey: newRkey,
1144 SwapRecord: ex.Cid,
1145 Record: &lexutil.LexiconTypeDecoder{Val: &record},
1146 })
1147 if err != nil {
1148 l.Error("failed to rewrite rename-back record on PDS", "err", err)
1149 rp.pages.Notice(w, noticeId, "Failed to save renamed repository to PDS.")
1150 return
1151 }
1152 l.Info("rewrote rename-back record on PDS over prior alias")
1153 }
1154
1155 tx, err := rp.db.Begin()
1156 if err != nil {
1157 l.Error("failed to begin rename tx", "err", err)
1158 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid))
1159 return
1160 }
1161 defer tx.Rollback()
1162
1163 if err := db.RenameRepo(tx, f.Did, f.Rkey, newRkey, newName); err != nil {
1164 l.Error("optimistic rename failed", "err", err)
1165 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid))
1166 return
1167 }
1168 if err := db.RecordRepoRename(tx, f.Did, f.Rkey, f.RepoDid); err != nil {
1169 l.Error("failed to record rename history", "err", err)
1170 }
1171 if err := db.DeleteRepoRename(tx, f.Did, newRkey); err != nil {
1172 l.Error("failed to clear stale rename hint", "err", err)
1173 }
1174 if err := tx.Commit(); err != nil {
1175 l.Error("failed to commit rename tx", "err", err)
1176 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid))
1177 return
1178 }
1179 }
1180
1181 oldRepo := *f
1182 rp.notifier.RenameRepo(r.Context(), syntax.DID(user.Did), &oldRepo, &newRepo)
1183
1184 if newRkey != f.Rkey {
1185 rp.migrateSiteOnRename(r.Context(), f, newName, newRkey)
1186 }
1187
1188 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid))
1189}
1190
1191func validateRenameInput(currentName, currentRkey, raw string) (string, error) {
1192 newName := strings.TrimSpace(raw)
1193 if newName == "" {
1194 return "", errors.New("Repository name cannot be empty.")
1195 }
1196 if err := models.ValidateRepoName(newName); err != nil {
1197 return "", err
1198 }
1199 newName = models.StripGitExt(newName)
1200 if newName == currentName {
1201 if _, tidErr := syntax.ParseTID(currentRkey); tidErr == nil {
1202 return newName, nil
1203 }
1204 return "", errors.New("New name matches the current name.")
1205 }
1206 return newName, nil
1207}
1208
1209func (rp *Repo) migrateSiteOnRename(ctx context.Context, oldRepo *models.Repo, newName, newRkey string) {
1210 l := rp.logger.With("handler", "migrateSiteOnRename", "repo_did", oldRepo.RepoDid)
1211
1212 siteConfig, err := db.GetRepoSiteConfig(rp.db, oldRepo.RepoDid)
1213 if err != nil || siteConfig == nil {
1214 return
1215 }
1216
1217 if !rp.cfClient.Enabled() {
1218 return
1219 }
1220
1221 ownerClaim, _ := db.GetActiveDomainClaimForDid(rp.db, oldRepo.Did)
1222
1223 go func() {
1224 bgCtx := context.Background()
1225 oldRkey := oldRepo.Rkey
1226 oldName := oldRepo.Name
1227
1228 if err := sites.Delete(bgCtx, rp.cfClient, oldRepo.Did, oldRkey); err != nil {
1229 l.Error("sites: failed to delete old R2 prefix", "oldRkey", oldRkey, "err", err)
1230 }
1231
1232 newRepo := *oldRepo
1233 newRepo.Name = newName
1234 newRepo.Rkey = newRkey
1235 if deployErr := sites.Deploy(bgCtx, rp.cfClient, rp.config, &newRepo, siteConfig.Branch, siteConfig.Dir); deployErr != nil {
1236 l.Error("sites: redeploy after rename failed", "err", deployErr)
1237 }
1238
1239 if ownerClaim != nil {
1240 // drop the old name's entry when the name actually changed.
1241 if oldName != newName {
1242 if err := sites.DeleteDomainMapping(bgCtx, rp.cfClient, ownerClaim.Domain, oldName); err != nil {
1243 l.Error("sites: failed to remove old KV mapping", "oldName", oldName, "err", err)
1244 }
1245 }
1246 if err := sites.PutDomainMapping(bgCtx, rp.cfClient, ownerClaim.Domain, oldRepo.Did, newName, newRkey, siteConfig.IsIndex); err != nil {
1247 l.Error("sites: failed to write new KV mapping", "newName", newName, "newRkey", newRkey, "err", err)
1248 }
1249 }
1250
1251 l.Info("sites: migrated on rename", "oldName", oldName, "oldRkey", oldRkey, "newName", newName, "newRkey", newRkey)
1252 }()
1253}
1254
1255func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) {
1256 user := rp.oauth.GetMultiAccountUser(r)
1257 l := rp.logger.With("handler", "DeleteRepo")
1258
1259 noticeId := "operation-error"
1260 f, err := rp.repoResolver.Resolve(r)
1261 if err != nil {
1262 l.Error("failed to get repo and knot", "err", err)
1263 return
1264 }
1265
1266 // remove record from pds
1267 atpClient, err := rp.oauth.AuthorizedClient(r)
1268 if err != nil {
1269 l.Error("failed to get authorized client", "err", err)
1270 return
1271 }
1272 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{
1273 Collection: tangled.RepoNSID,
1274 Repo: user.Did,
1275 Rkey: f.Rkey,
1276 })
1277 if err != nil {
1278 l.Error("failed to delete record", "err", err)
1279 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.")
1280 return
1281 }
1282 l.Info("removed repo record", "aturi", f.RepoAt().String())
1283
1284 client, err := rp.oauth.ServiceClient(
1285 r,
1286 oauth.WithService(f.Knot),
1287 oauth.WithLxm(tangled.RepoDeleteNSID),
1288 oauth.WithDev(rp.config.Core.Dev),
1289 )
1290 if err != nil {
1291 l.Error("failed to connect to knot server", "err", err)
1292 return
1293 }
1294
1295 err = tangled.RepoDelete(
1296 r.Context(),
1297 client,
1298 &tangled.RepoDelete_Input{
1299 Did: f.Did,
1300 Name: f.Name,
1301 Rkey: f.Rkey,
1302 },
1303 )
1304 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
1305 l.Error("failed to call XRPC repo.delete", "xrpcerr", xrpcerr, "err", err)
1306 rp.pages.Notice(w, noticeId, xrpcerr.Error())
1307 return
1308 }
1309 l.Info("deleted repo from knot")
1310
1311 tx, err := rp.db.BeginTx(r.Context(), nil)
1312 if err != nil {
1313 l.Error("failed to start tx")
1314 w.Write(fmt.Append(nil, "failed to add collaborator: ", err))
1315 return
1316 }
1317 defer func() {
1318 tx.Rollback()
1319 err = rp.enforcer.E.LoadPolicy()
1320 if err != nil {
1321 l.Error("failed to rollback policies")
1322 }
1323 }()
1324
1325 // remove collaborator RBAC
1326 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.RepoIdentifier(), f.Knot)
1327 if err != nil {
1328 rp.pages.Notice(w, noticeId, "Failed to remove collaborators")
1329 return
1330 }
1331 for _, c := range repoCollaborators {
1332 did := c[0]
1333 rp.enforcer.RemoveCollaborator(did, f.Knot, f.RepoIdentifier())
1334 }
1335 l.Info("removed collaborators")
1336
1337 // remove repo RBAC
1338 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.RepoIdentifier())
1339 if err != nil {
1340 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules")
1341 return
1342 }
1343
1344 // remove repo from db
1345 err = db.RemoveRepo(tx, f.Did, f.Rkey)
1346 if err != nil {
1347 rp.pages.Notice(w, noticeId, "Failed to update appview")
1348 return
1349 }
1350 l.Info("removed repo from db")
1351
1352 err = tx.Commit()
1353 if err != nil {
1354 l.Error("failed to commit changes", "err", err)
1355 http.Error(w, err.Error(), http.StatusInternalServerError)
1356 return
1357 }
1358
1359 err = rp.enforcer.E.SavePolicy()
1360 if err != nil {
1361 l.Error("failed to update ACLs", "err", err)
1362 http.Error(w, err.Error(), http.StatusInternalServerError)
1363 return
1364 }
1365
1366 rp.notifier.DeleteRepo(r.Context(), f)
1367 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did))
1368}
1369
1370func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) {
1371 l := rp.logger.With("handler", "SyncRepoFork")
1372
1373 ref := chi.URLParam(r, "ref")
1374 ref, _ = url.PathUnescape(ref)
1375
1376 user := rp.oauth.GetMultiAccountUser(r)
1377 f, err := rp.repoResolver.Resolve(r)
1378 if err != nil {
1379 l.Error("failed to resolve source repo", "err", err)
1380 return
1381 }
1382
1383 switch r.Method {
1384 case http.MethodPost:
1385 client, err := rp.oauth.ServiceClient(
1386 r,
1387 oauth.WithService(f.Knot),
1388 oauth.WithLxm(tangled.RepoForkSyncNSID),
1389 oauth.WithDev(rp.config.Core.Dev),
1390 )
1391 if err != nil {
1392 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1393 return
1394 }
1395
1396 if f.Source == "" {
1397 rp.pages.Notice(w, "repo", "This repository is not a fork.")
1398 return
1399 }
1400
1401 err = tangled.RepoForkSync(
1402 r.Context(),
1403 client,
1404 &tangled.RepoForkSync_Input{
1405 Did: user.Did,
1406 Name: f.Name,
1407 Source: f.Source,
1408 Branch: ref,
1409 },
1410 )
1411 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
1412 l.Error("failed to call XRPC repo.forkSync", "xrpcerr", xrpcerr, "err", err)
1413 rp.pages.Notice(w, "repo", err.Error())
1414 return
1415 }
1416
1417 rp.pages.HxRefresh(w)
1418 return
1419 }
1420}
1421
1422func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) {
1423 l := rp.logger.With("handler", "ForkRepo")
1424
1425 user := rp.oauth.GetMultiAccountUser(r)
1426 f, err := rp.repoResolver.Resolve(r)
1427 if err != nil {
1428 l.Error("failed to resolve source repo", "err", err)
1429 return
1430 }
1431
1432 switch r.Method {
1433 case http.MethodGet:
1434 user := rp.oauth.GetMultiAccountUser(r)
1435 knots := rp.acl.KnotsForUser(r.Context(), user.Did)
1436
1437 rp.pages.ForkRepo(w, pages.ForkRepoParams{
1438 BaseParams: pages.BaseParamsFromContext(r.Context()),
1439 Knots: knots,
1440 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1441 })
1442
1443 case http.MethodPost:
1444 l := rp.logger.With("handler", "ForkRepo")
1445
1446 targetKnot := r.FormValue("knot")
1447 if targetKnot == "" {
1448 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
1449 return
1450 }
1451 l = l.With("targetKnot", targetKnot)
1452
1453 if !rp.acl.IsRepoCreateAllowed(r.Context(), targetKnot, user.Did) {
1454 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
1455 return
1456 }
1457
1458 // choose a name for a fork
1459 forkName := strings.ToLower(r.FormValue("repo_name"))
1460 if forkName == "" {
1461 rp.pages.Notice(w, "repo", "Repository name cannot be empty.")
1462 return
1463 }
1464
1465 // this check is *only* to see if the forked repo name already exists
1466 // in the user's account.
1467 existingRepo, err := db.GetRepo(
1468 rp.db,
1469 orm.FilterEq("did", user.Did),
1470 orm.FilterEq("name", forkName),
1471 )
1472 if err != nil {
1473 if !errors.Is(err, sql.ErrNoRows) {
1474 l.Error("error fetching existing repo from db", "err", err)
1475 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.")
1476 return
1477 }
1478 } else if existingRepo != nil {
1479 // repo with this name already exists
1480 rp.pages.Notice(w, "repo", "A repository with this name already exists.")
1481 return
1482 }
1483 l = l.With("forkName", forkName)
1484
1485 uri := "https"
1486 if rp.config.Core.Dev {
1487 uri = "http"
1488 }
1489
1490 forkSourceUrl := fmt.Sprintf("%s://%s/%s", uri, f.Knot, f.RepoIdentifier())
1491 l = l.With("cloneUrl", forkSourceUrl)
1492
1493 rkey := strings.ToLower(forkName)
1494
1495 // TODO: this could coordinate better with the knot to receive a clone status
1496 client, err := rp.oauth.ServiceClient(
1497 r,
1498 oauth.WithService(targetKnot),
1499 oauth.WithLxm(tangled.RepoCreateNSID),
1500 oauth.WithDev(rp.config.Core.Dev),
1501 oauth.WithTimeout(time.Second*20),
1502 )
1503 if err != nil {
1504 l.Error("could not create service client", "err", err)
1505 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1506 return
1507 }
1508
1509 forkInput := &tangled.RepoCreate_Input{
1510 Rkey: rkey,
1511 Name: rkey,
1512 Source: &forkSourceUrl,
1513 }
1514 createResp, err := tangled.RepoCreate(
1515 r.Context(),
1516 client,
1517 forkInput,
1518 )
1519 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
1520 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
1521 rp.pages.Notice(w, "repo", xrpcerr.Error())
1522 return
1523 }
1524
1525 var repoDid string
1526 if createResp != nil && createResp.RepoDid != nil {
1527 repoDid = *createResp.RepoDid
1528 }
1529 if repoDid == "" {
1530 l.Error("knot returned empty repo DID for fork")
1531 rp.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
1532 return
1533 }
1534
1535 forkSource := f.RepoAt().String()
1536 if f.RepoDid != "" {
1537 forkSource = f.RepoDid
1538 }
1539
1540 forkDescription := r.Form.Get("description")
1541
1542 repo := &models.Repo{
1543 Did: user.Did,
1544 Name: rkey,
1545 Knot: targetKnot,
1546 Rkey: rkey,
1547 Source: forkSource,
1548 Description: forkDescription,
1549 Created: time.Now(),
1550 Labels: rp.config.Label.DefaultLabelDefs,
1551 RepoDid: repoDid,
1552 }
1553 record := repo.AsRecord()
1554
1555 cleanupKnot := func() {
1556 go func() {
1557 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
1558 for attempt, delay := range delays {
1559 time.Sleep(delay)
1560 deleteClient, dErr := rp.oauth.ServiceClient(
1561 r,
1562 oauth.WithService(targetKnot),
1563 oauth.WithLxm(tangled.RepoDeleteNSID),
1564 oauth.WithDev(rp.config.Core.Dev),
1565 )
1566 if dErr != nil {
1567 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
1568 continue
1569 }
1570 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
1571 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
1572 Did: user.Did,
1573 Name: forkName,
1574 Rkey: rkey,
1575 }); dErr != nil {
1576 cancel()
1577 l.Error("failed to clean up fork on knot after rollback", "attempt", attempt+1, "err", dErr)
1578 continue
1579 }
1580 cancel()
1581 l.Info("successfully cleaned up fork on knot after rollback", "attempt", attempt+1)
1582 return
1583 }
1584 l.Error("exhausted retries for knot cleanup, fork may be orphaned",
1585 "did", user.Did, "fork", forkName, "knot", targetKnot)
1586 }()
1587 }
1588
1589 atpClient, err := rp.oauth.AuthorizedClient(r)
1590 if err != nil {
1591 l.Error("failed to create xrpcclient", "err", err)
1592 cleanupKnot()
1593 rp.pages.Notice(w, "repo", "Failed to fork repository.")
1594 return
1595 }
1596
1597 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1598 Collection: tangled.RepoNSID,
1599 Repo: user.Did,
1600 Rkey: rkey,
1601 Record: &lexutil.LexiconTypeDecoder{
1602 Val: &record,
1603 },
1604 })
1605 if err != nil {
1606 l.Error("failed to write to PDS", "err", err)
1607 cleanupKnot()
1608 rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
1609 return
1610 }
1611
1612 aturi := atresp.Uri
1613 l = l.With("aturi", aturi)
1614 l.Info("wrote to PDS")
1615
1616 tx, err := rp.db.BeginTx(r.Context(), nil)
1617 if err != nil {
1618 l.Info("txn failed", "err", err)
1619 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1620 return
1621 }
1622
1623 rollback := func() {
1624 err1 := tx.Rollback()
1625 err2 := rp.enforcer.E.LoadPolicy()
1626 err3 := rollbackRecord(context.Background(), aturi, atpClient)
1627
1628 if errors.Is(err1, sql.ErrTxDone) {
1629 err1 = nil
1630 }
1631
1632 if errs := errors.Join(err1, err2, err3); errs != nil {
1633 l.Error("failed to rollback changes", "errs", errs)
1634 }
1635
1636 if aturi != "" {
1637 cleanupKnot()
1638 }
1639 }
1640 defer rollback()
1641
1642 err = db.AddRepo(tx, repo)
1643 if err != nil {
1644 l.Error("failed to AddRepo", "err", err)
1645 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1646 return
1647 }
1648
1649 rbacPath := repo.RepoIdentifier()
1650 err = rp.enforcer.AddRepo(user.Did, targetKnot, rbacPath)
1651 if err != nil {
1652 l.Error("failed to add ACLs", "err", err)
1653 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
1654 return
1655 }
1656
1657 err = tx.Commit()
1658 if err != nil {
1659 l.Error("failed to commit changes", "err", err)
1660 http.Error(w, err.Error(), http.StatusInternalServerError)
1661 return
1662 }
1663
1664 err = rp.enforcer.E.SavePolicy()
1665 if err != nil {
1666 l.Error("failed to update ACLs", "err", err)
1667 http.Error(w, err.Error(), http.StatusInternalServerError)
1668 return
1669 }
1670
1671 aturi = ""
1672
1673 rp.notifier.NewRepo(r.Context(), repo)
1674 if repoDid != "" {
1675 rp.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
1676 } else {
1677 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, forkName))
1678 }
1679 }
1680}
1681
1682func (rp *Repo) Stars(w http.ResponseWriter, r *http.Request) {
1683 l := rp.logger.With("handler", "Stars")
1684
1685 user := rp.oauth.GetMultiAccountUser(r)
1686 f, err := rp.repoResolver.Resolve(r)
1687 if err != nil {
1688 l.Error("failed to resolve source repo", "err", err)
1689 return
1690 }
1691
1692 page := pagination.FromContext(r.Context())
1693 if page.Limit > 30 || page.Limit <= 0 {
1694 page.Limit = 30
1695 }
1696
1697 starrers, err := db.GetStars(rp.db, string(f.RepoDid), page)
1698 if err != nil {
1699 l.Error("failed to fetch starrers", "err", err, "repoDid", f.RepoDid)
1700 return
1701 }
1702
1703 totalCount, err := db.GetStarCount(rp.db, models.StarSubjectRepo, string(f.RepoDid))
1704 if err != nil {
1705 l.Error("failed to fetch star count", "err", err, "repoDid", f.RepoDid)
1706 return
1707 }
1708
1709 rp.pages.RepoStars(w, pages.RepoStarsParams{
1710 BaseParams: pages.BaseParamsFromContext(r.Context()),
1711 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1712 Starrers: starrers,
1713 Page: page,
1714 TotalCount: totalCount,
1715 })
1716}
1717
1718func (rp *Repo) Forks(w http.ResponseWriter, r *http.Request) {
1719 l := rp.logger.With("handler", "Forks")
1720
1721 user := rp.oauth.GetMultiAccountUser(r)
1722 f, err := rp.repoResolver.Resolve(r)
1723 if err != nil {
1724 l.Error("failed to resolve source repo", "err", err)
1725 return
1726 }
1727
1728 var forks []models.Repo
1729 totalCount := 0
1730 page := pagination.FromContext(r.Context())
1731 if f.RepoDid != "" {
1732 forks, err = db.GetReposPaginated(rp.db, page, orm.FilterEq("source", f.RepoDid))
1733 if err != nil {
1734 l.Error("failed to fetch forks", "err", err, "repoAt", f.RepoAt())
1735 return
1736 }
1737
1738 totalCount, err = db.GetForkCount(rp.db, f.RepoDid)
1739 if err != nil {
1740 l.Error("failed to fetch fork count", "err", err, "repoAt", f.RepoAt())
1741 return
1742 }
1743 }
1744
1745 err = rp.pages.RepoForks(w, pages.RepoForksParams{
1746 BaseParams: pages.BaseParamsFromContext(r.Context()),
1747 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1748 Forks: forks,
1749 Page: page,
1750 TotalCount: totalCount,
1751 })
1752 if err != nil {
1753 l.Error("failed to render page", "err", err)
1754 }
1755}
1756
1757// this is used to rollback changes made to the PDS
1758//
1759// it is a no-op if the provided ATURI is empty
1760func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
1761 if aturi == "" {
1762 return nil
1763 }
1764
1765 parsed := syntax.ATURI(aturi)
1766
1767 collection := parsed.Collection().String()
1768 repo := parsed.Authority().String()
1769 rkey := parsed.RecordKey().String()
1770
1771 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
1772 Collection: collection,
1773 Repo: repo,
1774 Rkey: rkey,
1775 })
1776 return err
1777}
1778
1779func repoCollaboratorRecord(f *models.Repo, subject string, createdAt time.Time) *tangled.RepoCollaborator {
1780 return &tangled.RepoCollaborator{
1781 Subject: subject,
1782 CreatedAt: createdAt.Format(time.RFC3339),
1783 Repo: f.RepoDid,
1784 }
1785}