Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package repo 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "net/url" 11 "slices" 12 "strings" 13 "time" 14 15 "tangled.org/core/appview/cloudflare" 16 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/knotacl" 21 "tangled.org/core/appview/knotcompat" 22 "tangled.org/core/appview/models" 23 "tangled.org/core/appview/notify" 24 "tangled.org/core/appview/oauth" 25 "tangled.org/core/appview/pages" 26 "tangled.org/core/appview/pagination" 27 "tangled.org/core/appview/reporesolver" 28 "tangled.org/core/appview/sites" 29 "tangled.org/core/appview/validator" 30 xrpcclient "tangled.org/core/appview/xrpcclient" 31 "tangled.org/core/consts" 32 "tangled.org/core/eventconsumer" 33 "tangled.org/core/idresolver" 34 "tangled.org/core/ogre" 35 "tangled.org/core/orm" 36 "tangled.org/core/rbac" 37 "tangled.org/core/tid" 38 "tangled.org/core/xrpc/serviceauth" 39 40 comatproto "github.com/bluesky-social/indigo/api/atproto" 41 "github.com/bluesky-social/indigo/atproto/atclient" 42 "github.com/bluesky-social/indigo/atproto/syntax" 43 lexutil "github.com/bluesky-social/indigo/lex/util" 44 45 "github.com/go-chi/chi/v5" 46) 47 48type Repo struct { 49 repoResolver *reporesolver.RepoResolver 50 idResolver *idresolver.Resolver 51 config *config.Config 52 oauth *oauth.OAuth 53 pages *pages.Pages 54 spindlestream *eventconsumer.Consumer 55 db *db.DB 56 enforcer *rbac.Enforcer 57 acl *knotacl.Service 58 notifier notify.Notifier 59 logger *slog.Logger 60 serviceAuth *serviceauth.ServiceAuth 61 validator *validator.Validator 62 cfClient *cloudflare.Client 63 ogreClient *ogre.Client 64} 65 66func New( 67 oauth *oauth.OAuth, 68 repoResolver *reporesolver.RepoResolver, 69 pages *pages.Pages, 70 spindlestream *eventconsumer.Consumer, 71 idResolver *idresolver.Resolver, 72 db *db.DB, 73 config *config.Config, 74 notifier notify.Notifier, 75 enforcer *rbac.Enforcer, 76 acl *knotacl.Service, 77 logger *slog.Logger, 78 validator *validator.Validator, 79 cfClient *cloudflare.Client, 80) *Repo { 81 return &Repo{ 82 oauth: oauth, 83 repoResolver: repoResolver, 84 pages: pages, 85 idResolver: idResolver, 86 config: config, 87 spindlestream: spindlestream, 88 db: db, 89 notifier: notifier, 90 enforcer: enforcer, 91 acl: acl, 92 logger: logger, 93 validator: validator, 94 cfClient: cfClient, 95 ogreClient: ogre.NewClient(config.Ogre.Host), 96 } 97} 98 99// modify the spindle configured for this repo 100func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) { 101 user := rp.oauth.GetMultiAccountUser(r) 102 l := rp.logger.With("handler", "EditSpindle") 103 l = l.With("did", user.Did) 104 105 errorId := "operation-error" 106 fail := func(msg string, err error) { 107 l.Error(msg, "err", err) 108 rp.pages.Notice(w, errorId, msg) 109 } 110 111 f, err := rp.repoResolver.Resolve(r) 112 if err != nil { 113 fail("Failed to resolve repo. Try again later", err) 114 return 115 } 116 117 newSpindle := r.FormValue("spindle") 118 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value 119 client, err := rp.oauth.AuthorizedClient(r) 120 if err != nil { 121 fail("Failed to authorize. Try again later.", err) 122 return 123 } 124 125 if !removingSpindle { 126 // ensure that this is a valid spindle for this user 127 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Did) 128 if err != nil { 129 fail("Failed to find spindles. Try again later.", err) 130 return 131 } 132 133 if !slices.Contains(validSpindles, newSpindle) { 134 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles)) 135 return 136 } 137 } 138 139 newRepo := *f 140 newRepo.Spindle = newSpindle 141 record := newRepo.AsRecord() 142 143 spindlePtr := &newSpindle 144 if removingSpindle { 145 spindlePtr = nil 146 newRepo.Spindle = "" 147 } 148 149 // optimistic update 150 err = db.UpdateSpindle(rp.db, newRepo.RepoDid, spindlePtr) 151 if err != nil { 152 fail("Failed to update spindle. Try again later.", err) 153 return 154 } 155 156 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 157 if err != nil { 158 fail("Failed to update spindle, no record found on PDS.", err) 159 return 160 } 161 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 162 Collection: tangled.RepoNSID, 163 Repo: newRepo.Did, 164 Rkey: newRepo.Rkey, 165 SwapRecord: ex.Cid, 166 Record: &lexutil.LexiconTypeDecoder{ 167 Val: &record, 168 }, 169 }) 170 171 if err != nil { 172 fail("Failed to update spindle, unable to save to PDS.", err) 173 return 174 } 175 176 oldSpindle := f.Spindle 177 if oldSpindle != "" && oldSpindle != newSpindle { 178 remaining, qErr := db.GetRepos(rp.db, orm.FilterEq("spindle", oldSpindle)) 179 if qErr != nil { 180 l.Warn("failed to count repos using old spindle", "err", qErr) 181 } else if len(remaining) == 0 { 182 rp.spindlestream.RemoveSource(eventconsumer.NewSpindleSource(oldSpindle)) 183 } 184 } 185 186 if !removingSpindle { 187 rp.spindlestream.AddSource( 188 context.Background(), 189 eventconsumer.NewSpindleSource(newSpindle), 190 ) 191 } 192 193 rp.pages.HxRefresh(w) 194} 195 196func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) { 197 user := rp.oauth.GetMultiAccountUser(r) 198 l := rp.logger.With("handler", "AddLabel") 199 l = l.With("did", user.Did) 200 201 f, err := rp.repoResolver.Resolve(r) 202 if err != nil { 203 l.Error("failed to get repo and knot", "err", err) 204 return 205 } 206 207 errorId := "add-label-error" 208 fail := func(msg string, err error) { 209 l.Error(msg, "err", err) 210 rp.pages.Notice(w, errorId, msg) 211 } 212 213 // get form values for label definition 214 name := r.FormValue("name") 215 concreteType := r.FormValue("valueType") 216 valueFormat := r.FormValue("valueFormat") 217 enumValues := r.FormValue("enumValues") 218 scope := r.Form["scope"] 219 color := r.FormValue("color") 220 multiple := r.FormValue("multiple") == "true" 221 222 var variants []string 223 for part := range strings.SplitSeq(enumValues, ",") { 224 if part = strings.TrimSpace(part); part != "" { 225 variants = append(variants, part) 226 } 227 } 228 229 if concreteType == "" { 230 concreteType = "null" 231 } 232 233 format := models.ValueTypeFormatAny 234 if valueFormat == "did" { 235 format = models.ValueTypeFormatDid 236 } 237 238 valueType := models.ValueType{ 239 Type: models.ConcreteType(concreteType), 240 Format: format, 241 Enum: variants, 242 } 243 244 label := models.LabelDefinition{ 245 Did: user.Did, 246 Rkey: tid.TID(), 247 Name: name, 248 ValueType: valueType, 249 Scope: scope, 250 Color: &color, 251 Multiple: multiple, 252 Created: time.Now(), 253 } 254 if err := rp.validator.ValidateLabelDefinition(&label); err != nil { 255 fail(err.Error(), err) 256 return 257 } 258 259 // announce this relation into the firehose, store into owners' pds 260 client, err := rp.oauth.AuthorizedClient(r) 261 if err != nil { 262 fail(err.Error(), err) 263 return 264 } 265 266 // emit a labelRecord 267 labelRecord := label.AsRecord() 268 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 269 Collection: tangled.LabelDefinitionNSID, 270 Repo: label.Did, 271 Rkey: label.Rkey, 272 Record: &lexutil.LexiconTypeDecoder{ 273 Val: &labelRecord, 274 }, 275 }) 276 // invalid record 277 if err != nil { 278 fail("Failed to write record to PDS.", err) 279 return 280 } 281 282 aturi := resp.Uri 283 l = l.With("at-uri", aturi) 284 l.Info("wrote label record to PDS") 285 286 // update the repo to subscribe to this label 287 newRepo := *f 288 newRepo.Labels = append(newRepo.Labels, aturi) 289 repoRecord := newRepo.AsRecord() 290 291 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 292 if err != nil { 293 fail("Failed to update labels, no record found on PDS.", err) 294 return 295 } 296 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 297 Collection: tangled.RepoNSID, 298 Repo: newRepo.Did, 299 Rkey: newRepo.Rkey, 300 SwapRecord: ex.Cid, 301 Record: &lexutil.LexiconTypeDecoder{ 302 Val: &repoRecord, 303 }, 304 }) 305 if err != nil { 306 fail("Failed to update labels for repo.", err) 307 return 308 } 309 310 tx, err := rp.db.BeginTx(r.Context(), nil) 311 if err != nil { 312 fail("Failed to add label.", err) 313 return 314 } 315 316 rollback := func() { 317 err1 := tx.Rollback() 318 err2 := rollbackRecord(context.Background(), aturi, client) 319 320 // ignore txn complete errors, this is okay 321 if errors.Is(err1, sql.ErrTxDone) { 322 err1 = nil 323 } 324 325 if errs := errors.Join(err1, err2); errs != nil { 326 l.Error("failed to rollback changes", "errs", errs) 327 return 328 } 329 } 330 defer rollback() 331 332 _, err = db.AddLabelDefinition(tx, &label) 333 if err != nil { 334 fail("Failed to add label.", err) 335 return 336 } 337 338 if err = db.SubscribeLabel(tx, &models.RepoLabel{ 339 RepoDid: syntax.DID(f.RepoDid), 340 LabelAt: label.AtUri(), 341 }); err != nil { 342 fail("Failed to subscribe to label.", err) 343 return 344 } 345 346 err = tx.Commit() 347 if err != nil { 348 fail("Failed to add label.", err) 349 return 350 } 351 352 // clear aturi when everything is successful 353 aturi = "" 354 355 rp.pages.HxRefresh(w) 356} 357 358func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) { 359 user := rp.oauth.GetMultiAccountUser(r) 360 l := rp.logger.With("handler", "DeleteLabel") 361 l = l.With("did", user.Did) 362 363 f, err := rp.repoResolver.Resolve(r) 364 if err != nil { 365 l.Error("failed to get repo and knot", "err", err) 366 return 367 } 368 369 errorId := "label-operation" 370 fail := func(msg string, err error) { 371 l.Error(msg, "err", err) 372 rp.pages.Notice(w, errorId, msg) 373 } 374 375 // get form values 376 labelId := r.FormValue("label-id") 377 378 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId)) 379 if err != nil { 380 fail("Failed to find label definition.", err) 381 return 382 } 383 384 client, err := rp.oauth.AuthorizedClient(r) 385 if err != nil { 386 fail(err.Error(), err) 387 return 388 } 389 390 // delete label record from PDS 391 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 392 Collection: tangled.LabelDefinitionNSID, 393 Repo: label.Did, 394 Rkey: label.Rkey, 395 }) 396 if err != nil { 397 fail("Failed to delete label record from PDS.", err) 398 return 399 } 400 401 // update repo record to remove the label reference 402 newRepo := *f 403 var updated []string 404 removedAt := label.AtUri().String() 405 for _, l := range newRepo.Labels { 406 if l != removedAt { 407 updated = append(updated, l) 408 } 409 } 410 newRepo.Labels = updated 411 repoRecord := newRepo.AsRecord() 412 413 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 414 if err != nil { 415 fail("Failed to update labels, no record found on PDS.", err) 416 return 417 } 418 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 419 Collection: tangled.RepoNSID, 420 Repo: newRepo.Did, 421 Rkey: newRepo.Rkey, 422 SwapRecord: ex.Cid, 423 Record: &lexutil.LexiconTypeDecoder{ 424 Val: &repoRecord, 425 }, 426 }) 427 if err != nil { 428 fail("Failed to update repo record.", err) 429 return 430 } 431 432 // transaction for DB changes 433 tx, err := rp.db.BeginTx(r.Context(), nil) 434 if err != nil { 435 fail("Failed to delete label.", err) 436 return 437 } 438 defer tx.Rollback() 439 440 err = db.UnsubscribeLabel( 441 tx, 442 orm.FilterEq("repo_did", f.RepoDid), 443 orm.FilterEq("label_at", removedAt), 444 ) 445 if err != nil { 446 fail("Failed to unsubscribe label.", err) 447 return 448 } 449 450 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id)) 451 if err != nil { 452 fail("Failed to delete label definition.", err) 453 return 454 } 455 456 err = tx.Commit() 457 if err != nil { 458 fail("Failed to delete label.", err) 459 return 460 } 461 462 // everything succeeded 463 rp.pages.HxRefresh(w) 464} 465 466func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) { 467 user := rp.oauth.GetMultiAccountUser(r) 468 l := rp.logger.With("handler", "SubscribeLabel") 469 l = l.With("did", user.Did) 470 471 f, err := rp.repoResolver.Resolve(r) 472 if err != nil { 473 l.Error("failed to get repo and knot", "err", err) 474 return 475 } 476 477 if err := r.ParseForm(); err != nil { 478 l.Error("invalid form", "err", err) 479 return 480 } 481 482 errorId := "default-label-operation" 483 fail := func(msg string, err error) { 484 l.Error(msg, "err", err) 485 rp.pages.Notice(w, errorId, msg) 486 } 487 488 labelAts := r.Form["label"] 489 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 490 if err != nil { 491 fail("Failed to subscribe to label.", err) 492 return 493 } 494 495 newRepo := *f 496 newRepo.Labels = append(newRepo.Labels, labelAts...) 497 498 // dedup 499 slices.Sort(newRepo.Labels) 500 newRepo.Labels = slices.Compact(newRepo.Labels) 501 502 repoRecord := newRepo.AsRecord() 503 504 client, err := rp.oauth.AuthorizedClient(r) 505 if err != nil { 506 fail(err.Error(), err) 507 return 508 } 509 510 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 511 if err != nil { 512 fail("Failed to update labels, no record found on PDS.", err) 513 return 514 } 515 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 516 Collection: tangled.RepoNSID, 517 Repo: newRepo.Did, 518 Rkey: newRepo.Rkey, 519 SwapRecord: ex.Cid, 520 Record: &lexutil.LexiconTypeDecoder{ 521 Val: &repoRecord, 522 }, 523 }) 524 525 tx, err := rp.db.Begin() 526 if err != nil { 527 fail("Failed to subscribe to label.", err) 528 return 529 } 530 defer tx.Rollback() 531 532 for _, l := range labelAts { 533 err = db.SubscribeLabel(tx, &models.RepoLabel{ 534 RepoDid: syntax.DID(f.RepoDid), 535 LabelAt: syntax.ATURI(l), 536 }) 537 if err != nil { 538 fail("Failed to subscribe to label.", err) 539 return 540 } 541 } 542 543 if err := tx.Commit(); err != nil { 544 fail("Failed to subscribe to label.", err) 545 return 546 } 547 548 // everything succeeded 549 rp.pages.HxRefresh(w) 550} 551 552func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) { 553 user := rp.oauth.GetMultiAccountUser(r) 554 l := rp.logger.With("handler", "UnsubscribeLabel") 555 l = l.With("did", user.Did) 556 557 f, err := rp.repoResolver.Resolve(r) 558 if err != nil { 559 l.Error("failed to get repo and knot", "err", err) 560 return 561 } 562 563 if err := r.ParseForm(); err != nil { 564 l.Error("invalid form", "err", err) 565 return 566 } 567 568 errorId := "default-label-operation" 569 fail := func(msg string, err error) { 570 l.Error(msg, "err", err) 571 rp.pages.Notice(w, errorId, msg) 572 } 573 574 labelAts := r.Form["label"] 575 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 576 if err != nil { 577 fail("Failed to unsubscribe to label.", err) 578 return 579 } 580 581 // update repo record to remove the label reference 582 newRepo := *f 583 var updated []string 584 for _, l := range newRepo.Labels { 585 if !slices.Contains(labelAts, l) { 586 updated = append(updated, l) 587 } 588 } 589 newRepo.Labels = updated 590 repoRecord := newRepo.AsRecord() 591 592 client, err := rp.oauth.AuthorizedClient(r) 593 if err != nil { 594 fail(err.Error(), err) 595 return 596 } 597 598 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 599 if err != nil { 600 fail("Failed to update labels, no record found on PDS.", err) 601 return 602 } 603 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 604 Collection: tangled.RepoNSID, 605 Repo: newRepo.Did, 606 Rkey: newRepo.Rkey, 607 SwapRecord: ex.Cid, 608 Record: &lexutil.LexiconTypeDecoder{ 609 Val: &repoRecord, 610 }, 611 }) 612 613 err = db.UnsubscribeLabel( 614 rp.db, 615 orm.FilterEq("repo_did", f.RepoDid), 616 orm.FilterIn("label_at", labelAts), 617 ) 618 if err != nil { 619 fail("Failed to unsubscribe label.", err) 620 return 621 } 622 623 // everything succeeded 624 rp.pages.HxRefresh(w) 625} 626 627func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) { 628 l := rp.logger.With("handler", "LabelPanel") 629 630 f, err := rp.repoResolver.Resolve(r) 631 if err != nil { 632 l.Error("failed to get repo and knot", "err", err) 633 return 634 } 635 636 subjectStr := r.FormValue("subject") 637 subject, err := syntax.ParseATURI(subjectStr) 638 if err != nil { 639 l.Error("failed to get repo and knot", "err", err) 640 return 641 } 642 643 labelDefs, err := db.GetLabelDefinitions( 644 rp.db, 645 orm.FilterIn("at_uri", f.Labels), 646 orm.FilterContains("scope", subject.Collection().String()), 647 ) 648 if err != nil { 649 l.Error("failed to fetch label defs", "err", err) 650 return 651 } 652 653 defs := make(map[string]*models.LabelDefinition) 654 for _, l := range labelDefs { 655 defs[l.AtUri().String()] = &l 656 } 657 658 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 659 if err != nil { 660 l.Error("failed to build label state", "err", err) 661 return 662 } 663 state := states[subject] 664 665 user := rp.oauth.GetMultiAccountUser(r) 666 rp.pages.LabelPanel(w, pages.LabelPanelParams{ 667 BaseParams: pages.BaseParamsFromContext(r.Context()), 668 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 669 Defs: defs, 670 Subject: subject.String(), 671 State: state, 672 }) 673} 674 675func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) { 676 l := rp.logger.With("handler", "EditLabelPanel") 677 678 f, err := rp.repoResolver.Resolve(r) 679 if err != nil { 680 l.Error("failed to get repo and knot", "err", err) 681 return 682 } 683 684 subjectStr := r.FormValue("subject") 685 subject, err := syntax.ParseATURI(subjectStr) 686 if err != nil { 687 l.Error("failed to get repo and knot", "err", err) 688 return 689 } 690 691 labelDefs, err := db.GetLabelDefinitions( 692 rp.db, 693 orm.FilterIn("at_uri", f.Labels), 694 orm.FilterContains("scope", subject.Collection().String()), 695 ) 696 if err != nil { 697 l.Error("failed to fetch labels", "err", err) 698 return 699 } 700 701 defs := make(map[string]*models.LabelDefinition) 702 for _, l := range labelDefs { 703 defs[l.AtUri().String()] = &l 704 } 705 706 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 707 if err != nil { 708 l.Error("failed to build label state", "err", err) 709 return 710 } 711 state := states[subject] 712 713 user := rp.oauth.GetMultiAccountUser(r) 714 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{ 715 BaseParams: pages.BaseParamsFromContext(r.Context()), 716 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 717 Defs: defs, 718 Subject: subject.String(), 719 State: state, 720 }) 721} 722 723func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) { 724 user := rp.oauth.GetMultiAccountUser(r) 725 l := rp.logger.With("handler", "AddCollaborator") 726 l = l.With("did", user.Did) 727 728 f, err := rp.repoResolver.Resolve(r) 729 if err != nil { 730 l.Error("failed to get repo and knot", "err", err) 731 return 732 } 733 734 errorId := "add-collaborator-error" 735 fail := func(msg string, err error) { 736 l.Error(msg, "err", err) 737 rp.pages.Notice(w, errorId, msg) 738 } 739 740 collaborator := r.FormValue("collaborator") 741 if collaborator == "" { 742 fail("Invalid form.", nil) 743 return 744 } 745 746 // remove a single leading `@`, to make @handle work with ResolveIdent 747 collaborator = strings.TrimPrefix(collaborator, "@") 748 749 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 750 if err != nil { 751 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 752 return 753 } 754 755 if collaboratorIdent.DID.String() == user.Did { 756 fail("You seem to be adding yourself as a collaborator.", nil) 757 return 758 } 759 l = l.With("collaborator", collaboratorIdent.Handle) 760 l = l.With("knot", f.Knot) 761 762 capStatus := knotcompat.KnotCapability(r.Context(), f.Knot, rp.config.Core.Dev, consts.CapKnotACL) 763 if capStatus == knotcompat.CapUnknown { 764 fail("Could not reach the knot to add the collaborator. Try again later.", nil) 765 return 766 } 767 if capStatus == knotcompat.CapPresent { 768 if f.RepoDid == "" { 769 fail("This repository is missing its DID and cannot manage collaborators.", nil) 770 return 771 } 772 773 client, err := rp.oauth.ServiceClient( 774 r, 775 oauth.WithService(f.Knot), 776 oauth.WithLxm(tangled.RepoAddCollaboratorNSID), 777 oauth.WithDev(rp.config.Core.Dev), 778 ) 779 if err != nil { 780 fail("Failed to connect to knot server.", err) 781 return 782 } 783 784 err = tangled.RepoAddCollaborator(r.Context(), client, &tangled.RepoAddCollaborator_Input{ 785 Repo: f.RepoDid, 786 Subject: collaboratorIdent.DID.String(), 787 }) 788 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 789 l.Error("failed to call XRPC repo.addCollaborator", "xrpcerr", xrpcerr, "err", err) 790 rp.pages.Notice(w, errorId, xrpcerr.Error()) 791 return 792 } 793 794 rp.acl.InvalidateCollaborators(f.Knot, f.RepoDid) 795 796 rp.pages.HxRefresh(w) 797 return 798 } 799 800 existing, err := db.GetCollaborators(rp.db, 801 orm.FilterEq("repo_did", f.RepoDid), 802 orm.FilterEq("subject_did", collaboratorIdent.DID.String()), 803 ) 804 if err != nil { 805 fail("Failed to check existing collaborators.", err) 806 return 807 } 808 if len(existing) > 0 { 809 fail(fmt.Sprintf("%s is already a collaborator.", collaboratorIdent.Handle), nil) 810 return 811 } 812 813 // announce this relation into the firehose, store into owners' pds 814 client, err := rp.oauth.AuthorizedClient(r) 815 if err != nil { 816 fail("Failed to write to PDS.", err) 817 return 818 } 819 820 // emit a record 821 currentUser := rp.oauth.GetMultiAccountUser(r) 822 rkey := tid.TID() 823 createdAt := time.Now() 824 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 825 Collection: tangled.RepoCollaboratorNSID, 826 Repo: currentUser.Did, 827 Rkey: rkey, 828 Record: knotcompat.Collaborator(repoCollaboratorRecord(f, collaboratorIdent.DID.String(), createdAt)), 829 }) 830 // invalid record 831 if err != nil { 832 fail("Failed to write record to PDS.", err) 833 return 834 } 835 836 aturi := resp.Uri 837 l = l.With("at-uri", aturi) 838 l.Info("wrote record to PDS") 839 840 tx, err := rp.db.BeginTx(r.Context(), nil) 841 if err != nil { 842 fail("Failed to add collaborator.", err) 843 return 844 } 845 846 rollback := func() { 847 err1 := tx.Rollback() 848 err2 := rp.enforcer.E.LoadPolicy() 849 err3 := rollbackRecord(context.Background(), aturi, client) 850 851 // ignore txn complete errors, this is okay 852 if errors.Is(err1, sql.ErrTxDone) { 853 err1 = nil 854 } 855 856 if errs := errors.Join(err1, err2, err3); errs != nil { 857 l.Error("failed to rollback changes", "errs", errs) 858 return 859 } 860 } 861 defer rollback() 862 863 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier()) 864 if err != nil { 865 fail("Failed to add collaborator permissions.", err) 866 return 867 } 868 869 err = db.AddCollaborator(tx, models.Collaborator{ 870 Did: syntax.DID(currentUser.Did), 871 Rkey: sql.NullString{String: rkey, Valid: true}, 872 SubjectDid: collaboratorIdent.DID, 873 RepoDid: syntax.DID(f.RepoDid), 874 Created: createdAt, 875 }) 876 if err != nil { 877 fail("Failed to add collaborator.", err) 878 return 879 } 880 881 err = tx.Commit() 882 if err != nil { 883 fail("Failed to add collaborator.", err) 884 return 885 } 886 887 err = rp.enforcer.E.SavePolicy() 888 if err != nil { 889 fail("Failed to update collaborator permissions.", err) 890 return 891 } 892 893 // clear aturi to when everything is successful 894 aturi = "" 895 896 rp.pages.HxRefresh(w) 897} 898 899func (rp *Repo) RemoveCollaborator(w http.ResponseWriter, r *http.Request) { 900 user := rp.oauth.GetMultiAccountUser(r) 901 l := rp.logger.With("handler", "RemoveCollaborator") 902 l = l.With("did", user.Did) 903 904 f, err := rp.repoResolver.Resolve(r) 905 if err != nil { 906 l.Error("failed to get repo and knot", "err", err) 907 return 908 } 909 910 errorId := "collaborator-error" 911 fail := func(msg string, err error) { 912 l.Error(msg, "err", err) 913 rp.pages.Notice(w, errorId, msg) 914 } 915 916 collaborator := r.FormValue("collaborator") 917 if collaborator == "" { 918 fail("Invalid form.", nil) 919 return 920 } 921 collaborator = strings.TrimPrefix(collaborator, "@") 922 923 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 924 if err != nil { 925 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 926 return 927 } 928 l = l.With("collaborator", collaboratorIdent.Handle, "knot", f.Knot) 929 930 if collaboratorIdent.DID.String() == f.Did { 931 fail("Cannot remove the repository owner.", nil) 932 return 933 } 934 935 capStatus := knotcompat.KnotCapability(r.Context(), f.Knot, rp.config.Core.Dev, consts.CapKnotACL) 936 if capStatus == knotcompat.CapUnknown { 937 fail("Could not reach the knot to remove the collaborator. Try again later.", nil) 938 return 939 } 940 if capStatus == knotcompat.CapPresent { 941 if f.RepoDid == "" { 942 fail("This repository is missing its DID and cannot manage collaborators.", nil) 943 return 944 } 945 946 client, err := rp.oauth.ServiceClient( 947 r, 948 oauth.WithService(f.Knot), 949 oauth.WithLxm(tangled.RepoRemoveCollaboratorNSID), 950 oauth.WithDev(rp.config.Core.Dev), 951 ) 952 if err != nil { 953 fail("Failed to connect to knot server.", err) 954 return 955 } 956 957 err = tangled.RepoRemoveCollaborator(r.Context(), client, &tangled.RepoRemoveCollaborator_Input{ 958 Repo: f.RepoDid, 959 Subject: collaboratorIdent.DID.String(), 960 }) 961 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 962 l.Error("failed to call XRPC repo.removeCollaborator", "xrpcerr", xrpcerr, "err", err) 963 rp.pages.Notice(w, errorId, xrpcerr.Error()) 964 return 965 } 966 967 rp.acl.InvalidateCollaborators(f.Knot, f.RepoDid) 968 969 rp.pages.HxRefresh(w) 970 return 971 } 972 973 existing, err := db.GetCollaborators(rp.db, 974 orm.FilterEq("repo_did", f.RepoDid), 975 orm.FilterEq("subject_did", collaboratorIdent.DID.String()), 976 ) 977 if err != nil { 978 fail("Failed to look up collaborator.", err) 979 return 980 } 981 if len(existing) == 0 { 982 fail(fmt.Sprintf("%s is not a collaborator.", collaboratorIdent.Handle), nil) 983 return 984 } 985 row := existing[0] 986 987 client, err := rp.oauth.AuthorizedClient(r) 988 if err != nil { 989 fail("Failed to write to PDS.", err) 990 return 991 } 992 993 tx, err := rp.db.BeginTx(r.Context(), nil) 994 if err != nil { 995 fail("Failed to remove collaborator.", err) 996 return 997 } 998 committed := false 999 defer func() { 1000 if !committed { 1001 tx.Rollback() 1002 if err := rp.enforcer.E.LoadPolicy(); err != nil { 1003 l.Error("failed to reload policy after rollback", "err", err) 1004 } 1005 } 1006 }() 1007 1008 if err := rp.enforcer.RemoveCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier()); err != nil { 1009 fail("Failed to remove collaborator permissions.", err) 1010 return 1011 } 1012 1013 if err := db.DeleteCollaborator(tx, 1014 orm.FilterEq("repo_did", f.RepoDid), 1015 orm.FilterEq("subject_did", collaboratorIdent.DID.String()), 1016 ); err != nil { 1017 fail("Failed to remove collaborator.", err) 1018 return 1019 } 1020 1021 if row.Rkey.Valid && row.Rkey.String != "" { 1022 if _, err := comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 1023 Collection: tangled.RepoCollaboratorNSID, 1024 Repo: row.Did.String(), 1025 Rkey: row.Rkey.String, 1026 }); err != nil { 1027 fail("Failed to delete collaborator record from PDS.", err) 1028 return 1029 } 1030 } 1031 1032 if err := tx.Commit(); err != nil { 1033 fail("Failed to remove collaborator.", err) 1034 return 1035 } 1036 committed = true 1037 1038 if err := rp.enforcer.E.SavePolicy(); err != nil { 1039 fail("Failed to update collaborator permissions.", err) 1040 return 1041 } 1042 1043 rp.pages.HxRefresh(w) 1044} 1045 1046func (rp *Repo) RenameRepo(w http.ResponseWriter, r *http.Request) { 1047 l := rp.logger.With("handler", "RenameRepo") 1048 noticeId := "rename-repo-error" 1049 1050 user := rp.oauth.GetMultiAccountUser(r) 1051 f, err := rp.repoResolver.Resolve(r) 1052 if err != nil { 1053 l.Error("failed to get repo and knot", "err", err) 1054 rp.pages.Notice(w, noticeId, "Failed to load repository.") 1055 return 1056 } 1057 l = l.With("did", user.Did, "rkey", f.Rkey, "oldName", f.Name) 1058 1059 if f.RepoDid == "" { 1060 rp.pages.Notice(w, noticeId, "This repository's knot has not completed the DID migration; rename is unavailable.") 1061 return 1062 } 1063 1064 if !knotcompat.KnotSupports114(r.Context(), f.Knot, rp.config.Core.Dev) { 1065 rp.pages.Notice(w, noticeId, "This repository's knot is below v1.14 and does not yet support renames. Ask the knot operator to upgrade.") 1066 return 1067 } 1068 1069 newName, err := validateRenameInput(f.Name, f.Rkey, r.FormValue("name")) 1070 if err != nil { 1071 rp.pages.Notice(w, noticeId, err.Error()) 1072 return 1073 } 1074 newRkey := strings.ToLower(newName) 1075 l = l.With("newName", newName, "newRkey", newRkey) 1076 1077 atpClient, err := rp.oauth.AuthorizedClient(r) 1078 if err != nil { 1079 l.Error("failed to get authorized client", "err", err) 1080 rp.pages.Notice(w, noticeId, "Failed to authorize. Try again later.") 1081 return 1082 } 1083 1084 newRepo := *f 1085 newRepo.Name = newName 1086 newRepo.Rkey = newRkey 1087 newRepo.Created = time.Now() 1088 record := newRepo.AsRecord() 1089 1090 if newRkey == f.Rkey { 1091 ex, err := comatproto.RepoGetRecord(r.Context(), atpClient, "", tangled.RepoNSID, f.Did, f.Rkey) 1092 if err != nil { 1093 l.Error("failed to fetch existing record", "err", err) 1094 rp.pages.Notice(w, noticeId, "Failed to read repository record from PDS.") 1095 return 1096 } 1097 1098 _, err = comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1099 Collection: tangled.RepoNSID, 1100 Repo: f.Did, 1101 Rkey: f.Rkey, 1102 SwapRecord: ex.Cid, 1103 Record: &lexutil.LexiconTypeDecoder{ 1104 Val: &record, 1105 }, 1106 }) 1107 if err != nil { 1108 l.Error("failed to update display name on PDS", "err", err) 1109 rp.pages.Notice(w, noticeId, "Failed to save display name to PDS.") 1110 return 1111 } 1112 l.Info("updated display name on PDS") 1113 1114 if err := db.UpdateRepoDisplayName(rp.db, f.Did, f.Rkey, newName); err != nil { 1115 l.Error("optimistic display name update failed", "err", err) 1116 } 1117 } else { 1118 ex, getErr := comatproto.RepoGetRecord(r.Context(), atpClient, "", tangled.RepoNSID, f.Did, newRkey) 1119 switch { 1120 case getErr != nil: 1121 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 1122 Collection: tangled.RepoNSID, 1123 Repo: f.Did, 1124 Rkey: &newRkey, 1125 Record: &lexutil.LexiconTypeDecoder{Val: &record}, 1126 }) 1127 if err != nil { 1128 l.Error("failed to write rename to PDS", "err", err) 1129 rp.pages.Notice(w, noticeId, "Failed to save renamed repository to PDS.") 1130 return 1131 } 1132 l.Info("wrote rename-create to PDS; old record retained as alias") 1133 1134 default: 1135 existing, ok := ex.Value.Val.(*tangled.Repo) 1136 if !ok || existing.RepoDid == nil || *existing.RepoDid != f.RepoDid { 1137 rp.pages.Notice(w, noticeId, fmt.Sprintf("You already have a repository named %q.", newRkey)) 1138 return 1139 } 1140 _, err = comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1141 Collection: tangled.RepoNSID, 1142 Repo: f.Did, 1143 Rkey: newRkey, 1144 SwapRecord: ex.Cid, 1145 Record: &lexutil.LexiconTypeDecoder{Val: &record}, 1146 }) 1147 if err != nil { 1148 l.Error("failed to rewrite rename-back record on PDS", "err", err) 1149 rp.pages.Notice(w, noticeId, "Failed to save renamed repository to PDS.") 1150 return 1151 } 1152 l.Info("rewrote rename-back record on PDS over prior alias") 1153 } 1154 1155 tx, err := rp.db.Begin() 1156 if err != nil { 1157 l.Error("failed to begin rename tx", "err", err) 1158 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid)) 1159 return 1160 } 1161 defer tx.Rollback() 1162 1163 if err := db.RenameRepo(tx, f.Did, f.Rkey, newRkey, newName); err != nil { 1164 l.Error("optimistic rename failed", "err", err) 1165 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid)) 1166 return 1167 } 1168 if err := db.RecordRepoRename(tx, f.Did, f.Rkey, f.RepoDid); err != nil { 1169 l.Error("failed to record rename history", "err", err) 1170 } 1171 if err := db.DeleteRepoRename(tx, f.Did, newRkey); err != nil { 1172 l.Error("failed to clear stale rename hint", "err", err) 1173 } 1174 if err := tx.Commit(); err != nil { 1175 l.Error("failed to commit rename tx", "err", err) 1176 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid)) 1177 return 1178 } 1179 } 1180 1181 oldRepo := *f 1182 rp.notifier.RenameRepo(r.Context(), syntax.DID(user.Did), &oldRepo, &newRepo) 1183 1184 if newRkey != f.Rkey { 1185 rp.migrateSiteOnRename(r.Context(), f, newName, newRkey) 1186 } 1187 1188 rp.pages.HxLocation(w, fmt.Sprintf("/%s", f.RepoDid)) 1189} 1190 1191func validateRenameInput(currentName, currentRkey, raw string) (string, error) { 1192 newName := strings.TrimSpace(raw) 1193 if newName == "" { 1194 return "", errors.New("Repository name cannot be empty.") 1195 } 1196 if err := models.ValidateRepoName(newName); err != nil { 1197 return "", err 1198 } 1199 newName = models.StripGitExt(newName) 1200 if newName == currentName { 1201 if _, tidErr := syntax.ParseTID(currentRkey); tidErr == nil { 1202 return newName, nil 1203 } 1204 return "", errors.New("New name matches the current name.") 1205 } 1206 return newName, nil 1207} 1208 1209func (rp *Repo) migrateSiteOnRename(ctx context.Context, oldRepo *models.Repo, newName, newRkey string) { 1210 l := rp.logger.With("handler", "migrateSiteOnRename", "repo_did", oldRepo.RepoDid) 1211 1212 siteConfig, err := db.GetRepoSiteConfig(rp.db, oldRepo.RepoDid) 1213 if err != nil || siteConfig == nil { 1214 return 1215 } 1216 1217 if !rp.cfClient.Enabled() { 1218 return 1219 } 1220 1221 ownerClaim, _ := db.GetActiveDomainClaimForDid(rp.db, oldRepo.Did) 1222 1223 go func() { 1224 bgCtx := context.Background() 1225 oldRkey := oldRepo.Rkey 1226 oldName := oldRepo.Name 1227 1228 if err := sites.Delete(bgCtx, rp.cfClient, oldRepo.Did, oldRkey); err != nil { 1229 l.Error("sites: failed to delete old R2 prefix", "oldRkey", oldRkey, "err", err) 1230 } 1231 1232 newRepo := *oldRepo 1233 newRepo.Name = newName 1234 newRepo.Rkey = newRkey 1235 if deployErr := sites.Deploy(bgCtx, rp.cfClient, rp.config, &newRepo, siteConfig.Branch, siteConfig.Dir); deployErr != nil { 1236 l.Error("sites: redeploy after rename failed", "err", deployErr) 1237 } 1238 1239 if ownerClaim != nil { 1240 // drop the old name's entry when the name actually changed. 1241 if oldName != newName { 1242 if err := sites.DeleteDomainMapping(bgCtx, rp.cfClient, ownerClaim.Domain, oldName); err != nil { 1243 l.Error("sites: failed to remove old KV mapping", "oldName", oldName, "err", err) 1244 } 1245 } 1246 if err := sites.PutDomainMapping(bgCtx, rp.cfClient, ownerClaim.Domain, oldRepo.Did, newName, newRkey, siteConfig.IsIndex); err != nil { 1247 l.Error("sites: failed to write new KV mapping", "newName", newName, "newRkey", newRkey, "err", err) 1248 } 1249 } 1250 1251 l.Info("sites: migrated on rename", "oldName", oldName, "oldRkey", oldRkey, "newName", newName, "newRkey", newRkey) 1252 }() 1253} 1254 1255func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) { 1256 user := rp.oauth.GetMultiAccountUser(r) 1257 l := rp.logger.With("handler", "DeleteRepo") 1258 1259 noticeId := "operation-error" 1260 f, err := rp.repoResolver.Resolve(r) 1261 if err != nil { 1262 l.Error("failed to get repo and knot", "err", err) 1263 return 1264 } 1265 1266 // remove record from pds 1267 atpClient, err := rp.oauth.AuthorizedClient(r) 1268 if err != nil { 1269 l.Error("failed to get authorized client", "err", err) 1270 return 1271 } 1272 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{ 1273 Collection: tangled.RepoNSID, 1274 Repo: user.Did, 1275 Rkey: f.Rkey, 1276 }) 1277 if err != nil { 1278 l.Error("failed to delete record", "err", err) 1279 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.") 1280 return 1281 } 1282 l.Info("removed repo record", "aturi", f.RepoAt().String()) 1283 1284 client, err := rp.oauth.ServiceClient( 1285 r, 1286 oauth.WithService(f.Knot), 1287 oauth.WithLxm(tangled.RepoDeleteNSID), 1288 oauth.WithDev(rp.config.Core.Dev), 1289 ) 1290 if err != nil { 1291 l.Error("failed to connect to knot server", "err", err) 1292 return 1293 } 1294 1295 err = tangled.RepoDelete( 1296 r.Context(), 1297 client, 1298 &tangled.RepoDelete_Input{ 1299 Did: f.Did, 1300 Name: f.Name, 1301 Rkey: f.Rkey, 1302 }, 1303 ) 1304 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1305 l.Error("failed to call XRPC repo.delete", "xrpcerr", xrpcerr, "err", err) 1306 rp.pages.Notice(w, noticeId, xrpcerr.Error()) 1307 return 1308 } 1309 l.Info("deleted repo from knot") 1310 1311 tx, err := rp.db.BeginTx(r.Context(), nil) 1312 if err != nil { 1313 l.Error("failed to start tx") 1314 w.Write(fmt.Append(nil, "failed to add collaborator: ", err)) 1315 return 1316 } 1317 defer func() { 1318 tx.Rollback() 1319 err = rp.enforcer.E.LoadPolicy() 1320 if err != nil { 1321 l.Error("failed to rollback policies") 1322 } 1323 }() 1324 1325 // remove collaborator RBAC 1326 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.RepoIdentifier(), f.Knot) 1327 if err != nil { 1328 rp.pages.Notice(w, noticeId, "Failed to remove collaborators") 1329 return 1330 } 1331 for _, c := range repoCollaborators { 1332 did := c[0] 1333 rp.enforcer.RemoveCollaborator(did, f.Knot, f.RepoIdentifier()) 1334 } 1335 l.Info("removed collaborators") 1336 1337 // remove repo RBAC 1338 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.RepoIdentifier()) 1339 if err != nil { 1340 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules") 1341 return 1342 } 1343 1344 // remove repo from db 1345 err = db.RemoveRepo(tx, f.Did, f.Rkey) 1346 if err != nil { 1347 rp.pages.Notice(w, noticeId, "Failed to update appview") 1348 return 1349 } 1350 l.Info("removed repo from db") 1351 1352 err = tx.Commit() 1353 if err != nil { 1354 l.Error("failed to commit changes", "err", err) 1355 http.Error(w, err.Error(), http.StatusInternalServerError) 1356 return 1357 } 1358 1359 err = rp.enforcer.E.SavePolicy() 1360 if err != nil { 1361 l.Error("failed to update ACLs", "err", err) 1362 http.Error(w, err.Error(), http.StatusInternalServerError) 1363 return 1364 } 1365 1366 rp.notifier.DeleteRepo(r.Context(), f) 1367 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did)) 1368} 1369 1370func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) { 1371 l := rp.logger.With("handler", "SyncRepoFork") 1372 1373 ref := chi.URLParam(r, "ref") 1374 ref, _ = url.PathUnescape(ref) 1375 1376 user := rp.oauth.GetMultiAccountUser(r) 1377 f, err := rp.repoResolver.Resolve(r) 1378 if err != nil { 1379 l.Error("failed to resolve source repo", "err", err) 1380 return 1381 } 1382 1383 switch r.Method { 1384 case http.MethodPost: 1385 client, err := rp.oauth.ServiceClient( 1386 r, 1387 oauth.WithService(f.Knot), 1388 oauth.WithLxm(tangled.RepoForkSyncNSID), 1389 oauth.WithDev(rp.config.Core.Dev), 1390 ) 1391 if err != nil { 1392 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1393 return 1394 } 1395 1396 if f.Source == "" { 1397 rp.pages.Notice(w, "repo", "This repository is not a fork.") 1398 return 1399 } 1400 1401 err = tangled.RepoForkSync( 1402 r.Context(), 1403 client, 1404 &tangled.RepoForkSync_Input{ 1405 Did: user.Did, 1406 Name: f.Name, 1407 Source: f.Source, 1408 Branch: ref, 1409 }, 1410 ) 1411 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1412 l.Error("failed to call XRPC repo.forkSync", "xrpcerr", xrpcerr, "err", err) 1413 rp.pages.Notice(w, "repo", err.Error()) 1414 return 1415 } 1416 1417 rp.pages.HxRefresh(w) 1418 return 1419 } 1420} 1421 1422func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) { 1423 l := rp.logger.With("handler", "ForkRepo") 1424 1425 user := rp.oauth.GetMultiAccountUser(r) 1426 f, err := rp.repoResolver.Resolve(r) 1427 if err != nil { 1428 l.Error("failed to resolve source repo", "err", err) 1429 return 1430 } 1431 1432 switch r.Method { 1433 case http.MethodGet: 1434 user := rp.oauth.GetMultiAccountUser(r) 1435 knots := rp.acl.KnotsForUser(r.Context(), user.Did) 1436 1437 rp.pages.ForkRepo(w, pages.ForkRepoParams{ 1438 BaseParams: pages.BaseParamsFromContext(r.Context()), 1439 Knots: knots, 1440 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1441 }) 1442 1443 case http.MethodPost: 1444 l := rp.logger.With("handler", "ForkRepo") 1445 1446 targetKnot := r.FormValue("knot") 1447 if targetKnot == "" { 1448 rp.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 1449 return 1450 } 1451 l = l.With("targetKnot", targetKnot) 1452 1453 if !rp.acl.IsRepoCreateAllowed(r.Context(), targetKnot, user.Did) { 1454 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 1455 return 1456 } 1457 1458 // choose a name for a fork 1459 forkName := strings.ToLower(r.FormValue("repo_name")) 1460 if forkName == "" { 1461 rp.pages.Notice(w, "repo", "Repository name cannot be empty.") 1462 return 1463 } 1464 1465 // this check is *only* to see if the forked repo name already exists 1466 // in the user's account. 1467 existingRepo, err := db.GetRepo( 1468 rp.db, 1469 orm.FilterEq("did", user.Did), 1470 orm.FilterEq("name", forkName), 1471 ) 1472 if err != nil { 1473 if !errors.Is(err, sql.ErrNoRows) { 1474 l.Error("error fetching existing repo from db", "err", err) 1475 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.") 1476 return 1477 } 1478 } else if existingRepo != nil { 1479 // repo with this name already exists 1480 rp.pages.Notice(w, "repo", "A repository with this name already exists.") 1481 return 1482 } 1483 l = l.With("forkName", forkName) 1484 1485 uri := "https" 1486 if rp.config.Core.Dev { 1487 uri = "http" 1488 } 1489 1490 forkSourceUrl := fmt.Sprintf("%s://%s/%s", uri, f.Knot, f.RepoIdentifier()) 1491 l = l.With("cloneUrl", forkSourceUrl) 1492 1493 rkey := strings.ToLower(forkName) 1494 1495 // TODO: this could coordinate better with the knot to receive a clone status 1496 client, err := rp.oauth.ServiceClient( 1497 r, 1498 oauth.WithService(targetKnot), 1499 oauth.WithLxm(tangled.RepoCreateNSID), 1500 oauth.WithDev(rp.config.Core.Dev), 1501 oauth.WithTimeout(time.Second*20), 1502 ) 1503 if err != nil { 1504 l.Error("could not create service client", "err", err) 1505 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1506 return 1507 } 1508 1509 forkInput := &tangled.RepoCreate_Input{ 1510 Rkey: rkey, 1511 Name: rkey, 1512 Source: &forkSourceUrl, 1513 } 1514 createResp, err := tangled.RepoCreate( 1515 r.Context(), 1516 client, 1517 forkInput, 1518 ) 1519 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1520 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 1521 rp.pages.Notice(w, "repo", xrpcerr.Error()) 1522 return 1523 } 1524 1525 var repoDid string 1526 if createResp != nil && createResp.RepoDid != nil { 1527 repoDid = *createResp.RepoDid 1528 } 1529 if repoDid == "" { 1530 l.Error("knot returned empty repo DID for fork") 1531 rp.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 1532 return 1533 } 1534 1535 forkSource := f.RepoAt().String() 1536 if f.RepoDid != "" { 1537 forkSource = f.RepoDid 1538 } 1539 1540 forkDescription := r.Form.Get("description") 1541 1542 repo := &models.Repo{ 1543 Did: user.Did, 1544 Name: rkey, 1545 Knot: targetKnot, 1546 Rkey: rkey, 1547 Source: forkSource, 1548 Description: forkDescription, 1549 Created: time.Now(), 1550 Labels: rp.config.Label.DefaultLabelDefs, 1551 RepoDid: repoDid, 1552 } 1553 record := repo.AsRecord() 1554 1555 cleanupKnot := func() { 1556 go func() { 1557 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 1558 for attempt, delay := range delays { 1559 time.Sleep(delay) 1560 deleteClient, dErr := rp.oauth.ServiceClient( 1561 r, 1562 oauth.WithService(targetKnot), 1563 oauth.WithLxm(tangled.RepoDeleteNSID), 1564 oauth.WithDev(rp.config.Core.Dev), 1565 ) 1566 if dErr != nil { 1567 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 1568 continue 1569 } 1570 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 1571 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 1572 Did: user.Did, 1573 Name: forkName, 1574 Rkey: rkey, 1575 }); dErr != nil { 1576 cancel() 1577 l.Error("failed to clean up fork on knot after rollback", "attempt", attempt+1, "err", dErr) 1578 continue 1579 } 1580 cancel() 1581 l.Info("successfully cleaned up fork on knot after rollback", "attempt", attempt+1) 1582 return 1583 } 1584 l.Error("exhausted retries for knot cleanup, fork may be orphaned", 1585 "did", user.Did, "fork", forkName, "knot", targetKnot) 1586 }() 1587 } 1588 1589 atpClient, err := rp.oauth.AuthorizedClient(r) 1590 if err != nil { 1591 l.Error("failed to create xrpcclient", "err", err) 1592 cleanupKnot() 1593 rp.pages.Notice(w, "repo", "Failed to fork repository.") 1594 return 1595 } 1596 1597 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1598 Collection: tangled.RepoNSID, 1599 Repo: user.Did, 1600 Rkey: rkey, 1601 Record: &lexutil.LexiconTypeDecoder{ 1602 Val: &record, 1603 }, 1604 }) 1605 if err != nil { 1606 l.Error("failed to write to PDS", "err", err) 1607 cleanupKnot() 1608 rp.pages.Notice(w, "repo", "Failed to announce repository creation.") 1609 return 1610 } 1611 1612 aturi := atresp.Uri 1613 l = l.With("aturi", aturi) 1614 l.Info("wrote to PDS") 1615 1616 tx, err := rp.db.BeginTx(r.Context(), nil) 1617 if err != nil { 1618 l.Info("txn failed", "err", err) 1619 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1620 return 1621 } 1622 1623 rollback := func() { 1624 err1 := tx.Rollback() 1625 err2 := rp.enforcer.E.LoadPolicy() 1626 err3 := rollbackRecord(context.Background(), aturi, atpClient) 1627 1628 if errors.Is(err1, sql.ErrTxDone) { 1629 err1 = nil 1630 } 1631 1632 if errs := errors.Join(err1, err2, err3); errs != nil { 1633 l.Error("failed to rollback changes", "errs", errs) 1634 } 1635 1636 if aturi != "" { 1637 cleanupKnot() 1638 } 1639 } 1640 defer rollback() 1641 1642 err = db.AddRepo(tx, repo) 1643 if err != nil { 1644 l.Error("failed to AddRepo", "err", err) 1645 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1646 return 1647 } 1648 1649 rbacPath := repo.RepoIdentifier() 1650 err = rp.enforcer.AddRepo(user.Did, targetKnot, rbacPath) 1651 if err != nil { 1652 l.Error("failed to add ACLs", "err", err) 1653 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.") 1654 return 1655 } 1656 1657 err = tx.Commit() 1658 if err != nil { 1659 l.Error("failed to commit changes", "err", err) 1660 http.Error(w, err.Error(), http.StatusInternalServerError) 1661 return 1662 } 1663 1664 err = rp.enforcer.E.SavePolicy() 1665 if err != nil { 1666 l.Error("failed to update ACLs", "err", err) 1667 http.Error(w, err.Error(), http.StatusInternalServerError) 1668 return 1669 } 1670 1671 aturi = "" 1672 1673 rp.notifier.NewRepo(r.Context(), repo) 1674 if repoDid != "" { 1675 rp.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 1676 } else { 1677 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, forkName)) 1678 } 1679 } 1680} 1681 1682func (rp *Repo) Stars(w http.ResponseWriter, r *http.Request) { 1683 l := rp.logger.With("handler", "Stars") 1684 1685 user := rp.oauth.GetMultiAccountUser(r) 1686 f, err := rp.repoResolver.Resolve(r) 1687 if err != nil { 1688 l.Error("failed to resolve source repo", "err", err) 1689 return 1690 } 1691 1692 page := pagination.FromContext(r.Context()) 1693 if page.Limit > 30 || page.Limit <= 0 { 1694 page.Limit = 30 1695 } 1696 1697 starrers, err := db.GetStars(rp.db, string(f.RepoDid), page) 1698 if err != nil { 1699 l.Error("failed to fetch starrers", "err", err, "repoDid", f.RepoDid) 1700 return 1701 } 1702 1703 totalCount, err := db.GetStarCount(rp.db, models.StarSubjectRepo, string(f.RepoDid)) 1704 if err != nil { 1705 l.Error("failed to fetch star count", "err", err, "repoDid", f.RepoDid) 1706 return 1707 } 1708 1709 rp.pages.RepoStars(w, pages.RepoStarsParams{ 1710 BaseParams: pages.BaseParamsFromContext(r.Context()), 1711 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1712 Starrers: starrers, 1713 Page: page, 1714 TotalCount: totalCount, 1715 }) 1716} 1717 1718func (rp *Repo) Forks(w http.ResponseWriter, r *http.Request) { 1719 l := rp.logger.With("handler", "Forks") 1720 1721 user := rp.oauth.GetMultiAccountUser(r) 1722 f, err := rp.repoResolver.Resolve(r) 1723 if err != nil { 1724 l.Error("failed to resolve source repo", "err", err) 1725 return 1726 } 1727 1728 var forks []models.Repo 1729 totalCount := 0 1730 page := pagination.FromContext(r.Context()) 1731 if f.RepoDid != "" { 1732 forks, err = db.GetReposPaginated(rp.db, page, orm.FilterEq("source", f.RepoDid)) 1733 if err != nil { 1734 l.Error("failed to fetch forks", "err", err, "repoAt", f.RepoAt()) 1735 return 1736 } 1737 1738 totalCount, err = db.GetForkCount(rp.db, f.RepoDid) 1739 if err != nil { 1740 l.Error("failed to fetch fork count", "err", err, "repoAt", f.RepoAt()) 1741 return 1742 } 1743 } 1744 1745 err = rp.pages.RepoForks(w, pages.RepoForksParams{ 1746 BaseParams: pages.BaseParamsFromContext(r.Context()), 1747 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1748 Forks: forks, 1749 Page: page, 1750 TotalCount: totalCount, 1751 }) 1752 if err != nil { 1753 l.Error("failed to render page", "err", err) 1754 } 1755} 1756 1757// this is used to rollback changes made to the PDS 1758// 1759// it is a no-op if the provided ATURI is empty 1760func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 1761 if aturi == "" { 1762 return nil 1763 } 1764 1765 parsed := syntax.ATURI(aturi) 1766 1767 collection := parsed.Collection().String() 1768 repo := parsed.Authority().String() 1769 rkey := parsed.RecordKey().String() 1770 1771 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 1772 Collection: collection, 1773 Repo: repo, 1774 Rkey: rkey, 1775 }) 1776 return err 1777} 1778 1779func repoCollaboratorRecord(f *models.Repo, subject string, createdAt time.Time) *tangled.RepoCollaborator { 1780 return &tangled.RepoCollaborator{ 1781 Subject: subject, 1782 CreatedAt: createdAt.Format(time.RFC3339), 1783 Repo: f.RepoDid, 1784 } 1785}