Monorepo for Tangled
tangled.org
1package repo
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "slices"
12 "strings"
13 "time"
14
15 "tangled.org/core/appview/cloudflare"
16
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/compat113"
19 "tangled.org/core/appview/config"
20 "tangled.org/core/appview/db"
21 "tangled.org/core/appview/models"
22 "tangled.org/core/appview/notify"
23 "tangled.org/core/appview/oauth"
24 "tangled.org/core/appview/pages"
25 "tangled.org/core/appview/pagination"
26 "tangled.org/core/appview/reporesolver"
27 "tangled.org/core/appview/sites"
28 "tangled.org/core/appview/validator"
29 xrpcclient "tangled.org/core/appview/xrpcclient"
30 "tangled.org/core/eventconsumer"
31 "tangled.org/core/idresolver"
32 "tangled.org/core/ogre"
33 "tangled.org/core/orm"
34 "tangled.org/core/rbac"
35 "tangled.org/core/tid"
36 "tangled.org/core/xrpc/serviceauth"
37
38 comatproto "github.com/bluesky-social/indigo/api/atproto"
39 "github.com/bluesky-social/indigo/atproto/atclient"
40 "github.com/bluesky-social/indigo/atproto/syntax"
41 lexutil "github.com/bluesky-social/indigo/lex/util"
42
43 "github.com/go-chi/chi/v5"
44)
45
46type Repo struct {
47 repoResolver *reporesolver.RepoResolver
48 idResolver *idresolver.Resolver
49 config *config.Config
50 oauth *oauth.OAuth
51 pages *pages.Pages
52 spindlestream *eventconsumer.Consumer
53 db *db.DB
54 enforcer *rbac.Enforcer
55 notifier notify.Notifier
56 logger *slog.Logger
57 serviceAuth *serviceauth.ServiceAuth
58 validator *validator.Validator
59 cfClient *cloudflare.Client
60 ogreClient *ogre.Client
61}
62
63func New(
64 oauth *oauth.OAuth,
65 repoResolver *reporesolver.RepoResolver,
66 pages *pages.Pages,
67 spindlestream *eventconsumer.Consumer,
68 idResolver *idresolver.Resolver,
69 db *db.DB,
70 config *config.Config,
71 notifier notify.Notifier,
72 enforcer *rbac.Enforcer,
73 logger *slog.Logger,
74 validator *validator.Validator,
75 cfClient *cloudflare.Client,
76) *Repo {
77 return &Repo{
78 oauth: oauth,
79 repoResolver: repoResolver,
80 pages: pages,
81 idResolver: idResolver,
82 config: config,
83 spindlestream: spindlestream,
84 db: db,
85 notifier: notifier,
86 enforcer: enforcer,
87 logger: logger,
88 validator: validator,
89 cfClient: cfClient,
90 ogreClient: ogre.NewClient(config.Ogre.Host),
91 }
92}
93
94// modify the spindle configured for this repo
95func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) {
96 user := rp.oauth.GetMultiAccountUser(r)
97 l := rp.logger.With("handler", "EditSpindle")
98 l = l.With("did", user.Did)
99
100 errorId := "operation-error"
101 fail := func(msg string, err error) {
102 l.Error(msg, "err", err)
103 rp.pages.Notice(w, errorId, msg)
104 }
105
106 f, err := rp.repoResolver.Resolve(r)
107 if err != nil {
108 fail("Failed to resolve repo. Try again later", err)
109 return
110 }
111
112 newSpindle := r.FormValue("spindle")
113 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value
114 client, err := rp.oauth.AuthorizedClient(r)
115 if err != nil {
116 fail("Failed to authorize. Try again later.", err)
117 return
118 }
119
120 if !removingSpindle {
121 // ensure that this is a valid spindle for this user
122 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Did)
123 if err != nil {
124 fail("Failed to find spindles. Try again later.", err)
125 return
126 }
127
128 if !slices.Contains(validSpindles, newSpindle) {
129 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles))
130 return
131 }
132 }
133
134 newRepo := *f
135 newRepo.Spindle = newSpindle
136 record := newRepo.AsRecord()
137
138 spindlePtr := &newSpindle
139 if removingSpindle {
140 spindlePtr = nil
141 newRepo.Spindle = ""
142 }
143
144 // optimistic update
145 err = db.UpdateSpindle(rp.db, newRepo.RepoDid, spindlePtr)
146 if err != nil {
147 fail("Failed to update spindle. Try again later.", err)
148 return
149 }
150
151 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
152 if err != nil {
153 fail("Failed to update spindle, no record found on PDS.", err)
154 return
155 }
156 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
157 Collection: tangled.RepoNSID,
158 Repo: newRepo.Did,
159 Rkey: newRepo.Rkey,
160 SwapRecord: ex.Cid,
161 Record: &lexutil.LexiconTypeDecoder{
162 Val: &record,
163 },
164 })
165
166 if err != nil {
167 fail("Failed to update spindle, unable to save to PDS.", err)
168 return
169 }
170
171 oldSpindle := f.Spindle
172 if oldSpindle != "" && oldSpindle != newSpindle {
173 remaining, qErr := db.GetRepos(rp.db, orm.FilterEq("spindle", oldSpindle))
174 if qErr != nil {
175 l.Warn("failed to count repos using old spindle", "err", qErr)
176 } else if len(remaining) == 0 {
177 rp.spindlestream.RemoveSource(eventconsumer.NewSpindleSource(oldSpindle))
178 }
179 }
180
181 if !removingSpindle {
182 rp.spindlestream.AddSource(
183 context.Background(),
184 eventconsumer.NewSpindleSource(newSpindle),
185 )
186 }
187
188 rp.pages.HxRefresh(w)
189}
190
191func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) {
192 user := rp.oauth.GetMultiAccountUser(r)
193 l := rp.logger.With("handler", "AddLabel")
194 l = l.With("did", user.Did)
195
196 f, err := rp.repoResolver.Resolve(r)
197 if err != nil {
198 l.Error("failed to get repo and knot", "err", err)
199 return
200 }
201
202 errorId := "add-label-error"
203 fail := func(msg string, err error) {
204 l.Error(msg, "err", err)
205 rp.pages.Notice(w, errorId, msg)
206 }
207
208 // get form values for label definition
209 name := r.FormValue("name")
210 concreteType := r.FormValue("valueType")
211 valueFormat := r.FormValue("valueFormat")
212 enumValues := r.FormValue("enumValues")
213 scope := r.Form["scope"]
214 color := r.FormValue("color")
215 multiple := r.FormValue("multiple") == "true"
216
217 var variants []string
218 for part := range strings.SplitSeq(enumValues, ",") {
219 if part = strings.TrimSpace(part); part != "" {
220 variants = append(variants, part)
221 }
222 }
223
224 if concreteType == "" {
225 concreteType = "null"
226 }
227
228 format := models.ValueTypeFormatAny
229 if valueFormat == "did" {
230 format = models.ValueTypeFormatDid
231 }
232
233 valueType := models.ValueType{
234 Type: models.ConcreteType(concreteType),
235 Format: format,
236 Enum: variants,
237 }
238
239 label := models.LabelDefinition{
240 Did: user.Did,
241 Rkey: tid.TID(),
242 Name: name,
243 ValueType: valueType,
244 Scope: scope,
245 Color: &color,
246 Multiple: multiple,
247 Created: time.Now(),
248 }
249 if err := rp.validator.ValidateLabelDefinition(&label); err != nil {
250 fail(err.Error(), err)
251 return
252 }
253
254 // announce this relation into the firehose, store into owners' pds
255 client, err := rp.oauth.AuthorizedClient(r)
256 if err != nil {
257 fail(err.Error(), err)
258 return
259 }
260
261 // emit a labelRecord
262 labelRecord := label.AsRecord()
263 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
264 Collection: tangled.LabelDefinitionNSID,
265 Repo: label.Did,
266 Rkey: label.Rkey,
267 Record: &lexutil.LexiconTypeDecoder{
268 Val: &labelRecord,
269 },
270 })
271 // invalid record
272 if err != nil {
273 fail("Failed to write record to PDS.", err)
274 return
275 }
276
277 aturi := resp.Uri
278 l = l.With("at-uri", aturi)
279 l.Info("wrote label record to PDS")
280
281 // update the repo to subscribe to this label
282 newRepo := *f
283 newRepo.Labels = append(newRepo.Labels, aturi)
284 repoRecord := newRepo.AsRecord()
285
286 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
287 if err != nil {
288 fail("Failed to update labels, no record found on PDS.", err)
289 return
290 }
291 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
292 Collection: tangled.RepoNSID,
293 Repo: newRepo.Did,
294 Rkey: newRepo.Rkey,
295 SwapRecord: ex.Cid,
296 Record: &lexutil.LexiconTypeDecoder{
297 Val: &repoRecord,
298 },
299 })
300 if err != nil {
301 fail("Failed to update labels for repo.", err)
302 return
303 }
304
305 tx, err := rp.db.BeginTx(r.Context(), nil)
306 if err != nil {
307 fail("Failed to add label.", err)
308 return
309 }
310
311 rollback := func() {
312 err1 := tx.Rollback()
313 err2 := rollbackRecord(context.Background(), aturi, client)
314
315 // ignore txn complete errors, this is okay
316 if errors.Is(err1, sql.ErrTxDone) {
317 err1 = nil
318 }
319
320 if errs := errors.Join(err1, err2); errs != nil {
321 l.Error("failed to rollback changes", "errs", errs)
322 return
323 }
324 }
325 defer rollback()
326
327 _, err = db.AddLabelDefinition(tx, &label)
328 if err != nil {
329 fail("Failed to add label.", err)
330 return
331 }
332
333 if err = db.SubscribeLabel(tx, &models.RepoLabel{
334 RepoDid: syntax.DID(f.RepoDid),
335 LabelAt: label.AtUri(),
336 }); err != nil {
337 fail("Failed to subscribe to label.", err)
338 return
339 }
340
341 err = tx.Commit()
342 if err != nil {
343 fail("Failed to add label.", err)
344 return
345 }
346
347 // clear aturi when everything is successful
348 aturi = ""
349
350 rp.pages.HxRefresh(w)
351}
352
353func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) {
354 user := rp.oauth.GetMultiAccountUser(r)
355 l := rp.logger.With("handler", "DeleteLabel")
356 l = l.With("did", user.Did)
357
358 f, err := rp.repoResolver.Resolve(r)
359 if err != nil {
360 l.Error("failed to get repo and knot", "err", err)
361 return
362 }
363
364 errorId := "label-operation"
365 fail := func(msg string, err error) {
366 l.Error(msg, "err", err)
367 rp.pages.Notice(w, errorId, msg)
368 }
369
370 // get form values
371 labelId := r.FormValue("label-id")
372
373 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId))
374 if err != nil {
375 fail("Failed to find label definition.", err)
376 return
377 }
378
379 client, err := rp.oauth.AuthorizedClient(r)
380 if err != nil {
381 fail(err.Error(), err)
382 return
383 }
384
385 // delete label record from PDS
386 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
387 Collection: tangled.LabelDefinitionNSID,
388 Repo: label.Did,
389 Rkey: label.Rkey,
390 })
391 if err != nil {
392 fail("Failed to delete label record from PDS.", err)
393 return
394 }
395
396 // update repo record to remove the label reference
397 newRepo := *f
398 var updated []string
399 removedAt := label.AtUri().String()
400 for _, l := range newRepo.Labels {
401 if l != removedAt {
402 updated = append(updated, l)
403 }
404 }
405 newRepo.Labels = updated
406 repoRecord := newRepo.AsRecord()
407
408 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
409 if err != nil {
410 fail("Failed to update labels, no record found on PDS.", err)
411 return
412 }
413 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
414 Collection: tangled.RepoNSID,
415 Repo: newRepo.Did,
416 Rkey: newRepo.Rkey,
417 SwapRecord: ex.Cid,
418 Record: &lexutil.LexiconTypeDecoder{
419 Val: &repoRecord,
420 },
421 })
422 if err != nil {
423 fail("Failed to update repo record.", err)
424 return
425 }
426
427 // transaction for DB changes
428 tx, err := rp.db.BeginTx(r.Context(), nil)
429 if err != nil {
430 fail("Failed to delete label.", err)
431 return
432 }
433 defer tx.Rollback()
434
435 err = db.UnsubscribeLabel(
436 tx,
437 orm.FilterEq("repo_did", f.RepoDid),
438 orm.FilterEq("label_at", removedAt),
439 )
440 if err != nil {
441 fail("Failed to unsubscribe label.", err)
442 return
443 }
444
445 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id))
446 if err != nil {
447 fail("Failed to delete label definition.", err)
448 return
449 }
450
451 err = tx.Commit()
452 if err != nil {
453 fail("Failed to delete label.", err)
454 return
455 }
456
457 // everything succeeded
458 rp.pages.HxRefresh(w)
459}
460
461func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) {
462 user := rp.oauth.GetMultiAccountUser(r)
463 l := rp.logger.With("handler", "SubscribeLabel")
464 l = l.With("did", user.Did)
465
466 f, err := rp.repoResolver.Resolve(r)
467 if err != nil {
468 l.Error("failed to get repo and knot", "err", err)
469 return
470 }
471
472 if err := r.ParseForm(); err != nil {
473 l.Error("invalid form", "err", err)
474 return
475 }
476
477 errorId := "default-label-operation"
478 fail := func(msg string, err error) {
479 l.Error(msg, "err", err)
480 rp.pages.Notice(w, errorId, msg)
481 }
482
483 labelAts := r.Form["label"]
484 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
485 if err != nil {
486 fail("Failed to subscribe to label.", err)
487 return
488 }
489
490 newRepo := *f
491 newRepo.Labels = append(newRepo.Labels, labelAts...)
492
493 // dedup
494 slices.Sort(newRepo.Labels)
495 newRepo.Labels = slices.Compact(newRepo.Labels)
496
497 repoRecord := newRepo.AsRecord()
498
499 client, err := rp.oauth.AuthorizedClient(r)
500 if err != nil {
501 fail(err.Error(), err)
502 return
503 }
504
505 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
506 if err != nil {
507 fail("Failed to update labels, no record found on PDS.", err)
508 return
509 }
510 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
511 Collection: tangled.RepoNSID,
512 Repo: newRepo.Did,
513 Rkey: newRepo.Rkey,
514 SwapRecord: ex.Cid,
515 Record: &lexutil.LexiconTypeDecoder{
516 Val: &repoRecord,
517 },
518 })
519
520 tx, err := rp.db.Begin()
521 if err != nil {
522 fail("Failed to subscribe to label.", err)
523 return
524 }
525 defer tx.Rollback()
526
527 for _, l := range labelAts {
528 err = db.SubscribeLabel(tx, &models.RepoLabel{
529 RepoDid: syntax.DID(f.RepoDid),
530 LabelAt: syntax.ATURI(l),
531 })
532 if err != nil {
533 fail("Failed to subscribe to label.", err)
534 return
535 }
536 }
537
538 if err := tx.Commit(); err != nil {
539 fail("Failed to subscribe to label.", err)
540 return
541 }
542
543 // everything succeeded
544 rp.pages.HxRefresh(w)
545}
546
547func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) {
548 user := rp.oauth.GetMultiAccountUser(r)
549 l := rp.logger.With("handler", "UnsubscribeLabel")
550 l = l.With("did", user.Did)
551
552 f, err := rp.repoResolver.Resolve(r)
553 if err != nil {
554 l.Error("failed to get repo and knot", "err", err)
555 return
556 }
557
558 if err := r.ParseForm(); err != nil {
559 l.Error("invalid form", "err", err)
560 return
561 }
562
563 errorId := "default-label-operation"
564 fail := func(msg string, err error) {
565 l.Error(msg, "err", err)
566 rp.pages.Notice(w, errorId, msg)
567 }
568
569 labelAts := r.Form["label"]
570 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
571 if err != nil {
572 fail("Failed to unsubscribe to label.", err)
573 return
574 }
575
576 // update repo record to remove the label reference
577 newRepo := *f
578 var updated []string
579 for _, l := range newRepo.Labels {
580 if !slices.Contains(labelAts, l) {
581 updated = append(updated, l)
582 }
583 }
584 newRepo.Labels = updated
585 repoRecord := newRepo.AsRecord()
586
587 client, err := rp.oauth.AuthorizedClient(r)
588 if err != nil {
589 fail(err.Error(), err)
590 return
591 }
592
593 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
594 if err != nil {
595 fail("Failed to update labels, no record found on PDS.", err)
596 return
597 }
598 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
599 Collection: tangled.RepoNSID,
600 Repo: newRepo.Did,
601 Rkey: newRepo.Rkey,
602 SwapRecord: ex.Cid,
603 Record: &lexutil.LexiconTypeDecoder{
604 Val: &repoRecord,
605 },
606 })
607
608 err = db.UnsubscribeLabel(
609 rp.db,
610 orm.FilterEq("repo_did", f.RepoDid),
611 orm.FilterIn("label_at", labelAts),
612 )
613 if err != nil {
614 fail("Failed to unsubscribe label.", err)
615 return
616 }
617
618 // everything succeeded
619 rp.pages.HxRefresh(w)
620}
621
622func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) {
623 l := rp.logger.With("handler", "LabelPanel")
624
625 f, err := rp.repoResolver.Resolve(r)
626 if err != nil {
627 l.Error("failed to get repo and knot", "err", err)
628 return
629 }
630
631 subjectStr := r.FormValue("subject")
632 subject, err := syntax.ParseATURI(subjectStr)
633 if err != nil {
634 l.Error("failed to get repo and knot", "err", err)
635 return
636 }
637
638 labelDefs, err := db.GetLabelDefinitions(
639 rp.db,
640 orm.FilterIn("at_uri", f.Labels),
641 orm.FilterContains("scope", subject.Collection().String()),
642 )
643 if err != nil {
644 l.Error("failed to fetch label defs", "err", err)
645 return
646 }
647
648 defs := make(map[string]*models.LabelDefinition)
649 for _, l := range labelDefs {
650 defs[l.AtUri().String()] = &l
651 }
652
653 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
654 if err != nil {
655 l.Error("failed to build label state", "err", err)
656 return
657 }
658 state := states[subject]
659
660 user := rp.oauth.GetMultiAccountUser(r)
661 rp.pages.LabelPanel(w, pages.LabelPanelParams{
662 LoggedInUser: user,
663 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
664 Defs: defs,
665 Subject: subject.String(),
666 State: state,
667 })
668}
669
670func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) {
671 l := rp.logger.With("handler", "EditLabelPanel")
672
673 f, err := rp.repoResolver.Resolve(r)
674 if err != nil {
675 l.Error("failed to get repo and knot", "err", err)
676 return
677 }
678
679 subjectStr := r.FormValue("subject")
680 subject, err := syntax.ParseATURI(subjectStr)
681 if err != nil {
682 l.Error("failed to get repo and knot", "err", err)
683 return
684 }
685
686 labelDefs, err := db.GetLabelDefinitions(
687 rp.db,
688 orm.FilterIn("at_uri", f.Labels),
689 orm.FilterContains("scope", subject.Collection().String()),
690 )
691 if err != nil {
692 l.Error("failed to fetch labels", "err", err)
693 return
694 }
695
696 defs := make(map[string]*models.LabelDefinition)
697 for _, l := range labelDefs {
698 defs[l.AtUri().String()] = &l
699 }
700
701 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
702 if err != nil {
703 l.Error("failed to build label state", "err", err)
704 return
705 }
706 state := states[subject]
707
708 user := rp.oauth.GetMultiAccountUser(r)
709 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{
710 LoggedInUser: user,
711 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
712 Defs: defs,
713 Subject: subject.String(),
714 State: state,
715 })
716}
717
718func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) {
719 user := rp.oauth.GetMultiAccountUser(r)
720 l := rp.logger.With("handler", "AddCollaborator")
721 l = l.With("did", user.Did)
722
723 f, err := rp.repoResolver.Resolve(r)
724 if err != nil {
725 l.Error("failed to get repo and knot", "err", err)
726 return
727 }
728
729 errorId := "add-collaborator-error"
730 fail := func(msg string, err error) {
731 l.Error(msg, "err", err)
732 rp.pages.Notice(w, errorId, msg)
733 }
734
735 collaborator := r.FormValue("collaborator")
736 if collaborator == "" {
737 fail("Invalid form.", nil)
738 return
739 }
740
741 // remove a single leading `@`, to make @handle work with ResolveIdent
742 collaborator = strings.TrimPrefix(collaborator, "@")
743
744 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
745 if err != nil {
746 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
747 return
748 }
749
750 if collaboratorIdent.DID.String() == user.Did {
751 fail("You seem to be adding yourself as a collaborator.", nil)
752 return
753 }
754 l = l.With("collaborator", collaboratorIdent.Handle)
755 l = l.With("knot", f.Knot)
756
757 existing, err := db.GetCollaborators(rp.db,
758 orm.FilterEq("repo_did", f.RepoDid),
759 orm.FilterEq("subject_did", collaboratorIdent.DID.String()),
760 )
761 if err != nil {
762 fail("Failed to check existing collaborators.", err)
763 return
764 }
765 if len(existing) > 0 {
766 fail(fmt.Sprintf("%s is already a collaborator.", collaboratorIdent.Handle), nil)
767 return
768 }
769
770 // announce this relation into the firehose, store into owners' pds
771 client, err := rp.oauth.AuthorizedClient(r)
772 if err != nil {
773 fail("Failed to write to PDS.", err)
774 return
775 }
776
777 // emit a record
778 currentUser := rp.oauth.GetMultiAccountUser(r)
779 rkey := tid.TID()
780 createdAt := time.Now()
781 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
782 Collection: tangled.RepoCollaboratorNSID,
783 Repo: currentUser.Did,
784 Rkey: rkey,
785 Record: compat113.Collaborator(repoCollaboratorRecord(f, collaboratorIdent.DID.String(), createdAt)),
786 })
787 // invalid record
788 if err != nil {
789 fail("Failed to write record to PDS.", err)
790 return
791 }
792
793 aturi := resp.Uri
794 l = l.With("at-uri", aturi)
795 l.Info("wrote record to PDS")
796
797 tx, err := rp.db.BeginTx(r.Context(), nil)
798 if err != nil {
799 fail("Failed to add collaborator.", err)
800 return
801 }
802
803 rollback := func() {
804 err1 := tx.Rollback()
805 err2 := rp.enforcer.E.LoadPolicy()
806 err3 := rollbackRecord(context.Background(), aturi, client)
807
808 // ignore txn complete errors, this is okay
809 if errors.Is(err1, sql.ErrTxDone) {
810 err1 = nil
811 }
812
813 if errs := errors.Join(err1, err2, err3); errs != nil {
814 l.Error("failed to rollback changes", "errs", errs)
815 return
816 }
817 }
818 defer rollback()
819
820 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier())
821 if err != nil {
822 fail("Failed to add collaborator permissions.", err)
823 return
824 }
825
826 err = db.AddCollaborator(tx, models.Collaborator{
827 Did: syntax.DID(currentUser.Did),
828 Rkey: sql.NullString{String: rkey, Valid: true},
829 SubjectDid: collaboratorIdent.DID,
830 RepoDid: syntax.DID(f.RepoDid),
831 Created: createdAt,
832 })
833 if err != nil {
834 fail("Failed to add collaborator.", err)
835 return
836 }
837
838 err = tx.Commit()
839 if err != nil {
840 fail("Failed to add collaborator.", err)
841 return
842 }
843
844 err = rp.enforcer.E.SavePolicy()
845 if err != nil {
846 fail("Failed to update collaborator permissions.", err)
847 return
848 }
849
850 // clear aturi to when everything is successful
851 aturi = ""
852
853 rp.pages.HxRefresh(w)
854}
855
856func (rp *Repo) RenameRepo(w http.ResponseWriter, r *http.Request) {
857 l := rp.logger.With("handler", "RenameRepo")
858 noticeId := "rename-repo-error"
859
860 user := rp.oauth.GetMultiAccountUser(r)
861 f, err := rp.repoResolver.Resolve(r)
862 if err != nil {
863 l.Error("failed to get repo and knot", "err", err)
864 rp.pages.Notice(w, noticeId, "Failed to load repository.")
865 return
866 }
867 l = l.With("did", user.Did, "rkey", f.Rkey, "oldName", f.Name)
868
869 if f.RepoDid == "" {
870 rp.pages.Notice(w, noticeId, "This repository's knot has not completed the DID migration; rename is unavailable.")
871 return
872 }
873
874 if !compat113.KnotSupports114(r.Context(), f.Knot, rp.config.Core.Dev) {
875 rp.pages.Notice(w, noticeId, "This repository's knot is below v1.14 and does not yet support renames. Ask the knot operator to upgrade.")
876 return
877 }
878
879 newName, err := validateRenameInput(f.Name, f.Rkey, r.FormValue("name"))
880 if err != nil {
881 rp.pages.Notice(w, noticeId, err.Error())
882 return
883 }
884 newRkey := strings.ToLower(newName)
885 l = l.With("newName", newName, "newRkey", newRkey)
886
887 atpClient, err := rp.oauth.AuthorizedClient(r)
888 if err != nil {
889 l.Error("failed to get authorized client", "err", err)
890 rp.pages.Notice(w, noticeId, "Failed to authorize. Try again later.")
891 return
892 }
893
894 newRepo := *f
895 newRepo.Name = newName
896 newRepo.Rkey = newRkey
897 newRepo.Created = time.Now()
898 record := newRepo.AsRecord()
899
900 if newRkey == f.Rkey {
901 ex, err := comatproto.RepoGetRecord(r.Context(), atpClient, "", tangled.RepoNSID, f.Did, f.Rkey)
902 if err != nil {
903 l.Error("failed to fetch existing record", "err", err)
904 rp.pages.Notice(w, noticeId, "Failed to read repository record from PDS.")
905 return
906 }
907
908 _, err = comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
909 Collection: tangled.RepoNSID,
910 Repo: f.Did,
911 Rkey: f.Rkey,
912 SwapRecord: ex.Cid,
913 Record: &lexutil.LexiconTypeDecoder{
914 Val: &record,
915 },
916 })
917 if err != nil {
918 l.Error("failed to update display name on PDS", "err", err)
919 rp.pages.Notice(w, noticeId, "Failed to save display name to PDS.")
920 return
921 }
922 l.Info("updated display name on PDS")
923
924 if err := db.UpdateRepoDisplayName(rp.db, f.Did, f.Rkey, newName); err != nil {
925 l.Error("optimistic display name update failed", "err", err)
926 }
927 } else {
928 ex, getErr := comatproto.RepoGetRecord(r.Context(), atpClient, "", tangled.RepoNSID, f.Did, newRkey)
929 switch {
930 case getErr != nil:
931 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
932 Collection: tangled.RepoNSID,
933 Repo: f.Did,
934 Rkey: &newRkey,
935 Record: &lexutil.LexiconTypeDecoder{Val: &record},
936 })
937 if err != nil {
938 l.Error("failed to write rename to PDS", "err", err)
939 rp.pages.Notice(w, noticeId, "Failed to save renamed repository to PDS.")
940 return
941 }
942 l.Info("wrote rename-create to PDS; old record retained as alias")
943
944 default:
945 existing, ok := ex.Value.Val.(*tangled.Repo)
946 if !ok || existing.RepoDid == nil || *existing.RepoDid != f.RepoDid {
947 rp.pages.Notice(w, noticeId, fmt.Sprintf("You already have a repository named %q.", newRkey))
948 return
949 }
950 _, err = comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
951 Collection: tangled.RepoNSID,
952 Repo: f.Did,
953 Rkey: newRkey,
954 SwapRecord: ex.Cid,
955 Record: &lexutil.LexiconTypeDecoder{Val: &record},
956 })
957 if err != nil {
958 l.Error("failed to rewrite rename-back record on PDS", "err", err)
959 rp.pages.Notice(w, noticeId, "Failed to save renamed repository to PDS.")
960 return
961 }
962 l.Info("rewrote rename-back record on PDS over prior alias")
963 }
964
965 tx, err := rp.db.Begin()
966 if err != nil {
967 l.Error("failed to begin rename tx", "err", err)
968 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid))
969 return
970 }
971 defer tx.Rollback()
972
973 if err := db.RenameRepo(tx, f.Did, f.Rkey, newRkey, newName); err != nil {
974 l.Error("optimistic rename failed", "err", err)
975 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid))
976 return
977 }
978 if err := db.RecordRepoRename(tx, f.Did, f.Rkey, f.RepoDid); err != nil {
979 l.Error("failed to record rename history", "err", err)
980 }
981 if err := db.DeleteRepoRename(tx, f.Did, newRkey); err != nil {
982 l.Error("failed to clear stale rename hint", "err", err)
983 }
984 if err := tx.Commit(); err != nil {
985 l.Error("failed to commit rename tx", "err", err)
986 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid))
987 return
988 }
989 }
990
991 oldRepo := *f
992 rp.notifier.RenameRepo(r.Context(), syntax.DID(user.Did), &oldRepo, &newRepo)
993
994 if newRkey != f.Rkey {
995 rp.migrateSiteOnRename(r.Context(), f, newName, newRkey)
996 }
997
998 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid))
999}
1000
1001func validateRenameInput(currentName, currentRkey, raw string) (string, error) {
1002 newName := strings.TrimSpace(raw)
1003 if newName == "" {
1004 return "", errors.New("Repository name cannot be empty.")
1005 }
1006 if err := models.ValidateRepoName(newName); err != nil {
1007 return "", err
1008 }
1009 newName = models.StripGitExt(newName)
1010 if newName == currentName {
1011 if _, tidErr := syntax.ParseTID(currentRkey); tidErr == nil {
1012 return newName, nil
1013 }
1014 return "", errors.New("New name matches the current name.")
1015 }
1016 return newName, nil
1017}
1018
1019func (rp *Repo) migrateSiteOnRename(ctx context.Context, oldRepo *models.Repo, newName, newRkey string) {
1020 l := rp.logger.With("handler", "migrateSiteOnRename", "repo_did", oldRepo.RepoDid)
1021
1022 siteConfig, err := db.GetRepoSiteConfig(rp.db, oldRepo.RepoDid)
1023 if err != nil || siteConfig == nil {
1024 return
1025 }
1026
1027 if !rp.cfClient.Enabled() {
1028 return
1029 }
1030
1031 ownerClaim, _ := db.GetActiveDomainClaimForDid(rp.db, oldRepo.Did)
1032
1033 go func() {
1034 bgCtx := context.Background()
1035 oldRkey := oldRepo.Rkey
1036 oldName := oldRepo.Name
1037
1038 if err := sites.Delete(bgCtx, rp.cfClient, oldRepo.Did, oldRkey); err != nil {
1039 l.Error("sites: failed to delete old R2 prefix", "oldRkey", oldRkey, "err", err)
1040 }
1041
1042 newRepo := *oldRepo
1043 newRepo.Name = newName
1044 newRepo.Rkey = newRkey
1045 if deployErr := sites.Deploy(bgCtx, rp.cfClient, rp.config, &newRepo, siteConfig.Branch, siteConfig.Dir); deployErr != nil {
1046 l.Error("sites: redeploy after rename failed", "err", deployErr)
1047 }
1048
1049 if ownerClaim != nil {
1050 // drop the old name's entry when the name actually changed.
1051 if oldName != newName {
1052 if err := sites.DeleteDomainMapping(bgCtx, rp.cfClient, ownerClaim.Domain, oldName); err != nil {
1053 l.Error("sites: failed to remove old KV mapping", "oldName", oldName, "err", err)
1054 }
1055 }
1056 if err := sites.PutDomainMapping(bgCtx, rp.cfClient, ownerClaim.Domain, oldRepo.Did, newName, newRkey, siteConfig.IsIndex); err != nil {
1057 l.Error("sites: failed to write new KV mapping", "newName", newName, "newRkey", newRkey, "err", err)
1058 }
1059 }
1060
1061 l.Info("sites: migrated on rename", "oldName", oldName, "oldRkey", oldRkey, "newName", newName, "newRkey", newRkey)
1062 }()
1063}
1064
1065func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) {
1066 user := rp.oauth.GetMultiAccountUser(r)
1067 l := rp.logger.With("handler", "DeleteRepo")
1068
1069 noticeId := "operation-error"
1070 f, err := rp.repoResolver.Resolve(r)
1071 if err != nil {
1072 l.Error("failed to get repo and knot", "err", err)
1073 return
1074 }
1075
1076 // remove record from pds
1077 atpClient, err := rp.oauth.AuthorizedClient(r)
1078 if err != nil {
1079 l.Error("failed to get authorized client", "err", err)
1080 return
1081 }
1082 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{
1083 Collection: tangled.RepoNSID,
1084 Repo: user.Did,
1085 Rkey: f.Rkey,
1086 })
1087 if err != nil {
1088 l.Error("failed to delete record", "err", err)
1089 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.")
1090 return
1091 }
1092 l.Info("removed repo record", "aturi", f.RepoAt().String())
1093
1094 client, err := rp.oauth.ServiceClient(
1095 r,
1096 oauth.WithService(f.Knot),
1097 oauth.WithLxm(tangled.RepoDeleteNSID),
1098 oauth.WithDev(rp.config.Core.Dev),
1099 )
1100 if err != nil {
1101 l.Error("failed to connect to knot server", "err", err)
1102 return
1103 }
1104
1105 err = tangled.RepoDelete(
1106 r.Context(),
1107 client,
1108 &tangled.RepoDelete_Input{
1109 Did: f.Did,
1110 Name: f.Name,
1111 Rkey: f.Rkey,
1112 },
1113 )
1114 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
1115 l.Error("failed to call XRPC repo.delete", "xrpcerr", xrpcerr, "err", err)
1116 rp.pages.Notice(w, noticeId, xrpcerr.Error())
1117 return
1118 }
1119 l.Info("deleted repo from knot")
1120
1121 tx, err := rp.db.BeginTx(r.Context(), nil)
1122 if err != nil {
1123 l.Error("failed to start tx")
1124 w.Write(fmt.Append(nil, "failed to add collaborator: ", err))
1125 return
1126 }
1127 defer func() {
1128 tx.Rollback()
1129 err = rp.enforcer.E.LoadPolicy()
1130 if err != nil {
1131 l.Error("failed to rollback policies")
1132 }
1133 }()
1134
1135 // remove collaborator RBAC
1136 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.RepoIdentifier(), f.Knot)
1137 if err != nil {
1138 rp.pages.Notice(w, noticeId, "Failed to remove collaborators")
1139 return
1140 }
1141 for _, c := range repoCollaborators {
1142 did := c[0]
1143 rp.enforcer.RemoveCollaborator(did, f.Knot, f.RepoIdentifier())
1144 }
1145 l.Info("removed collaborators")
1146
1147 // remove repo RBAC
1148 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.RepoIdentifier())
1149 if err != nil {
1150 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules")
1151 return
1152 }
1153
1154 // remove repo from db
1155 err = db.RemoveRepo(tx, f.Did, f.Rkey)
1156 if err != nil {
1157 rp.pages.Notice(w, noticeId, "Failed to update appview")
1158 return
1159 }
1160 l.Info("removed repo from db")
1161
1162 err = tx.Commit()
1163 if err != nil {
1164 l.Error("failed to commit changes", "err", err)
1165 http.Error(w, err.Error(), http.StatusInternalServerError)
1166 return
1167 }
1168
1169 err = rp.enforcer.E.SavePolicy()
1170 if err != nil {
1171 l.Error("failed to update ACLs", "err", err)
1172 http.Error(w, err.Error(), http.StatusInternalServerError)
1173 return
1174 }
1175
1176 rp.notifier.DeleteRepo(r.Context(), f)
1177 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did))
1178}
1179
1180func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) {
1181 l := rp.logger.With("handler", "SyncRepoFork")
1182
1183 ref := chi.URLParam(r, "ref")
1184 ref, _ = url.PathUnescape(ref)
1185
1186 user := rp.oauth.GetMultiAccountUser(r)
1187 f, err := rp.repoResolver.Resolve(r)
1188 if err != nil {
1189 l.Error("failed to resolve source repo", "err", err)
1190 return
1191 }
1192
1193 switch r.Method {
1194 case http.MethodPost:
1195 client, err := rp.oauth.ServiceClient(
1196 r,
1197 oauth.WithService(f.Knot),
1198 oauth.WithLxm(tangled.RepoForkSyncNSID),
1199 oauth.WithDev(rp.config.Core.Dev),
1200 )
1201 if err != nil {
1202 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1203 return
1204 }
1205
1206 if f.Source == "" {
1207 rp.pages.Notice(w, "repo", "This repository is not a fork.")
1208 return
1209 }
1210
1211 err = tangled.RepoForkSync(
1212 r.Context(),
1213 client,
1214 &tangled.RepoForkSync_Input{
1215 Did: user.Did,
1216 Name: f.Name,
1217 Source: f.Source,
1218 Branch: ref,
1219 },
1220 )
1221 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
1222 l.Error("failed to call XRPC repo.forkSync", "xrpcerr", xrpcerr, "err", err)
1223 rp.pages.Notice(w, "repo", err.Error())
1224 return
1225 }
1226
1227 rp.pages.HxRefresh(w)
1228 return
1229 }
1230}
1231
1232func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) {
1233 l := rp.logger.With("handler", "ForkRepo")
1234
1235 user := rp.oauth.GetMultiAccountUser(r)
1236 f, err := rp.repoResolver.Resolve(r)
1237 if err != nil {
1238 l.Error("failed to resolve source repo", "err", err)
1239 return
1240 }
1241
1242 switch r.Method {
1243 case http.MethodGet:
1244 user := rp.oauth.GetMultiAccountUser(r)
1245 knots, err := rp.enforcer.GetKnotsForUser(user.Did)
1246 if err != nil {
1247 rp.pages.Notice(w, "repo", "Invalid user account.")
1248 return
1249 }
1250
1251 rp.pages.ForkRepo(w, pages.ForkRepoParams{
1252 LoggedInUser: user,
1253 Knots: knots,
1254 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1255 })
1256
1257 case http.MethodPost:
1258 l := rp.logger.With("handler", "ForkRepo")
1259
1260 targetKnot := r.FormValue("knot")
1261 if targetKnot == "" {
1262 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
1263 return
1264 }
1265 l = l.With("targetKnot", targetKnot)
1266
1267 ok, err := rp.enforcer.E.Enforce(user.Did, targetKnot, targetKnot, "repo:create")
1268 if err != nil || !ok {
1269 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
1270 return
1271 }
1272
1273 // choose a name for a fork
1274 forkName := strings.ToLower(r.FormValue("repo_name"))
1275 if forkName == "" {
1276 rp.pages.Notice(w, "repo", "Repository name cannot be empty.")
1277 return
1278 }
1279
1280 // this check is *only* to see if the forked repo name already exists
1281 // in the user's account.
1282 existingRepo, err := db.GetRepo(
1283 rp.db,
1284 orm.FilterEq("did", user.Did),
1285 orm.FilterEq("name", forkName),
1286 )
1287 if err != nil {
1288 if !errors.Is(err, sql.ErrNoRows) {
1289 l.Error("error fetching existing repo from db", "err", err)
1290 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.")
1291 return
1292 }
1293 } else if existingRepo != nil {
1294 // repo with this name already exists
1295 rp.pages.Notice(w, "repo", "A repository with this name already exists.")
1296 return
1297 }
1298 l = l.With("forkName", forkName)
1299
1300 uri := "https"
1301 if rp.config.Core.Dev {
1302 uri = "http"
1303 }
1304
1305 forkSourceUrl := fmt.Sprintf("%s://%s/%s", uri, f.Knot, f.RepoIdentifier())
1306 l = l.With("cloneUrl", forkSourceUrl)
1307
1308 rkey := strings.ToLower(forkName)
1309
1310 // TODO: this could coordinate better with the knot to receive a clone status
1311 client, err := rp.oauth.ServiceClient(
1312 r,
1313 oauth.WithService(targetKnot),
1314 oauth.WithLxm(tangled.RepoCreateNSID),
1315 oauth.WithDev(rp.config.Core.Dev),
1316 oauth.WithTimeout(time.Second*20),
1317 )
1318 if err != nil {
1319 l.Error("could not create service client", "err", err)
1320 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1321 return
1322 }
1323
1324 forkInput := &tangled.RepoCreate_Input{
1325 Rkey: rkey,
1326 Name: rkey,
1327 Source: &forkSourceUrl,
1328 }
1329 createResp, err := tangled.RepoCreate(
1330 r.Context(),
1331 client,
1332 forkInput,
1333 )
1334 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
1335 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
1336 rp.pages.Notice(w, "repo", xrpcerr.Error())
1337 return
1338 }
1339
1340 var repoDid string
1341 if createResp != nil && createResp.RepoDid != nil {
1342 repoDid = *createResp.RepoDid
1343 }
1344 if repoDid == "" {
1345 l.Error("knot returned empty repo DID for fork")
1346 rp.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
1347 return
1348 }
1349
1350 forkSource := f.RepoAt().String()
1351 if f.RepoDid != "" {
1352 forkSource = f.RepoDid
1353 }
1354
1355 forkDescription := r.Form.Get("description")
1356
1357 repo := &models.Repo{
1358 Did: user.Did,
1359 Name: rkey,
1360 Knot: targetKnot,
1361 Rkey: rkey,
1362 Source: forkSource,
1363 Description: forkDescription,
1364 Created: time.Now(),
1365 Labels: rp.config.Label.DefaultLabelDefs,
1366 RepoDid: repoDid,
1367 }
1368 record := repo.AsRecord()
1369
1370 cleanupKnot := func() {
1371 go func() {
1372 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
1373 for attempt, delay := range delays {
1374 time.Sleep(delay)
1375 deleteClient, dErr := rp.oauth.ServiceClient(
1376 r,
1377 oauth.WithService(targetKnot),
1378 oauth.WithLxm(tangled.RepoDeleteNSID),
1379 oauth.WithDev(rp.config.Core.Dev),
1380 )
1381 if dErr != nil {
1382 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
1383 continue
1384 }
1385 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
1386 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
1387 Did: user.Did,
1388 Name: forkName,
1389 Rkey: rkey,
1390 }); dErr != nil {
1391 cancel()
1392 l.Error("failed to clean up fork on knot after rollback", "attempt", attempt+1, "err", dErr)
1393 continue
1394 }
1395 cancel()
1396 l.Info("successfully cleaned up fork on knot after rollback", "attempt", attempt+1)
1397 return
1398 }
1399 l.Error("exhausted retries for knot cleanup, fork may be orphaned",
1400 "did", user.Did, "fork", forkName, "knot", targetKnot)
1401 }()
1402 }
1403
1404 atpClient, err := rp.oauth.AuthorizedClient(r)
1405 if err != nil {
1406 l.Error("failed to create xrpcclient", "err", err)
1407 cleanupKnot()
1408 rp.pages.Notice(w, "repo", "Failed to fork repository.")
1409 return
1410 }
1411
1412 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1413 Collection: tangled.RepoNSID,
1414 Repo: user.Did,
1415 Rkey: rkey,
1416 Record: &lexutil.LexiconTypeDecoder{
1417 Val: &record,
1418 },
1419 })
1420 if err != nil {
1421 l.Error("failed to write to PDS", "err", err)
1422 cleanupKnot()
1423 rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
1424 return
1425 }
1426
1427 aturi := atresp.Uri
1428 l = l.With("aturi", aturi)
1429 l.Info("wrote to PDS")
1430
1431 tx, err := rp.db.BeginTx(r.Context(), nil)
1432 if err != nil {
1433 l.Info("txn failed", "err", err)
1434 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1435 return
1436 }
1437
1438 rollback := func() {
1439 err1 := tx.Rollback()
1440 err2 := rp.enforcer.E.LoadPolicy()
1441 err3 := rollbackRecord(context.Background(), aturi, atpClient)
1442
1443 if errors.Is(err1, sql.ErrTxDone) {
1444 err1 = nil
1445 }
1446
1447 if errs := errors.Join(err1, err2, err3); errs != nil {
1448 l.Error("failed to rollback changes", "errs", errs)
1449 }
1450
1451 if aturi != "" {
1452 cleanupKnot()
1453 }
1454 }
1455 defer rollback()
1456
1457 err = db.AddRepo(tx, repo)
1458 if err != nil {
1459 l.Error("failed to AddRepo", "err", err)
1460 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1461 return
1462 }
1463
1464 rbacPath := repo.RepoIdentifier()
1465 err = rp.enforcer.AddRepo(user.Did, targetKnot, rbacPath)
1466 if err != nil {
1467 l.Error("failed to add ACLs", "err", err)
1468 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
1469 return
1470 }
1471
1472 err = tx.Commit()
1473 if err != nil {
1474 l.Error("failed to commit changes", "err", err)
1475 http.Error(w, err.Error(), http.StatusInternalServerError)
1476 return
1477 }
1478
1479 err = rp.enforcer.E.SavePolicy()
1480 if err != nil {
1481 l.Error("failed to update ACLs", "err", err)
1482 http.Error(w, err.Error(), http.StatusInternalServerError)
1483 return
1484 }
1485
1486 aturi = ""
1487
1488 rp.notifier.NewRepo(r.Context(), repo)
1489 if repoDid != "" {
1490 rp.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
1491 } else {
1492 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, forkName))
1493 }
1494 }
1495}
1496
1497func (rp *Repo) Stars(w http.ResponseWriter, r *http.Request) {
1498 l := rp.logger.With("handler", "Stars")
1499
1500 user := rp.oauth.GetMultiAccountUser(r)
1501 f, err := rp.repoResolver.Resolve(r)
1502 if err != nil {
1503 l.Error("failed to resolve source repo", "err", err)
1504 return
1505 }
1506
1507 page := pagination.FromContext(r.Context())
1508 if page.Limit > 30 || page.Limit <= 0 {
1509 page.Limit = 30
1510 }
1511
1512 starrers, err := db.GetStars(rp.db, string(f.RepoDid), page)
1513 if err != nil {
1514 l.Error("failed to fetch starrers", "err", err, "repoDid", f.RepoDid)
1515 return
1516 }
1517
1518 totalCount, err := db.GetStarCount(rp.db, models.StarSubjectRepo, string(f.RepoDid))
1519 if err != nil {
1520 l.Error("failed to fetch star count", "err", err, "repoDid", f.RepoDid)
1521 return
1522 }
1523
1524 rp.pages.RepoStars(w, pages.RepoStarsParams{
1525 LoggedInUser: user,
1526 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1527 Starrers: starrers,
1528 Page: page,
1529 TotalCount: totalCount,
1530 })
1531}
1532
1533func (rp *Repo) Forks(w http.ResponseWriter, r *http.Request) {
1534 l := rp.logger.With("handler", "Forks")
1535
1536 user := rp.oauth.GetMultiAccountUser(r)
1537 f, err := rp.repoResolver.Resolve(r)
1538 if err != nil {
1539 l.Error("failed to resolve source repo", "err", err)
1540 return
1541 }
1542
1543 var forks []models.Repo
1544 totalCount := 0
1545 page := pagination.FromContext(r.Context())
1546 if f.RepoDid != "" {
1547 forks, err = db.GetReposPaginated(rp.db, page, orm.FilterEq("source", f.RepoDid))
1548 if err != nil {
1549 l.Error("failed to fetch forks", "err", err, "repoAt", f.RepoAt())
1550 return
1551 }
1552
1553 totalCount, err = db.GetForkCount(rp.db, f.RepoDid)
1554 if err != nil {
1555 l.Error("failed to fetch fork count", "err", err, "repoAt", f.RepoAt())
1556 return
1557 }
1558 }
1559
1560 err = rp.pages.RepoForks(w, pages.RepoForksParams{
1561 LoggedInUser: user,
1562 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1563 Forks: forks,
1564 Page: page,
1565 TotalCount: totalCount,
1566 })
1567 if err != nil {
1568 l.Error("failed to render page", "err", err)
1569 }
1570}
1571
1572// this is used to rollback changes made to the PDS
1573//
1574// it is a no-op if the provided ATURI is empty
1575func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
1576 if aturi == "" {
1577 return nil
1578 }
1579
1580 parsed := syntax.ATURI(aturi)
1581
1582 collection := parsed.Collection().String()
1583 repo := parsed.Authority().String()
1584 rkey := parsed.RecordKey().String()
1585
1586 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
1587 Collection: collection,
1588 Repo: repo,
1589 Rkey: rkey,
1590 })
1591 return err
1592}
1593
1594func repoCollaboratorRecord(f *models.Repo, subject string, createdAt time.Time) *tangled.RepoCollaborator {
1595 return &tangled.RepoCollaborator{
1596 Subject: subject,
1597 CreatedAt: createdAt.Format(time.RFC3339),
1598 Repo: f.RepoDid,
1599 }
1600}