Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cloudflare" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/email" 21 "tangled.org/core/appview/indexer" 22 "tangled.org/core/appview/knotacl" 23 "tangled.org/core/appview/knotcompat" 24 "tangled.org/core/appview/mentions" 25 "tangled.org/core/appview/models" 26 "tangled.org/core/appview/notify" 27 dbnotify "tangled.org/core/appview/notify/db" 28 lognotify "tangled.org/core/appview/notify/logging" 29 phnotify "tangled.org/core/appview/notify/posthog" 30 whnotify "tangled.org/core/appview/notify/webhook" 31 "tangled.org/core/appview/oauth" 32 "tangled.org/core/appview/pages" 33 "tangled.org/core/appview/pipelines" 34 pipelinessh "tangled.org/core/appview/pipelines/ssh" 35 "tangled.org/core/appview/reporesolver" 36 "tangled.org/core/appview/repoverify" 37 "tangled.org/core/appview/validator" 38 xrpcclient "tangled.org/core/appview/xrpcclient" 39 "tangled.org/core/blobstore" 40 "tangled.org/core/consts" 41 "tangled.org/core/eventconsumer" 42 "tangled.org/core/idresolver" 43 "tangled.org/core/jetstream" 44 "tangled.org/core/log" 45 tlog "tangled.org/core/log" 46 "tangled.org/core/orm" 47 "tangled.org/core/rbac" 48 49 comatproto "github.com/bluesky-social/indigo/api/atproto" 50 "github.com/bluesky-social/indigo/atproto/atclient" 51 "github.com/bluesky-social/indigo/atproto/syntax" 52 lexutil "github.com/bluesky-social/indigo/lex/util" 53 "github.com/bluesky-social/indigo/xrpc" 54 55 "github.com/go-chi/chi/v5" 56 "github.com/posthog/posthog-go" 57) 58 59type State struct { 60 db *db.DB 61 notifier notify.Notifier 62 indexer *indexer.Indexer 63 oauth *oauth.OAuth 64 enforcer *rbac.Enforcer 65 pages *pages.Pages 66 idResolver *idresolver.Resolver 67 blobStore blobstore.BlobStore 68 rdb *cache.Cache 69 mentionsResolver *mentions.Resolver 70 posthog posthog.Client 71 jc *jetstream.JetstreamClient 72 config *config.Config 73 repoResolver *reporesolver.RepoResolver 74 aclService *knotacl.Service 75 knotstream *eventconsumer.Consumer 76 spindlestream *eventconsumer.Consumer 77 pipelineNotifier *pipelines.StatusNotifier 78 logger *slog.Logger 79 validator *validator.Validator 80 cfClient *cloudflare.Client 81} 82 83func Make(ctx context.Context, config *config.Config) (*State, error) { 84 logger := tlog.FromContext(ctx) 85 86 d, err := db.Make(ctx, config.Core.DbPath) 87 if err != nil { 88 return nil, fmt.Errorf("failed to create db: %w", err) 89 } 90 91 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 92 err = indexer.Init(ctx) 93 if err != nil { 94 return nil, fmt.Errorf("failed to create indexer: %w", err) 95 } 96 97 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 98 if err != nil { 99 return nil, fmt.Errorf("failed to create enforcer: %w", err) 100 } 101 102 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 103 if err != nil { 104 logger.Error("failed to create redis resolver", "err", err) 105 res = idresolver.DefaultResolver(config.Plc.PLCURL) 106 } 107 108 var rdb *cache.Cache 109 if config.Redis.Addr != "" { 110 rdb = cache.New(config.Redis.Addr) 111 } 112 113 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 114 if err != nil { 115 return nil, fmt.Errorf("failed to create posthog client: %w", err) 116 } 117 118 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages")) 119 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch"))) 120 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl")) 121 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth")) 122 if err != nil { 123 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 124 } 125 126 validator := validator.New(d, res, aclService) 127 128 repoResolver := reporesolver.New(config, aclService, d, rdb) 129 130 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 131 132 var blobStore blobstore.BlobStore 133 if config.Porxie.Url != "" { 134 blobStore = blobstore.NewPorxieBlobStore(config.Porxie.Url) 135 } else { 136 blobStore = blobstore.NewPdsBlobStore(res.Directory()) 137 } 138 139 jc, err := jetstream.NewJetstreamClient( 140 config.Jetstream.Endpoint, 141 "appview", 142 []string{ 143 tangled.ActorProfileNSID, 144 tangled.FeedStarNSID, 145 tangled.FeedReactionNSID, 146 tangled.FeedCommentNSID, 147 tangled.GraphFollowNSID, 148 tangled.GraphVouchNSID, 149 tangled.KnotMemberNSID, 150 tangled.KnotNSID, 151 tangled.LabelDefinitionNSID, 152 tangled.LabelOpNSID, 153 tangled.PublicKeyNSID, 154 tangled.RepoArtifactNSID, 155 tangled.RepoIssueCommentNSID, 156 tangled.RepoIssueNSID, 157 tangled.RepoNSID, 158 tangled.RepoPullNSID, 159 tangled.RepoPullCommentNSID, 160 tangled.SpindleMemberNSID, 161 tangled.SpindleNSID, 162 tangled.StringNSID, 163 }, 164 nil, 165 tlog.SubLogger(logger, "jetstream"), 166 d, 167 false, 168 169 // in-memory filter is inapplicable to appview so 170 // we'll never log dids anyway. 171 false, 172 ) 173 if err != nil { 174 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 175 } 176 177 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 178 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 179 } 180 181 var notifiers []notify.Notifier 182 183 // Always add the database notifier 184 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 185 186 // Add other notifiers in production only 187 if !config.Core.Dev { 188 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 189 } 190 notifiers = append(notifiers, indexer) 191 192 notifiers = append(notifiers, whnotify.NewNotifier(d)) 193 194 notifier := notify.NewMergedNotifier(notifiers) 195 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 196 197 ingester := appview.Ingester{ 198 Ctx: ctx, 199 Db: d, 200 Enforcer: enforcer, 201 Acl: aclService, 202 IdResolver: res, 203 BlobStore: blobStore, 204 Cache: rdb, 205 Config: config, 206 Logger: log.SubLogger(logger, "ingester"), 207 Validator: validator, 208 MentionsResolver: mentionsResolver, 209 Notifier: notifier, 210 Verifier: repoverify.New(res, config.Core.Dev), 211 } 212 err = jc.StartJetstream(ctx, ingester.Ingest()) 213 if err != nil { 214 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 215 } 216 217 go ingester.SweepPendingVerifications() 218 219 var cfClient *cloudflare.Client 220 if config.Cloudflare.ApiToken != "" { 221 cfClient, err = cloudflare.New(config) 222 if err != nil { 223 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 224 cfClient = nil 225 } 226 } 227 228 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 229 if err != nil { 230 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 231 } 232 knotstream.Start(ctx) 233 234 pipelineNotifier := pipelines.NewStatusNotifier() 235 236 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier) 237 if err != nil { 238 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 239 } 240 spindlestream.Start(ctx) 241 242 state := &State{ 243 db: d, 244 notifier: notifier, 245 indexer: indexer, 246 oauth: oauth, 247 enforcer: enforcer, 248 pages: pages, 249 idResolver: res, 250 blobStore: blobStore, 251 rdb: rdb, 252 mentionsResolver: mentionsResolver, 253 posthog: posthog, 254 jc: jc, 255 config: config, 256 repoResolver: repoResolver, 257 aclService: aclService, 258 knotstream: knotstream, 259 spindlestream: spindlestream, 260 pipelineNotifier: pipelineNotifier, 261 logger: logger, 262 validator: validator, 263 cfClient: cfClient, 264 } 265 266 // fetch initial bluesky posts if configured 267 go fetchBskyPosts(ctx, res, config, d, logger) 268 269 return state, nil 270} 271 272func (s *State) Close() error { 273 // other close up logic goes here 274 return s.db.Close() 275} 276 277func (s *State) NewSSHServer() *pipelinessh.Server { 278 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh")) 279} 280 281func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 282 w.Header().Set("Content-Type", "text/plain") 283 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 284 285 securityTxt := `Contact: mailto:security@tangled.org 286Preferred-Languages: en 287Canonical: https://tangled.org/.well-known/security.txt 288Expires: 2030-01-01T21:59:00.000Z 289` 290 w.Write([]byte(securityTxt)) 291} 292 293func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 294 w.Header().Set("Content-Type", "text/plain") 295 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 296 297 robotsTxt := `# Hello, Tanglers! 298User-agent: * 299Allow: / 300Disallow: /*/*/settings 301Disallow: /settings 302Disallow: /*/*/compare 303Disallow: /*/*/fork 304 305Crawl-delay: 1 306` 307 w.Write([]byte(robotsTxt)) 308} 309 310func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 311 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 312 BaseParams: pages.BaseParamsFromContext(r.Context()), 313 }) 314} 315 316func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 317 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 318 BaseParams: pages.BaseParamsFromContext(r.Context()), 319 }) 320} 321 322func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 323 s.pages.Brand(w, pages.BrandParams{ 324 BaseParams: pages.BaseParamsFromContext(r.Context()), 325 }) 326} 327 328func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 329 user := s.oauth.GetMultiAccountUser(r) 330 if user == nil { 331 return 332 } 333 334 l := s.logger.With("handler", "UpgradeBanner") 335 l = l.With("did", user.Did) 336 337 regs, err := db.GetRegistrations( 338 s.db, 339 orm.FilterEq("did", user.Did), 340 orm.FilterEq("needs_upgrade", 1), 341 ) 342 if err != nil { 343 l.Error("non-fatal: failed to get registrations", "err", err) 344 } 345 346 spindles, err := db.GetSpindles( 347 r.Context(), 348 s.db, 349 orm.FilterEq("owner", user.Did), 350 orm.FilterEq("needs_upgrade", 1), 351 ) 352 if err != nil { 353 l.Error("non-fatal: failed to get spindles", "err", err) 354 } 355 356 if regs == nil && spindles == nil { 357 return 358 } 359 360 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 361 Registrations: regs, 362 Spindles: spindles, 363 }) 364} 365 366func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) { 367 // target is echoed back from the form via hx-vals so the response span's 368 // id matches the form's hx-target. Fallback keeps the handler useful if 369 // a caller forgets to send it. 370 target := strings.TrimSpace(r.FormValue("target")) 371 if target == "" { 372 target = "home" 373 } 374 375 w.Header().Set("Content-Type", "text/html") 376 377 emailAddr := strings.TrimSpace(r.FormValue("email")) 378 if !email.IsValidEmail(emailAddr) { 379 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{ 380 Id: target, 381 Error: "Invalid email address.", 382 }) 383 return 384 } 385 386 // For logged-in users, persist the signup locally so the widget stays 387 // hidden across devices. The DB row is the render-time source of truth; 388 // Resend still owns the mailing list itself. 389 if user := s.oauth.GetMultiAccountUser(r); user != nil { 390 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil { 391 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err) 392 } 393 } 394 395 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" { 396 go func() { 397 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil { 398 s.logger.Error("failed to add newsletter contact", "error", err) 399 } 400 }() 401 } else { 402 s.logger.Error( 403 "failed to add newsletter contact, missing resend config", 404 "isKeyPresent", s.config.Resend.ApiKey != "", 405 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "", 406 "emailAddr", emailAddr, 407 ) 408 } 409 410 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target}) 411} 412 413// NewsletterDismiss records that a logged-in user has dismissed the newsletter 414// widget so it stays hidden across their devices. Anonymous callers get a 204 415// with no DB write — localStorage handles the per-browser fallback. 416func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) { 417 user := s.oauth.GetMultiAccountUser(r) 418 if user == nil { 419 w.WriteHeader(http.StatusNoContent) 420 return 421 } 422 423 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil { 424 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err) 425 } 426 w.WriteHeader(http.StatusNoContent) 427} 428 429func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 430 user := chi.URLParam(r, "user") 431 user = strings.TrimPrefix(user, "@") 432 433 if user == "" { 434 w.WriteHeader(http.StatusBadRequest) 435 return 436 } 437 438 id, err := s.idResolver.ResolveIdent(r.Context(), user) 439 if err != nil { 440 w.WriteHeader(http.StatusInternalServerError) 441 return 442 } 443 444 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 445 if err != nil { 446 s.logger.Error("failed to get public keys", "err", err) 447 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 448 return 449 } 450 451 if len(pubKeys) == 0 { 452 w.WriteHeader(http.StatusNoContent) 453 return 454 } 455 456 for _, k := range pubKeys { 457 key := strings.TrimRight(k.Key, "\n") 458 fmt.Fprintln(w, key) 459 } 460} 461 462func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 463 switch r.Method { 464 case http.MethodGet: 465 user := s.oauth.GetMultiAccountUser(r) 466 knots := s.aclService.KnotsForUser(r.Context(), user.Did) 467 468 s.pages.NewRepo(w, pages.NewRepoParams{ 469 BaseParams: pages.BaseParamsFromContext(r.Context()), 470 Knots: knots, 471 }) 472 473 case http.MethodPost: 474 l := s.logger.With("handler", "NewRepo") 475 476 user := s.oauth.GetMultiAccountUser(r) 477 l = l.With("did", user.Did) 478 479 // form validation 480 domain := r.FormValue("domain") 481 if domain == "" { 482 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 483 return 484 } 485 l = l.With("knot", domain) 486 487 repoName := r.FormValue("name") 488 if repoName == "" { 489 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 490 return 491 } 492 493 if err := models.ValidateRepoName(repoName); err != nil { 494 s.pages.Notice(w, "repo", err.Error()) 495 return 496 } 497 repoName = models.StripGitExt(repoName) 498 rkey := strings.ToLower(repoName) 499 l = l.With("repoName", repoName, "rkey", rkey) 500 501 defaultBranch := r.FormValue("branch") 502 if defaultBranch == "" { 503 defaultBranch = "main" 504 } 505 l = l.With("defaultBranch", defaultBranch) 506 507 description := r.FormValue("description") 508 if len([]rune(description)) > 140 { 509 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 510 return 511 } 512 513 // ACL validation 514 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) { 515 l.Info("unauthorized") 516 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 517 return 518 } 519 520 // Check for existing repos 521 existingRepo, err := db.GetRepo( 522 s.db, 523 orm.FilterEq("did", user.Did), 524 orm.FilterEq("rkey", rkey), 525 ) 526 if err == nil && existingRepo != nil { 527 l.Info("repo exists") 528 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 529 return 530 } 531 532 atpClient, err := s.oauth.AuthorizedClient(r) 533 if err != nil { 534 l.Error("failed to get authorized client", "err", err) 535 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.") 536 return 537 } 538 539 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 540 l.Info("rkey occupied by prior rename alias") 541 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey)) 542 return 543 } 544 545 client, err := s.oauth.ServiceClient( 546 r, 547 oauth.WithService(domain), 548 oauth.WithLxm(tangled.RepoCreateNSID), 549 oauth.WithDev(s.config.Core.Dev), 550 ) 551 if err != nil { 552 l.Error("service auth failed", "err", err) 553 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.") 554 return 555 } 556 557 input := &tangled.RepoCreate_Input{ 558 Rkey: rkey, 559 Name: rkey, 560 DefaultBranch: &defaultBranch, 561 } 562 createResp, err := tangled.RepoCreate( 563 r.Context(), 564 client, 565 input, 566 ) 567 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 568 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 569 s.pages.Notice(w, "repo", err.Error()) 570 return 571 } 572 573 var repoDid string 574 if createResp != nil && createResp.RepoDid != nil { 575 repoDid = *createResp.RepoDid 576 } 577 if repoDid == "" { 578 l.Error("knot returned empty repo DID") 579 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 580 return 581 } 582 583 repo := &models.Repo{ 584 Did: user.Did, 585 Name: repoName, 586 Knot: domain, 587 Rkey: rkey, 588 Description: description, 589 Created: time.Now(), 590 Labels: s.config.Label.DefaultLabelDefs, 591 RepoDid: repoDid, 592 } 593 record := repo.AsRecord() 594 595 cleanupKnot := func() { 596 go func() { 597 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 598 for attempt, delay := range delays { 599 time.Sleep(delay) 600 deleteClient, dErr := s.oauth.ServiceClient( 601 r, 602 oauth.WithService(domain), 603 oauth.WithLxm(tangled.RepoDeleteNSID), 604 oauth.WithDev(s.config.Core.Dev), 605 ) 606 if dErr != nil { 607 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 608 continue 609 } 610 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 611 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 612 Did: user.Did, 613 Name: rkey, 614 Rkey: rkey, 615 }); dErr != nil { 616 cancel() 617 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 618 continue 619 } 620 cancel() 621 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 622 return 623 } 624 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 625 "did", user.Did, "repo", repoName, "knot", domain) 626 }() 627 } 628 629 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 630 Collection: tangled.RepoNSID, 631 Repo: user.Did, 632 Rkey: &rkey, 633 Record: &lexutil.LexiconTypeDecoder{ 634 Val: &record, 635 }, 636 }) 637 if err != nil { 638 l.Info("PDS write failed", "err", err) 639 cleanupKnot() 640 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 641 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey)) 642 } else { 643 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 644 } 645 return 646 } 647 648 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey) 649 l = l.With("aturi", aturi) 650 l.Info("wrote to PDS") 651 652 tx, err := s.db.BeginTx(r.Context(), nil) 653 if err != nil { 654 l.Info("txn failed", "err", err) 655 s.pages.Notice(w, "repo", "Failed to save repository information.") 656 return 657 } 658 659 rollback := func() { 660 err1 := tx.Rollback() 661 err2 := s.enforcer.E.LoadPolicy() 662 err3 := rollbackRecord(context.Background(), aturi, atpClient) 663 664 if errors.Is(err1, sql.ErrTxDone) { 665 err1 = nil 666 } 667 668 if errs := errors.Join(err1, err2, err3); errs != nil { 669 l.Error("failed to rollback changes", "errs", errs) 670 } 671 672 if aturi != "" { 673 cleanupKnot() 674 } 675 } 676 defer rollback() 677 678 err = db.AddRepo(tx, repo) 679 if err != nil { 680 l.Error("db write failed", "err", err) 681 s.pages.Notice(w, "repo", "Failed to save repository information.") 682 return 683 } 684 685 rbacPath := repo.RepoIdentifier() 686 err = s.enforcer.AddRepo(user.Did, domain, rbacPath) 687 if err != nil { 688 l.Error("acl setup failed", "err", err) 689 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 690 return 691 } 692 693 err = tx.Commit() 694 if err != nil { 695 l.Error("txn commit failed", "err", err) 696 http.Error(w, err.Error(), http.StatusInternalServerError) 697 return 698 } 699 700 err = s.enforcer.E.SavePolicy() 701 if err != nil { 702 l.Error("acl save failed", "err", err) 703 http.Error(w, err.Error(), http.StatusInternalServerError) 704 return 705 } 706 707 aturi = "" 708 709 s.notifier.NewRepo(r.Context(), repo) 710 switch { 711 case repoDid != "": 712 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 713 default: 714 handle := s.pages.DisplayHandle(r.Context(), user.Did) 715 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey)) 716 } 717 } 718} 719 720func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool { 721 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 722 defer cancel() 723 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey) 724 return err == nil && resp != nil 725} 726 727// this is used to rollback changes made to the PDS 728// 729// it is a no-op if the provided ATURI is empty 730func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 731 if aturi == "" { 732 return nil 733 } 734 735 parsed := syntax.ATURI(aturi) 736 737 collection := parsed.Collection().String() 738 repo := parsed.Authority().String() 739 rkey := parsed.RecordKey().String() 740 741 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 742 Collection: collection, 743 Repo: repo, 744 Rkey: rkey, 745 }) 746 return err 747} 748 749func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 750 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 751 if err != nil { 752 return err 753 } 754 // already present 755 if len(defaultLabels) == len(defaults) { 756 return nil 757 } 758 759 labelDefs, err := models.FetchLabelDefs(r, defaults) 760 if err != nil { 761 return err 762 } 763 764 // Insert each label definition to the database 765 for _, labelDef := range labelDefs { 766 _, err = db.AddLabelDefinition(e, &labelDef) 767 if err != nil { 768 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 769 } 770 } 771 772 return nil 773} 774 775func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 776 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 777 if err != nil { 778 logger.Error("failed to resolve tangled.org DID", "err", err) 779 return 780 } 781 782 pdsEndpoint := resolved.PDSEndpoint() 783 if pdsEndpoint == "" { 784 logger.Error("no PDS endpoint found for tangled.sh DID") 785 return 786 } 787 788 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 789 if err != nil { 790 logger.Error("failed to create appassword session... skipping fetch", "err", err) 791 return 792 } 793 794 l := log.SubLogger(logger, "bluesky") 795 796 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 797 defer ticker.Stop() 798 799 for { 800 // refresh session if necessary 801 if !session.IsValid() { 802 l.Debug("access token expired, refreshing session") 803 if err := session.RefreshSession(); err != nil { 804 l.Error("failed to refresh session, stopping bluesky updater", "err", err) 805 return 806 } 807 l.Debug("session refreshed") 808 } 809 810 // make client 811 client := xrpc.Client{ 812 Auth: &xrpc.AuthInfo{ 813 AccessJwt: session.AccessJwt, 814 Did: session.Did, 815 }, 816 Host: session.PdsEndpoint, 817 } 818 819 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 820 if err != nil { 821 l.Error("failed to fetch bluesky posts", "err", err) 822 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 823 l.Error("failed to insert bluesky posts", "err", err) 824 } else { 825 l.Info("inserted bluesky posts", "count", len(posts)) 826 } 827 828 select { 829 case <-ticker.C: 830 case <-ctx.Done(): 831 l.Info("stopping bluesky updater") 832 return 833 } 834 } 835}