Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package repo 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "net/url" 11 "slices" 12 "strings" 13 "time" 14 15 "tangled.org/core/appview/cloudflare" 16 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/knotacl" 21 "tangled.org/core/appview/knotcompat" 22 "tangled.org/core/appview/models" 23 "tangled.org/core/appview/notify" 24 "tangled.org/core/appview/oauth" 25 "tangled.org/core/appview/pages" 26 "tangled.org/core/appview/pagination" 27 "tangled.org/core/appview/reporesolver" 28 "tangled.org/core/appview/sites" 29 "tangled.org/core/appview/validator" 30 xrpcclient "tangled.org/core/appview/xrpcclient" 31 "tangled.org/core/consts" 32 "tangled.org/core/eventconsumer" 33 "tangled.org/core/idresolver" 34 "tangled.org/core/ogre" 35 "tangled.org/core/orm" 36 "tangled.org/core/rbac" 37 "tangled.org/core/tid" 38 "tangled.org/core/xrpc/serviceauth" 39 40 comatproto "github.com/bluesky-social/indigo/api/atproto" 41 "github.com/bluesky-social/indigo/atproto/atclient" 42 "github.com/bluesky-social/indigo/atproto/syntax" 43 lexutil "github.com/bluesky-social/indigo/lex/util" 44 45 "github.com/go-chi/chi/v5" 46) 47 48type Repo struct { 49 repoResolver *reporesolver.RepoResolver 50 idResolver *idresolver.Resolver 51 config *config.Config 52 oauth *oauth.OAuth 53 pages *pages.Pages 54 spindlestream *eventconsumer.Consumer 55 db *db.DB 56 enforcer *rbac.Enforcer 57 acl *knotacl.Service 58 notifier notify.Notifier 59 logger *slog.Logger 60 serviceAuth *serviceauth.ServiceAuth 61 validator *validator.Validator 62 cfClient *cloudflare.Client 63 ogreClient *ogre.Client 64} 65 66func New( 67 oauth *oauth.OAuth, 68 repoResolver *reporesolver.RepoResolver, 69 pages *pages.Pages, 70 spindlestream *eventconsumer.Consumer, 71 idResolver *idresolver.Resolver, 72 db *db.DB, 73 config *config.Config, 74 notifier notify.Notifier, 75 enforcer *rbac.Enforcer, 76 acl *knotacl.Service, 77 logger *slog.Logger, 78 validator *validator.Validator, 79 cfClient *cloudflare.Client, 80) *Repo { 81 return &Repo{ 82 oauth: oauth, 83 repoResolver: repoResolver, 84 pages: pages, 85 idResolver: idResolver, 86 config: config, 87 spindlestream: spindlestream, 88 db: db, 89 notifier: notifier, 90 enforcer: enforcer, 91 acl: acl, 92 logger: logger, 93 validator: validator, 94 cfClient: cfClient, 95 ogreClient: ogre.NewClient(config.Ogre.Host), 96 } 97} 98 99// modify the spindle configured for this repo 100func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) { 101 user := rp.oauth.GetMultiAccountUser(r) 102 l := rp.logger.With("handler", "EditSpindle") 103 l = l.With("did", user.Did) 104 105 errorId := "operation-error" 106 fail := func(msg string, err error) { 107 l.Error(msg, "err", err) 108 rp.pages.Notice(w, errorId, msg) 109 } 110 111 f, err := rp.repoResolver.Resolve(r) 112 if err != nil { 113 fail("Failed to resolve repo. Try again later", err) 114 return 115 } 116 117 newSpindle := r.FormValue("spindle") 118 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value 119 client, err := rp.oauth.AuthorizedClient(r) 120 if err != nil { 121 fail("Failed to authorize. Try again later.", err) 122 return 123 } 124 125 if !removingSpindle { 126 // ensure that this is a valid spindle for this user 127 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Did) 128 if err != nil { 129 fail("Failed to find spindles. Try again later.", err) 130 return 131 } 132 133 if !slices.Contains(validSpindles, newSpindle) { 134 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles)) 135 return 136 } 137 } 138 139 newRepo := *f 140 newRepo.Spindle = newSpindle 141 record := newRepo.AsRecord() 142 143 spindlePtr := &newSpindle 144 if removingSpindle { 145 spindlePtr = nil 146 newRepo.Spindle = "" 147 } 148 149 // optimistic update 150 err = db.UpdateSpindle(rp.db, newRepo.RepoDid, spindlePtr) 151 if err != nil { 152 fail("Failed to update spindle. Try again later.", err) 153 return 154 } 155 156 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 157 if err != nil { 158 fail("Failed to update spindle, no record found on PDS.", err) 159 return 160 } 161 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 162 Collection: tangled.RepoNSID, 163 Repo: newRepo.Did, 164 Rkey: newRepo.Rkey, 165 SwapRecord: ex.Cid, 166 Record: &lexutil.LexiconTypeDecoder{ 167 Val: &record, 168 }, 169 }) 170 171 if err != nil { 172 fail("Failed to update spindle, unable to save to PDS.", err) 173 return 174 } 175 176 oldSpindle := f.Spindle 177 if oldSpindle != "" && oldSpindle != newSpindle { 178 remaining, qErr := db.GetRepos(rp.db, orm.FilterEq("spindle", oldSpindle)) 179 if qErr != nil { 180 l.Warn("failed to count repos using old spindle", "err", qErr) 181 } else if len(remaining) == 0 { 182 rp.spindlestream.RemoveSource(eventconsumer.NewSpindleSource(oldSpindle)) 183 } 184 } 185 186 if !removingSpindle { 187 rp.spindlestream.AddSource( 188 context.Background(), 189 eventconsumer.NewSpindleSource(newSpindle), 190 ) 191 } 192 193 rp.pages.HxRefresh(w) 194} 195 196func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) { 197 user := rp.oauth.GetMultiAccountUser(r) 198 l := rp.logger.With("handler", "AddLabel") 199 l = l.With("did", user.Did) 200 201 f, err := rp.repoResolver.Resolve(r) 202 if err != nil { 203 l.Error("failed to get repo and knot", "err", err) 204 return 205 } 206 207 errorId := "add-label-error" 208 fail := func(msg string, err error) { 209 l.Error(msg, "err", err) 210 rp.pages.Notice(w, errorId, msg) 211 } 212 213 // get form values for label definition 214 name := r.FormValue("name") 215 concreteType := r.FormValue("valueType") 216 valueFormat := r.FormValue("valueFormat") 217 enumValues := r.FormValue("enumValues") 218 scope := r.Form["scope"] 219 color := r.FormValue("color") 220 multiple := r.FormValue("multiple") == "true" 221 222 var variants []string 223 for part := range strings.SplitSeq(enumValues, ",") { 224 if part = strings.TrimSpace(part); part != "" { 225 variants = append(variants, part) 226 } 227 } 228 229 if concreteType == "" { 230 concreteType = "null" 231 } 232 233 format := models.ValueTypeFormatAny 234 if valueFormat == "did" { 235 format = models.ValueTypeFormatDid 236 } 237 238 valueType := models.ValueType{ 239 Type: models.ConcreteType(concreteType), 240 Format: format, 241 Enum: variants, 242 } 243 244 label := models.LabelDefinition{ 245 Did: user.Did, 246 Rkey: tid.TID(), 247 Name: name, 248 ValueType: valueType, 249 Scope: scope, 250 Color: &color, 251 Multiple: multiple, 252 Created: time.Now(), 253 } 254 if err := rp.validator.ValidateLabelDefinition(&label); err != nil { 255 fail(err.Error(), err) 256 return 257 } 258 259 // announce this relation into the firehose, store into owners' pds 260 client, err := rp.oauth.AuthorizedClient(r) 261 if err != nil { 262 fail(err.Error(), err) 263 return 264 } 265 266 // emit a labelRecord 267 labelRecord := label.AsRecord() 268 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 269 Collection: tangled.LabelDefinitionNSID, 270 Repo: label.Did, 271 Rkey: label.Rkey, 272 Record: &lexutil.LexiconTypeDecoder{ 273 Val: &labelRecord, 274 }, 275 }) 276 // invalid record 277 if err != nil { 278 fail("Failed to write record to PDS.", err) 279 return 280 } 281 282 aturi := resp.Uri 283 l = l.With("at-uri", aturi) 284 l.Info("wrote label record to PDS") 285 286 // update the repo to subscribe to this label 287 newRepo := *f 288 newRepo.Labels = append(newRepo.Labels, aturi) 289 repoRecord := newRepo.AsRecord() 290 291 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 292 if err != nil { 293 fail("Failed to update labels, no record found on PDS.", err) 294 return 295 } 296 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 297 Collection: tangled.RepoNSID, 298 Repo: newRepo.Did, 299 Rkey: newRepo.Rkey, 300 SwapRecord: ex.Cid, 301 Record: &lexutil.LexiconTypeDecoder{ 302 Val: &repoRecord, 303 }, 304 }) 305 if err != nil { 306 fail("Failed to update labels for repo.", err) 307 return 308 } 309 310 tx, err := rp.db.BeginTx(r.Context(), nil) 311 if err != nil { 312 fail("Failed to add label.", err) 313 return 314 } 315 316 rollback := func() { 317 err1 := tx.Rollback() 318 err2 := rollbackRecord(context.Background(), aturi, client) 319 320 // ignore txn complete errors, this is okay 321 if errors.Is(err1, sql.ErrTxDone) { 322 err1 = nil 323 } 324 325 if errs := errors.Join(err1, err2); errs != nil { 326 l.Error("failed to rollback changes", "errs", errs) 327 return 328 } 329 } 330 defer rollback() 331 332 _, err = db.AddLabelDefinition(tx, &label) 333 if err != nil { 334 fail("Failed to add label.", err) 335 return 336 } 337 338 if err = db.SubscribeLabel(tx, &models.RepoLabel{ 339 RepoDid: syntax.DID(f.RepoDid), 340 LabelAt: label.AtUri(), 341 }); err != nil { 342 fail("Failed to subscribe to label.", err) 343 return 344 } 345 346 err = tx.Commit() 347 if err != nil { 348 fail("Failed to add label.", err) 349 return 350 } 351 352 // clear aturi when everything is successful 353 aturi = "" 354 355 rp.pages.HxRefresh(w) 356} 357 358func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) { 359 user := rp.oauth.GetMultiAccountUser(r) 360 l := rp.logger.With("handler", "DeleteLabel") 361 l = l.With("did", user.Did) 362 363 f, err := rp.repoResolver.Resolve(r) 364 if err != nil { 365 l.Error("failed to get repo and knot", "err", err) 366 return 367 } 368 369 errorId := "label-operation" 370 fail := func(msg string, err error) { 371 l.Error(msg, "err", err) 372 rp.pages.Notice(w, errorId, msg) 373 } 374 375 // get form values 376 labelId := r.FormValue("label-id") 377 378 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId)) 379 if err != nil { 380 fail("Failed to find label definition.", err) 381 return 382 } 383 384 client, err := rp.oauth.AuthorizedClient(r) 385 if err != nil { 386 fail(err.Error(), err) 387 return 388 } 389 390 // delete label record from PDS 391 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 392 Collection: tangled.LabelDefinitionNSID, 393 Repo: label.Did, 394 Rkey: label.Rkey, 395 }) 396 if err != nil { 397 fail("Failed to delete label record from PDS.", err) 398 return 399 } 400 401 // update repo record to remove the label reference 402 newRepo := *f 403 var updated []string 404 removedAt := label.AtUri().String() 405 for _, l := range newRepo.Labels { 406 if l != removedAt { 407 updated = append(updated, l) 408 } 409 } 410 newRepo.Labels = updated 411 repoRecord := newRepo.AsRecord() 412 413 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 414 if err != nil { 415 fail("Failed to update labels, no record found on PDS.", err) 416 return 417 } 418 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 419 Collection: tangled.RepoNSID, 420 Repo: newRepo.Did, 421 Rkey: newRepo.Rkey, 422 SwapRecord: ex.Cid, 423 Record: &lexutil.LexiconTypeDecoder{ 424 Val: &repoRecord, 425 }, 426 }) 427 if err != nil { 428 fail("Failed to update repo record.", err) 429 return 430 } 431 432 // transaction for DB changes 433 tx, err := rp.db.BeginTx(r.Context(), nil) 434 if err != nil { 435 fail("Failed to delete label.", err) 436 return 437 } 438 defer tx.Rollback() 439 440 err = db.UnsubscribeLabel( 441 tx, 442 orm.FilterEq("repo_did", f.RepoDid), 443 orm.FilterEq("label_at", removedAt), 444 ) 445 if err != nil { 446 fail("Failed to unsubscribe label.", err) 447 return 448 } 449 450 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id)) 451 if err != nil { 452 fail("Failed to delete label definition.", err) 453 return 454 } 455 456 err = tx.Commit() 457 if err != nil { 458 fail("Failed to delete label.", err) 459 return 460 } 461 462 // everything succeeded 463 rp.pages.HxRefresh(w) 464} 465 466func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) { 467 user := rp.oauth.GetMultiAccountUser(r) 468 l := rp.logger.With("handler", "SubscribeLabel") 469 l = l.With("did", user.Did) 470 471 f, err := rp.repoResolver.Resolve(r) 472 if err != nil { 473 l.Error("failed to get repo and knot", "err", err) 474 return 475 } 476 477 if err := r.ParseForm(); err != nil { 478 l.Error("invalid form", "err", err) 479 return 480 } 481 482 errorId := "default-label-operation" 483 fail := func(msg string, err error) { 484 l.Error(msg, "err", err) 485 rp.pages.Notice(w, errorId, msg) 486 } 487 488 labelAts := r.Form["label"] 489 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 490 if err != nil { 491 fail("Failed to subscribe to label.", err) 492 return 493 } 494 495 newRepo := *f 496 newRepo.Labels = append(newRepo.Labels, labelAts...) 497 498 // dedup 499 slices.Sort(newRepo.Labels) 500 newRepo.Labels = slices.Compact(newRepo.Labels) 501 502 repoRecord := newRepo.AsRecord() 503 504 client, err := rp.oauth.AuthorizedClient(r) 505 if err != nil { 506 fail(err.Error(), err) 507 return 508 } 509 510 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 511 if err != nil { 512 fail("Failed to update labels, no record found on PDS.", err) 513 return 514 } 515 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 516 Collection: tangled.RepoNSID, 517 Repo: newRepo.Did, 518 Rkey: newRepo.Rkey, 519 SwapRecord: ex.Cid, 520 Record: &lexutil.LexiconTypeDecoder{ 521 Val: &repoRecord, 522 }, 523 }) 524 525 tx, err := rp.db.Begin() 526 if err != nil { 527 fail("Failed to subscribe to label.", err) 528 return 529 } 530 defer tx.Rollback() 531 532 for _, l := range labelAts { 533 err = db.SubscribeLabel(tx, &models.RepoLabel{ 534 RepoDid: syntax.DID(f.RepoDid), 535 LabelAt: syntax.ATURI(l), 536 }) 537 if err != nil { 538 fail("Failed to subscribe to label.", err) 539 return 540 } 541 } 542 543 if err := tx.Commit(); err != nil { 544 fail("Failed to subscribe to label.", err) 545 return 546 } 547 548 // everything succeeded 549 rp.pages.HxRefresh(w) 550} 551 552func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) { 553 user := rp.oauth.GetMultiAccountUser(r) 554 l := rp.logger.With("handler", "UnsubscribeLabel") 555 l = l.With("did", user.Did) 556 557 f, err := rp.repoResolver.Resolve(r) 558 if err != nil { 559 l.Error("failed to get repo and knot", "err", err) 560 return 561 } 562 563 if err := r.ParseForm(); err != nil { 564 l.Error("invalid form", "err", err) 565 return 566 } 567 568 errorId := "default-label-operation" 569 fail := func(msg string, err error) { 570 l.Error(msg, "err", err) 571 rp.pages.Notice(w, errorId, msg) 572 } 573 574 labelAts := r.Form["label"] 575 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 576 if err != nil { 577 fail("Failed to unsubscribe to label.", err) 578 return 579 } 580 581 // update repo record to remove the label reference 582 newRepo := *f 583 var updated []string 584 for _, l := range newRepo.Labels { 585 if !slices.Contains(labelAts, l) { 586 updated = append(updated, l) 587 } 588 } 589 newRepo.Labels = updated 590 repoRecord := newRepo.AsRecord() 591 592 client, err := rp.oauth.AuthorizedClient(r) 593 if err != nil { 594 fail(err.Error(), err) 595 return 596 } 597 598 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 599 if err != nil { 600 fail("Failed to update labels, no record found on PDS.", err) 601 return 602 } 603 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 604 Collection: tangled.RepoNSID, 605 Repo: newRepo.Did, 606 Rkey: newRepo.Rkey, 607 SwapRecord: ex.Cid, 608 Record: &lexutil.LexiconTypeDecoder{ 609 Val: &repoRecord, 610 }, 611 }) 612 613 err = db.UnsubscribeLabel( 614 rp.db, 615 orm.FilterEq("repo_did", f.RepoDid), 616 orm.FilterIn("label_at", labelAts), 617 ) 618 if err != nil { 619 fail("Failed to unsubscribe label.", err) 620 return 621 } 622 623 // everything succeeded 624 rp.pages.HxRefresh(w) 625} 626 627func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) { 628 l := rp.logger.With("handler", "LabelPanel") 629 630 f, err := rp.repoResolver.Resolve(r) 631 if err != nil { 632 l.Error("failed to get repo and knot", "err", err) 633 return 634 } 635 636 subjectStr := r.FormValue("subject") 637 subject, err := syntax.ParseATURI(subjectStr) 638 if err != nil { 639 l.Error("failed to get repo and knot", "err", err) 640 return 641 } 642 643 labelDefs, err := db.GetLabelDefinitions( 644 rp.db, 645 orm.FilterIn("at_uri", f.Labels), 646 orm.FilterContains("scope", subject.Collection().String()), 647 ) 648 if err != nil { 649 l.Error("failed to fetch label defs", "err", err) 650 return 651 } 652 653 defs := make(map[string]*models.LabelDefinition) 654 for _, l := range labelDefs { 655 defs[l.AtUri().String()] = &l 656 } 657 658 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 659 if err != nil { 660 l.Error("failed to build label state", "err", err) 661 return 662 } 663 state := states[subject] 664 665 user := rp.oauth.GetMultiAccountUser(r) 666 rp.pages.LabelPanel(w, pages.LabelPanelParams{ 667 BaseParams: pages.BaseParamsFromContext(r.Context()), 668 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 669 Defs: defs, 670 Subject: subject.String(), 671 State: state, 672 }) 673} 674 675func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) { 676 l := rp.logger.With("handler", "EditLabelPanel") 677 678 f, err := rp.repoResolver.Resolve(r) 679 if err != nil { 680 l.Error("failed to get repo and knot", "err", err) 681 return 682 } 683 684 subjectStr := r.FormValue("subject") 685 subject, err := syntax.ParseATURI(subjectStr) 686 if err != nil { 687 l.Error("failed to get repo and knot", "err", err) 688 return 689 } 690 691 labelDefs, err := db.GetLabelDefinitions( 692 rp.db, 693 orm.FilterIn("at_uri", f.Labels), 694 orm.FilterContains("scope", subject.Collection().String()), 695 ) 696 if err != nil { 697 l.Error("failed to fetch labels", "err", err) 698 return 699 } 700 701 defs := make(map[string]*models.LabelDefinition) 702 for _, l := range labelDefs { 703 defs[l.AtUri().String()] = &l 704 } 705 706 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 707 if err != nil { 708 l.Error("failed to build label state", "err", err) 709 return 710 } 711 state := states[subject] 712 713 user := rp.oauth.GetMultiAccountUser(r) 714 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{ 715 BaseParams: pages.BaseParamsFromContext(r.Context()), 716 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 717 Defs: defs, 718 Subject: subject.String(), 719 State: state, 720 }) 721} 722 723func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) { 724 user := rp.oauth.GetMultiAccountUser(r) 725 l := rp.logger.With("handler", "AddCollaborator") 726 l = l.With("did", user.Did) 727 728 f, err := rp.repoResolver.Resolve(r) 729 if err != nil { 730 l.Error("failed to get repo and knot", "err", err) 731 return 732 } 733 734 errorId := "add-collaborator-error" 735 fail := func(msg string, err error) { 736 l.Error(msg, "err", err) 737 rp.pages.Notice(w, errorId, msg) 738 } 739 740 collaborator := r.FormValue("collaborator") 741 if collaborator == "" { 742 fail("Invalid form.", nil) 743 return 744 } 745 746 // remove a single leading `@`, to make @handle work with ResolveIdent 747 collaborator = strings.TrimPrefix(collaborator, "@") 748 749 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 750 if err != nil { 751 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 752 return 753 } 754 755 if collaboratorIdent.DID.String() == user.Did { 756 fail("You seem to be adding yourself as a collaborator.", nil) 757 return 758 } 759 l = l.With("collaborator", collaboratorIdent.Handle) 760 l = l.With("knot", f.Knot) 761 762 if knotcompat.KnotHasCapability(r.Context(), f.Knot, rp.config.Core.Dev, consts.CapKnotACL) { 763 if f.RepoDid == "" { 764 fail("This repository is missing its DID and cannot manage collaborators.", nil) 765 return 766 } 767 768 client, err := rp.oauth.ServiceClient( 769 r, 770 oauth.WithService(f.Knot), 771 oauth.WithLxm(tangled.RepoAddCollaboratorNSID), 772 oauth.WithDev(rp.config.Core.Dev), 773 ) 774 if err != nil { 775 fail("Failed to connect to knot server.", err) 776 return 777 } 778 779 err = tangled.RepoAddCollaborator(r.Context(), client, &tangled.RepoAddCollaborator_Input{ 780 Repo: f.RepoDid, 781 Subject: collaboratorIdent.DID.String(), 782 }) 783 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 784 l.Error("failed to call XRPC repo.addCollaborator", "xrpcerr", xrpcerr, "err", err) 785 rp.pages.Notice(w, errorId, xrpcerr.Error()) 786 return 787 } 788 789 rp.acl.InvalidateCollaborators(f.Knot, f.RepoDid) 790 791 rp.pages.HxRefresh(w) 792 return 793 } 794 795 existing, err := db.GetCollaborators(rp.db, 796 orm.FilterEq("repo_did", f.RepoDid), 797 orm.FilterEq("subject_did", collaboratorIdent.DID.String()), 798 ) 799 if err != nil { 800 fail("Failed to check existing collaborators.", err) 801 return 802 } 803 if len(existing) > 0 { 804 fail(fmt.Sprintf("%s is already a collaborator.", collaboratorIdent.Handle), nil) 805 return 806 } 807 808 // announce this relation into the firehose, store into owners' pds 809 client, err := rp.oauth.AuthorizedClient(r) 810 if err != nil { 811 fail("Failed to write to PDS.", err) 812 return 813 } 814 815 // emit a record 816 currentUser := rp.oauth.GetMultiAccountUser(r) 817 rkey := tid.TID() 818 createdAt := time.Now() 819 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 820 Collection: tangled.RepoCollaboratorNSID, 821 Repo: currentUser.Did, 822 Rkey: rkey, 823 Record: knotcompat.Collaborator(repoCollaboratorRecord(f, collaboratorIdent.DID.String(), createdAt)), 824 }) 825 // invalid record 826 if err != nil { 827 fail("Failed to write record to PDS.", err) 828 return 829 } 830 831 aturi := resp.Uri 832 l = l.With("at-uri", aturi) 833 l.Info("wrote record to PDS") 834 835 tx, err := rp.db.BeginTx(r.Context(), nil) 836 if err != nil { 837 fail("Failed to add collaborator.", err) 838 return 839 } 840 841 rollback := func() { 842 err1 := tx.Rollback() 843 err2 := rp.enforcer.E.LoadPolicy() 844 err3 := rollbackRecord(context.Background(), aturi, client) 845 846 // ignore txn complete errors, this is okay 847 if errors.Is(err1, sql.ErrTxDone) { 848 err1 = nil 849 } 850 851 if errs := errors.Join(err1, err2, err3); errs != nil { 852 l.Error("failed to rollback changes", "errs", errs) 853 return 854 } 855 } 856 defer rollback() 857 858 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier()) 859 if err != nil { 860 fail("Failed to add collaborator permissions.", err) 861 return 862 } 863 864 err = db.AddCollaborator(tx, models.Collaborator{ 865 Did: syntax.DID(currentUser.Did), 866 Rkey: sql.NullString{String: rkey, Valid: true}, 867 SubjectDid: collaboratorIdent.DID, 868 RepoDid: syntax.DID(f.RepoDid), 869 Created: createdAt, 870 }) 871 if err != nil { 872 fail("Failed to add collaborator.", err) 873 return 874 } 875 876 err = tx.Commit() 877 if err != nil { 878 fail("Failed to add collaborator.", err) 879 return 880 } 881 882 err = rp.enforcer.E.SavePolicy() 883 if err != nil { 884 fail("Failed to update collaborator permissions.", err) 885 return 886 } 887 888 // clear aturi to when everything is successful 889 aturi = "" 890 891 rp.pages.HxRefresh(w) 892} 893 894func (rp *Repo) RemoveCollaborator(w http.ResponseWriter, r *http.Request) { 895 user := rp.oauth.GetMultiAccountUser(r) 896 l := rp.logger.With("handler", "RemoveCollaborator") 897 l = l.With("did", user.Did) 898 899 f, err := rp.repoResolver.Resolve(r) 900 if err != nil { 901 l.Error("failed to get repo and knot", "err", err) 902 return 903 } 904 905 errorId := "collaborator-error" 906 fail := func(msg string, err error) { 907 l.Error(msg, "err", err) 908 rp.pages.Notice(w, errorId, msg) 909 } 910 911 collaborator := r.FormValue("collaborator") 912 if collaborator == "" { 913 fail("Invalid form.", nil) 914 return 915 } 916 collaborator = strings.TrimPrefix(collaborator, "@") 917 918 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 919 if err != nil { 920 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 921 return 922 } 923 l = l.With("collaborator", collaboratorIdent.Handle, "knot", f.Knot) 924 925 if collaboratorIdent.DID.String() == f.Did { 926 fail("Cannot remove the repository owner.", nil) 927 return 928 } 929 930 if knotcompat.KnotHasCapability(r.Context(), f.Knot, rp.config.Core.Dev, consts.CapKnotACL) { 931 if f.RepoDid == "" { 932 fail("This repository is missing its DID and cannot manage collaborators.", nil) 933 return 934 } 935 936 client, err := rp.oauth.ServiceClient( 937 r, 938 oauth.WithService(f.Knot), 939 oauth.WithLxm(tangled.RepoRemoveCollaboratorNSID), 940 oauth.WithDev(rp.config.Core.Dev), 941 ) 942 if err != nil { 943 fail("Failed to connect to knot server.", err) 944 return 945 } 946 947 err = tangled.RepoRemoveCollaborator(r.Context(), client, &tangled.RepoRemoveCollaborator_Input{ 948 Repo: f.RepoDid, 949 Subject: collaboratorIdent.DID.String(), 950 }) 951 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 952 l.Error("failed to call XRPC repo.removeCollaborator", "xrpcerr", xrpcerr, "err", err) 953 rp.pages.Notice(w, errorId, xrpcerr.Error()) 954 return 955 } 956 957 rp.acl.InvalidateCollaborators(f.Knot, f.RepoDid) 958 959 rp.pages.HxRefresh(w) 960 return 961 } 962 963 existing, err := db.GetCollaborators(rp.db, 964 orm.FilterEq("repo_did", f.RepoDid), 965 orm.FilterEq("subject_did", collaboratorIdent.DID.String()), 966 ) 967 if err != nil { 968 fail("Failed to look up collaborator.", err) 969 return 970 } 971 if len(existing) == 0 { 972 fail(fmt.Sprintf("%s is not a collaborator.", collaboratorIdent.Handle), nil) 973 return 974 } 975 row := existing[0] 976 977 client, err := rp.oauth.AuthorizedClient(r) 978 if err != nil { 979 fail("Failed to write to PDS.", err) 980 return 981 } 982 983 tx, err := rp.db.BeginTx(r.Context(), nil) 984 if err != nil { 985 fail("Failed to remove collaborator.", err) 986 return 987 } 988 committed := false 989 defer func() { 990 if !committed { 991 tx.Rollback() 992 if err := rp.enforcer.E.LoadPolicy(); err != nil { 993 l.Error("failed to reload policy after rollback", "err", err) 994 } 995 } 996 }() 997 998 if err := rp.enforcer.RemoveCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier()); err != nil { 999 fail("Failed to remove collaborator permissions.", err) 1000 return 1001 } 1002 1003 if err := db.DeleteCollaborator(tx, 1004 orm.FilterEq("repo_did", f.RepoDid), 1005 orm.FilterEq("subject_did", collaboratorIdent.DID.String()), 1006 ); err != nil { 1007 fail("Failed to remove collaborator.", err) 1008 return 1009 } 1010 1011 if row.Rkey.Valid && row.Rkey.String != "" { 1012 if _, err := comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 1013 Collection: tangled.RepoCollaboratorNSID, 1014 Repo: row.Did.String(), 1015 Rkey: row.Rkey.String, 1016 }); err != nil { 1017 fail("Failed to delete collaborator record from PDS.", err) 1018 return 1019 } 1020 } 1021 1022 if err := tx.Commit(); err != nil { 1023 fail("Failed to remove collaborator.", err) 1024 return 1025 } 1026 committed = true 1027 1028 if err := rp.enforcer.E.SavePolicy(); err != nil { 1029 fail("Failed to update collaborator permissions.", err) 1030 return 1031 } 1032 1033 rp.pages.HxRefresh(w) 1034} 1035 1036func (rp *Repo) RenameRepo(w http.ResponseWriter, r *http.Request) { 1037 l := rp.logger.With("handler", "RenameRepo") 1038 noticeId := "rename-repo-error" 1039 1040 user := rp.oauth.GetMultiAccountUser(r) 1041 f, err := rp.repoResolver.Resolve(r) 1042 if err != nil { 1043 l.Error("failed to get repo and knot", "err", err) 1044 rp.pages.Notice(w, noticeId, "Failed to load repository.") 1045 return 1046 } 1047 l = l.With("did", user.Did, "rkey", f.Rkey, "oldName", f.Name) 1048 1049 if f.RepoDid == "" { 1050 rp.pages.Notice(w, noticeId, "This repository's knot has not completed the DID migration; rename is unavailable.") 1051 return 1052 } 1053 1054 if !knotcompat.KnotSupports114(r.Context(), f.Knot, rp.config.Core.Dev) { 1055 rp.pages.Notice(w, noticeId, "This repository's knot is below v1.14 and does not yet support renames. Ask the knot operator to upgrade.") 1056 return 1057 } 1058 1059 newName, err := validateRenameInput(f.Name, f.Rkey, r.FormValue("name")) 1060 if err != nil { 1061 rp.pages.Notice(w, noticeId, err.Error()) 1062 return 1063 } 1064 newRkey := strings.ToLower(newName) 1065 l = l.With("newName", newName, "newRkey", newRkey) 1066 1067 atpClient, err := rp.oauth.AuthorizedClient(r) 1068 if err != nil { 1069 l.Error("failed to get authorized client", "err", err) 1070 rp.pages.Notice(w, noticeId, "Failed to authorize. Try again later.") 1071 return 1072 } 1073 1074 newRepo := *f 1075 newRepo.Name = newName 1076 newRepo.Rkey = newRkey 1077 newRepo.Created = time.Now() 1078 record := newRepo.AsRecord() 1079 1080 if newRkey == f.Rkey { 1081 ex, err := comatproto.RepoGetRecord(r.Context(), atpClient, "", tangled.RepoNSID, f.Did, f.Rkey) 1082 if err != nil { 1083 l.Error("failed to fetch existing record", "err", err) 1084 rp.pages.Notice(w, noticeId, "Failed to read repository record from PDS.") 1085 return 1086 } 1087 1088 _, err = comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1089 Collection: tangled.RepoNSID, 1090 Repo: f.Did, 1091 Rkey: f.Rkey, 1092 SwapRecord: ex.Cid, 1093 Record: &lexutil.LexiconTypeDecoder{ 1094 Val: &record, 1095 }, 1096 }) 1097 if err != nil { 1098 l.Error("failed to update display name on PDS", "err", err) 1099 rp.pages.Notice(w, noticeId, "Failed to save display name to PDS.") 1100 return 1101 } 1102 l.Info("updated display name on PDS") 1103 1104 if err := db.UpdateRepoDisplayName(rp.db, f.Did, f.Rkey, newName); err != nil { 1105 l.Error("optimistic display name update failed", "err", err) 1106 } 1107 } else { 1108 ex, getErr := comatproto.RepoGetRecord(r.Context(), atpClient, "", tangled.RepoNSID, f.Did, newRkey) 1109 switch { 1110 case getErr != nil: 1111 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 1112 Collection: tangled.RepoNSID, 1113 Repo: f.Did, 1114 Rkey: &newRkey, 1115 Record: &lexutil.LexiconTypeDecoder{Val: &record}, 1116 }) 1117 if err != nil { 1118 l.Error("failed to write rename to PDS", "err", err) 1119 rp.pages.Notice(w, noticeId, "Failed to save renamed repository to PDS.") 1120 return 1121 } 1122 l.Info("wrote rename-create to PDS; old record retained as alias") 1123 1124 default: 1125 existing, ok := ex.Value.Val.(*tangled.Repo) 1126 if !ok || existing.RepoDid == nil || *existing.RepoDid != f.RepoDid { 1127 rp.pages.Notice(w, noticeId, fmt.Sprintf("You already have a repository named %q.", newRkey)) 1128 return 1129 } 1130 _, err = comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1131 Collection: tangled.RepoNSID, 1132 Repo: f.Did, 1133 Rkey: newRkey, 1134 SwapRecord: ex.Cid, 1135 Record: &lexutil.LexiconTypeDecoder{Val: &record}, 1136 }) 1137 if err != nil { 1138 l.Error("failed to rewrite rename-back record on PDS", "err", err) 1139 rp.pages.Notice(w, noticeId, "Failed to save renamed repository to PDS.") 1140 return 1141 } 1142 l.Info("rewrote rename-back record on PDS over prior alias") 1143 } 1144 1145 tx, err := rp.db.Begin() 1146 if err != nil { 1147 l.Error("failed to begin rename tx", "err", err) 1148 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid)) 1149 return 1150 } 1151 defer tx.Rollback() 1152 1153 if err := db.RenameRepo(tx, f.Did, f.Rkey, newRkey, newName); err != nil { 1154 l.Error("optimistic rename failed", "err", err) 1155 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid)) 1156 return 1157 } 1158 if err := db.RecordRepoRename(tx, f.Did, f.Rkey, f.RepoDid); err != nil { 1159 l.Error("failed to record rename history", "err", err) 1160 } 1161 if err := db.DeleteRepoRename(tx, f.Did, newRkey); err != nil { 1162 l.Error("failed to clear stale rename hint", "err", err) 1163 } 1164 if err := tx.Commit(); err != nil { 1165 l.Error("failed to commit rename tx", "err", err) 1166 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid)) 1167 return 1168 } 1169 } 1170 1171 oldRepo := *f 1172 rp.notifier.RenameRepo(r.Context(), syntax.DID(user.Did), &oldRepo, &newRepo) 1173 1174 if newRkey != f.Rkey { 1175 rp.migrateSiteOnRename(r.Context(), f, newName, newRkey) 1176 } 1177 1178 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid)) 1179} 1180 1181func validateRenameInput(currentName, currentRkey, raw string) (string, error) { 1182 newName := strings.TrimSpace(raw) 1183 if newName == "" { 1184 return "", errors.New("Repository name cannot be empty.") 1185 } 1186 if err := models.ValidateRepoName(newName); err != nil { 1187 return "", err 1188 } 1189 newName = models.StripGitExt(newName) 1190 if newName == currentName { 1191 if _, tidErr := syntax.ParseTID(currentRkey); tidErr == nil { 1192 return newName, nil 1193 } 1194 return "", errors.New("New name matches the current name.") 1195 } 1196 return newName, nil 1197} 1198 1199func (rp *Repo) migrateSiteOnRename(ctx context.Context, oldRepo *models.Repo, newName, newRkey string) { 1200 l := rp.logger.With("handler", "migrateSiteOnRename", "repo_did", oldRepo.RepoDid) 1201 1202 siteConfig, err := db.GetRepoSiteConfig(rp.db, oldRepo.RepoDid) 1203 if err != nil || siteConfig == nil { 1204 return 1205 } 1206 1207 if !rp.cfClient.Enabled() { 1208 return 1209 } 1210 1211 ownerClaim, _ := db.GetActiveDomainClaimForDid(rp.db, oldRepo.Did) 1212 1213 go func() { 1214 bgCtx := context.Background() 1215 oldRkey := oldRepo.Rkey 1216 oldName := oldRepo.Name 1217 1218 if err := sites.Delete(bgCtx, rp.cfClient, oldRepo.Did, oldRkey); err != nil { 1219 l.Error("sites: failed to delete old R2 prefix", "oldRkey", oldRkey, "err", err) 1220 } 1221 1222 newRepo := *oldRepo 1223 newRepo.Name = newName 1224 newRepo.Rkey = newRkey 1225 if deployErr := sites.Deploy(bgCtx, rp.cfClient, rp.config, &newRepo, siteConfig.Branch, siteConfig.Dir); deployErr != nil { 1226 l.Error("sites: redeploy after rename failed", "err", deployErr) 1227 } 1228 1229 if ownerClaim != nil { 1230 // drop the old name's entry when the name actually changed. 1231 if oldName != newName { 1232 if err := sites.DeleteDomainMapping(bgCtx, rp.cfClient, ownerClaim.Domain, oldName); err != nil { 1233 l.Error("sites: failed to remove old KV mapping", "oldName", oldName, "err", err) 1234 } 1235 } 1236 if err := sites.PutDomainMapping(bgCtx, rp.cfClient, ownerClaim.Domain, oldRepo.Did, newName, newRkey, siteConfig.IsIndex); err != nil { 1237 l.Error("sites: failed to write new KV mapping", "newName", newName, "newRkey", newRkey, "err", err) 1238 } 1239 } 1240 1241 l.Info("sites: migrated on rename", "oldName", oldName, "oldRkey", oldRkey, "newName", newName, "newRkey", newRkey) 1242 }() 1243} 1244 1245func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) { 1246 user := rp.oauth.GetMultiAccountUser(r) 1247 l := rp.logger.With("handler", "DeleteRepo") 1248 1249 noticeId := "operation-error" 1250 f, err := rp.repoResolver.Resolve(r) 1251 if err != nil { 1252 l.Error("failed to get repo and knot", "err", err) 1253 return 1254 } 1255 1256 // remove record from pds 1257 atpClient, err := rp.oauth.AuthorizedClient(r) 1258 if err != nil { 1259 l.Error("failed to get authorized client", "err", err) 1260 return 1261 } 1262 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{ 1263 Collection: tangled.RepoNSID, 1264 Repo: user.Did, 1265 Rkey: f.Rkey, 1266 }) 1267 if err != nil { 1268 l.Error("failed to delete record", "err", err) 1269 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.") 1270 return 1271 } 1272 l.Info("removed repo record", "aturi", f.RepoAt().String()) 1273 1274 client, err := rp.oauth.ServiceClient( 1275 r, 1276 oauth.WithService(f.Knot), 1277 oauth.WithLxm(tangled.RepoDeleteNSID), 1278 oauth.WithDev(rp.config.Core.Dev), 1279 ) 1280 if err != nil { 1281 l.Error("failed to connect to knot server", "err", err) 1282 return 1283 } 1284 1285 err = tangled.RepoDelete( 1286 r.Context(), 1287 client, 1288 &tangled.RepoDelete_Input{ 1289 Did: f.Did, 1290 Name: f.Name, 1291 Rkey: f.Rkey, 1292 }, 1293 ) 1294 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1295 l.Error("failed to call XRPC repo.delete", "xrpcerr", xrpcerr, "err", err) 1296 rp.pages.Notice(w, noticeId, xrpcerr.Error()) 1297 return 1298 } 1299 l.Info("deleted repo from knot") 1300 1301 tx, err := rp.db.BeginTx(r.Context(), nil) 1302 if err != nil { 1303 l.Error("failed to start tx") 1304 w.Write(fmt.Append(nil, "failed to add collaborator: ", err)) 1305 return 1306 } 1307 defer func() { 1308 tx.Rollback() 1309 err = rp.enforcer.E.LoadPolicy() 1310 if err != nil { 1311 l.Error("failed to rollback policies") 1312 } 1313 }() 1314 1315 // remove collaborator RBAC 1316 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.RepoIdentifier(), f.Knot) 1317 if err != nil { 1318 rp.pages.Notice(w, noticeId, "Failed to remove collaborators") 1319 return 1320 } 1321 for _, c := range repoCollaborators { 1322 did := c[0] 1323 rp.enforcer.RemoveCollaborator(did, f.Knot, f.RepoIdentifier()) 1324 } 1325 l.Info("removed collaborators") 1326 1327 // remove repo RBAC 1328 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.RepoIdentifier()) 1329 if err != nil { 1330 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules") 1331 return 1332 } 1333 1334 // remove repo from db 1335 err = db.RemoveRepo(tx, f.Did, f.Rkey) 1336 if err != nil { 1337 rp.pages.Notice(w, noticeId, "Failed to update appview") 1338 return 1339 } 1340 l.Info("removed repo from db") 1341 1342 err = tx.Commit() 1343 if err != nil { 1344 l.Error("failed to commit changes", "err", err) 1345 http.Error(w, err.Error(), http.StatusInternalServerError) 1346 return 1347 } 1348 1349 err = rp.enforcer.E.SavePolicy() 1350 if err != nil { 1351 l.Error("failed to update ACLs", "err", err) 1352 http.Error(w, err.Error(), http.StatusInternalServerError) 1353 return 1354 } 1355 1356 rp.notifier.DeleteRepo(r.Context(), f) 1357 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did)) 1358} 1359 1360func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) { 1361 l := rp.logger.With("handler", "SyncRepoFork") 1362 1363 ref := chi.URLParam(r, "ref") 1364 ref, _ = url.PathUnescape(ref) 1365 1366 user := rp.oauth.GetMultiAccountUser(r) 1367 f, err := rp.repoResolver.Resolve(r) 1368 if err != nil { 1369 l.Error("failed to resolve source repo", "err", err) 1370 return 1371 } 1372 1373 switch r.Method { 1374 case http.MethodPost: 1375 client, err := rp.oauth.ServiceClient( 1376 r, 1377 oauth.WithService(f.Knot), 1378 oauth.WithLxm(tangled.RepoForkSyncNSID), 1379 oauth.WithDev(rp.config.Core.Dev), 1380 ) 1381 if err != nil { 1382 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1383 return 1384 } 1385 1386 if f.Source == "" { 1387 rp.pages.Notice(w, "repo", "This repository is not a fork.") 1388 return 1389 } 1390 1391 err = tangled.RepoForkSync( 1392 r.Context(), 1393 client, 1394 &tangled.RepoForkSync_Input{ 1395 Did: user.Did, 1396 Name: f.Name, 1397 Source: f.Source, 1398 Branch: ref, 1399 }, 1400 ) 1401 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1402 l.Error("failed to call XRPC repo.forkSync", "xrpcerr", xrpcerr, "err", err) 1403 rp.pages.Notice(w, "repo", err.Error()) 1404 return 1405 } 1406 1407 rp.pages.HxRefresh(w) 1408 return 1409 } 1410} 1411 1412func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) { 1413 l := rp.logger.With("handler", "ForkRepo") 1414 1415 user := rp.oauth.GetMultiAccountUser(r) 1416 f, err := rp.repoResolver.Resolve(r) 1417 if err != nil { 1418 l.Error("failed to resolve source repo", "err", err) 1419 return 1420 } 1421 1422 switch r.Method { 1423 case http.MethodGet: 1424 user := rp.oauth.GetMultiAccountUser(r) 1425 knots := rp.acl.KnotsForUser(r.Context(), user.Did) 1426 1427 rp.pages.ForkRepo(w, pages.ForkRepoParams{ 1428 BaseParams: pages.BaseParamsFromContext(r.Context()), 1429 Knots: knots, 1430 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1431 }) 1432 1433 case http.MethodPost: 1434 l := rp.logger.With("handler", "ForkRepo") 1435 1436 targetKnot := r.FormValue("knot") 1437 if targetKnot == "" { 1438 rp.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 1439 return 1440 } 1441 l = l.With("targetKnot", targetKnot) 1442 1443 if !rp.acl.IsRepoCreateAllowed(r.Context(), targetKnot, user.Did) { 1444 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 1445 return 1446 } 1447 1448 // choose a name for a fork 1449 forkName := strings.ToLower(r.FormValue("repo_name")) 1450 if forkName == "" { 1451 rp.pages.Notice(w, "repo", "Repository name cannot be empty.") 1452 return 1453 } 1454 1455 // this check is *only* to see if the forked repo name already exists 1456 // in the user's account. 1457 existingRepo, err := db.GetRepo( 1458 rp.db, 1459 orm.FilterEq("did", user.Did), 1460 orm.FilterEq("name", forkName), 1461 ) 1462 if err != nil { 1463 if !errors.Is(err, sql.ErrNoRows) { 1464 l.Error("error fetching existing repo from db", "err", err) 1465 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.") 1466 return 1467 } 1468 } else if existingRepo != nil { 1469 // repo with this name already exists 1470 rp.pages.Notice(w, "repo", "A repository with this name already exists.") 1471 return 1472 } 1473 l = l.With("forkName", forkName) 1474 1475 uri := "https" 1476 if rp.config.Core.Dev { 1477 uri = "http" 1478 } 1479 1480 forkSourceUrl := fmt.Sprintf("%s://%s/%s", uri, f.Knot, f.RepoIdentifier()) 1481 l = l.With("cloneUrl", forkSourceUrl) 1482 1483 rkey := strings.ToLower(forkName) 1484 1485 // TODO: this could coordinate better with the knot to receive a clone status 1486 client, err := rp.oauth.ServiceClient( 1487 r, 1488 oauth.WithService(targetKnot), 1489 oauth.WithLxm(tangled.RepoCreateNSID), 1490 oauth.WithDev(rp.config.Core.Dev), 1491 oauth.WithTimeout(time.Second*20), 1492 ) 1493 if err != nil { 1494 l.Error("could not create service client", "err", err) 1495 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1496 return 1497 } 1498 1499 forkInput := &tangled.RepoCreate_Input{ 1500 Rkey: rkey, 1501 Name: rkey, 1502 Source: &forkSourceUrl, 1503 } 1504 createResp, err := tangled.RepoCreate( 1505 r.Context(), 1506 client, 1507 forkInput, 1508 ) 1509 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1510 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 1511 rp.pages.Notice(w, "repo", xrpcerr.Error()) 1512 return 1513 } 1514 1515 var repoDid string 1516 if createResp != nil && createResp.RepoDid != nil { 1517 repoDid = *createResp.RepoDid 1518 } 1519 if repoDid == "" { 1520 l.Error("knot returned empty repo DID for fork") 1521 rp.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 1522 return 1523 } 1524 1525 forkSource := f.RepoAt().String() 1526 if f.RepoDid != "" { 1527 forkSource = f.RepoDid 1528 } 1529 1530 forkDescription := r.Form.Get("description") 1531 1532 repo := &models.Repo{ 1533 Did: user.Did, 1534 Name: rkey, 1535 Knot: targetKnot, 1536 Rkey: rkey, 1537 Source: forkSource, 1538 Description: forkDescription, 1539 Created: time.Now(), 1540 Labels: rp.config.Label.DefaultLabelDefs, 1541 RepoDid: repoDid, 1542 } 1543 record := repo.AsRecord() 1544 1545 cleanupKnot := func() { 1546 go func() { 1547 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 1548 for attempt, delay := range delays { 1549 time.Sleep(delay) 1550 deleteClient, dErr := rp.oauth.ServiceClient( 1551 r, 1552 oauth.WithService(targetKnot), 1553 oauth.WithLxm(tangled.RepoDeleteNSID), 1554 oauth.WithDev(rp.config.Core.Dev), 1555 ) 1556 if dErr != nil { 1557 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 1558 continue 1559 } 1560 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 1561 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 1562 Did: user.Did, 1563 Name: forkName, 1564 Rkey: rkey, 1565 }); dErr != nil { 1566 cancel() 1567 l.Error("failed to clean up fork on knot after rollback", "attempt", attempt+1, "err", dErr) 1568 continue 1569 } 1570 cancel() 1571 l.Info("successfully cleaned up fork on knot after rollback", "attempt", attempt+1) 1572 return 1573 } 1574 l.Error("exhausted retries for knot cleanup, fork may be orphaned", 1575 "did", user.Did, "fork", forkName, "knot", targetKnot) 1576 }() 1577 } 1578 1579 atpClient, err := rp.oauth.AuthorizedClient(r) 1580 if err != nil { 1581 l.Error("failed to create xrpcclient", "err", err) 1582 cleanupKnot() 1583 rp.pages.Notice(w, "repo", "Failed to fork repository.") 1584 return 1585 } 1586 1587 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1588 Collection: tangled.RepoNSID, 1589 Repo: user.Did, 1590 Rkey: rkey, 1591 Record: &lexutil.LexiconTypeDecoder{ 1592 Val: &record, 1593 }, 1594 }) 1595 if err != nil { 1596 l.Error("failed to write to PDS", "err", err) 1597 cleanupKnot() 1598 rp.pages.Notice(w, "repo", "Failed to announce repository creation.") 1599 return 1600 } 1601 1602 aturi := atresp.Uri 1603 l = l.With("aturi", aturi) 1604 l.Info("wrote to PDS") 1605 1606 tx, err := rp.db.BeginTx(r.Context(), nil) 1607 if err != nil { 1608 l.Info("txn failed", "err", err) 1609 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1610 return 1611 } 1612 1613 rollback := func() { 1614 err1 := tx.Rollback() 1615 err2 := rp.enforcer.E.LoadPolicy() 1616 err3 := rollbackRecord(context.Background(), aturi, atpClient) 1617 1618 if errors.Is(err1, sql.ErrTxDone) { 1619 err1 = nil 1620 } 1621 1622 if errs := errors.Join(err1, err2, err3); errs != nil { 1623 l.Error("failed to rollback changes", "errs", errs) 1624 } 1625 1626 if aturi != "" { 1627 cleanupKnot() 1628 } 1629 } 1630 defer rollback() 1631 1632 err = db.AddRepo(tx, repo) 1633 if err != nil { 1634 l.Error("failed to AddRepo", "err", err) 1635 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1636 return 1637 } 1638 1639 rbacPath := repo.RepoIdentifier() 1640 err = rp.enforcer.AddRepo(user.Did, targetKnot, rbacPath) 1641 if err != nil { 1642 l.Error("failed to add ACLs", "err", err) 1643 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.") 1644 return 1645 } 1646 1647 err = tx.Commit() 1648 if err != nil { 1649 l.Error("failed to commit changes", "err", err) 1650 http.Error(w, err.Error(), http.StatusInternalServerError) 1651 return 1652 } 1653 1654 err = rp.enforcer.E.SavePolicy() 1655 if err != nil { 1656 l.Error("failed to update ACLs", "err", err) 1657 http.Error(w, err.Error(), http.StatusInternalServerError) 1658 return 1659 } 1660 1661 aturi = "" 1662 1663 rp.notifier.NewRepo(r.Context(), repo) 1664 if repoDid != "" { 1665 rp.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 1666 } else { 1667 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, forkName)) 1668 } 1669 } 1670} 1671 1672func (rp *Repo) Stars(w http.ResponseWriter, r *http.Request) { 1673 l := rp.logger.With("handler", "Stars") 1674 1675 user := rp.oauth.GetMultiAccountUser(r) 1676 f, err := rp.repoResolver.Resolve(r) 1677 if err != nil { 1678 l.Error("failed to resolve source repo", "err", err) 1679 return 1680 } 1681 1682 page := pagination.FromContext(r.Context()) 1683 if page.Limit > 30 || page.Limit <= 0 { 1684 page.Limit = 30 1685 } 1686 1687 starrers, err := db.GetStars(rp.db, string(f.RepoDid), page) 1688 if err != nil { 1689 l.Error("failed to fetch starrers", "err", err, "repoDid", f.RepoDid) 1690 return 1691 } 1692 1693 totalCount, err := db.GetStarCount(rp.db, models.StarSubjectRepo, string(f.RepoDid)) 1694 if err != nil { 1695 l.Error("failed to fetch star count", "err", err, "repoDid", f.RepoDid) 1696 return 1697 } 1698 1699 rp.pages.RepoStars(w, pages.RepoStarsParams{ 1700 BaseParams: pages.BaseParamsFromContext(r.Context()), 1701 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1702 Starrers: starrers, 1703 Page: page, 1704 TotalCount: totalCount, 1705 }) 1706} 1707 1708func (rp *Repo) Forks(w http.ResponseWriter, r *http.Request) { 1709 l := rp.logger.With("handler", "Forks") 1710 1711 user := rp.oauth.GetMultiAccountUser(r) 1712 f, err := rp.repoResolver.Resolve(r) 1713 if err != nil { 1714 l.Error("failed to resolve source repo", "err", err) 1715 return 1716 } 1717 1718 var forks []models.Repo 1719 totalCount := 0 1720 page := pagination.FromContext(r.Context()) 1721 if f.RepoDid != "" { 1722 forks, err = db.GetReposPaginated(rp.db, page, orm.FilterEq("source", f.RepoDid)) 1723 if err != nil { 1724 l.Error("failed to fetch forks", "err", err, "repoAt", f.RepoAt()) 1725 return 1726 } 1727 1728 totalCount, err = db.GetForkCount(rp.db, f.RepoDid) 1729 if err != nil { 1730 l.Error("failed to fetch fork count", "err", err, "repoAt", f.RepoAt()) 1731 return 1732 } 1733 } 1734 1735 err = rp.pages.RepoForks(w, pages.RepoForksParams{ 1736 BaseParams: pages.BaseParamsFromContext(r.Context()), 1737 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1738 Forks: forks, 1739 Page: page, 1740 TotalCount: totalCount, 1741 }) 1742 if err != nil { 1743 l.Error("failed to render page", "err", err) 1744 } 1745} 1746 1747// this is used to rollback changes made to the PDS 1748// 1749// it is a no-op if the provided ATURI is empty 1750func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 1751 if aturi == "" { 1752 return nil 1753 } 1754 1755 parsed := syntax.ATURI(aturi) 1756 1757 collection := parsed.Collection().String() 1758 repo := parsed.Authority().String() 1759 rkey := parsed.RecordKey().String() 1760 1761 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 1762 Collection: collection, 1763 Repo: repo, 1764 Rkey: rkey, 1765 }) 1766 return err 1767} 1768 1769func repoCollaboratorRecord(f *models.Repo, subject string, createdAt time.Time) *tangled.RepoCollaborator { 1770 return &tangled.RepoCollaborator{ 1771 Subject: subject, 1772 CreatedAt: createdAt.Format(time.RFC3339), 1773 Repo: f.RepoDid, 1774 } 1775}