Monorepo for Tangled tangled.org
9

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cloudflare" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/email" 21 "tangled.org/core/appview/indexer" 22 "tangled.org/core/appview/knotacl" 23 "tangled.org/core/appview/knotcompat" 24 "tangled.org/core/appview/mentions" 25 "tangled.org/core/appview/models" 26 "tangled.org/core/appview/notify" 27 dbnotify "tangled.org/core/appview/notify/db" 28 lognotify "tangled.org/core/appview/notify/logging" 29 phnotify "tangled.org/core/appview/notify/posthog" 30 whnotify "tangled.org/core/appview/notify/webhook" 31 "tangled.org/core/appview/oauth" 32 "tangled.org/core/appview/pages" 33 pipelinessh "tangled.org/core/appview/pipelines/ssh" 34 "tangled.org/core/appview/reporesolver" 35 "tangled.org/core/appview/repoverify" 36 "tangled.org/core/appview/validator" 37 xrpcclient "tangled.org/core/appview/xrpcclient" 38 "tangled.org/core/consts" 39 "tangled.org/core/eventconsumer" 40 "tangled.org/core/idresolver" 41 "tangled.org/core/jetstream" 42 "tangled.org/core/log" 43 tlog "tangled.org/core/log" 44 "tangled.org/core/orm" 45 "tangled.org/core/rbac" 46 47 comatproto "github.com/bluesky-social/indigo/api/atproto" 48 "github.com/bluesky-social/indigo/atproto/atclient" 49 "github.com/bluesky-social/indigo/atproto/syntax" 50 lexutil "github.com/bluesky-social/indigo/lex/util" 51 "github.com/bluesky-social/indigo/xrpc" 52 53 "github.com/go-chi/chi/v5" 54 "github.com/posthog/posthog-go" 55) 56 57type State struct { 58 db *db.DB 59 notifier notify.Notifier 60 indexer *indexer.Indexer 61 oauth *oauth.OAuth 62 enforcer *rbac.Enforcer 63 pages *pages.Pages 64 idResolver *idresolver.Resolver 65 rdb *cache.Cache 66 mentionsResolver *mentions.Resolver 67 posthog posthog.Client 68 jc *jetstream.JetstreamClient 69 config *config.Config 70 repoResolver *reporesolver.RepoResolver 71 aclService *knotacl.Service 72 knotstream *eventconsumer.Consumer 73 logger *slog.Logger 74 validator *validator.Validator 75 cfClient *cloudflare.Client 76} 77 78func Make(ctx context.Context, config *config.Config) (*State, error) { 79 logger := tlog.FromContext(ctx) 80 81 d, err := db.Make(ctx, config.Core.DbPath) 82 if err != nil { 83 return nil, fmt.Errorf("failed to create db: %w", err) 84 } 85 86 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 87 err = indexer.Init(ctx) 88 if err != nil { 89 return nil, fmt.Errorf("failed to create indexer: %w", err) 90 } 91 92 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 93 if err != nil { 94 return nil, fmt.Errorf("failed to create enforcer: %w", err) 95 } 96 97 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 98 if err != nil { 99 logger.Error("failed to create redis resolver", "err", err) 100 res = idresolver.DefaultResolver(config.Plc.PLCURL) 101 } 102 103 var rdb *cache.Cache 104 if config.Redis.Addr != "" { 105 rdb = cache.New(config.Redis.Addr) 106 } 107 108 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 109 if err != nil { 110 return nil, fmt.Errorf("failed to create posthog client: %w", err) 111 } 112 113 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages")) 114 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch"))) 115 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl")) 116 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth")) 117 if err != nil { 118 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 119 } 120 121 validator := validator.New(d, res, aclService) 122 123 repoResolver := reporesolver.New(config, aclService, d, rdb) 124 125 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 126 127 jc, err := jetstream.NewJetstreamClient( 128 config.Jetstream.Endpoint, 129 "appview", 130 []string{ 131 tangled.ActorProfileNSID, 132 tangled.FeedStarNSID, 133 tangled.FeedReactionNSID, 134 tangled.FeedCommentNSID, 135 tangled.GraphFollowNSID, 136 tangled.GraphVouchNSID, 137 tangled.KnotMemberNSID, 138 tangled.KnotNSID, 139 tangled.LabelDefinitionNSID, 140 tangled.LabelOpNSID, 141 tangled.PublicKeyNSID, 142 tangled.RepoArtifactNSID, 143 tangled.RepoIssueCommentNSID, 144 tangled.RepoIssueNSID, 145 tangled.RepoNSID, 146 tangled.RepoPullNSID, 147 tangled.RepoPullCommentNSID, 148 tangled.SpindleMemberNSID, 149 tangled.SpindleNSID, 150 tangled.StringNSID, 151 }, 152 nil, 153 tlog.SubLogger(logger, "jetstream"), 154 d, 155 false, 156 157 // in-memory filter is inapplicable to appview so 158 // we'll never log dids anyway. 159 false, 160 ) 161 if err != nil { 162 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 163 } 164 165 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 166 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 167 } 168 169 var notifiers []notify.Notifier 170 171 // Always add the database notifier 172 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 173 174 // Add other notifiers in production only 175 if !config.Core.Dev { 176 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 177 } 178 notifiers = append(notifiers, indexer) 179 180 notifiers = append(notifiers, whnotify.NewNotifier(d)) 181 182 notifier := notify.NewMergedNotifier(notifiers) 183 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 184 185 ingester := appview.Ingester{ 186 Ctx: ctx, 187 Db: d, 188 Enforcer: enforcer, 189 Acl: aclService, 190 IdResolver: res, 191 Cache: rdb, 192 Config: config, 193 Logger: log.SubLogger(logger, "ingester"), 194 Validator: validator, 195 MentionsResolver: mentionsResolver, 196 Notifier: notifier, 197 Verifier: repoverify.New(res, config.Core.Dev), 198 } 199 err = jc.StartJetstream(ctx, ingester.Ingest()) 200 if err != nil { 201 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 202 } 203 204 go ingester.SweepPendingVerifications() 205 206 var cfClient *cloudflare.Client 207 if config.Cloudflare.ApiToken != "" { 208 cfClient, err = cloudflare.New(config) 209 if err != nil { 210 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 211 cfClient = nil 212 } 213 } 214 215 knotstream, err := Knotstream(ctx, config, d, aclService, enforcer, posthog, notifier, cfClient) 216 if err != nil { 217 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 218 } 219 knotstream.Start(ctx) 220 221 state := &State{ 222 db: d, 223 notifier: notifier, 224 indexer: indexer, 225 oauth: oauth, 226 enforcer: enforcer, 227 pages: pages, 228 idResolver: res, 229 rdb: rdb, 230 mentionsResolver: mentionsResolver, 231 posthog: posthog, 232 jc: jc, 233 config: config, 234 repoResolver: repoResolver, 235 aclService: aclService, 236 knotstream: knotstream, 237 logger: logger, 238 validator: validator, 239 cfClient: cfClient, 240 } 241 242 // fetch initial bluesky posts if configured 243 go fetchBskyPosts(ctx, res, config, d, logger) 244 245 return state, nil 246} 247 248func (s *State) Close() error { 249 // other close up logic goes here 250 return s.db.Close() 251} 252 253func (s *State) NewSSHServer() *pipelinessh.Server { 254 return pipelinessh.New(s.db, s.config, log.SubLogger(s.logger, "pipelinessh")) 255} 256 257func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 258 w.Header().Set("Content-Type", "text/plain") 259 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 260 261 securityTxt := `Contact: mailto:security@tangled.org 262Preferred-Languages: en 263Canonical: https://tangled.org/.well-known/security.txt 264Expires: 2030-01-01T21:59:00.000Z 265` 266 w.Write([]byte(securityTxt)) 267} 268 269func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 270 w.Header().Set("Content-Type", "text/plain") 271 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 272 273 robotsTxt := `# Hello, Tanglers! 274User-agent: * 275Allow: / 276Disallow: /*/*/settings 277Disallow: /settings 278Disallow: /*/*/compare 279Disallow: /*/*/fork 280 281Crawl-delay: 1 282` 283 w.Write([]byte(robotsTxt)) 284} 285 286func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 287 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 288 BaseParams: pages.BaseParamsFromContext(r.Context()), 289 }) 290} 291 292func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 293 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 294 BaseParams: pages.BaseParamsFromContext(r.Context()), 295 }) 296} 297 298func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 299 s.pages.Brand(w, pages.BrandParams{ 300 BaseParams: pages.BaseParamsFromContext(r.Context()), 301 }) 302} 303 304func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 305 user := s.oauth.GetMultiAccountUser(r) 306 if user == nil { 307 return 308 } 309 310 l := s.logger.With("handler", "UpgradeBanner") 311 l = l.With("did", user.Did) 312 313 regs, err := db.GetRegistrations( 314 s.db, 315 orm.FilterEq("did", user.Did), 316 orm.FilterEq("needs_upgrade", 1), 317 ) 318 if err != nil { 319 l.Error("non-fatal: failed to get registrations", "err", err) 320 } 321 322 spindles, err := db.GetSpindles( 323 r.Context(), 324 s.db, 325 orm.FilterEq("owner", user.Did), 326 orm.FilterEq("needs_upgrade", 1), 327 ) 328 if err != nil { 329 l.Error("non-fatal: failed to get spindles", "err", err) 330 } 331 332 if regs == nil && spindles == nil { 333 return 334 } 335 336 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 337 Registrations: regs, 338 Spindles: spindles, 339 }) 340} 341 342func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) { 343 // target is echoed back from the form via hx-vals so the response span's 344 // id matches the form's hx-target. Fallback keeps the handler useful if 345 // a caller forgets to send it. 346 target := strings.TrimSpace(r.FormValue("target")) 347 if target == "" { 348 target = "home" 349 } 350 351 w.Header().Set("Content-Type", "text/html") 352 353 emailAddr := strings.TrimSpace(r.FormValue("email")) 354 if !email.IsValidEmail(emailAddr) { 355 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{ 356 Id: target, 357 Error: "Invalid email address.", 358 }) 359 return 360 } 361 362 // For logged-in users, persist the signup locally so the widget stays 363 // hidden across devices. The DB row is the render-time source of truth; 364 // Resend still owns the mailing list itself. 365 if user := s.oauth.GetMultiAccountUser(r); user != nil { 366 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil { 367 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err) 368 } 369 } 370 371 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" { 372 go func() { 373 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil { 374 s.logger.Error("failed to add newsletter contact", "error", err) 375 } 376 }() 377 } else { 378 s.logger.Error( 379 "failed to add newsletter contact, missing resend config", 380 "isKeyPresent", s.config.Resend.ApiKey != "", 381 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "", 382 "emailAddr", emailAddr, 383 ) 384 } 385 386 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target}) 387} 388 389// NewsletterDismiss records that a logged-in user has dismissed the newsletter 390// widget so it stays hidden across their devices. Anonymous callers get a 204 391// with no DB write — localStorage handles the per-browser fallback. 392func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) { 393 user := s.oauth.GetMultiAccountUser(r) 394 if user == nil { 395 w.WriteHeader(http.StatusNoContent) 396 return 397 } 398 399 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil { 400 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err) 401 } 402 w.WriteHeader(http.StatusNoContent) 403} 404 405func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 406 user := chi.URLParam(r, "user") 407 user = strings.TrimPrefix(user, "@") 408 user = strings.TrimSuffix(user, ".keys") 409 410 if user == "" { 411 w.WriteHeader(http.StatusBadRequest) 412 return 413 } 414 415 id, err := s.idResolver.ResolveIdent(r.Context(), user) 416 if err != nil { 417 w.WriteHeader(http.StatusInternalServerError) 418 return 419 } 420 421 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 422 if err != nil { 423 s.logger.Error("failed to get public keys", "err", err) 424 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 425 return 426 } 427 428 if len(pubKeys) == 0 { 429 w.WriteHeader(http.StatusNoContent) 430 return 431 } 432 433 for _, k := range pubKeys { 434 key := strings.TrimRight(k.Key, "\n") 435 fmt.Fprintln(w, key) 436 } 437} 438 439func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 440 switch r.Method { 441 case http.MethodGet: 442 user := s.oauth.GetMultiAccountUser(r) 443 knots := s.aclService.KnotsForUser(r.Context(), user.Did) 444 445 s.pages.NewRepo(w, pages.NewRepoParams{ 446 BaseParams: pages.BaseParamsFromContext(r.Context()), 447 Knots: knots, 448 }) 449 450 case http.MethodPost: 451 l := s.logger.With("handler", "NewRepo") 452 453 user := s.oauth.GetMultiAccountUser(r) 454 l = l.With("did", user.Did) 455 456 // form validation 457 domain := r.FormValue("domain") 458 if domain == "" { 459 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 460 return 461 } 462 l = l.With("knot", domain) 463 464 repoName := r.FormValue("name") 465 if repoName == "" { 466 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 467 return 468 } 469 470 if err := models.ValidateRepoName(repoName); err != nil { 471 s.pages.Notice(w, "repo", err.Error()) 472 return 473 } 474 repoName = models.StripGitExt(repoName) 475 rkey := strings.ToLower(repoName) 476 l = l.With("repoName", repoName, "rkey", rkey) 477 478 defaultBranch := r.FormValue("branch") 479 if defaultBranch == "" { 480 defaultBranch = "main" 481 } 482 l = l.With("defaultBranch", defaultBranch) 483 484 description := r.FormValue("description") 485 if len([]rune(description)) > 140 { 486 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 487 return 488 } 489 490 // ACL validation 491 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) { 492 l.Info("unauthorized") 493 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 494 return 495 } 496 497 // Check for existing repos 498 existingRepo, err := db.GetRepo( 499 s.db, 500 orm.FilterEq("did", user.Did), 501 orm.FilterEq("rkey", rkey), 502 ) 503 if err == nil && existingRepo != nil { 504 l.Info("repo exists") 505 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 506 return 507 } 508 509 atpClient, err := s.oauth.AuthorizedClient(r) 510 if err != nil { 511 l.Error("failed to get authorized client", "err", err) 512 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.") 513 return 514 } 515 516 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 517 l.Info("rkey occupied by prior rename alias") 518 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey)) 519 return 520 } 521 522 client, err := s.oauth.ServiceClient( 523 r, 524 oauth.WithService(domain), 525 oauth.WithLxm(tangled.RepoCreateNSID), 526 oauth.WithDev(s.config.Core.Dev), 527 ) 528 if err != nil { 529 l.Error("service auth failed", "err", err) 530 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.") 531 return 532 } 533 534 input := &tangled.RepoCreate_Input{ 535 Rkey: rkey, 536 Name: rkey, 537 DefaultBranch: &defaultBranch, 538 } 539 createResp, err := tangled.RepoCreate( 540 r.Context(), 541 client, 542 input, 543 ) 544 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 545 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 546 s.pages.Notice(w, "repo", err.Error()) 547 return 548 } 549 550 var repoDid string 551 if createResp != nil && createResp.RepoDid != nil { 552 repoDid = *createResp.RepoDid 553 } 554 if repoDid == "" { 555 l.Error("knot returned empty repo DID") 556 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 557 return 558 } 559 560 repo := &models.Repo{ 561 Did: user.Did, 562 Name: repoName, 563 Knot: domain, 564 Rkey: rkey, 565 Description: description, 566 Created: time.Now(), 567 Labels: s.config.Label.DefaultLabelDefs, 568 RepoDid: repoDid, 569 } 570 record := repo.AsRecord() 571 572 cleanupKnot := func() { 573 go func() { 574 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 575 for attempt, delay := range delays { 576 time.Sleep(delay) 577 deleteClient, dErr := s.oauth.ServiceClient( 578 r, 579 oauth.WithService(domain), 580 oauth.WithLxm(tangled.RepoDeleteNSID), 581 oauth.WithDev(s.config.Core.Dev), 582 ) 583 if dErr != nil { 584 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 585 continue 586 } 587 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 588 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 589 Did: user.Did, 590 Name: rkey, 591 Rkey: rkey, 592 }); dErr != nil { 593 cancel() 594 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 595 continue 596 } 597 cancel() 598 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 599 return 600 } 601 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 602 "did", user.Did, "repo", repoName, "knot", domain) 603 }() 604 } 605 606 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 607 Collection: tangled.RepoNSID, 608 Repo: user.Did, 609 Rkey: &rkey, 610 Record: &lexutil.LexiconTypeDecoder{ 611 Val: &record, 612 }, 613 }) 614 if err != nil { 615 l.Info("PDS write failed", "err", err) 616 cleanupKnot() 617 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 618 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey)) 619 } else { 620 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 621 } 622 return 623 } 624 625 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey) 626 l = l.With("aturi", aturi) 627 l.Info("wrote to PDS") 628 629 tx, err := s.db.BeginTx(r.Context(), nil) 630 if err != nil { 631 l.Info("txn failed", "err", err) 632 s.pages.Notice(w, "repo", "Failed to save repository information.") 633 return 634 } 635 636 rollback := func() { 637 err1 := tx.Rollback() 638 err2 := s.enforcer.E.LoadPolicy() 639 err3 := rollbackRecord(context.Background(), aturi, atpClient) 640 641 if errors.Is(err1, sql.ErrTxDone) { 642 err1 = nil 643 } 644 645 if errs := errors.Join(err1, err2, err3); errs != nil { 646 l.Error("failed to rollback changes", "errs", errs) 647 } 648 649 if aturi != "" { 650 cleanupKnot() 651 } 652 } 653 defer rollback() 654 655 err = db.AddRepo(tx, repo) 656 if err != nil { 657 l.Error("db write failed", "err", err) 658 s.pages.Notice(w, "repo", "Failed to save repository information.") 659 return 660 } 661 662 rbacPath := repo.RepoIdentifier() 663 err = s.enforcer.AddRepo(user.Did, domain, rbacPath) 664 if err != nil { 665 l.Error("acl setup failed", "err", err) 666 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 667 return 668 } 669 670 err = tx.Commit() 671 if err != nil { 672 l.Error("txn commit failed", "err", err) 673 http.Error(w, err.Error(), http.StatusInternalServerError) 674 return 675 } 676 677 err = s.enforcer.E.SavePolicy() 678 if err != nil { 679 l.Error("acl save failed", "err", err) 680 http.Error(w, err.Error(), http.StatusInternalServerError) 681 return 682 } 683 684 aturi = "" 685 686 s.notifier.NewRepo(r.Context(), repo) 687 switch { 688 case repoDid != "": 689 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 690 default: 691 handle := s.pages.DisplayHandle(r.Context(), user.Did) 692 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey)) 693 } 694 } 695} 696 697func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool { 698 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 699 defer cancel() 700 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey) 701 return err == nil && resp != nil 702} 703 704// this is used to rollback changes made to the PDS 705// 706// it is a no-op if the provided ATURI is empty 707func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 708 if aturi == "" { 709 return nil 710 } 711 712 parsed := syntax.ATURI(aturi) 713 714 collection := parsed.Collection().String() 715 repo := parsed.Authority().String() 716 rkey := parsed.RecordKey().String() 717 718 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 719 Collection: collection, 720 Repo: repo, 721 Rkey: rkey, 722 }) 723 return err 724} 725 726func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 727 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 728 if err != nil { 729 return err 730 } 731 // already present 732 if len(defaultLabels) == len(defaults) { 733 return nil 734 } 735 736 labelDefs, err := models.FetchLabelDefs(r, defaults) 737 if err != nil { 738 return err 739 } 740 741 // Insert each label definition to the database 742 for _, labelDef := range labelDefs { 743 _, err = db.AddLabelDefinition(e, &labelDef) 744 if err != nil { 745 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 746 } 747 } 748 749 return nil 750} 751 752func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 753 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 754 if err != nil { 755 logger.Error("failed to resolve tangled.org DID", "err", err) 756 return 757 } 758 759 pdsEndpoint := resolved.PDSEndpoint() 760 if pdsEndpoint == "" { 761 logger.Error("no PDS endpoint found for tangled.sh DID") 762 return 763 } 764 765 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 766 if err != nil { 767 logger.Error("failed to create appassword session... skipping fetch", "err", err) 768 return 769 } 770 771 l := log.SubLogger(logger, "bluesky") 772 773 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 774 defer ticker.Stop() 775 776 for { 777 // refresh session if necessary 778 if !session.IsValid() { 779 l.Debug("access token expired, refreshing session") 780 if err := session.RefreshSession(); err != nil { 781 l.Error("failed to refresh session, stopping bluesky updater", "err", err) 782 return 783 } 784 l.Debug("session refreshed") 785 } 786 787 // make client 788 client := xrpc.Client{ 789 Auth: &xrpc.AuthInfo{ 790 AccessJwt: session.AccessJwt, 791 Did: session.Did, 792 }, 793 Host: session.PdsEndpoint, 794 } 795 796 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 797 if err != nil { 798 l.Error("failed to fetch bluesky posts", "err", err) 799 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 800 l.Error("failed to insert bluesky posts", "err", err) 801 } else { 802 l.Info("inserted bluesky posts", "count", len(posts)) 803 } 804 805 select { 806 case <-ticker.C: 807 case <-ctx.Done(): 808 l.Info("stopping bluesky updater") 809 return 810 } 811 } 812}