Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package repo 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "net/url" 11 "slices" 12 "strings" 13 "time" 14 15 "tangled.org/core/appview/cloudflare" 16 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/compat113" 19 "tangled.org/core/appview/config" 20 "tangled.org/core/appview/db" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/appview/notify" 23 "tangled.org/core/appview/oauth" 24 "tangled.org/core/appview/pages" 25 "tangled.org/core/appview/pagination" 26 "tangled.org/core/appview/reporesolver" 27 "tangled.org/core/appview/sites" 28 "tangled.org/core/appview/validator" 29 xrpcclient "tangled.org/core/appview/xrpcclient" 30 "tangled.org/core/eventconsumer" 31 "tangled.org/core/idresolver" 32 "tangled.org/core/ogre" 33 "tangled.org/core/orm" 34 "tangled.org/core/rbac" 35 "tangled.org/core/tid" 36 "tangled.org/core/xrpc/serviceauth" 37 38 comatproto "github.com/bluesky-social/indigo/api/atproto" 39 "github.com/bluesky-social/indigo/atproto/atclient" 40 "github.com/bluesky-social/indigo/atproto/syntax" 41 lexutil "github.com/bluesky-social/indigo/lex/util" 42 43 "github.com/go-chi/chi/v5" 44) 45 46type Repo struct { 47 repoResolver *reporesolver.RepoResolver 48 idResolver *idresolver.Resolver 49 config *config.Config 50 oauth *oauth.OAuth 51 pages *pages.Pages 52 spindlestream *eventconsumer.Consumer 53 db *db.DB 54 enforcer *rbac.Enforcer 55 notifier notify.Notifier 56 logger *slog.Logger 57 serviceAuth *serviceauth.ServiceAuth 58 validator *validator.Validator 59 cfClient *cloudflare.Client 60 ogreClient *ogre.Client 61} 62 63func New( 64 oauth *oauth.OAuth, 65 repoResolver *reporesolver.RepoResolver, 66 pages *pages.Pages, 67 spindlestream *eventconsumer.Consumer, 68 idResolver *idresolver.Resolver, 69 db *db.DB, 70 config *config.Config, 71 notifier notify.Notifier, 72 enforcer *rbac.Enforcer, 73 logger *slog.Logger, 74 validator *validator.Validator, 75 cfClient *cloudflare.Client, 76) *Repo { 77 return &Repo{ 78 oauth: oauth, 79 repoResolver: repoResolver, 80 pages: pages, 81 idResolver: idResolver, 82 config: config, 83 spindlestream: spindlestream, 84 db: db, 85 notifier: notifier, 86 enforcer: enforcer, 87 logger: logger, 88 validator: validator, 89 cfClient: cfClient, 90 ogreClient: ogre.NewClient(config.Ogre.Host), 91 } 92} 93 94// modify the spindle configured for this repo 95func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) { 96 user := rp.oauth.GetMultiAccountUser(r) 97 l := rp.logger.With("handler", "EditSpindle") 98 l = l.With("did", user.Did) 99 100 errorId := "operation-error" 101 fail := func(msg string, err error) { 102 l.Error(msg, "err", err) 103 rp.pages.Notice(w, errorId, msg) 104 } 105 106 f, err := rp.repoResolver.Resolve(r) 107 if err != nil { 108 fail("Failed to resolve repo. Try again later", err) 109 return 110 } 111 112 newSpindle := r.FormValue("spindle") 113 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value 114 client, err := rp.oauth.AuthorizedClient(r) 115 if err != nil { 116 fail("Failed to authorize. Try again later.", err) 117 return 118 } 119 120 if !removingSpindle { 121 // ensure that this is a valid spindle for this user 122 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Did) 123 if err != nil { 124 fail("Failed to find spindles. Try again later.", err) 125 return 126 } 127 128 if !slices.Contains(validSpindles, newSpindle) { 129 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles)) 130 return 131 } 132 } 133 134 newRepo := *f 135 newRepo.Spindle = newSpindle 136 record := newRepo.AsRecord() 137 138 spindlePtr := &newSpindle 139 if removingSpindle { 140 spindlePtr = nil 141 newRepo.Spindle = "" 142 } 143 144 // optimistic update 145 err = db.UpdateSpindle(rp.db, newRepo.RepoDid, spindlePtr) 146 if err != nil { 147 fail("Failed to update spindle. Try again later.", err) 148 return 149 } 150 151 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 152 if err != nil { 153 fail("Failed to update spindle, no record found on PDS.", err) 154 return 155 } 156 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 157 Collection: tangled.RepoNSID, 158 Repo: newRepo.Did, 159 Rkey: newRepo.Rkey, 160 SwapRecord: ex.Cid, 161 Record: &lexutil.LexiconTypeDecoder{ 162 Val: &record, 163 }, 164 }) 165 166 if err != nil { 167 fail("Failed to update spindle, unable to save to PDS.", err) 168 return 169 } 170 171 oldSpindle := f.Spindle 172 if oldSpindle != "" && oldSpindle != newSpindle { 173 remaining, qErr := db.GetRepos(rp.db, orm.FilterEq("spindle", oldSpindle)) 174 if qErr != nil { 175 l.Warn("failed to count repos using old spindle", "err", qErr) 176 } else if len(remaining) == 0 { 177 rp.spindlestream.RemoveSource(eventconsumer.NewSpindleSource(oldSpindle)) 178 } 179 } 180 181 if !removingSpindle { 182 rp.spindlestream.AddSource( 183 context.Background(), 184 eventconsumer.NewSpindleSource(newSpindle), 185 ) 186 } 187 188 rp.pages.HxRefresh(w) 189} 190 191func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) { 192 user := rp.oauth.GetMultiAccountUser(r) 193 l := rp.logger.With("handler", "AddLabel") 194 l = l.With("did", user.Did) 195 196 f, err := rp.repoResolver.Resolve(r) 197 if err != nil { 198 l.Error("failed to get repo and knot", "err", err) 199 return 200 } 201 202 errorId := "add-label-error" 203 fail := func(msg string, err error) { 204 l.Error(msg, "err", err) 205 rp.pages.Notice(w, errorId, msg) 206 } 207 208 // get form values for label definition 209 name := r.FormValue("name") 210 concreteType := r.FormValue("valueType") 211 valueFormat := r.FormValue("valueFormat") 212 enumValues := r.FormValue("enumValues") 213 scope := r.Form["scope"] 214 color := r.FormValue("color") 215 multiple := r.FormValue("multiple") == "true" 216 217 var variants []string 218 for part := range strings.SplitSeq(enumValues, ",") { 219 if part = strings.TrimSpace(part); part != "" { 220 variants = append(variants, part) 221 } 222 } 223 224 if concreteType == "" { 225 concreteType = "null" 226 } 227 228 format := models.ValueTypeFormatAny 229 if valueFormat == "did" { 230 format = models.ValueTypeFormatDid 231 } 232 233 valueType := models.ValueType{ 234 Type: models.ConcreteType(concreteType), 235 Format: format, 236 Enum: variants, 237 } 238 239 label := models.LabelDefinition{ 240 Did: user.Did, 241 Rkey: tid.TID(), 242 Name: name, 243 ValueType: valueType, 244 Scope: scope, 245 Color: &color, 246 Multiple: multiple, 247 Created: time.Now(), 248 } 249 if err := rp.validator.ValidateLabelDefinition(&label); err != nil { 250 fail(err.Error(), err) 251 return 252 } 253 254 // announce this relation into the firehose, store into owners' pds 255 client, err := rp.oauth.AuthorizedClient(r) 256 if err != nil { 257 fail(err.Error(), err) 258 return 259 } 260 261 // emit a labelRecord 262 labelRecord := label.AsRecord() 263 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 264 Collection: tangled.LabelDefinitionNSID, 265 Repo: label.Did, 266 Rkey: label.Rkey, 267 Record: &lexutil.LexiconTypeDecoder{ 268 Val: &labelRecord, 269 }, 270 }) 271 // invalid record 272 if err != nil { 273 fail("Failed to write record to PDS.", err) 274 return 275 } 276 277 aturi := resp.Uri 278 l = l.With("at-uri", aturi) 279 l.Info("wrote label record to PDS") 280 281 // update the repo to subscribe to this label 282 newRepo := *f 283 newRepo.Labels = append(newRepo.Labels, aturi) 284 repoRecord := newRepo.AsRecord() 285 286 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 287 if err != nil { 288 fail("Failed to update labels, no record found on PDS.", err) 289 return 290 } 291 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 292 Collection: tangled.RepoNSID, 293 Repo: newRepo.Did, 294 Rkey: newRepo.Rkey, 295 SwapRecord: ex.Cid, 296 Record: &lexutil.LexiconTypeDecoder{ 297 Val: &repoRecord, 298 }, 299 }) 300 if err != nil { 301 fail("Failed to update labels for repo.", err) 302 return 303 } 304 305 tx, err := rp.db.BeginTx(r.Context(), nil) 306 if err != nil { 307 fail("Failed to add label.", err) 308 return 309 } 310 311 rollback := func() { 312 err1 := tx.Rollback() 313 err2 := rollbackRecord(context.Background(), aturi, client) 314 315 // ignore txn complete errors, this is okay 316 if errors.Is(err1, sql.ErrTxDone) { 317 err1 = nil 318 } 319 320 if errs := errors.Join(err1, err2); errs != nil { 321 l.Error("failed to rollback changes", "errs", errs) 322 return 323 } 324 } 325 defer rollback() 326 327 _, err = db.AddLabelDefinition(tx, &label) 328 if err != nil { 329 fail("Failed to add label.", err) 330 return 331 } 332 333 if err = db.SubscribeLabel(tx, &models.RepoLabel{ 334 RepoDid: syntax.DID(f.RepoDid), 335 LabelAt: label.AtUri(), 336 }); err != nil { 337 fail("Failed to subscribe to label.", err) 338 return 339 } 340 341 err = tx.Commit() 342 if err != nil { 343 fail("Failed to add label.", err) 344 return 345 } 346 347 // clear aturi when everything is successful 348 aturi = "" 349 350 rp.pages.HxRefresh(w) 351} 352 353func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) { 354 user := rp.oauth.GetMultiAccountUser(r) 355 l := rp.logger.With("handler", "DeleteLabel") 356 l = l.With("did", user.Did) 357 358 f, err := rp.repoResolver.Resolve(r) 359 if err != nil { 360 l.Error("failed to get repo and knot", "err", err) 361 return 362 } 363 364 errorId := "label-operation" 365 fail := func(msg string, err error) { 366 l.Error(msg, "err", err) 367 rp.pages.Notice(w, errorId, msg) 368 } 369 370 // get form values 371 labelId := r.FormValue("label-id") 372 373 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId)) 374 if err != nil { 375 fail("Failed to find label definition.", err) 376 return 377 } 378 379 client, err := rp.oauth.AuthorizedClient(r) 380 if err != nil { 381 fail(err.Error(), err) 382 return 383 } 384 385 // delete label record from PDS 386 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 387 Collection: tangled.LabelDefinitionNSID, 388 Repo: label.Did, 389 Rkey: label.Rkey, 390 }) 391 if err != nil { 392 fail("Failed to delete label record from PDS.", err) 393 return 394 } 395 396 // update repo record to remove the label reference 397 newRepo := *f 398 var updated []string 399 removedAt := label.AtUri().String() 400 for _, l := range newRepo.Labels { 401 if l != removedAt { 402 updated = append(updated, l) 403 } 404 } 405 newRepo.Labels = updated 406 repoRecord := newRepo.AsRecord() 407 408 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 409 if err != nil { 410 fail("Failed to update labels, no record found on PDS.", err) 411 return 412 } 413 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 414 Collection: tangled.RepoNSID, 415 Repo: newRepo.Did, 416 Rkey: newRepo.Rkey, 417 SwapRecord: ex.Cid, 418 Record: &lexutil.LexiconTypeDecoder{ 419 Val: &repoRecord, 420 }, 421 }) 422 if err != nil { 423 fail("Failed to update repo record.", err) 424 return 425 } 426 427 // transaction for DB changes 428 tx, err := rp.db.BeginTx(r.Context(), nil) 429 if err != nil { 430 fail("Failed to delete label.", err) 431 return 432 } 433 defer tx.Rollback() 434 435 err = db.UnsubscribeLabel( 436 tx, 437 orm.FilterEq("repo_did", f.RepoDid), 438 orm.FilterEq("label_at", removedAt), 439 ) 440 if err != nil { 441 fail("Failed to unsubscribe label.", err) 442 return 443 } 444 445 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id)) 446 if err != nil { 447 fail("Failed to delete label definition.", err) 448 return 449 } 450 451 err = tx.Commit() 452 if err != nil { 453 fail("Failed to delete label.", err) 454 return 455 } 456 457 // everything succeeded 458 rp.pages.HxRefresh(w) 459} 460 461func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) { 462 user := rp.oauth.GetMultiAccountUser(r) 463 l := rp.logger.With("handler", "SubscribeLabel") 464 l = l.With("did", user.Did) 465 466 f, err := rp.repoResolver.Resolve(r) 467 if err != nil { 468 l.Error("failed to get repo and knot", "err", err) 469 return 470 } 471 472 if err := r.ParseForm(); err != nil { 473 l.Error("invalid form", "err", err) 474 return 475 } 476 477 errorId := "default-label-operation" 478 fail := func(msg string, err error) { 479 l.Error(msg, "err", err) 480 rp.pages.Notice(w, errorId, msg) 481 } 482 483 labelAts := r.Form["label"] 484 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 485 if err != nil { 486 fail("Failed to subscribe to label.", err) 487 return 488 } 489 490 newRepo := *f 491 newRepo.Labels = append(newRepo.Labels, labelAts...) 492 493 // dedup 494 slices.Sort(newRepo.Labels) 495 newRepo.Labels = slices.Compact(newRepo.Labels) 496 497 repoRecord := newRepo.AsRecord() 498 499 client, err := rp.oauth.AuthorizedClient(r) 500 if err != nil { 501 fail(err.Error(), err) 502 return 503 } 504 505 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 506 if err != nil { 507 fail("Failed to update labels, no record found on PDS.", err) 508 return 509 } 510 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 511 Collection: tangled.RepoNSID, 512 Repo: newRepo.Did, 513 Rkey: newRepo.Rkey, 514 SwapRecord: ex.Cid, 515 Record: &lexutil.LexiconTypeDecoder{ 516 Val: &repoRecord, 517 }, 518 }) 519 520 tx, err := rp.db.Begin() 521 if err != nil { 522 fail("Failed to subscribe to label.", err) 523 return 524 } 525 defer tx.Rollback() 526 527 for _, l := range labelAts { 528 err = db.SubscribeLabel(tx, &models.RepoLabel{ 529 RepoDid: syntax.DID(f.RepoDid), 530 LabelAt: syntax.ATURI(l), 531 }) 532 if err != nil { 533 fail("Failed to subscribe to label.", err) 534 return 535 } 536 } 537 538 if err := tx.Commit(); err != nil { 539 fail("Failed to subscribe to label.", err) 540 return 541 } 542 543 // everything succeeded 544 rp.pages.HxRefresh(w) 545} 546 547func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) { 548 user := rp.oauth.GetMultiAccountUser(r) 549 l := rp.logger.With("handler", "UnsubscribeLabel") 550 l = l.With("did", user.Did) 551 552 f, err := rp.repoResolver.Resolve(r) 553 if err != nil { 554 l.Error("failed to get repo and knot", "err", err) 555 return 556 } 557 558 if err := r.ParseForm(); err != nil { 559 l.Error("invalid form", "err", err) 560 return 561 } 562 563 errorId := "default-label-operation" 564 fail := func(msg string, err error) { 565 l.Error(msg, "err", err) 566 rp.pages.Notice(w, errorId, msg) 567 } 568 569 labelAts := r.Form["label"] 570 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 571 if err != nil { 572 fail("Failed to unsubscribe to label.", err) 573 return 574 } 575 576 // update repo record to remove the label reference 577 newRepo := *f 578 var updated []string 579 for _, l := range newRepo.Labels { 580 if !slices.Contains(labelAts, l) { 581 updated = append(updated, l) 582 } 583 } 584 newRepo.Labels = updated 585 repoRecord := newRepo.AsRecord() 586 587 client, err := rp.oauth.AuthorizedClient(r) 588 if err != nil { 589 fail(err.Error(), err) 590 return 591 } 592 593 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 594 if err != nil { 595 fail("Failed to update labels, no record found on PDS.", err) 596 return 597 } 598 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 599 Collection: tangled.RepoNSID, 600 Repo: newRepo.Did, 601 Rkey: newRepo.Rkey, 602 SwapRecord: ex.Cid, 603 Record: &lexutil.LexiconTypeDecoder{ 604 Val: &repoRecord, 605 }, 606 }) 607 608 err = db.UnsubscribeLabel( 609 rp.db, 610 orm.FilterEq("repo_did", f.RepoDid), 611 orm.FilterIn("label_at", labelAts), 612 ) 613 if err != nil { 614 fail("Failed to unsubscribe label.", err) 615 return 616 } 617 618 // everything succeeded 619 rp.pages.HxRefresh(w) 620} 621 622func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) { 623 l := rp.logger.With("handler", "LabelPanel") 624 625 f, err := rp.repoResolver.Resolve(r) 626 if err != nil { 627 l.Error("failed to get repo and knot", "err", err) 628 return 629 } 630 631 subjectStr := r.FormValue("subject") 632 subject, err := syntax.ParseATURI(subjectStr) 633 if err != nil { 634 l.Error("failed to get repo and knot", "err", err) 635 return 636 } 637 638 labelDefs, err := db.GetLabelDefinitions( 639 rp.db, 640 orm.FilterIn("at_uri", f.Labels), 641 orm.FilterContains("scope", subject.Collection().String()), 642 ) 643 if err != nil { 644 l.Error("failed to fetch label defs", "err", err) 645 return 646 } 647 648 defs := make(map[string]*models.LabelDefinition) 649 for _, l := range labelDefs { 650 defs[l.AtUri().String()] = &l 651 } 652 653 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 654 if err != nil { 655 l.Error("failed to build label state", "err", err) 656 return 657 } 658 state := states[subject] 659 660 user := rp.oauth.GetMultiAccountUser(r) 661 rp.pages.LabelPanel(w, pages.LabelPanelParams{ 662 LoggedInUser: user, 663 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 664 Defs: defs, 665 Subject: subject.String(), 666 State: state, 667 }) 668} 669 670func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) { 671 l := rp.logger.With("handler", "EditLabelPanel") 672 673 f, err := rp.repoResolver.Resolve(r) 674 if err != nil { 675 l.Error("failed to get repo and knot", "err", err) 676 return 677 } 678 679 subjectStr := r.FormValue("subject") 680 subject, err := syntax.ParseATURI(subjectStr) 681 if err != nil { 682 l.Error("failed to get repo and knot", "err", err) 683 return 684 } 685 686 labelDefs, err := db.GetLabelDefinitions( 687 rp.db, 688 orm.FilterIn("at_uri", f.Labels), 689 orm.FilterContains("scope", subject.Collection().String()), 690 ) 691 if err != nil { 692 l.Error("failed to fetch labels", "err", err) 693 return 694 } 695 696 defs := make(map[string]*models.LabelDefinition) 697 for _, l := range labelDefs { 698 defs[l.AtUri().String()] = &l 699 } 700 701 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 702 if err != nil { 703 l.Error("failed to build label state", "err", err) 704 return 705 } 706 state := states[subject] 707 708 user := rp.oauth.GetMultiAccountUser(r) 709 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{ 710 LoggedInUser: user, 711 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 712 Defs: defs, 713 Subject: subject.String(), 714 State: state, 715 }) 716} 717 718func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) { 719 user := rp.oauth.GetMultiAccountUser(r) 720 l := rp.logger.With("handler", "AddCollaborator") 721 l = l.With("did", user.Did) 722 723 f, err := rp.repoResolver.Resolve(r) 724 if err != nil { 725 l.Error("failed to get repo and knot", "err", err) 726 return 727 } 728 729 errorId := "add-collaborator-error" 730 fail := func(msg string, err error) { 731 l.Error(msg, "err", err) 732 rp.pages.Notice(w, errorId, msg) 733 } 734 735 collaborator := r.FormValue("collaborator") 736 if collaborator == "" { 737 fail("Invalid form.", nil) 738 return 739 } 740 741 // remove a single leading `@`, to make @handle work with ResolveIdent 742 collaborator = strings.TrimPrefix(collaborator, "@") 743 744 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 745 if err != nil { 746 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 747 return 748 } 749 750 if collaboratorIdent.DID.String() == user.Did { 751 fail("You seem to be adding yourself as a collaborator.", nil) 752 return 753 } 754 l = l.With("collaborator", collaboratorIdent.Handle) 755 l = l.With("knot", f.Knot) 756 757 existing, err := db.GetCollaborators(rp.db, 758 orm.FilterEq("repo_did", f.RepoDid), 759 orm.FilterEq("subject_did", collaboratorIdent.DID.String()), 760 ) 761 if err != nil { 762 fail("Failed to check existing collaborators.", err) 763 return 764 } 765 if len(existing) > 0 { 766 fail(fmt.Sprintf("%s is already a collaborator.", collaboratorIdent.Handle), nil) 767 return 768 } 769 770 // announce this relation into the firehose, store into owners' pds 771 client, err := rp.oauth.AuthorizedClient(r) 772 if err != nil { 773 fail("Failed to write to PDS.", err) 774 return 775 } 776 777 // emit a record 778 currentUser := rp.oauth.GetMultiAccountUser(r) 779 rkey := tid.TID() 780 createdAt := time.Now() 781 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 782 Collection: tangled.RepoCollaboratorNSID, 783 Repo: currentUser.Did, 784 Rkey: rkey, 785 Record: compat113.Collaborator(repoCollaboratorRecord(f, collaboratorIdent.DID.String(), createdAt)), 786 }) 787 // invalid record 788 if err != nil { 789 fail("Failed to write record to PDS.", err) 790 return 791 } 792 793 aturi := resp.Uri 794 l = l.With("at-uri", aturi) 795 l.Info("wrote record to PDS") 796 797 tx, err := rp.db.BeginTx(r.Context(), nil) 798 if err != nil { 799 fail("Failed to add collaborator.", err) 800 return 801 } 802 803 rollback := func() { 804 err1 := tx.Rollback() 805 err2 := rp.enforcer.E.LoadPolicy() 806 err3 := rollbackRecord(context.Background(), aturi, client) 807 808 // ignore txn complete errors, this is okay 809 if errors.Is(err1, sql.ErrTxDone) { 810 err1 = nil 811 } 812 813 if errs := errors.Join(err1, err2, err3); errs != nil { 814 l.Error("failed to rollback changes", "errs", errs) 815 return 816 } 817 } 818 defer rollback() 819 820 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier()) 821 if err != nil { 822 fail("Failed to add collaborator permissions.", err) 823 return 824 } 825 826 err = db.AddCollaborator(tx, models.Collaborator{ 827 Did: syntax.DID(currentUser.Did), 828 Rkey: sql.NullString{String: rkey, Valid: true}, 829 SubjectDid: collaboratorIdent.DID, 830 RepoDid: syntax.DID(f.RepoDid), 831 Created: createdAt, 832 }) 833 if err != nil { 834 fail("Failed to add collaborator.", err) 835 return 836 } 837 838 err = tx.Commit() 839 if err != nil { 840 fail("Failed to add collaborator.", err) 841 return 842 } 843 844 err = rp.enforcer.E.SavePolicy() 845 if err != nil { 846 fail("Failed to update collaborator permissions.", err) 847 return 848 } 849 850 // clear aturi to when everything is successful 851 aturi = "" 852 853 rp.pages.HxRefresh(w) 854} 855 856func (rp *Repo) RenameRepo(w http.ResponseWriter, r *http.Request) { 857 l := rp.logger.With("handler", "RenameRepo") 858 noticeId := "rename-repo-error" 859 860 user := rp.oauth.GetMultiAccountUser(r) 861 f, err := rp.repoResolver.Resolve(r) 862 if err != nil { 863 l.Error("failed to get repo and knot", "err", err) 864 rp.pages.Notice(w, noticeId, "Failed to load repository.") 865 return 866 } 867 l = l.With("did", user.Did, "rkey", f.Rkey, "oldName", f.Name) 868 869 if f.RepoDid == "" { 870 rp.pages.Notice(w, noticeId, "This repository's knot has not completed the DID migration; rename is unavailable.") 871 return 872 } 873 874 if !compat113.KnotSupports114(r.Context(), f.Knot, rp.config.Core.Dev) { 875 rp.pages.Notice(w, noticeId, "This repository's knot is below v1.14 and does not yet support renames. Ask the knot operator to upgrade.") 876 return 877 } 878 879 newName, err := validateRenameInput(f.Name, f.Rkey, r.FormValue("name")) 880 if err != nil { 881 rp.pages.Notice(w, noticeId, err.Error()) 882 return 883 } 884 newRkey := strings.ToLower(newName) 885 l = l.With("newName", newName, "newRkey", newRkey) 886 887 atpClient, err := rp.oauth.AuthorizedClient(r) 888 if err != nil { 889 l.Error("failed to get authorized client", "err", err) 890 rp.pages.Notice(w, noticeId, "Failed to authorize. Try again later.") 891 return 892 } 893 894 newRepo := *f 895 newRepo.Name = newName 896 newRepo.Rkey = newRkey 897 newRepo.Created = time.Now() 898 record := newRepo.AsRecord() 899 900 if newRkey == f.Rkey { 901 ex, err := comatproto.RepoGetRecord(r.Context(), atpClient, "", tangled.RepoNSID, f.Did, f.Rkey) 902 if err != nil { 903 l.Error("failed to fetch existing record", "err", err) 904 rp.pages.Notice(w, noticeId, "Failed to read repository record from PDS.") 905 return 906 } 907 908 _, err = comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 909 Collection: tangled.RepoNSID, 910 Repo: f.Did, 911 Rkey: f.Rkey, 912 SwapRecord: ex.Cid, 913 Record: &lexutil.LexiconTypeDecoder{ 914 Val: &record, 915 }, 916 }) 917 if err != nil { 918 l.Error("failed to update display name on PDS", "err", err) 919 rp.pages.Notice(w, noticeId, "Failed to save display name to PDS.") 920 return 921 } 922 l.Info("updated display name on PDS") 923 924 if err := db.UpdateRepoDisplayName(rp.db, f.Did, f.Rkey, newName); err != nil { 925 l.Error("optimistic display name update failed", "err", err) 926 } 927 } else { 928 ex, getErr := comatproto.RepoGetRecord(r.Context(), atpClient, "", tangled.RepoNSID, f.Did, newRkey) 929 switch { 930 case getErr != nil: 931 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 932 Collection: tangled.RepoNSID, 933 Repo: f.Did, 934 Rkey: &newRkey, 935 Record: &lexutil.LexiconTypeDecoder{Val: &record}, 936 }) 937 if err != nil { 938 l.Error("failed to write rename to PDS", "err", err) 939 rp.pages.Notice(w, noticeId, "Failed to save renamed repository to PDS.") 940 return 941 } 942 l.Info("wrote rename-create to PDS; old record retained as alias") 943 944 default: 945 existing, ok := ex.Value.Val.(*tangled.Repo) 946 if !ok || existing.RepoDid == nil || *existing.RepoDid != f.RepoDid { 947 rp.pages.Notice(w, noticeId, fmt.Sprintf("You already have a repository named %q.", newRkey)) 948 return 949 } 950 _, err = comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 951 Collection: tangled.RepoNSID, 952 Repo: f.Did, 953 Rkey: newRkey, 954 SwapRecord: ex.Cid, 955 Record: &lexutil.LexiconTypeDecoder{Val: &record}, 956 }) 957 if err != nil { 958 l.Error("failed to rewrite rename-back record on PDS", "err", err) 959 rp.pages.Notice(w, noticeId, "Failed to save renamed repository to PDS.") 960 return 961 } 962 l.Info("rewrote rename-back record on PDS over prior alias") 963 } 964 965 tx, err := rp.db.Begin() 966 if err != nil { 967 l.Error("failed to begin rename tx", "err", err) 968 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid)) 969 return 970 } 971 defer tx.Rollback() 972 973 if err := db.RenameRepo(tx, f.Did, f.Rkey, newRkey, newName); err != nil { 974 l.Error("optimistic rename failed", "err", err) 975 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid)) 976 return 977 } 978 if err := db.RecordRepoRename(tx, f.Did, f.Rkey, f.RepoDid); err != nil { 979 l.Error("failed to record rename history", "err", err) 980 } 981 if err := db.DeleteRepoRename(tx, f.Did, newRkey); err != nil { 982 l.Error("failed to clear stale rename hint", "err", err) 983 } 984 if err := tx.Commit(); err != nil { 985 l.Error("failed to commit rename tx", "err", err) 986 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid)) 987 return 988 } 989 } 990 991 oldRepo := *f 992 rp.notifier.RenameRepo(r.Context(), syntax.DID(user.Did), &oldRepo, &newRepo) 993 994 if newRkey != f.Rkey { 995 rp.migrateSiteOnRename(r.Context(), f, newName, newRkey) 996 } 997 998 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid)) 999} 1000 1001func validateRenameInput(currentName, currentRkey, raw string) (string, error) { 1002 newName := strings.TrimSpace(raw) 1003 if newName == "" { 1004 return "", errors.New("Repository name cannot be empty.") 1005 } 1006 if err := models.ValidateRepoName(newName); err != nil { 1007 return "", err 1008 } 1009 newName = models.StripGitExt(newName) 1010 if newName == currentName { 1011 if _, tidErr := syntax.ParseTID(currentRkey); tidErr == nil { 1012 return newName, nil 1013 } 1014 return "", errors.New("New name matches the current name.") 1015 } 1016 return newName, nil 1017} 1018 1019func (rp *Repo) migrateSiteOnRename(ctx context.Context, oldRepo *models.Repo, newName, newRkey string) { 1020 l := rp.logger.With("handler", "migrateSiteOnRename", "repo_did", oldRepo.RepoDid) 1021 1022 siteConfig, err := db.GetRepoSiteConfig(rp.db, oldRepo.RepoDid) 1023 if err != nil || siteConfig == nil { 1024 return 1025 } 1026 1027 if !rp.cfClient.Enabled() { 1028 return 1029 } 1030 1031 ownerClaim, _ := db.GetActiveDomainClaimForDid(rp.db, oldRepo.Did) 1032 1033 go func() { 1034 bgCtx := context.Background() 1035 oldRkey := oldRepo.Rkey 1036 oldName := oldRepo.Name 1037 1038 if err := sites.Delete(bgCtx, rp.cfClient, oldRepo.Did, oldRkey); err != nil { 1039 l.Error("sites: failed to delete old R2 prefix", "oldRkey", oldRkey, "err", err) 1040 } 1041 1042 newRepo := *oldRepo 1043 newRepo.Name = newName 1044 newRepo.Rkey = newRkey 1045 if deployErr := sites.Deploy(bgCtx, rp.cfClient, rp.config, &newRepo, siteConfig.Branch, siteConfig.Dir); deployErr != nil { 1046 l.Error("sites: redeploy after rename failed", "err", deployErr) 1047 } 1048 1049 if ownerClaim != nil { 1050 // drop the old name's entry when the name actually changed. 1051 if oldName != newName { 1052 if err := sites.DeleteDomainMapping(bgCtx, rp.cfClient, ownerClaim.Domain, oldName); err != nil { 1053 l.Error("sites: failed to remove old KV mapping", "oldName", oldName, "err", err) 1054 } 1055 } 1056 if err := sites.PutDomainMapping(bgCtx, rp.cfClient, ownerClaim.Domain, oldRepo.Did, newName, newRkey, siteConfig.IsIndex); err != nil { 1057 l.Error("sites: failed to write new KV mapping", "newName", newName, "newRkey", newRkey, "err", err) 1058 } 1059 } 1060 1061 l.Info("sites: migrated on rename", "oldName", oldName, "oldRkey", oldRkey, "newName", newName, "newRkey", newRkey) 1062 }() 1063} 1064 1065func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) { 1066 user := rp.oauth.GetMultiAccountUser(r) 1067 l := rp.logger.With("handler", "DeleteRepo") 1068 1069 noticeId := "operation-error" 1070 f, err := rp.repoResolver.Resolve(r) 1071 if err != nil { 1072 l.Error("failed to get repo and knot", "err", err) 1073 return 1074 } 1075 1076 // remove record from pds 1077 atpClient, err := rp.oauth.AuthorizedClient(r) 1078 if err != nil { 1079 l.Error("failed to get authorized client", "err", err) 1080 return 1081 } 1082 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{ 1083 Collection: tangled.RepoNSID, 1084 Repo: user.Did, 1085 Rkey: f.Rkey, 1086 }) 1087 if err != nil { 1088 l.Error("failed to delete record", "err", err) 1089 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.") 1090 return 1091 } 1092 l.Info("removed repo record", "aturi", f.RepoAt().String()) 1093 1094 client, err := rp.oauth.ServiceClient( 1095 r, 1096 oauth.WithService(f.Knot), 1097 oauth.WithLxm(tangled.RepoDeleteNSID), 1098 oauth.WithDev(rp.config.Core.Dev), 1099 ) 1100 if err != nil { 1101 l.Error("failed to connect to knot server", "err", err) 1102 return 1103 } 1104 1105 err = tangled.RepoDelete( 1106 r.Context(), 1107 client, 1108 &tangled.RepoDelete_Input{ 1109 Did: f.Did, 1110 Name: f.Name, 1111 Rkey: f.Rkey, 1112 }, 1113 ) 1114 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1115 l.Error("failed to call XRPC repo.delete", "xrpcerr", xrpcerr, "err", err) 1116 rp.pages.Notice(w, noticeId, xrpcerr.Error()) 1117 return 1118 } 1119 l.Info("deleted repo from knot") 1120 1121 tx, err := rp.db.BeginTx(r.Context(), nil) 1122 if err != nil { 1123 l.Error("failed to start tx") 1124 w.Write(fmt.Append(nil, "failed to add collaborator: ", err)) 1125 return 1126 } 1127 defer func() { 1128 tx.Rollback() 1129 err = rp.enforcer.E.LoadPolicy() 1130 if err != nil { 1131 l.Error("failed to rollback policies") 1132 } 1133 }() 1134 1135 // remove collaborator RBAC 1136 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.RepoIdentifier(), f.Knot) 1137 if err != nil { 1138 rp.pages.Notice(w, noticeId, "Failed to remove collaborators") 1139 return 1140 } 1141 for _, c := range repoCollaborators { 1142 did := c[0] 1143 rp.enforcer.RemoveCollaborator(did, f.Knot, f.RepoIdentifier()) 1144 } 1145 l.Info("removed collaborators") 1146 1147 // remove repo RBAC 1148 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.RepoIdentifier()) 1149 if err != nil { 1150 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules") 1151 return 1152 } 1153 1154 // remove repo from db 1155 err = db.RemoveRepo(tx, f.Did, f.Rkey) 1156 if err != nil { 1157 rp.pages.Notice(w, noticeId, "Failed to update appview") 1158 return 1159 } 1160 l.Info("removed repo from db") 1161 1162 err = tx.Commit() 1163 if err != nil { 1164 l.Error("failed to commit changes", "err", err) 1165 http.Error(w, err.Error(), http.StatusInternalServerError) 1166 return 1167 } 1168 1169 err = rp.enforcer.E.SavePolicy() 1170 if err != nil { 1171 l.Error("failed to update ACLs", "err", err) 1172 http.Error(w, err.Error(), http.StatusInternalServerError) 1173 return 1174 } 1175 1176 rp.notifier.DeleteRepo(r.Context(), f) 1177 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did)) 1178} 1179 1180func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) { 1181 l := rp.logger.With("handler", "SyncRepoFork") 1182 1183 ref := chi.URLParam(r, "ref") 1184 ref, _ = url.PathUnescape(ref) 1185 1186 user := rp.oauth.GetMultiAccountUser(r) 1187 f, err := rp.repoResolver.Resolve(r) 1188 if err != nil { 1189 l.Error("failed to resolve source repo", "err", err) 1190 return 1191 } 1192 1193 switch r.Method { 1194 case http.MethodPost: 1195 client, err := rp.oauth.ServiceClient( 1196 r, 1197 oauth.WithService(f.Knot), 1198 oauth.WithLxm(tangled.RepoForkSyncNSID), 1199 oauth.WithDev(rp.config.Core.Dev), 1200 ) 1201 if err != nil { 1202 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1203 return 1204 } 1205 1206 if f.Source == "" { 1207 rp.pages.Notice(w, "repo", "This repository is not a fork.") 1208 return 1209 } 1210 1211 err = tangled.RepoForkSync( 1212 r.Context(), 1213 client, 1214 &tangled.RepoForkSync_Input{ 1215 Did: user.Did, 1216 Name: f.Name, 1217 Source: f.Source, 1218 Branch: ref, 1219 }, 1220 ) 1221 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1222 l.Error("failed to call XRPC repo.forkSync", "xrpcerr", xrpcerr, "err", err) 1223 rp.pages.Notice(w, "repo", err.Error()) 1224 return 1225 } 1226 1227 rp.pages.HxRefresh(w) 1228 return 1229 } 1230} 1231 1232func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) { 1233 l := rp.logger.With("handler", "ForkRepo") 1234 1235 user := rp.oauth.GetMultiAccountUser(r) 1236 f, err := rp.repoResolver.Resolve(r) 1237 if err != nil { 1238 l.Error("failed to resolve source repo", "err", err) 1239 return 1240 } 1241 1242 switch r.Method { 1243 case http.MethodGet: 1244 user := rp.oauth.GetMultiAccountUser(r) 1245 knots, err := rp.enforcer.GetKnotsForUser(user.Did) 1246 if err != nil { 1247 rp.pages.Notice(w, "repo", "Invalid user account.") 1248 return 1249 } 1250 1251 rp.pages.ForkRepo(w, pages.ForkRepoParams{ 1252 LoggedInUser: user, 1253 Knots: knots, 1254 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1255 }) 1256 1257 case http.MethodPost: 1258 l := rp.logger.With("handler", "ForkRepo") 1259 1260 targetKnot := r.FormValue("knot") 1261 if targetKnot == "" { 1262 rp.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 1263 return 1264 } 1265 l = l.With("targetKnot", targetKnot) 1266 1267 ok, err := rp.enforcer.E.Enforce(user.Did, targetKnot, targetKnot, "repo:create") 1268 if err != nil || !ok { 1269 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 1270 return 1271 } 1272 1273 // choose a name for a fork 1274 forkName := strings.ToLower(r.FormValue("repo_name")) 1275 if forkName == "" { 1276 rp.pages.Notice(w, "repo", "Repository name cannot be empty.") 1277 return 1278 } 1279 1280 // this check is *only* to see if the forked repo name already exists 1281 // in the user's account. 1282 existingRepo, err := db.GetRepo( 1283 rp.db, 1284 orm.FilterEq("did", user.Did), 1285 orm.FilterEq("name", forkName), 1286 ) 1287 if err != nil { 1288 if !errors.Is(err, sql.ErrNoRows) { 1289 l.Error("error fetching existing repo from db", "err", err) 1290 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.") 1291 return 1292 } 1293 } else if existingRepo != nil { 1294 // repo with this name already exists 1295 rp.pages.Notice(w, "repo", "A repository with this name already exists.") 1296 return 1297 } 1298 l = l.With("forkName", forkName) 1299 1300 uri := "https" 1301 if rp.config.Core.Dev { 1302 uri = "http" 1303 } 1304 1305 forkSourceUrl := fmt.Sprintf("%s://%s/%s", uri, f.Knot, f.RepoIdentifier()) 1306 l = l.With("cloneUrl", forkSourceUrl) 1307 1308 rkey := strings.ToLower(forkName) 1309 1310 // TODO: this could coordinate better with the knot to receive a clone status 1311 client, err := rp.oauth.ServiceClient( 1312 r, 1313 oauth.WithService(targetKnot), 1314 oauth.WithLxm(tangled.RepoCreateNSID), 1315 oauth.WithDev(rp.config.Core.Dev), 1316 oauth.WithTimeout(time.Second*20), 1317 ) 1318 if err != nil { 1319 l.Error("could not create service client", "err", err) 1320 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1321 return 1322 } 1323 1324 forkInput := &tangled.RepoCreate_Input{ 1325 Rkey: rkey, 1326 Name: rkey, 1327 Source: &forkSourceUrl, 1328 } 1329 createResp, err := tangled.RepoCreate( 1330 r.Context(), 1331 client, 1332 forkInput, 1333 ) 1334 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1335 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 1336 rp.pages.Notice(w, "repo", xrpcerr.Error()) 1337 return 1338 } 1339 1340 var repoDid string 1341 if createResp != nil && createResp.RepoDid != nil { 1342 repoDid = *createResp.RepoDid 1343 } 1344 if repoDid == "" { 1345 l.Error("knot returned empty repo DID for fork") 1346 rp.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 1347 return 1348 } 1349 1350 forkSource := f.RepoAt().String() 1351 if f.RepoDid != "" { 1352 forkSource = f.RepoDid 1353 } 1354 1355 forkDescription := r.Form.Get("description") 1356 1357 repo := &models.Repo{ 1358 Did: user.Did, 1359 Name: rkey, 1360 Knot: targetKnot, 1361 Rkey: rkey, 1362 Source: forkSource, 1363 Description: forkDescription, 1364 Created: time.Now(), 1365 Labels: rp.config.Label.DefaultLabelDefs, 1366 RepoDid: repoDid, 1367 } 1368 record := repo.AsRecord() 1369 1370 cleanupKnot := func() { 1371 go func() { 1372 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 1373 for attempt, delay := range delays { 1374 time.Sleep(delay) 1375 deleteClient, dErr := rp.oauth.ServiceClient( 1376 r, 1377 oauth.WithService(targetKnot), 1378 oauth.WithLxm(tangled.RepoDeleteNSID), 1379 oauth.WithDev(rp.config.Core.Dev), 1380 ) 1381 if dErr != nil { 1382 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 1383 continue 1384 } 1385 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 1386 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 1387 Did: user.Did, 1388 Name: forkName, 1389 Rkey: rkey, 1390 }); dErr != nil { 1391 cancel() 1392 l.Error("failed to clean up fork on knot after rollback", "attempt", attempt+1, "err", dErr) 1393 continue 1394 } 1395 cancel() 1396 l.Info("successfully cleaned up fork on knot after rollback", "attempt", attempt+1) 1397 return 1398 } 1399 l.Error("exhausted retries for knot cleanup, fork may be orphaned", 1400 "did", user.Did, "fork", forkName, "knot", targetKnot) 1401 }() 1402 } 1403 1404 atpClient, err := rp.oauth.AuthorizedClient(r) 1405 if err != nil { 1406 l.Error("failed to create xrpcclient", "err", err) 1407 cleanupKnot() 1408 rp.pages.Notice(w, "repo", "Failed to fork repository.") 1409 return 1410 } 1411 1412 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1413 Collection: tangled.RepoNSID, 1414 Repo: user.Did, 1415 Rkey: rkey, 1416 Record: &lexutil.LexiconTypeDecoder{ 1417 Val: &record, 1418 }, 1419 }) 1420 if err != nil { 1421 l.Error("failed to write to PDS", "err", err) 1422 cleanupKnot() 1423 rp.pages.Notice(w, "repo", "Failed to announce repository creation.") 1424 return 1425 } 1426 1427 aturi := atresp.Uri 1428 l = l.With("aturi", aturi) 1429 l.Info("wrote to PDS") 1430 1431 tx, err := rp.db.BeginTx(r.Context(), nil) 1432 if err != nil { 1433 l.Info("txn failed", "err", err) 1434 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1435 return 1436 } 1437 1438 rollback := func() { 1439 err1 := tx.Rollback() 1440 err2 := rp.enforcer.E.LoadPolicy() 1441 err3 := rollbackRecord(context.Background(), aturi, atpClient) 1442 1443 if errors.Is(err1, sql.ErrTxDone) { 1444 err1 = nil 1445 } 1446 1447 if errs := errors.Join(err1, err2, err3); errs != nil { 1448 l.Error("failed to rollback changes", "errs", errs) 1449 } 1450 1451 if aturi != "" { 1452 cleanupKnot() 1453 } 1454 } 1455 defer rollback() 1456 1457 err = db.AddRepo(tx, repo) 1458 if err != nil { 1459 l.Error("failed to AddRepo", "err", err) 1460 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1461 return 1462 } 1463 1464 rbacPath := repo.RepoIdentifier() 1465 err = rp.enforcer.AddRepo(user.Did, targetKnot, rbacPath) 1466 if err != nil { 1467 l.Error("failed to add ACLs", "err", err) 1468 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.") 1469 return 1470 } 1471 1472 err = tx.Commit() 1473 if err != nil { 1474 l.Error("failed to commit changes", "err", err) 1475 http.Error(w, err.Error(), http.StatusInternalServerError) 1476 return 1477 } 1478 1479 err = rp.enforcer.E.SavePolicy() 1480 if err != nil { 1481 l.Error("failed to update ACLs", "err", err) 1482 http.Error(w, err.Error(), http.StatusInternalServerError) 1483 return 1484 } 1485 1486 aturi = "" 1487 1488 rp.notifier.NewRepo(r.Context(), repo) 1489 if repoDid != "" { 1490 rp.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 1491 } else { 1492 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, forkName)) 1493 } 1494 } 1495} 1496 1497func (rp *Repo) Stars(w http.ResponseWriter, r *http.Request) { 1498 l := rp.logger.With("handler", "Stars") 1499 1500 user := rp.oauth.GetMultiAccountUser(r) 1501 f, err := rp.repoResolver.Resolve(r) 1502 if err != nil { 1503 l.Error("failed to resolve source repo", "err", err) 1504 return 1505 } 1506 1507 page := pagination.FromContext(r.Context()) 1508 if page.Limit > 30 || page.Limit <= 0 { 1509 page.Limit = 30 1510 } 1511 1512 starrers, err := db.GetStars(rp.db, string(f.RepoDid), page) 1513 if err != nil { 1514 l.Error("failed to fetch starrers", "err", err, "repoDid", f.RepoDid) 1515 return 1516 } 1517 1518 totalCount, err := db.GetStarCount(rp.db, models.StarSubjectRepo, string(f.RepoDid)) 1519 if err != nil { 1520 l.Error("failed to fetch star count", "err", err, "repoDid", f.RepoDid) 1521 return 1522 } 1523 1524 rp.pages.RepoStars(w, pages.RepoStarsParams{ 1525 LoggedInUser: user, 1526 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1527 Starrers: starrers, 1528 Page: page, 1529 TotalCount: totalCount, 1530 }) 1531} 1532 1533func (rp *Repo) Forks(w http.ResponseWriter, r *http.Request) { 1534 l := rp.logger.With("handler", "Forks") 1535 1536 user := rp.oauth.GetMultiAccountUser(r) 1537 f, err := rp.repoResolver.Resolve(r) 1538 if err != nil { 1539 l.Error("failed to resolve source repo", "err", err) 1540 return 1541 } 1542 1543 var forks []models.Repo 1544 totalCount := 0 1545 page := pagination.FromContext(r.Context()) 1546 if f.RepoDid != "" { 1547 forks, err = db.GetReposPaginated(rp.db, page, orm.FilterEq("source", f.RepoDid)) 1548 if err != nil { 1549 l.Error("failed to fetch forks", "err", err, "repoAt", f.RepoAt()) 1550 return 1551 } 1552 1553 totalCount, err = db.GetForkCount(rp.db, f.RepoDid) 1554 if err != nil { 1555 l.Error("failed to fetch fork count", "err", err, "repoAt", f.RepoAt()) 1556 return 1557 } 1558 } 1559 1560 err = rp.pages.RepoForks(w, pages.RepoForksParams{ 1561 LoggedInUser: user, 1562 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1563 Forks: forks, 1564 Page: page, 1565 TotalCount: totalCount, 1566 }) 1567 if err != nil { 1568 l.Error("failed to render page", "err", err) 1569 } 1570} 1571 1572// this is used to rollback changes made to the PDS 1573// 1574// it is a no-op if the provided ATURI is empty 1575func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 1576 if aturi == "" { 1577 return nil 1578 } 1579 1580 parsed := syntax.ATURI(aturi) 1581 1582 collection := parsed.Collection().String() 1583 repo := parsed.Authority().String() 1584 rkey := parsed.RecordKey().String() 1585 1586 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 1587 Collection: collection, 1588 Repo: repo, 1589 Rkey: rkey, 1590 }) 1591 return err 1592} 1593 1594func repoCollaboratorRecord(f *models.Repo, subject string, createdAt time.Time) *tangled.RepoCollaborator { 1595 return &tangled.RepoCollaborator{ 1596 Subject: subject, 1597 CreatedAt: createdAt.Format(time.RFC3339), 1598 Repo: f.RepoDid, 1599 } 1600}