Monorepo for Tangled
tangled.org
1package repo
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "slices"
12 "strings"
13 "time"
14
15 "tangled.org/core/appview/cloudflare"
16
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/knotacl"
21 "tangled.org/core/appview/knotcompat"
22 "tangled.org/core/appview/models"
23 "tangled.org/core/appview/notify"
24 "tangled.org/core/appview/oauth"
25 "tangled.org/core/appview/pages"
26 "tangled.org/core/appview/pagination"
27 "tangled.org/core/appview/reporesolver"
28 "tangled.org/core/appview/sites"
29 "tangled.org/core/appview/validator"
30 xrpcclient "tangled.org/core/appview/xrpcclient"
31 "tangled.org/core/consts"
32 "tangled.org/core/eventconsumer"
33 "tangled.org/core/idresolver"
34 "tangled.org/core/ogre"
35 "tangled.org/core/orm"
36 "tangled.org/core/rbac"
37 "tangled.org/core/tid"
38 "tangled.org/core/xrpc/serviceauth"
39
40 comatproto "github.com/bluesky-social/indigo/api/atproto"
41 "github.com/bluesky-social/indigo/atproto/atclient"
42 "github.com/bluesky-social/indigo/atproto/syntax"
43 lexutil "github.com/bluesky-social/indigo/lex/util"
44
45 "github.com/go-chi/chi/v5"
46)
47
48type Repo struct {
49 repoResolver *reporesolver.RepoResolver
50 idResolver *idresolver.Resolver
51 config *config.Config
52 oauth *oauth.OAuth
53 pages *pages.Pages
54 spindlestream *eventconsumer.Consumer
55 db *db.DB
56 enforcer *rbac.Enforcer
57 acl *knotacl.Service
58 notifier notify.Notifier
59 logger *slog.Logger
60 serviceAuth *serviceauth.ServiceAuth
61 validator *validator.Validator
62 cfClient *cloudflare.Client
63 ogreClient *ogre.Client
64}
65
66func New(
67 oauth *oauth.OAuth,
68 repoResolver *reporesolver.RepoResolver,
69 pages *pages.Pages,
70 spindlestream *eventconsumer.Consumer,
71 idResolver *idresolver.Resolver,
72 db *db.DB,
73 config *config.Config,
74 notifier notify.Notifier,
75 enforcer *rbac.Enforcer,
76 acl *knotacl.Service,
77 logger *slog.Logger,
78 validator *validator.Validator,
79 cfClient *cloudflare.Client,
80) *Repo {
81 return &Repo{
82 oauth: oauth,
83 repoResolver: repoResolver,
84 pages: pages,
85 idResolver: idResolver,
86 config: config,
87 spindlestream: spindlestream,
88 db: db,
89 notifier: notifier,
90 enforcer: enforcer,
91 acl: acl,
92 logger: logger,
93 validator: validator,
94 cfClient: cfClient,
95 ogreClient: ogre.NewClient(config.Ogre.Host),
96 }
97}
98
99// modify the spindle configured for this repo
100func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) {
101 user := rp.oauth.GetMultiAccountUser(r)
102 l := rp.logger.With("handler", "EditSpindle")
103 l = l.With("did", user.Did)
104
105 errorId := "operation-error"
106 fail := func(msg string, err error) {
107 l.Error(msg, "err", err)
108 rp.pages.Notice(w, errorId, msg)
109 }
110
111 f, err := rp.repoResolver.Resolve(r)
112 if err != nil {
113 fail("Failed to resolve repo. Try again later", err)
114 return
115 }
116
117 newSpindle := r.FormValue("spindle")
118 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value
119 client, err := rp.oauth.AuthorizedClient(r)
120 if err != nil {
121 fail("Failed to authorize. Try again later.", err)
122 return
123 }
124
125 if !removingSpindle {
126 // ensure that this is a valid spindle for this user
127 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Did)
128 if err != nil {
129 fail("Failed to find spindles. Try again later.", err)
130 return
131 }
132
133 if !slices.Contains(validSpindles, newSpindle) {
134 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles))
135 return
136 }
137 }
138
139 newRepo := *f
140 newRepo.Spindle = newSpindle
141 record := newRepo.AsRecord()
142
143 spindlePtr := &newSpindle
144 if removingSpindle {
145 spindlePtr = nil
146 newRepo.Spindle = ""
147 }
148
149 // optimistic update
150 err = db.UpdateSpindle(rp.db, newRepo.RepoDid, spindlePtr)
151 if err != nil {
152 fail("Failed to update spindle. Try again later.", err)
153 return
154 }
155
156 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
157 if err != nil {
158 fail("Failed to update spindle, no record found on PDS.", err)
159 return
160 }
161 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
162 Collection: tangled.RepoNSID,
163 Repo: newRepo.Did,
164 Rkey: newRepo.Rkey,
165 SwapRecord: ex.Cid,
166 Record: &lexutil.LexiconTypeDecoder{
167 Val: &record,
168 },
169 })
170
171 if err != nil {
172 fail("Failed to update spindle, unable to save to PDS.", err)
173 return
174 }
175
176 oldSpindle := f.Spindle
177 if oldSpindle != "" && oldSpindle != newSpindle {
178 remaining, qErr := db.GetRepos(rp.db, orm.FilterEq("spindle", oldSpindle))
179 if qErr != nil {
180 l.Warn("failed to count repos using old spindle", "err", qErr)
181 } else if len(remaining) == 0 {
182 rp.spindlestream.RemoveSource(eventconsumer.NewSpindleSource(oldSpindle))
183 }
184 }
185
186 if !removingSpindle {
187 rp.spindlestream.AddSource(
188 context.Background(),
189 eventconsumer.NewSpindleSource(newSpindle),
190 )
191 }
192
193 rp.pages.HxRefresh(w)
194}
195
196func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) {
197 user := rp.oauth.GetMultiAccountUser(r)
198 l := rp.logger.With("handler", "AddLabel")
199 l = l.With("did", user.Did)
200
201 f, err := rp.repoResolver.Resolve(r)
202 if err != nil {
203 l.Error("failed to get repo and knot", "err", err)
204 return
205 }
206
207 errorId := "add-label-error"
208 fail := func(msg string, err error) {
209 l.Error(msg, "err", err)
210 rp.pages.Notice(w, errorId, msg)
211 }
212
213 // get form values for label definition
214 name := r.FormValue("name")
215 concreteType := r.FormValue("valueType")
216 valueFormat := r.FormValue("valueFormat")
217 enumValues := r.FormValue("enumValues")
218 scope := r.Form["scope"]
219 color := r.FormValue("color")
220 multiple := r.FormValue("multiple") == "true"
221
222 var variants []string
223 for part := range strings.SplitSeq(enumValues, ",") {
224 if part = strings.TrimSpace(part); part != "" {
225 variants = append(variants, part)
226 }
227 }
228
229 if concreteType == "" {
230 concreteType = "null"
231 }
232
233 format := models.ValueTypeFormatAny
234 if valueFormat == "did" {
235 format = models.ValueTypeFormatDid
236 }
237
238 valueType := models.ValueType{
239 Type: models.ConcreteType(concreteType),
240 Format: format,
241 Enum: variants,
242 }
243
244 label := models.LabelDefinition{
245 Did: user.Did,
246 Rkey: tid.TID(),
247 Name: name,
248 ValueType: valueType,
249 Scope: scope,
250 Color: &color,
251 Multiple: multiple,
252 Created: time.Now(),
253 }
254 if err := rp.validator.ValidateLabelDefinition(&label); err != nil {
255 fail(err.Error(), err)
256 return
257 }
258
259 // announce this relation into the firehose, store into owners' pds
260 client, err := rp.oauth.AuthorizedClient(r)
261 if err != nil {
262 fail(err.Error(), err)
263 return
264 }
265
266 // emit a labelRecord
267 labelRecord := label.AsRecord()
268 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
269 Collection: tangled.LabelDefinitionNSID,
270 Repo: label.Did,
271 Rkey: label.Rkey,
272 Record: &lexutil.LexiconTypeDecoder{
273 Val: &labelRecord,
274 },
275 })
276 // invalid record
277 if err != nil {
278 fail("Failed to write record to PDS.", err)
279 return
280 }
281
282 aturi := resp.Uri
283 l = l.With("at-uri", aturi)
284 l.Info("wrote label record to PDS")
285
286 // update the repo to subscribe to this label
287 newRepo := *f
288 newRepo.Labels = append(newRepo.Labels, aturi)
289 repoRecord := newRepo.AsRecord()
290
291 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
292 if err != nil {
293 fail("Failed to update labels, no record found on PDS.", err)
294 return
295 }
296 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
297 Collection: tangled.RepoNSID,
298 Repo: newRepo.Did,
299 Rkey: newRepo.Rkey,
300 SwapRecord: ex.Cid,
301 Record: &lexutil.LexiconTypeDecoder{
302 Val: &repoRecord,
303 },
304 })
305 if err != nil {
306 fail("Failed to update labels for repo.", err)
307 return
308 }
309
310 tx, err := rp.db.BeginTx(r.Context(), nil)
311 if err != nil {
312 fail("Failed to add label.", err)
313 return
314 }
315
316 rollback := func() {
317 err1 := tx.Rollback()
318 err2 := rollbackRecord(context.Background(), aturi, client)
319
320 // ignore txn complete errors, this is okay
321 if errors.Is(err1, sql.ErrTxDone) {
322 err1 = nil
323 }
324
325 if errs := errors.Join(err1, err2); errs != nil {
326 l.Error("failed to rollback changes", "errs", errs)
327 return
328 }
329 }
330 defer rollback()
331
332 _, err = db.AddLabelDefinition(tx, &label)
333 if err != nil {
334 fail("Failed to add label.", err)
335 return
336 }
337
338 if err = db.SubscribeLabel(tx, &models.RepoLabel{
339 RepoDid: syntax.DID(f.RepoDid),
340 LabelAt: label.AtUri(),
341 }); err != nil {
342 fail("Failed to subscribe to label.", err)
343 return
344 }
345
346 err = tx.Commit()
347 if err != nil {
348 fail("Failed to add label.", err)
349 return
350 }
351
352 // clear aturi when everything is successful
353 aturi = ""
354
355 rp.pages.HxRefresh(w)
356}
357
358func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) {
359 user := rp.oauth.GetMultiAccountUser(r)
360 l := rp.logger.With("handler", "DeleteLabel")
361 l = l.With("did", user.Did)
362
363 f, err := rp.repoResolver.Resolve(r)
364 if err != nil {
365 l.Error("failed to get repo and knot", "err", err)
366 return
367 }
368
369 errorId := "label-operation"
370 fail := func(msg string, err error) {
371 l.Error(msg, "err", err)
372 rp.pages.Notice(w, errorId, msg)
373 }
374
375 // get form values
376 labelId := r.FormValue("label-id")
377
378 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId))
379 if err != nil {
380 fail("Failed to find label definition.", err)
381 return
382 }
383
384 client, err := rp.oauth.AuthorizedClient(r)
385 if err != nil {
386 fail(err.Error(), err)
387 return
388 }
389
390 // delete label record from PDS
391 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
392 Collection: tangled.LabelDefinitionNSID,
393 Repo: label.Did,
394 Rkey: label.Rkey,
395 })
396 if err != nil {
397 fail("Failed to delete label record from PDS.", err)
398 return
399 }
400
401 // update repo record to remove the label reference
402 newRepo := *f
403 var updated []string
404 removedAt := label.AtUri().String()
405 for _, l := range newRepo.Labels {
406 if l != removedAt {
407 updated = append(updated, l)
408 }
409 }
410 newRepo.Labels = updated
411 repoRecord := newRepo.AsRecord()
412
413 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
414 if err != nil {
415 fail("Failed to update labels, no record found on PDS.", err)
416 return
417 }
418 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
419 Collection: tangled.RepoNSID,
420 Repo: newRepo.Did,
421 Rkey: newRepo.Rkey,
422 SwapRecord: ex.Cid,
423 Record: &lexutil.LexiconTypeDecoder{
424 Val: &repoRecord,
425 },
426 })
427 if err != nil {
428 fail("Failed to update repo record.", err)
429 return
430 }
431
432 // transaction for DB changes
433 tx, err := rp.db.BeginTx(r.Context(), nil)
434 if err != nil {
435 fail("Failed to delete label.", err)
436 return
437 }
438 defer tx.Rollback()
439
440 err = db.UnsubscribeLabel(
441 tx,
442 orm.FilterEq("repo_did", f.RepoDid),
443 orm.FilterEq("label_at", removedAt),
444 )
445 if err != nil {
446 fail("Failed to unsubscribe label.", err)
447 return
448 }
449
450 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id))
451 if err != nil {
452 fail("Failed to delete label definition.", err)
453 return
454 }
455
456 err = tx.Commit()
457 if err != nil {
458 fail("Failed to delete label.", err)
459 return
460 }
461
462 // everything succeeded
463 rp.pages.HxRefresh(w)
464}
465
466func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) {
467 user := rp.oauth.GetMultiAccountUser(r)
468 l := rp.logger.With("handler", "SubscribeLabel")
469 l = l.With("did", user.Did)
470
471 f, err := rp.repoResolver.Resolve(r)
472 if err != nil {
473 l.Error("failed to get repo and knot", "err", err)
474 return
475 }
476
477 if err := r.ParseForm(); err != nil {
478 l.Error("invalid form", "err", err)
479 return
480 }
481
482 errorId := "default-label-operation"
483 fail := func(msg string, err error) {
484 l.Error(msg, "err", err)
485 rp.pages.Notice(w, errorId, msg)
486 }
487
488 labelAts := r.Form["label"]
489 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
490 if err != nil {
491 fail("Failed to subscribe to label.", err)
492 return
493 }
494
495 newRepo := *f
496 newRepo.Labels = append(newRepo.Labels, labelAts...)
497
498 // dedup
499 slices.Sort(newRepo.Labels)
500 newRepo.Labels = slices.Compact(newRepo.Labels)
501
502 repoRecord := newRepo.AsRecord()
503
504 client, err := rp.oauth.AuthorizedClient(r)
505 if err != nil {
506 fail(err.Error(), err)
507 return
508 }
509
510 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
511 if err != nil {
512 fail("Failed to update labels, no record found on PDS.", err)
513 return
514 }
515 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
516 Collection: tangled.RepoNSID,
517 Repo: newRepo.Did,
518 Rkey: newRepo.Rkey,
519 SwapRecord: ex.Cid,
520 Record: &lexutil.LexiconTypeDecoder{
521 Val: &repoRecord,
522 },
523 })
524
525 tx, err := rp.db.Begin()
526 if err != nil {
527 fail("Failed to subscribe to label.", err)
528 return
529 }
530 defer tx.Rollback()
531
532 for _, l := range labelAts {
533 err = db.SubscribeLabel(tx, &models.RepoLabel{
534 RepoDid: syntax.DID(f.RepoDid),
535 LabelAt: syntax.ATURI(l),
536 })
537 if err != nil {
538 fail("Failed to subscribe to label.", err)
539 return
540 }
541 }
542
543 if err := tx.Commit(); err != nil {
544 fail("Failed to subscribe to label.", err)
545 return
546 }
547
548 // everything succeeded
549 rp.pages.HxRefresh(w)
550}
551
552func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) {
553 user := rp.oauth.GetMultiAccountUser(r)
554 l := rp.logger.With("handler", "UnsubscribeLabel")
555 l = l.With("did", user.Did)
556
557 f, err := rp.repoResolver.Resolve(r)
558 if err != nil {
559 l.Error("failed to get repo and knot", "err", err)
560 return
561 }
562
563 if err := r.ParseForm(); err != nil {
564 l.Error("invalid form", "err", err)
565 return
566 }
567
568 errorId := "default-label-operation"
569 fail := func(msg string, err error) {
570 l.Error(msg, "err", err)
571 rp.pages.Notice(w, errorId, msg)
572 }
573
574 labelAts := r.Form["label"]
575 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
576 if err != nil {
577 fail("Failed to unsubscribe to label.", err)
578 return
579 }
580
581 // update repo record to remove the label reference
582 newRepo := *f
583 var updated []string
584 for _, l := range newRepo.Labels {
585 if !slices.Contains(labelAts, l) {
586 updated = append(updated, l)
587 }
588 }
589 newRepo.Labels = updated
590 repoRecord := newRepo.AsRecord()
591
592 client, err := rp.oauth.AuthorizedClient(r)
593 if err != nil {
594 fail(err.Error(), err)
595 return
596 }
597
598 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
599 if err != nil {
600 fail("Failed to update labels, no record found on PDS.", err)
601 return
602 }
603 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
604 Collection: tangled.RepoNSID,
605 Repo: newRepo.Did,
606 Rkey: newRepo.Rkey,
607 SwapRecord: ex.Cid,
608 Record: &lexutil.LexiconTypeDecoder{
609 Val: &repoRecord,
610 },
611 })
612
613 err = db.UnsubscribeLabel(
614 rp.db,
615 orm.FilterEq("repo_did", f.RepoDid),
616 orm.FilterIn("label_at", labelAts),
617 )
618 if err != nil {
619 fail("Failed to unsubscribe label.", err)
620 return
621 }
622
623 // everything succeeded
624 rp.pages.HxRefresh(w)
625}
626
627func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) {
628 l := rp.logger.With("handler", "LabelPanel")
629
630 f, err := rp.repoResolver.Resolve(r)
631 if err != nil {
632 l.Error("failed to get repo and knot", "err", err)
633 return
634 }
635
636 subjectStr := r.FormValue("subject")
637 subject, err := syntax.ParseATURI(subjectStr)
638 if err != nil {
639 l.Error("failed to get repo and knot", "err", err)
640 return
641 }
642
643 labelDefs, err := db.GetLabelDefinitions(
644 rp.db,
645 orm.FilterIn("at_uri", f.Labels),
646 orm.FilterContains("scope", subject.Collection().String()),
647 )
648 if err != nil {
649 l.Error("failed to fetch label defs", "err", err)
650 return
651 }
652
653 defs := make(map[string]*models.LabelDefinition)
654 for _, l := range labelDefs {
655 defs[l.AtUri().String()] = &l
656 }
657
658 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
659 if err != nil {
660 l.Error("failed to build label state", "err", err)
661 return
662 }
663 state := states[subject]
664
665 user := rp.oauth.GetMultiAccountUser(r)
666 rp.pages.LabelPanel(w, pages.LabelPanelParams{
667 LoggedInUser: user,
668 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
669 Defs: defs,
670 Subject: subject.String(),
671 State: state,
672 })
673}
674
675func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) {
676 l := rp.logger.With("handler", "EditLabelPanel")
677
678 f, err := rp.repoResolver.Resolve(r)
679 if err != nil {
680 l.Error("failed to get repo and knot", "err", err)
681 return
682 }
683
684 subjectStr := r.FormValue("subject")
685 subject, err := syntax.ParseATURI(subjectStr)
686 if err != nil {
687 l.Error("failed to get repo and knot", "err", err)
688 return
689 }
690
691 labelDefs, err := db.GetLabelDefinitions(
692 rp.db,
693 orm.FilterIn("at_uri", f.Labels),
694 orm.FilterContains("scope", subject.Collection().String()),
695 )
696 if err != nil {
697 l.Error("failed to fetch labels", "err", err)
698 return
699 }
700
701 defs := make(map[string]*models.LabelDefinition)
702 for _, l := range labelDefs {
703 defs[l.AtUri().String()] = &l
704 }
705
706 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
707 if err != nil {
708 l.Error("failed to build label state", "err", err)
709 return
710 }
711 state := states[subject]
712
713 user := rp.oauth.GetMultiAccountUser(r)
714 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{
715 LoggedInUser: user,
716 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
717 Defs: defs,
718 Subject: subject.String(),
719 State: state,
720 })
721}
722
723func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) {
724 user := rp.oauth.GetMultiAccountUser(r)
725 l := rp.logger.With("handler", "AddCollaborator")
726 l = l.With("did", user.Did)
727
728 f, err := rp.repoResolver.Resolve(r)
729 if err != nil {
730 l.Error("failed to get repo and knot", "err", err)
731 return
732 }
733
734 errorId := "add-collaborator-error"
735 fail := func(msg string, err error) {
736 l.Error(msg, "err", err)
737 rp.pages.Notice(w, errorId, msg)
738 }
739
740 collaborator := r.FormValue("collaborator")
741 if collaborator == "" {
742 fail("Invalid form.", nil)
743 return
744 }
745
746 // remove a single leading `@`, to make @handle work with ResolveIdent
747 collaborator = strings.TrimPrefix(collaborator, "@")
748
749 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
750 if err != nil {
751 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
752 return
753 }
754
755 if collaboratorIdent.DID.String() == user.Did {
756 fail("You seem to be adding yourself as a collaborator.", nil)
757 return
758 }
759 l = l.With("collaborator", collaboratorIdent.Handle)
760 l = l.With("knot", f.Knot)
761
762 if knotcompat.KnotHasCapability(r.Context(), f.Knot, rp.config.Core.Dev, consts.CapKnotACL) {
763 if f.RepoDid == "" {
764 fail("This repository is missing its DID and cannot manage collaborators.", nil)
765 return
766 }
767
768 client, err := rp.oauth.ServiceClient(
769 r,
770 oauth.WithService(f.Knot),
771 oauth.WithLxm(tangled.RepoAddCollaboratorNSID),
772 oauth.WithDev(rp.config.Core.Dev),
773 )
774 if err != nil {
775 fail("Failed to connect to knot server.", err)
776 return
777 }
778
779 err = tangled.RepoAddCollaborator(r.Context(), client, &tangled.RepoAddCollaborator_Input{
780 Repo: f.RepoDid,
781 Subject: collaboratorIdent.DID.String(),
782 })
783 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
784 l.Error("failed to call XRPC repo.addCollaborator", "xrpcerr", xrpcerr, "err", err)
785 rp.pages.Notice(w, errorId, xrpcerr.Error())
786 return
787 }
788
789 rp.acl.InvalidateCollaborators(f.Knot, f.RepoDid)
790
791 rp.pages.HxRefresh(w)
792 return
793 }
794
795 existing, err := db.GetCollaborators(rp.db,
796 orm.FilterEq("repo_did", f.RepoDid),
797 orm.FilterEq("subject_did", collaboratorIdent.DID.String()),
798 )
799 if err != nil {
800 fail("Failed to check existing collaborators.", err)
801 return
802 }
803 if len(existing) > 0 {
804 fail(fmt.Sprintf("%s is already a collaborator.", collaboratorIdent.Handle), nil)
805 return
806 }
807
808 // announce this relation into the firehose, store into owners' pds
809 client, err := rp.oauth.AuthorizedClient(r)
810 if err != nil {
811 fail("Failed to write to PDS.", err)
812 return
813 }
814
815 // emit a record
816 currentUser := rp.oauth.GetMultiAccountUser(r)
817 rkey := tid.TID()
818 createdAt := time.Now()
819 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
820 Collection: tangled.RepoCollaboratorNSID,
821 Repo: currentUser.Did,
822 Rkey: rkey,
823 Record: knotcompat.Collaborator(repoCollaboratorRecord(f, collaboratorIdent.DID.String(), createdAt)),
824 })
825 // invalid record
826 if err != nil {
827 fail("Failed to write record to PDS.", err)
828 return
829 }
830
831 aturi := resp.Uri
832 l = l.With("at-uri", aturi)
833 l.Info("wrote record to PDS")
834
835 tx, err := rp.db.BeginTx(r.Context(), nil)
836 if err != nil {
837 fail("Failed to add collaborator.", err)
838 return
839 }
840
841 rollback := func() {
842 err1 := tx.Rollback()
843 err2 := rp.enforcer.E.LoadPolicy()
844 err3 := rollbackRecord(context.Background(), aturi, client)
845
846 // ignore txn complete errors, this is okay
847 if errors.Is(err1, sql.ErrTxDone) {
848 err1 = nil
849 }
850
851 if errs := errors.Join(err1, err2, err3); errs != nil {
852 l.Error("failed to rollback changes", "errs", errs)
853 return
854 }
855 }
856 defer rollback()
857
858 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier())
859 if err != nil {
860 fail("Failed to add collaborator permissions.", err)
861 return
862 }
863
864 err = db.AddCollaborator(tx, models.Collaborator{
865 Did: syntax.DID(currentUser.Did),
866 Rkey: sql.NullString{String: rkey, Valid: true},
867 SubjectDid: collaboratorIdent.DID,
868 RepoDid: syntax.DID(f.RepoDid),
869 Created: createdAt,
870 })
871 if err != nil {
872 fail("Failed to add collaborator.", err)
873 return
874 }
875
876 err = tx.Commit()
877 if err != nil {
878 fail("Failed to add collaborator.", err)
879 return
880 }
881
882 err = rp.enforcer.E.SavePolicy()
883 if err != nil {
884 fail("Failed to update collaborator permissions.", err)
885 return
886 }
887
888 // clear aturi to when everything is successful
889 aturi = ""
890
891 rp.pages.HxRefresh(w)
892}
893
894func (rp *Repo) RemoveCollaborator(w http.ResponseWriter, r *http.Request) {
895 user := rp.oauth.GetMultiAccountUser(r)
896 l := rp.logger.With("handler", "RemoveCollaborator")
897 l = l.With("did", user.Did)
898
899 f, err := rp.repoResolver.Resolve(r)
900 if err != nil {
901 l.Error("failed to get repo and knot", "err", err)
902 return
903 }
904
905 errorId := "collaborator-error"
906 fail := func(msg string, err error) {
907 l.Error(msg, "err", err)
908 rp.pages.Notice(w, errorId, msg)
909 }
910
911 collaborator := r.FormValue("collaborator")
912 if collaborator == "" {
913 fail("Invalid form.", nil)
914 return
915 }
916 collaborator = strings.TrimPrefix(collaborator, "@")
917
918 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
919 if err != nil {
920 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
921 return
922 }
923 l = l.With("collaborator", collaboratorIdent.Handle, "knot", f.Knot)
924
925 if collaboratorIdent.DID.String() == f.Did {
926 fail("Cannot remove the repository owner.", nil)
927 return
928 }
929
930 if knotcompat.KnotHasCapability(r.Context(), f.Knot, rp.config.Core.Dev, consts.CapKnotACL) {
931 if f.RepoDid == "" {
932 fail("This repository is missing its DID and cannot manage collaborators.", nil)
933 return
934 }
935
936 client, err := rp.oauth.ServiceClient(
937 r,
938 oauth.WithService(f.Knot),
939 oauth.WithLxm(tangled.RepoRemoveCollaboratorNSID),
940 oauth.WithDev(rp.config.Core.Dev),
941 )
942 if err != nil {
943 fail("Failed to connect to knot server.", err)
944 return
945 }
946
947 err = tangled.RepoRemoveCollaborator(r.Context(), client, &tangled.RepoRemoveCollaborator_Input{
948 Repo: f.RepoDid,
949 Subject: collaboratorIdent.DID.String(),
950 })
951 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
952 l.Error("failed to call XRPC repo.removeCollaborator", "xrpcerr", xrpcerr, "err", err)
953 rp.pages.Notice(w, errorId, xrpcerr.Error())
954 return
955 }
956
957 rp.acl.InvalidateCollaborators(f.Knot, f.RepoDid)
958
959 rp.pages.HxRefresh(w)
960 return
961 }
962
963 existing, err := db.GetCollaborators(rp.db,
964 orm.FilterEq("repo_did", f.RepoDid),
965 orm.FilterEq("subject_did", collaboratorIdent.DID.String()),
966 )
967 if err != nil {
968 fail("Failed to look up collaborator.", err)
969 return
970 }
971 if len(existing) == 0 {
972 fail(fmt.Sprintf("%s is not a collaborator.", collaboratorIdent.Handle), nil)
973 return
974 }
975 row := existing[0]
976
977 client, err := rp.oauth.AuthorizedClient(r)
978 if err != nil {
979 fail("Failed to write to PDS.", err)
980 return
981 }
982
983 tx, err := rp.db.BeginTx(r.Context(), nil)
984 if err != nil {
985 fail("Failed to remove collaborator.", err)
986 return
987 }
988 committed := false
989 defer func() {
990 if !committed {
991 tx.Rollback()
992 if err := rp.enforcer.E.LoadPolicy(); err != nil {
993 l.Error("failed to reload policy after rollback", "err", err)
994 }
995 }
996 }()
997
998 if err := rp.enforcer.RemoveCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier()); err != nil {
999 fail("Failed to remove collaborator permissions.", err)
1000 return
1001 }
1002
1003 if err := db.DeleteCollaborator(tx,
1004 orm.FilterEq("repo_did", f.RepoDid),
1005 orm.FilterEq("subject_did", collaboratorIdent.DID.String()),
1006 ); err != nil {
1007 fail("Failed to remove collaborator.", err)
1008 return
1009 }
1010
1011 if row.Rkey.Valid && row.Rkey.String != "" {
1012 if _, err := comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
1013 Collection: tangled.RepoCollaboratorNSID,
1014 Repo: row.Did.String(),
1015 Rkey: row.Rkey.String,
1016 }); err != nil {
1017 fail("Failed to delete collaborator record from PDS.", err)
1018 return
1019 }
1020 }
1021
1022 if err := tx.Commit(); err != nil {
1023 fail("Failed to remove collaborator.", err)
1024 return
1025 }
1026 committed = true
1027
1028 if err := rp.enforcer.E.SavePolicy(); err != nil {
1029 fail("Failed to update collaborator permissions.", err)
1030 return
1031 }
1032
1033 rp.pages.HxRefresh(w)
1034}
1035
1036func (rp *Repo) RenameRepo(w http.ResponseWriter, r *http.Request) {
1037 l := rp.logger.With("handler", "RenameRepo")
1038 noticeId := "rename-repo-error"
1039
1040 user := rp.oauth.GetMultiAccountUser(r)
1041 f, err := rp.repoResolver.Resolve(r)
1042 if err != nil {
1043 l.Error("failed to get repo and knot", "err", err)
1044 rp.pages.Notice(w, noticeId, "Failed to load repository.")
1045 return
1046 }
1047 l = l.With("did", user.Did, "rkey", f.Rkey, "oldName", f.Name)
1048
1049 if f.RepoDid == "" {
1050 rp.pages.Notice(w, noticeId, "This repository's knot has not completed the DID migration; rename is unavailable.")
1051 return
1052 }
1053
1054 if !knotcompat.KnotSupports114(r.Context(), f.Knot, rp.config.Core.Dev) {
1055 rp.pages.Notice(w, noticeId, "This repository's knot is below v1.14 and does not yet support renames. Ask the knot operator to upgrade.")
1056 return
1057 }
1058
1059 newName, err := validateRenameInput(f.Name, f.Rkey, r.FormValue("name"))
1060 if err != nil {
1061 rp.pages.Notice(w, noticeId, err.Error())
1062 return
1063 }
1064 newRkey := strings.ToLower(newName)
1065 l = l.With("newName", newName, "newRkey", newRkey)
1066
1067 atpClient, err := rp.oauth.AuthorizedClient(r)
1068 if err != nil {
1069 l.Error("failed to get authorized client", "err", err)
1070 rp.pages.Notice(w, noticeId, "Failed to authorize. Try again later.")
1071 return
1072 }
1073
1074 newRepo := *f
1075 newRepo.Name = newName
1076 newRepo.Rkey = newRkey
1077 newRepo.Created = time.Now()
1078 record := newRepo.AsRecord()
1079
1080 if newRkey == f.Rkey {
1081 ex, err := comatproto.RepoGetRecord(r.Context(), atpClient, "", tangled.RepoNSID, f.Did, f.Rkey)
1082 if err != nil {
1083 l.Error("failed to fetch existing record", "err", err)
1084 rp.pages.Notice(w, noticeId, "Failed to read repository record from PDS.")
1085 return
1086 }
1087
1088 _, err = comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1089 Collection: tangled.RepoNSID,
1090 Repo: f.Did,
1091 Rkey: f.Rkey,
1092 SwapRecord: ex.Cid,
1093 Record: &lexutil.LexiconTypeDecoder{
1094 Val: &record,
1095 },
1096 })
1097 if err != nil {
1098 l.Error("failed to update display name on PDS", "err", err)
1099 rp.pages.Notice(w, noticeId, "Failed to save display name to PDS.")
1100 return
1101 }
1102 l.Info("updated display name on PDS")
1103
1104 if err := db.UpdateRepoDisplayName(rp.db, f.Did, f.Rkey, newName); err != nil {
1105 l.Error("optimistic display name update failed", "err", err)
1106 }
1107 } else {
1108 ex, getErr := comatproto.RepoGetRecord(r.Context(), atpClient, "", tangled.RepoNSID, f.Did, newRkey)
1109 switch {
1110 case getErr != nil:
1111 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
1112 Collection: tangled.RepoNSID,
1113 Repo: f.Did,
1114 Rkey: &newRkey,
1115 Record: &lexutil.LexiconTypeDecoder{Val: &record},
1116 })
1117 if err != nil {
1118 l.Error("failed to write rename to PDS", "err", err)
1119 rp.pages.Notice(w, noticeId, "Failed to save renamed repository to PDS.")
1120 return
1121 }
1122 l.Info("wrote rename-create to PDS; old record retained as alias")
1123
1124 default:
1125 existing, ok := ex.Value.Val.(*tangled.Repo)
1126 if !ok || existing.RepoDid == nil || *existing.RepoDid != f.RepoDid {
1127 rp.pages.Notice(w, noticeId, fmt.Sprintf("You already have a repository named %q.", newRkey))
1128 return
1129 }
1130 _, err = comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1131 Collection: tangled.RepoNSID,
1132 Repo: f.Did,
1133 Rkey: newRkey,
1134 SwapRecord: ex.Cid,
1135 Record: &lexutil.LexiconTypeDecoder{Val: &record},
1136 })
1137 if err != nil {
1138 l.Error("failed to rewrite rename-back record on PDS", "err", err)
1139 rp.pages.Notice(w, noticeId, "Failed to save renamed repository to PDS.")
1140 return
1141 }
1142 l.Info("rewrote rename-back record on PDS over prior alias")
1143 }
1144
1145 tx, err := rp.db.Begin()
1146 if err != nil {
1147 l.Error("failed to begin rename tx", "err", err)
1148 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid))
1149 return
1150 }
1151 defer tx.Rollback()
1152
1153 if err := db.RenameRepo(tx, f.Did, f.Rkey, newRkey, newName); err != nil {
1154 l.Error("optimistic rename failed", "err", err)
1155 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid))
1156 return
1157 }
1158 if err := db.RecordRepoRename(tx, f.Did, f.Rkey, f.RepoDid); err != nil {
1159 l.Error("failed to record rename history", "err", err)
1160 }
1161 if err := db.DeleteRepoRename(tx, f.Did, newRkey); err != nil {
1162 l.Error("failed to clear stale rename hint", "err", err)
1163 }
1164 if err := tx.Commit(); err != nil {
1165 l.Error("failed to commit rename tx", "err", err)
1166 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid))
1167 return
1168 }
1169 }
1170
1171 oldRepo := *f
1172 rp.notifier.RenameRepo(r.Context(), syntax.DID(user.Did), &oldRepo, &newRepo)
1173
1174 if newRkey != f.Rkey {
1175 rp.migrateSiteOnRename(r.Context(), f, newName, newRkey)
1176 }
1177
1178 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid))
1179}
1180
1181func validateRenameInput(currentName, currentRkey, raw string) (string, error) {
1182 newName := strings.TrimSpace(raw)
1183 if newName == "" {
1184 return "", errors.New("Repository name cannot be empty.")
1185 }
1186 if err := models.ValidateRepoName(newName); err != nil {
1187 return "", err
1188 }
1189 newName = models.StripGitExt(newName)
1190 if newName == currentName {
1191 if _, tidErr := syntax.ParseTID(currentRkey); tidErr == nil {
1192 return newName, nil
1193 }
1194 return "", errors.New("New name matches the current name.")
1195 }
1196 return newName, nil
1197}
1198
1199func (rp *Repo) migrateSiteOnRename(ctx context.Context, oldRepo *models.Repo, newName, newRkey string) {
1200 l := rp.logger.With("handler", "migrateSiteOnRename", "repo_did", oldRepo.RepoDid)
1201
1202 siteConfig, err := db.GetRepoSiteConfig(rp.db, oldRepo.RepoDid)
1203 if err != nil || siteConfig == nil {
1204 return
1205 }
1206
1207 if !rp.cfClient.Enabled() {
1208 return
1209 }
1210
1211 ownerClaim, _ := db.GetActiveDomainClaimForDid(rp.db, oldRepo.Did)
1212
1213 go func() {
1214 bgCtx := context.Background()
1215 oldRkey := oldRepo.Rkey
1216 oldName := oldRepo.Name
1217
1218 if err := sites.Delete(bgCtx, rp.cfClient, oldRepo.Did, oldRkey); err != nil {
1219 l.Error("sites: failed to delete old R2 prefix", "oldRkey", oldRkey, "err", err)
1220 }
1221
1222 newRepo := *oldRepo
1223 newRepo.Name = newName
1224 newRepo.Rkey = newRkey
1225 if deployErr := sites.Deploy(bgCtx, rp.cfClient, rp.config, &newRepo, siteConfig.Branch, siteConfig.Dir); deployErr != nil {
1226 l.Error("sites: redeploy after rename failed", "err", deployErr)
1227 }
1228
1229 if ownerClaim != nil {
1230 // drop the old name's entry when the name actually changed.
1231 if oldName != newName {
1232 if err := sites.DeleteDomainMapping(bgCtx, rp.cfClient, ownerClaim.Domain, oldName); err != nil {
1233 l.Error("sites: failed to remove old KV mapping", "oldName", oldName, "err", err)
1234 }
1235 }
1236 if err := sites.PutDomainMapping(bgCtx, rp.cfClient, ownerClaim.Domain, oldRepo.Did, newName, newRkey, siteConfig.IsIndex); err != nil {
1237 l.Error("sites: failed to write new KV mapping", "newName", newName, "newRkey", newRkey, "err", err)
1238 }
1239 }
1240
1241 l.Info("sites: migrated on rename", "oldName", oldName, "oldRkey", oldRkey, "newName", newName, "newRkey", newRkey)
1242 }()
1243}
1244
1245func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) {
1246 user := rp.oauth.GetMultiAccountUser(r)
1247 l := rp.logger.With("handler", "DeleteRepo")
1248
1249 noticeId := "operation-error"
1250 f, err := rp.repoResolver.Resolve(r)
1251 if err != nil {
1252 l.Error("failed to get repo and knot", "err", err)
1253 return
1254 }
1255
1256 // remove record from pds
1257 atpClient, err := rp.oauth.AuthorizedClient(r)
1258 if err != nil {
1259 l.Error("failed to get authorized client", "err", err)
1260 return
1261 }
1262 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{
1263 Collection: tangled.RepoNSID,
1264 Repo: user.Did,
1265 Rkey: f.Rkey,
1266 })
1267 if err != nil {
1268 l.Error("failed to delete record", "err", err)
1269 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.")
1270 return
1271 }
1272 l.Info("removed repo record", "aturi", f.RepoAt().String())
1273
1274 client, err := rp.oauth.ServiceClient(
1275 r,
1276 oauth.WithService(f.Knot),
1277 oauth.WithLxm(tangled.RepoDeleteNSID),
1278 oauth.WithDev(rp.config.Core.Dev),
1279 )
1280 if err != nil {
1281 l.Error("failed to connect to knot server", "err", err)
1282 return
1283 }
1284
1285 err = tangled.RepoDelete(
1286 r.Context(),
1287 client,
1288 &tangled.RepoDelete_Input{
1289 Did: f.Did,
1290 Name: f.Name,
1291 Rkey: f.Rkey,
1292 },
1293 )
1294 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
1295 l.Error("failed to call XRPC repo.delete", "xrpcerr", xrpcerr, "err", err)
1296 rp.pages.Notice(w, noticeId, xrpcerr.Error())
1297 return
1298 }
1299 l.Info("deleted repo from knot")
1300
1301 tx, err := rp.db.BeginTx(r.Context(), nil)
1302 if err != nil {
1303 l.Error("failed to start tx")
1304 w.Write(fmt.Append(nil, "failed to add collaborator: ", err))
1305 return
1306 }
1307 defer func() {
1308 tx.Rollback()
1309 err = rp.enforcer.E.LoadPolicy()
1310 if err != nil {
1311 l.Error("failed to rollback policies")
1312 }
1313 }()
1314
1315 // remove collaborator RBAC
1316 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.RepoIdentifier(), f.Knot)
1317 if err != nil {
1318 rp.pages.Notice(w, noticeId, "Failed to remove collaborators")
1319 return
1320 }
1321 for _, c := range repoCollaborators {
1322 did := c[0]
1323 rp.enforcer.RemoveCollaborator(did, f.Knot, f.RepoIdentifier())
1324 }
1325 l.Info("removed collaborators")
1326
1327 // remove repo RBAC
1328 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.RepoIdentifier())
1329 if err != nil {
1330 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules")
1331 return
1332 }
1333
1334 // remove repo from db
1335 err = db.RemoveRepo(tx, f.Did, f.Rkey)
1336 if err != nil {
1337 rp.pages.Notice(w, noticeId, "Failed to update appview")
1338 return
1339 }
1340 l.Info("removed repo from db")
1341
1342 err = tx.Commit()
1343 if err != nil {
1344 l.Error("failed to commit changes", "err", err)
1345 http.Error(w, err.Error(), http.StatusInternalServerError)
1346 return
1347 }
1348
1349 err = rp.enforcer.E.SavePolicy()
1350 if err != nil {
1351 l.Error("failed to update ACLs", "err", err)
1352 http.Error(w, err.Error(), http.StatusInternalServerError)
1353 return
1354 }
1355
1356 rp.notifier.DeleteRepo(r.Context(), f)
1357 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did))
1358}
1359
1360func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) {
1361 l := rp.logger.With("handler", "SyncRepoFork")
1362
1363 ref := chi.URLParam(r, "ref")
1364 ref, _ = url.PathUnescape(ref)
1365
1366 user := rp.oauth.GetMultiAccountUser(r)
1367 f, err := rp.repoResolver.Resolve(r)
1368 if err != nil {
1369 l.Error("failed to resolve source repo", "err", err)
1370 return
1371 }
1372
1373 switch r.Method {
1374 case http.MethodPost:
1375 client, err := rp.oauth.ServiceClient(
1376 r,
1377 oauth.WithService(f.Knot),
1378 oauth.WithLxm(tangled.RepoForkSyncNSID),
1379 oauth.WithDev(rp.config.Core.Dev),
1380 )
1381 if err != nil {
1382 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1383 return
1384 }
1385
1386 if f.Source == "" {
1387 rp.pages.Notice(w, "repo", "This repository is not a fork.")
1388 return
1389 }
1390
1391 err = tangled.RepoForkSync(
1392 r.Context(),
1393 client,
1394 &tangled.RepoForkSync_Input{
1395 Did: user.Did,
1396 Name: f.Name,
1397 Source: f.Source,
1398 Branch: ref,
1399 },
1400 )
1401 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
1402 l.Error("failed to call XRPC repo.forkSync", "xrpcerr", xrpcerr, "err", err)
1403 rp.pages.Notice(w, "repo", err.Error())
1404 return
1405 }
1406
1407 rp.pages.HxRefresh(w)
1408 return
1409 }
1410}
1411
1412func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) {
1413 l := rp.logger.With("handler", "ForkRepo")
1414
1415 user := rp.oauth.GetMultiAccountUser(r)
1416 f, err := rp.repoResolver.Resolve(r)
1417 if err != nil {
1418 l.Error("failed to resolve source repo", "err", err)
1419 return
1420 }
1421
1422 switch r.Method {
1423 case http.MethodGet:
1424 user := rp.oauth.GetMultiAccountUser(r)
1425 knots := rp.acl.KnotsForUser(r.Context(), user.Did)
1426
1427 rp.pages.ForkRepo(w, pages.ForkRepoParams{
1428 LoggedInUser: user,
1429 Knots: knots,
1430 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1431 })
1432
1433 case http.MethodPost:
1434 l := rp.logger.With("handler", "ForkRepo")
1435
1436 targetKnot := r.FormValue("knot")
1437 if targetKnot == "" {
1438 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
1439 return
1440 }
1441 l = l.With("targetKnot", targetKnot)
1442
1443 if !rp.acl.IsRepoCreateAllowed(r.Context(), targetKnot, user.Did) {
1444 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
1445 return
1446 }
1447
1448 // choose a name for a fork
1449 forkName := strings.ToLower(r.FormValue("repo_name"))
1450 if forkName == "" {
1451 rp.pages.Notice(w, "repo", "Repository name cannot be empty.")
1452 return
1453 }
1454
1455 // this check is *only* to see if the forked repo name already exists
1456 // in the user's account.
1457 existingRepo, err := db.GetRepo(
1458 rp.db,
1459 orm.FilterEq("did", user.Did),
1460 orm.FilterEq("name", forkName),
1461 )
1462 if err != nil {
1463 if !errors.Is(err, sql.ErrNoRows) {
1464 l.Error("error fetching existing repo from db", "err", err)
1465 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.")
1466 return
1467 }
1468 } else if existingRepo != nil {
1469 // repo with this name already exists
1470 rp.pages.Notice(w, "repo", "A repository with this name already exists.")
1471 return
1472 }
1473 l = l.With("forkName", forkName)
1474
1475 uri := "https"
1476 if rp.config.Core.Dev {
1477 uri = "http"
1478 }
1479
1480 forkSourceUrl := fmt.Sprintf("%s://%s/%s", uri, f.Knot, f.RepoIdentifier())
1481 l = l.With("cloneUrl", forkSourceUrl)
1482
1483 rkey := strings.ToLower(forkName)
1484
1485 // TODO: this could coordinate better with the knot to receive a clone status
1486 client, err := rp.oauth.ServiceClient(
1487 r,
1488 oauth.WithService(targetKnot),
1489 oauth.WithLxm(tangled.RepoCreateNSID),
1490 oauth.WithDev(rp.config.Core.Dev),
1491 oauth.WithTimeout(time.Second*20),
1492 )
1493 if err != nil {
1494 l.Error("could not create service client", "err", err)
1495 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1496 return
1497 }
1498
1499 forkInput := &tangled.RepoCreate_Input{
1500 Rkey: rkey,
1501 Name: rkey,
1502 Source: &forkSourceUrl,
1503 }
1504 createResp, err := tangled.RepoCreate(
1505 r.Context(),
1506 client,
1507 forkInput,
1508 )
1509 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
1510 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
1511 rp.pages.Notice(w, "repo", xrpcerr.Error())
1512 return
1513 }
1514
1515 var repoDid string
1516 if createResp != nil && createResp.RepoDid != nil {
1517 repoDid = *createResp.RepoDid
1518 }
1519 if repoDid == "" {
1520 l.Error("knot returned empty repo DID for fork")
1521 rp.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
1522 return
1523 }
1524
1525 forkSource := f.RepoAt().String()
1526 if f.RepoDid != "" {
1527 forkSource = f.RepoDid
1528 }
1529
1530 forkDescription := r.Form.Get("description")
1531
1532 repo := &models.Repo{
1533 Did: user.Did,
1534 Name: rkey,
1535 Knot: targetKnot,
1536 Rkey: rkey,
1537 Source: forkSource,
1538 Description: forkDescription,
1539 Created: time.Now(),
1540 Labels: rp.config.Label.DefaultLabelDefs,
1541 RepoDid: repoDid,
1542 }
1543 record := repo.AsRecord()
1544
1545 cleanupKnot := func() {
1546 go func() {
1547 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
1548 for attempt, delay := range delays {
1549 time.Sleep(delay)
1550 deleteClient, dErr := rp.oauth.ServiceClient(
1551 r,
1552 oauth.WithService(targetKnot),
1553 oauth.WithLxm(tangled.RepoDeleteNSID),
1554 oauth.WithDev(rp.config.Core.Dev),
1555 )
1556 if dErr != nil {
1557 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
1558 continue
1559 }
1560 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
1561 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
1562 Did: user.Did,
1563 Name: forkName,
1564 Rkey: rkey,
1565 }); dErr != nil {
1566 cancel()
1567 l.Error("failed to clean up fork on knot after rollback", "attempt", attempt+1, "err", dErr)
1568 continue
1569 }
1570 cancel()
1571 l.Info("successfully cleaned up fork on knot after rollback", "attempt", attempt+1)
1572 return
1573 }
1574 l.Error("exhausted retries for knot cleanup, fork may be orphaned",
1575 "did", user.Did, "fork", forkName, "knot", targetKnot)
1576 }()
1577 }
1578
1579 atpClient, err := rp.oauth.AuthorizedClient(r)
1580 if err != nil {
1581 l.Error("failed to create xrpcclient", "err", err)
1582 cleanupKnot()
1583 rp.pages.Notice(w, "repo", "Failed to fork repository.")
1584 return
1585 }
1586
1587 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1588 Collection: tangled.RepoNSID,
1589 Repo: user.Did,
1590 Rkey: rkey,
1591 Record: &lexutil.LexiconTypeDecoder{
1592 Val: &record,
1593 },
1594 })
1595 if err != nil {
1596 l.Error("failed to write to PDS", "err", err)
1597 cleanupKnot()
1598 rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
1599 return
1600 }
1601
1602 aturi := atresp.Uri
1603 l = l.With("aturi", aturi)
1604 l.Info("wrote to PDS")
1605
1606 tx, err := rp.db.BeginTx(r.Context(), nil)
1607 if err != nil {
1608 l.Info("txn failed", "err", err)
1609 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1610 return
1611 }
1612
1613 rollback := func() {
1614 err1 := tx.Rollback()
1615 err2 := rp.enforcer.E.LoadPolicy()
1616 err3 := rollbackRecord(context.Background(), aturi, atpClient)
1617
1618 if errors.Is(err1, sql.ErrTxDone) {
1619 err1 = nil
1620 }
1621
1622 if errs := errors.Join(err1, err2, err3); errs != nil {
1623 l.Error("failed to rollback changes", "errs", errs)
1624 }
1625
1626 if aturi != "" {
1627 cleanupKnot()
1628 }
1629 }
1630 defer rollback()
1631
1632 err = db.AddRepo(tx, repo)
1633 if err != nil {
1634 l.Error("failed to AddRepo", "err", err)
1635 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1636 return
1637 }
1638
1639 rbacPath := repo.RepoIdentifier()
1640 err = rp.enforcer.AddRepo(user.Did, targetKnot, rbacPath)
1641 if err != nil {
1642 l.Error("failed to add ACLs", "err", err)
1643 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
1644 return
1645 }
1646
1647 err = tx.Commit()
1648 if err != nil {
1649 l.Error("failed to commit changes", "err", err)
1650 http.Error(w, err.Error(), http.StatusInternalServerError)
1651 return
1652 }
1653
1654 err = rp.enforcer.E.SavePolicy()
1655 if err != nil {
1656 l.Error("failed to update ACLs", "err", err)
1657 http.Error(w, err.Error(), http.StatusInternalServerError)
1658 return
1659 }
1660
1661 aturi = ""
1662
1663 rp.notifier.NewRepo(r.Context(), repo)
1664 if repoDid != "" {
1665 rp.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
1666 } else {
1667 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, forkName))
1668 }
1669 }
1670}
1671
1672func (rp *Repo) Stars(w http.ResponseWriter, r *http.Request) {
1673 l := rp.logger.With("handler", "Stars")
1674
1675 user := rp.oauth.GetMultiAccountUser(r)
1676 f, err := rp.repoResolver.Resolve(r)
1677 if err != nil {
1678 l.Error("failed to resolve source repo", "err", err)
1679 return
1680 }
1681
1682 page := pagination.FromContext(r.Context())
1683 if page.Limit > 30 || page.Limit <= 0 {
1684 page.Limit = 30
1685 }
1686
1687 starrers, err := db.GetStars(rp.db, string(f.RepoDid), page)
1688 if err != nil {
1689 l.Error("failed to fetch starrers", "err", err, "repoDid", f.RepoDid)
1690 return
1691 }
1692
1693 totalCount, err := db.GetStarCount(rp.db, models.StarSubjectRepo, string(f.RepoDid))
1694 if err != nil {
1695 l.Error("failed to fetch star count", "err", err, "repoDid", f.RepoDid)
1696 return
1697 }
1698
1699 rp.pages.RepoStars(w, pages.RepoStarsParams{
1700 LoggedInUser: user,
1701 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1702 Starrers: starrers,
1703 Page: page,
1704 TotalCount: totalCount,
1705 })
1706}
1707
1708func (rp *Repo) Forks(w http.ResponseWriter, r *http.Request) {
1709 l := rp.logger.With("handler", "Forks")
1710
1711 user := rp.oauth.GetMultiAccountUser(r)
1712 f, err := rp.repoResolver.Resolve(r)
1713 if err != nil {
1714 l.Error("failed to resolve source repo", "err", err)
1715 return
1716 }
1717
1718 var forks []models.Repo
1719 totalCount := 0
1720 page := pagination.FromContext(r.Context())
1721 if f.RepoDid != "" {
1722 forks, err = db.GetReposPaginated(rp.db, page, orm.FilterEq("source", f.RepoDid))
1723 if err != nil {
1724 l.Error("failed to fetch forks", "err", err, "repoAt", f.RepoAt())
1725 return
1726 }
1727
1728 totalCount, err = db.GetForkCount(rp.db, f.RepoDid)
1729 if err != nil {
1730 l.Error("failed to fetch fork count", "err", err, "repoAt", f.RepoAt())
1731 return
1732 }
1733 }
1734
1735 err = rp.pages.RepoForks(w, pages.RepoForksParams{
1736 LoggedInUser: user,
1737 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1738 Forks: forks,
1739 Page: page,
1740 TotalCount: totalCount,
1741 })
1742 if err != nil {
1743 l.Error("failed to render page", "err", err)
1744 }
1745}
1746
1747// this is used to rollback changes made to the PDS
1748//
1749// it is a no-op if the provided ATURI is empty
1750func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
1751 if aturi == "" {
1752 return nil
1753 }
1754
1755 parsed := syntax.ATURI(aturi)
1756
1757 collection := parsed.Collection().String()
1758 repo := parsed.Authority().String()
1759 rkey := parsed.RecordKey().String()
1760
1761 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
1762 Collection: collection,
1763 Repo: repo,
1764 Rkey: rkey,
1765 })
1766 return err
1767}
1768
1769func repoCollaboratorRecord(f *models.Repo, subject string, createdAt time.Time) *tangled.RepoCollaborator {
1770 return &tangled.RepoCollaborator{
1771 Subject: subject,
1772 CreatedAt: createdAt.Format(time.RFC3339),
1773 Repo: f.RepoDid,
1774 }
1775}