Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cloudflare" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/email" 21 "tangled.org/core/appview/indexer" 22 "tangled.org/core/appview/mentions" 23 "tangled.org/core/appview/models" 24 "tangled.org/core/appview/notify" 25 dbnotify "tangled.org/core/appview/notify/db" 26 lognotify "tangled.org/core/appview/notify/logging" 27 phnotify "tangled.org/core/appview/notify/posthog" 28 whnotify "tangled.org/core/appview/notify/webhook" 29 "tangled.org/core/appview/oauth" 30 "tangled.org/core/appview/pages" 31 "tangled.org/core/appview/pipelines" 32 pipelinessh "tangled.org/core/appview/pipelines/ssh" 33 "tangled.org/core/appview/reporesolver" 34 "tangled.org/core/appview/repoverify" 35 "tangled.org/core/appview/validator" 36 xrpcclient "tangled.org/core/appview/xrpcclient" 37 "tangled.org/core/consts" 38 "tangled.org/core/eventconsumer" 39 "tangled.org/core/idresolver" 40 "tangled.org/core/jetstream" 41 "tangled.org/core/log" 42 tlog "tangled.org/core/log" 43 "tangled.org/core/orm" 44 "tangled.org/core/rbac" 45 46 comatproto "github.com/bluesky-social/indigo/api/atproto" 47 "github.com/bluesky-social/indigo/atproto/atclient" 48 "github.com/bluesky-social/indigo/atproto/syntax" 49 lexutil "github.com/bluesky-social/indigo/lex/util" 50 "github.com/bluesky-social/indigo/xrpc" 51 52 "github.com/go-chi/chi/v5" 53 "github.com/posthog/posthog-go" 54) 55 56type State struct { 57 db *db.DB 58 notifier notify.Notifier 59 indexer *indexer.Indexer 60 oauth *oauth.OAuth 61 enforcer *rbac.Enforcer 62 pages *pages.Pages 63 idResolver *idresolver.Resolver 64 rdb *cache.Cache 65 mentionsResolver *mentions.Resolver 66 posthog posthog.Client 67 jc *jetstream.JetstreamClient 68 config *config.Config 69 repoResolver *reporesolver.RepoResolver 70 knotstream *eventconsumer.Consumer 71 spindlestream *eventconsumer.Consumer 72 pipelineNotifier *pipelines.StatusNotifier 73 logger *slog.Logger 74 validator *validator.Validator 75 cfClient *cloudflare.Client 76} 77 78func Make(ctx context.Context, config *config.Config) (*State, error) { 79 logger := tlog.FromContext(ctx) 80 81 d, err := db.Make(ctx, config.Core.DbPath) 82 if err != nil { 83 return nil, fmt.Errorf("failed to create db: %w", err) 84 } 85 86 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 87 err = indexer.Init(ctx) 88 if err != nil { 89 return nil, fmt.Errorf("failed to create indexer: %w", err) 90 } 91 92 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 93 if err != nil { 94 return nil, fmt.Errorf("failed to create enforcer: %w", err) 95 } 96 97 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 98 if err != nil { 99 logger.Error("failed to create redis resolver", "err", err) 100 res = idresolver.DefaultResolver(config.Plc.PLCURL) 101 } 102 103 var rdb *cache.Cache 104 if config.Redis.Addr != "" { 105 rdb = cache.New(config.Redis.Addr) 106 } 107 108 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 109 if err != nil { 110 return nil, fmt.Errorf("failed to create posthog client: %w", err) 111 } 112 113 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages")) 114 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 115 if err != nil { 116 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 117 } 118 validator := validator.New(d, res, enforcer) 119 120 repoResolver := reporesolver.New(config, enforcer, d, rdb) 121 122 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 123 124 jc, err := jetstream.NewJetstreamClient( 125 config.Jetstream.Endpoint, 126 "appview", 127 []string{ 128 tangled.ActorProfileNSID, 129 tangled.FeedStarNSID, 130 tangled.FeedReactionNSID, 131 tangled.FeedCommentNSID, 132 tangled.GraphFollowNSID, 133 tangled.GraphVouchNSID, 134 tangled.KnotMemberNSID, 135 tangled.KnotNSID, 136 tangled.LabelDefinitionNSID, 137 tangled.LabelOpNSID, 138 tangled.PublicKeyNSID, 139 tangled.RepoArtifactNSID, 140 tangled.RepoIssueCommentNSID, 141 tangled.RepoIssueNSID, 142 tangled.RepoNSID, 143 tangled.RepoPullNSID, 144 tangled.RepoPullCommentNSID, 145 tangled.SpindleMemberNSID, 146 tangled.SpindleNSID, 147 tangled.StringNSID, 148 }, 149 nil, 150 tlog.SubLogger(logger, "jetstream"), 151 d, 152 false, 153 154 // in-memory filter is inapplicable to appview so 155 // we'll never log dids anyway. 156 false, 157 ) 158 if err != nil { 159 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 160 } 161 162 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 163 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 164 } 165 166 var notifiers []notify.Notifier 167 168 // Always add the database notifier 169 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 170 171 // Add other notifiers in production only 172 if !config.Core.Dev { 173 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 174 } 175 notifiers = append(notifiers, indexer) 176 177 notifiers = append(notifiers, whnotify.NewNotifier(d)) 178 179 notifier := notify.NewMergedNotifier(notifiers) 180 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 181 182 ingester := appview.Ingester{ 183 Ctx: ctx, 184 Db: d, 185 Enforcer: enforcer, 186 IdResolver: res, 187 Cache: rdb, 188 Config: config, 189 Logger: log.SubLogger(logger, "ingester"), 190 Validator: validator, 191 MentionsResolver: mentionsResolver, 192 Notifier: notifier, 193 Verifier: repoverify.New(res, config.Core.Dev), 194 } 195 err = jc.StartJetstream(ctx, ingester.Ingest()) 196 if err != nil { 197 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 198 } 199 200 go ingester.SweepPendingVerifications() 201 202 var cfClient *cloudflare.Client 203 if config.Cloudflare.ApiToken != "" { 204 cfClient, err = cloudflare.New(config) 205 if err != nil { 206 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 207 cfClient = nil 208 } 209 } 210 211 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 212 if err != nil { 213 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 214 } 215 knotstream.Start(ctx) 216 217 pipelineNotifier := pipelines.NewStatusNotifier() 218 219 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier) 220 if err != nil { 221 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 222 } 223 spindlestream.Start(ctx) 224 225 state := &State{ 226 db: d, 227 notifier: notifier, 228 indexer: indexer, 229 oauth: oauth, 230 enforcer: enforcer, 231 pages: pages, 232 idResolver: res, 233 rdb: rdb, 234 mentionsResolver: mentionsResolver, 235 posthog: posthog, 236 jc: jc, 237 config: config, 238 repoResolver: repoResolver, 239 knotstream: knotstream, 240 spindlestream: spindlestream, 241 pipelineNotifier: pipelineNotifier, 242 logger: logger, 243 validator: validator, 244 cfClient: cfClient, 245 } 246 247 // fetch initial bluesky posts if configured 248 go fetchBskyPosts(ctx, res, config, d, logger) 249 250 return state, nil 251} 252 253func (s *State) Close() error { 254 // other close up logic goes here 255 return s.db.Close() 256} 257 258func (s *State) NewSSHServer() *pipelinessh.Server { 259 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh")) 260} 261 262func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 263 w.Header().Set("Content-Type", "text/plain") 264 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 265 266 securityTxt := `Contact: mailto:security@tangled.org 267Preferred-Languages: en 268Canonical: https://tangled.org/.well-known/security.txt 269Expires: 2030-01-01T21:59:00.000Z 270` 271 w.Write([]byte(securityTxt)) 272} 273 274func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 275 w.Header().Set("Content-Type", "text/plain") 276 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 277 278 robotsTxt := `# Hello, Tanglers! 279User-agent: * 280Allow: / 281Disallow: /*/*/settings 282Disallow: /settings 283Disallow: /*/*/compare 284Disallow: /*/*/fork 285 286Crawl-delay: 1 287` 288 w.Write([]byte(robotsTxt)) 289} 290 291func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 292 user := s.oauth.GetMultiAccountUser(r) 293 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 294 LoggedInUser: user, 295 }) 296} 297 298func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 299 user := s.oauth.GetMultiAccountUser(r) 300 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 301 LoggedInUser: user, 302 }) 303} 304 305func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 306 user := s.oauth.GetMultiAccountUser(r) 307 s.pages.Brand(w, pages.BrandParams{ 308 LoggedInUser: user, 309 }) 310} 311 312func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 313 user := s.oauth.GetMultiAccountUser(r) 314 if user == nil { 315 return 316 } 317 318 l := s.logger.With("handler", "UpgradeBanner") 319 l = l.With("did", user.Did) 320 321 regs, err := db.GetRegistrations( 322 s.db, 323 orm.FilterEq("did", user.Did), 324 orm.FilterEq("needs_upgrade", 1), 325 ) 326 if err != nil { 327 l.Error("non-fatal: failed to get registrations", "err", err) 328 } 329 330 spindles, err := db.GetSpindles( 331 r.Context(), 332 s.db, 333 orm.FilterEq("owner", user.Did), 334 orm.FilterEq("needs_upgrade", 1), 335 ) 336 if err != nil { 337 l.Error("non-fatal: failed to get spindles", "err", err) 338 } 339 340 if regs == nil && spindles == nil { 341 return 342 } 343 344 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 345 Registrations: regs, 346 Spindles: spindles, 347 }) 348} 349 350func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) { 351 // target is echoed back from the form via hx-vals so the response span's 352 // id matches the form's hx-target. Fallback keeps the handler useful if 353 // a caller forgets to send it. 354 target := strings.TrimSpace(r.FormValue("target")) 355 if target == "" { 356 target = "home" 357 } 358 359 w.Header().Set("Content-Type", "text/html") 360 361 emailAddr := strings.TrimSpace(r.FormValue("email")) 362 if !email.IsValidEmail(emailAddr) { 363 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{ 364 Id: target, 365 Error: "Invalid email address.", 366 }) 367 return 368 } 369 370 // For logged-in users, persist the signup locally so the widget stays 371 // hidden across devices. The DB row is the render-time source of truth; 372 // Resend still owns the mailing list itself. 373 if user := s.oauth.GetMultiAccountUser(r); user != nil { 374 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil { 375 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err) 376 } 377 } 378 379 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" { 380 go func() { 381 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil { 382 s.logger.Error("failed to add newsletter contact", "error", err) 383 } 384 }() 385 } else { 386 s.logger.Error( 387 "failed to add newsletter contact, missing resend config", 388 "isKeyPresent", s.config.Resend.ApiKey != "", 389 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "", 390 "emailAddr", emailAddr, 391 ) 392 } 393 394 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target}) 395} 396 397// NewsletterDismiss records that a logged-in user has dismissed the newsletter 398// widget so it stays hidden across their devices. Anonymous callers get a 204 399// with no DB write — localStorage handles the per-browser fallback. 400func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) { 401 user := s.oauth.GetMultiAccountUser(r) 402 if user == nil { 403 w.WriteHeader(http.StatusNoContent) 404 return 405 } 406 407 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil { 408 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err) 409 } 410 w.WriteHeader(http.StatusNoContent) 411} 412 413func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 414 user := chi.URLParam(r, "user") 415 user = strings.TrimPrefix(user, "@") 416 417 if user == "" { 418 w.WriteHeader(http.StatusBadRequest) 419 return 420 } 421 422 id, err := s.idResolver.ResolveIdent(r.Context(), user) 423 if err != nil { 424 w.WriteHeader(http.StatusInternalServerError) 425 return 426 } 427 428 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 429 if err != nil { 430 s.logger.Error("failed to get public keys", "err", err) 431 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 432 return 433 } 434 435 if len(pubKeys) == 0 { 436 w.WriteHeader(http.StatusNoContent) 437 return 438 } 439 440 for _, k := range pubKeys { 441 key := strings.TrimRight(k.Key, "\n") 442 fmt.Fprintln(w, key) 443 } 444} 445 446func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 447 switch r.Method { 448 case http.MethodGet: 449 user := s.oauth.GetMultiAccountUser(r) 450 knots, err := s.enforcer.GetKnotsForUser(user.Did) 451 if err != nil { 452 s.pages.Notice(w, "repo", "Invalid user account.") 453 return 454 } 455 456 s.pages.NewRepo(w, pages.NewRepoParams{ 457 LoggedInUser: user, 458 Knots: knots, 459 }) 460 461 case http.MethodPost: 462 l := s.logger.With("handler", "NewRepo") 463 464 user := s.oauth.GetMultiAccountUser(r) 465 l = l.With("did", user.Did) 466 467 // form validation 468 domain := r.FormValue("domain") 469 if domain == "" { 470 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 471 return 472 } 473 l = l.With("knot", domain) 474 475 repoName := r.FormValue("name") 476 if repoName == "" { 477 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 478 return 479 } 480 481 if err := models.ValidateRepoName(repoName); err != nil { 482 s.pages.Notice(w, "repo", err.Error()) 483 return 484 } 485 repoName = models.StripGitExt(repoName) 486 rkey := strings.ToLower(repoName) 487 l = l.With("repoName", repoName, "rkey", rkey) 488 489 defaultBranch := r.FormValue("branch") 490 if defaultBranch == "" { 491 defaultBranch = "main" 492 } 493 l = l.With("defaultBranch", defaultBranch) 494 495 description := r.FormValue("description") 496 if len([]rune(description)) > 140 { 497 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 498 return 499 } 500 501 // ACL validation 502 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 503 if err != nil || !ok { 504 l.Info("unauthorized") 505 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 506 return 507 } 508 509 // Check for existing repos 510 existingRepo, err := db.GetRepo( 511 s.db, 512 orm.FilterEq("did", user.Did), 513 orm.FilterEq("rkey", rkey), 514 ) 515 if err == nil && existingRepo != nil { 516 l.Info("repo exists") 517 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 518 return 519 } 520 521 atpClient, err := s.oauth.AuthorizedClient(r) 522 if err != nil { 523 l.Error("failed to get authorized client", "err", err) 524 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.") 525 return 526 } 527 528 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 529 l.Info("rkey occupied by prior rename alias") 530 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey)) 531 return 532 } 533 534 client, err := s.oauth.ServiceClient( 535 r, 536 oauth.WithService(domain), 537 oauth.WithLxm(tangled.RepoCreateNSID), 538 oauth.WithDev(s.config.Core.Dev), 539 ) 540 if err != nil { 541 l.Error("service auth failed", "err", err) 542 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.") 543 return 544 } 545 546 input := &tangled.RepoCreate_Input{ 547 Rkey: rkey, 548 Name: rkey, 549 DefaultBranch: &defaultBranch, 550 } 551 createResp, err := tangled.RepoCreate( 552 r.Context(), 553 client, 554 input, 555 ) 556 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 557 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 558 s.pages.Notice(w, "repo", err.Error()) 559 return 560 } 561 562 var repoDid string 563 if createResp != nil && createResp.RepoDid != nil { 564 repoDid = *createResp.RepoDid 565 } 566 if repoDid == "" { 567 l.Error("knot returned empty repo DID") 568 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 569 return 570 } 571 572 repo := &models.Repo{ 573 Did: user.Did, 574 Name: repoName, 575 Knot: domain, 576 Rkey: rkey, 577 Description: description, 578 Created: time.Now(), 579 Labels: s.config.Label.DefaultLabelDefs, 580 RepoDid: repoDid, 581 } 582 record := repo.AsRecord() 583 584 cleanupKnot := func() { 585 go func() { 586 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 587 for attempt, delay := range delays { 588 time.Sleep(delay) 589 deleteClient, dErr := s.oauth.ServiceClient( 590 r, 591 oauth.WithService(domain), 592 oauth.WithLxm(tangled.RepoDeleteNSID), 593 oauth.WithDev(s.config.Core.Dev), 594 ) 595 if dErr != nil { 596 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 597 continue 598 } 599 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 600 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 601 Did: user.Did, 602 Name: rkey, 603 Rkey: rkey, 604 }); dErr != nil { 605 cancel() 606 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 607 continue 608 } 609 cancel() 610 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 611 return 612 } 613 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 614 "did", user.Did, "repo", repoName, "knot", domain) 615 }() 616 } 617 618 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 619 Collection: tangled.RepoNSID, 620 Repo: user.Did, 621 Rkey: &rkey, 622 Record: &lexutil.LexiconTypeDecoder{ 623 Val: &record, 624 }, 625 }) 626 if err != nil { 627 l.Info("PDS write failed", "err", err) 628 cleanupKnot() 629 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 630 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey)) 631 } else { 632 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 633 } 634 return 635 } 636 637 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey) 638 l = l.With("aturi", aturi) 639 l.Info("wrote to PDS") 640 641 tx, err := s.db.BeginTx(r.Context(), nil) 642 if err != nil { 643 l.Info("txn failed", "err", err) 644 s.pages.Notice(w, "repo", "Failed to save repository information.") 645 return 646 } 647 648 rollback := func() { 649 err1 := tx.Rollback() 650 err2 := s.enforcer.E.LoadPolicy() 651 err3 := rollbackRecord(context.Background(), aturi, atpClient) 652 653 if errors.Is(err1, sql.ErrTxDone) { 654 err1 = nil 655 } 656 657 if errs := errors.Join(err1, err2, err3); errs != nil { 658 l.Error("failed to rollback changes", "errs", errs) 659 } 660 661 if aturi != "" { 662 cleanupKnot() 663 } 664 } 665 defer rollback() 666 667 err = db.AddRepo(tx, repo) 668 if err != nil { 669 l.Error("db write failed", "err", err) 670 s.pages.Notice(w, "repo", "Failed to save repository information.") 671 return 672 } 673 674 rbacPath := repo.RepoIdentifier() 675 err = s.enforcer.AddRepo(user.Did, domain, rbacPath) 676 if err != nil { 677 l.Error("acl setup failed", "err", err) 678 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 679 return 680 } 681 682 err = tx.Commit() 683 if err != nil { 684 l.Error("txn commit failed", "err", err) 685 http.Error(w, err.Error(), http.StatusInternalServerError) 686 return 687 } 688 689 err = s.enforcer.E.SavePolicy() 690 if err != nil { 691 l.Error("acl save failed", "err", err) 692 http.Error(w, err.Error(), http.StatusInternalServerError) 693 return 694 } 695 696 aturi = "" 697 698 s.notifier.NewRepo(r.Context(), repo) 699 switch { 700 case repoDid != "": 701 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 702 default: 703 handle := s.pages.DisplayHandle(r.Context(), user.Did) 704 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey)) 705 } 706 } 707} 708 709func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool { 710 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 711 defer cancel() 712 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey) 713 return err == nil && resp != nil 714} 715 716// this is used to rollback changes made to the PDS 717// 718// it is a no-op if the provided ATURI is empty 719func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 720 if aturi == "" { 721 return nil 722 } 723 724 parsed := syntax.ATURI(aturi) 725 726 collection := parsed.Collection().String() 727 repo := parsed.Authority().String() 728 rkey := parsed.RecordKey().String() 729 730 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 731 Collection: collection, 732 Repo: repo, 733 Rkey: rkey, 734 }) 735 return err 736} 737 738func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 739 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 740 if err != nil { 741 return err 742 } 743 // already present 744 if len(defaultLabels) == len(defaults) { 745 return nil 746 } 747 748 labelDefs, err := models.FetchLabelDefs(r, defaults) 749 if err != nil { 750 return err 751 } 752 753 // Insert each label definition to the database 754 for _, labelDef := range labelDefs { 755 _, err = db.AddLabelDefinition(e, &labelDef) 756 if err != nil { 757 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 758 } 759 } 760 761 return nil 762} 763 764func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 765 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 766 if err != nil { 767 logger.Error("failed to resolve tangled.org DID", "err", err) 768 return 769 } 770 771 pdsEndpoint := resolved.PDSEndpoint() 772 if pdsEndpoint == "" { 773 logger.Error("no PDS endpoint found for tangled.sh DID") 774 return 775 } 776 777 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 778 if err != nil { 779 logger.Error("failed to create appassword session... skipping fetch", "err", err) 780 return 781 } 782 783 l := log.SubLogger(logger, "bluesky") 784 785 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 786 defer ticker.Stop() 787 788 for { 789 // refresh session if necessary 790 if !session.IsValid() { 791 l.Debug("access token expired, refreshing session") 792 if err := session.RefreshSession(); err != nil { 793 l.Error("failed to refresh session, stopping bluesky updater", "err", err) 794 return 795 } 796 l.Debug("session refreshed") 797 } 798 799 // make client 800 client := xrpc.Client{ 801 Auth: &xrpc.AuthInfo{ 802 AccessJwt: session.AccessJwt, 803 Did: session.Did, 804 }, 805 Host: session.PdsEndpoint, 806 } 807 808 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 809 if err != nil { 810 l.Error("failed to fetch bluesky posts", "err", err) 811 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 812 l.Error("failed to insert bluesky posts", "err", err) 813 } else { 814 l.Info("inserted bluesky posts", "count", len(posts)) 815 } 816 817 select { 818 case <-ticker.C: 819 case <-ctx.Done(): 820 l.Info("stopping bluesky updater") 821 return 822 } 823 } 824}