Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cloudflare" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/email" 21 "tangled.org/core/appview/indexer" 22 "tangled.org/core/appview/mentions" 23 "tangled.org/core/appview/models" 24 "tangled.org/core/appview/notify" 25 dbnotify "tangled.org/core/appview/notify/db" 26 lognotify "tangled.org/core/appview/notify/logging" 27 phnotify "tangled.org/core/appview/notify/posthog" 28 whnotify "tangled.org/core/appview/notify/webhook" 29 "tangled.org/core/appview/oauth" 30 "tangled.org/core/appview/pages" 31 "tangled.org/core/appview/reporesolver" 32 "tangled.org/core/appview/repoverify" 33 "tangled.org/core/appview/validator" 34 xrpcclient "tangled.org/core/appview/xrpcclient" 35 "tangled.org/core/consts" 36 "tangled.org/core/eventconsumer" 37 "tangled.org/core/idresolver" 38 "tangled.org/core/jetstream" 39 "tangled.org/core/log" 40 tlog "tangled.org/core/log" 41 "tangled.org/core/orm" 42 "tangled.org/core/rbac" 43 44 comatproto "github.com/bluesky-social/indigo/api/atproto" 45 "github.com/bluesky-social/indigo/atproto/atclient" 46 "github.com/bluesky-social/indigo/atproto/syntax" 47 lexutil "github.com/bluesky-social/indigo/lex/util" 48 "github.com/bluesky-social/indigo/xrpc" 49 50 "github.com/go-chi/chi/v5" 51 "github.com/posthog/posthog-go" 52) 53 54type State struct { 55 db *db.DB 56 notifier notify.Notifier 57 indexer *indexer.Indexer 58 oauth *oauth.OAuth 59 enforcer *rbac.Enforcer 60 pages *pages.Pages 61 idResolver *idresolver.Resolver 62 rdb *cache.Cache 63 mentionsResolver *mentions.Resolver 64 posthog posthog.Client 65 jc *jetstream.JetstreamClient 66 config *config.Config 67 repoResolver *reporesolver.RepoResolver 68 knotstream *eventconsumer.Consumer 69 spindlestream *eventconsumer.Consumer 70 logger *slog.Logger 71 validator *validator.Validator 72 cfClient *cloudflare.Client 73} 74 75func Make(ctx context.Context, config *config.Config) (*State, error) { 76 logger := tlog.FromContext(ctx) 77 78 d, err := db.Make(ctx, config.Core.DbPath) 79 if err != nil { 80 return nil, fmt.Errorf("failed to create db: %w", err) 81 } 82 83 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 84 err = indexer.Init(ctx) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create indexer: %w", err) 87 } 88 89 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 90 if err != nil { 91 return nil, fmt.Errorf("failed to create enforcer: %w", err) 92 } 93 94 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 95 if err != nil { 96 logger.Error("failed to create redis resolver", "err", err) 97 res = idresolver.DefaultResolver(config.Plc.PLCURL) 98 } 99 100 var rdb *cache.Cache 101 if config.Redis.Addr != "" { 102 rdb = cache.New(config.Redis.Addr) 103 } 104 105 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 106 if err != nil { 107 return nil, fmt.Errorf("failed to create posthog client: %w", err) 108 } 109 110 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages")) 111 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 112 if err != nil { 113 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 114 } 115 validator := validator.New(d, res, enforcer) 116 117 repoResolver := reporesolver.New(config, enforcer, d, rdb) 118 119 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 120 121 jc, err := jetstream.NewJetstreamClient( 122 config.Jetstream.Endpoint, 123 "appview", 124 []string{ 125 tangled.ActorProfileNSID, 126 tangled.FeedStarNSID, 127 tangled.GraphFollowNSID, 128 tangled.GraphVouchNSID, 129 tangled.KnotMemberNSID, 130 tangled.KnotNSID, 131 tangled.LabelDefinitionNSID, 132 tangled.LabelOpNSID, 133 tangled.PublicKeyNSID, 134 tangled.RepoArtifactNSID, 135 tangled.RepoIssueCommentNSID, 136 tangled.RepoIssueNSID, 137 tangled.RepoNSID, 138 tangled.RepoPullNSID, 139 tangled.SpindleMemberNSID, 140 tangled.SpindleNSID, 141 tangled.StringNSID, 142 }, 143 nil, 144 tlog.SubLogger(logger, "jetstream"), 145 d, 146 false, 147 148 // in-memory filter is inapplicable to appview so 149 // we'll never log dids anyway. 150 false, 151 ) 152 if err != nil { 153 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 154 } 155 156 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 157 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 158 } 159 160 var notifiers []notify.Notifier 161 162 // Always add the database notifier 163 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 164 165 // Add other notifiers in production only 166 if !config.Core.Dev { 167 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 168 } 169 notifiers = append(notifiers, indexer) 170 171 notifiers = append(notifiers, whnotify.NewNotifier(d)) 172 173 notifier := notify.NewMergedNotifier(notifiers) 174 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 175 176 ingester := appview.Ingester{ 177 Db: d, 178 Enforcer: enforcer, 179 IdResolver: res, 180 Cache: rdb, 181 Config: config, 182 Logger: log.SubLogger(logger, "ingester"), 183 Validator: validator, 184 Notifier: notifier, 185 Verifier: repoverify.New(res, config.Core.Dev), 186 } 187 err = jc.StartJetstream(ctx, ingester.Ingest()) 188 if err != nil { 189 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 190 } 191 192 var cfClient *cloudflare.Client 193 if config.Cloudflare.ApiToken != "" { 194 cfClient, err = cloudflare.New(config) 195 if err != nil { 196 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 197 cfClient = nil 198 } 199 } 200 201 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 202 if err != nil { 203 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 204 } 205 knotstream.Start(ctx) 206 207 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 208 if err != nil { 209 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 210 } 211 spindlestream.Start(ctx) 212 213 state := &State{ 214 db: d, 215 notifier: notifier, 216 indexer: indexer, 217 oauth: oauth, 218 enforcer: enforcer, 219 pages: pages, 220 idResolver: res, 221 rdb: rdb, 222 mentionsResolver: mentionsResolver, 223 posthog: posthog, 224 jc: jc, 225 config: config, 226 repoResolver: repoResolver, 227 knotstream: knotstream, 228 spindlestream: spindlestream, 229 logger: logger, 230 validator: validator, 231 cfClient: cfClient, 232 } 233 234 // fetch initial bluesky posts if configured 235 go fetchBskyPosts(ctx, res, config, d, logger) 236 237 return state, nil 238} 239 240func (s *State) Close() error { 241 // other close up logic goes here 242 return s.db.Close() 243} 244 245func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 246 w.Header().Set("Content-Type", "text/plain") 247 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 248 249 securityTxt := `Contact: mailto:security@tangled.org 250Preferred-Languages: en 251Canonical: https://tangled.org/.well-known/security.txt 252Expires: 2030-01-01T21:59:00.000Z 253` 254 w.Write([]byte(securityTxt)) 255} 256 257func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 258 w.Header().Set("Content-Type", "text/plain") 259 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 260 261 robotsTxt := `# Hello, Tanglers! 262User-agent: * 263Allow: / 264Disallow: /*/*/settings 265Disallow: /settings 266Disallow: /*/*/compare 267Disallow: /*/*/fork 268 269Crawl-delay: 1 270` 271 w.Write([]byte(robotsTxt)) 272} 273 274func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 275 user := s.oauth.GetMultiAccountUser(r) 276 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 277 LoggedInUser: user, 278 }) 279} 280 281func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 282 user := s.oauth.GetMultiAccountUser(r) 283 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 284 LoggedInUser: user, 285 }) 286} 287 288func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 289 user := s.oauth.GetMultiAccountUser(r) 290 s.pages.Brand(w, pages.BrandParams{ 291 LoggedInUser: user, 292 }) 293} 294 295func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 296 user := s.oauth.GetMultiAccountUser(r) 297 if user == nil { 298 return 299 } 300 301 l := s.logger.With("handler", "UpgradeBanner") 302 l = l.With("did", user.Did) 303 304 regs, err := db.GetRegistrations( 305 s.db, 306 orm.FilterEq("did", user.Did), 307 orm.FilterEq("needs_upgrade", 1), 308 ) 309 if err != nil { 310 l.Error("non-fatal: failed to get registrations", "err", err) 311 } 312 313 spindles, err := db.GetSpindles( 314 r.Context(), 315 s.db, 316 orm.FilterEq("owner", user.Did), 317 orm.FilterEq("needs_upgrade", 1), 318 ) 319 if err != nil { 320 l.Error("non-fatal: failed to get spindles", "err", err) 321 } 322 323 if regs == nil && spindles == nil { 324 return 325 } 326 327 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 328 Registrations: regs, 329 Spindles: spindles, 330 }) 331} 332 333func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) { 334 // target is echoed back from the form via hx-vals so the response span's 335 // id matches the form's hx-target. Fallback keeps the handler useful if 336 // a caller forgets to send it. 337 target := strings.TrimSpace(r.FormValue("target")) 338 if target == "" { 339 target = "home" 340 } 341 342 w.Header().Set("Content-Type", "text/html") 343 344 emailAddr := strings.TrimSpace(r.FormValue("email")) 345 if !email.IsValidEmail(emailAddr) { 346 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{ 347 Id: target, 348 Error: "Invalid email address.", 349 }) 350 return 351 } 352 353 // For logged-in users, persist the signup locally so the widget stays 354 // hidden across devices. The DB row is the render-time source of truth; 355 // Resend still owns the mailing list itself. 356 if user := s.oauth.GetMultiAccountUser(r); user != nil { 357 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil { 358 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err) 359 } 360 } 361 362 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" { 363 go func() { 364 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil { 365 s.logger.Error("failed to add newsletter contact", "error", err) 366 } 367 }() 368 } else { 369 s.logger.Error( 370 "failed to add newsletter contact, missing resend config", 371 "isKeyPresent", s.config.Resend.ApiKey != "", 372 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "", 373 "emailAddr", emailAddr, 374 ) 375 } 376 377 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target}) 378} 379 380// NewsletterDismiss records that a logged-in user has dismissed the newsletter 381// widget so it stays hidden across their devices. Anonymous callers get a 204 382// with no DB write — localStorage handles the per-browser fallback. 383func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) { 384 user := s.oauth.GetMultiAccountUser(r) 385 if user == nil { 386 w.WriteHeader(http.StatusNoContent) 387 return 388 } 389 390 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil { 391 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err) 392 } 393 w.WriteHeader(http.StatusNoContent) 394} 395 396func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 397 user := chi.URLParam(r, "user") 398 user = strings.TrimPrefix(user, "@") 399 400 if user == "" { 401 w.WriteHeader(http.StatusBadRequest) 402 return 403 } 404 405 id, err := s.idResolver.ResolveIdent(r.Context(), user) 406 if err != nil { 407 w.WriteHeader(http.StatusInternalServerError) 408 return 409 } 410 411 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 412 if err != nil { 413 s.logger.Error("failed to get public keys", "err", err) 414 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 415 return 416 } 417 418 if len(pubKeys) == 0 { 419 w.WriteHeader(http.StatusNoContent) 420 return 421 } 422 423 for _, k := range pubKeys { 424 key := strings.TrimRight(k.Key, "\n") 425 fmt.Fprintln(w, key) 426 } 427} 428 429func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 430 switch r.Method { 431 case http.MethodGet: 432 user := s.oauth.GetMultiAccountUser(r) 433 knots, err := s.enforcer.GetKnotsForUser(user.Did) 434 if err != nil { 435 s.pages.Notice(w, "repo", "Invalid user account.") 436 return 437 } 438 439 s.pages.NewRepo(w, pages.NewRepoParams{ 440 LoggedInUser: user, 441 Knots: knots, 442 }) 443 444 case http.MethodPost: 445 l := s.logger.With("handler", "NewRepo") 446 447 user := s.oauth.GetMultiAccountUser(r) 448 l = l.With("did", user.Did) 449 450 // form validation 451 domain := r.FormValue("domain") 452 if domain == "" { 453 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 454 return 455 } 456 l = l.With("knot", domain) 457 458 repoName := r.FormValue("name") 459 if repoName == "" { 460 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 461 return 462 } 463 464 if err := models.ValidateRepoName(repoName); err != nil { 465 s.pages.Notice(w, "repo", err.Error()) 466 return 467 } 468 repoName = models.StripGitExt(repoName) 469 rkey := strings.ToLower(repoName) 470 l = l.With("repoName", repoName, "rkey", rkey) 471 472 defaultBranch := r.FormValue("branch") 473 if defaultBranch == "" { 474 defaultBranch = "main" 475 } 476 l = l.With("defaultBranch", defaultBranch) 477 478 description := r.FormValue("description") 479 if len([]rune(description)) > 140 { 480 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 481 return 482 } 483 484 // ACL validation 485 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 486 if err != nil || !ok { 487 l.Info("unauthorized") 488 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 489 return 490 } 491 492 // Check for existing repos 493 existingRepo, err := db.GetRepo( 494 s.db, 495 orm.FilterEq("did", user.Did), 496 orm.FilterEq("rkey", rkey), 497 ) 498 if err == nil && existingRepo != nil { 499 l.Info("repo exists") 500 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 501 return 502 } 503 504 atpClient, err := s.oauth.AuthorizedClient(r) 505 if err != nil { 506 l.Error("failed to get authorized client", "err", err) 507 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.") 508 return 509 } 510 511 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 512 l.Info("rkey occupied by prior rename alias") 513 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey)) 514 return 515 } 516 517 client, err := s.oauth.ServiceClient( 518 r, 519 oauth.WithService(domain), 520 oauth.WithLxm(tangled.RepoCreateNSID), 521 oauth.WithDev(s.config.Core.Dev), 522 ) 523 if err != nil { 524 l.Error("service auth failed", "err", err) 525 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.") 526 return 527 } 528 529 input := &tangled.RepoCreate_Input{ 530 Rkey: rkey, 531 Name: rkey, 532 DefaultBranch: &defaultBranch, 533 } 534 createResp, err := tangled.RepoCreate( 535 r.Context(), 536 client, 537 input, 538 ) 539 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 540 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 541 s.pages.Notice(w, "repo", err.Error()) 542 return 543 } 544 545 var repoDid string 546 if createResp != nil && createResp.RepoDid != nil { 547 repoDid = *createResp.RepoDid 548 } 549 if repoDid == "" { 550 l.Error("knot returned empty repo DID") 551 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 552 return 553 } 554 555 repo := &models.Repo{ 556 Did: user.Did, 557 Name: repoName, 558 Knot: domain, 559 Rkey: rkey, 560 Description: description, 561 Created: time.Now(), 562 Labels: s.config.Label.DefaultLabelDefs, 563 RepoDid: repoDid, 564 } 565 record := repo.AsRecord() 566 567 cleanupKnot := func() { 568 go func() { 569 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 570 for attempt, delay := range delays { 571 time.Sleep(delay) 572 deleteClient, dErr := s.oauth.ServiceClient( 573 r, 574 oauth.WithService(domain), 575 oauth.WithLxm(tangled.RepoDeleteNSID), 576 oauth.WithDev(s.config.Core.Dev), 577 ) 578 if dErr != nil { 579 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 580 continue 581 } 582 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 583 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 584 Did: user.Did, 585 Name: rkey, 586 Rkey: rkey, 587 }); dErr != nil { 588 cancel() 589 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 590 continue 591 } 592 cancel() 593 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 594 return 595 } 596 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 597 "did", user.Did, "repo", repoName, "knot", domain) 598 }() 599 } 600 601 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 602 Collection: tangled.RepoNSID, 603 Repo: user.Did, 604 Rkey: &rkey, 605 Record: &lexutil.LexiconTypeDecoder{ 606 Val: &record, 607 }, 608 }) 609 if err != nil { 610 l.Info("PDS write failed", "err", err) 611 cleanupKnot() 612 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 613 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey)) 614 } else { 615 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 616 } 617 return 618 } 619 620 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey) 621 l = l.With("aturi", aturi) 622 l.Info("wrote to PDS") 623 624 tx, err := s.db.BeginTx(r.Context(), nil) 625 if err != nil { 626 l.Info("txn failed", "err", err) 627 s.pages.Notice(w, "repo", "Failed to save repository information.") 628 return 629 } 630 631 rollback := func() { 632 err1 := tx.Rollback() 633 err2 := s.enforcer.E.LoadPolicy() 634 err3 := rollbackRecord(context.Background(), aturi, atpClient) 635 636 if errors.Is(err1, sql.ErrTxDone) { 637 err1 = nil 638 } 639 640 if errs := errors.Join(err1, err2, err3); errs != nil { 641 l.Error("failed to rollback changes", "errs", errs) 642 } 643 644 if aturi != "" { 645 cleanupKnot() 646 } 647 } 648 defer rollback() 649 650 err = db.AddRepo(tx, repo) 651 if err != nil { 652 l.Error("db write failed", "err", err) 653 s.pages.Notice(w, "repo", "Failed to save repository information.") 654 return 655 } 656 657 rbacPath := repo.RepoIdentifier() 658 err = s.enforcer.AddRepo(user.Did, domain, rbacPath) 659 if err != nil { 660 l.Error("acl setup failed", "err", err) 661 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 662 return 663 } 664 665 err = tx.Commit() 666 if err != nil { 667 l.Error("txn commit failed", "err", err) 668 http.Error(w, err.Error(), http.StatusInternalServerError) 669 return 670 } 671 672 err = s.enforcer.E.SavePolicy() 673 if err != nil { 674 l.Error("acl save failed", "err", err) 675 http.Error(w, err.Error(), http.StatusInternalServerError) 676 return 677 } 678 679 aturi = "" 680 681 s.notifier.NewRepo(r.Context(), repo) 682 switch { 683 case repoDid != "": 684 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 685 default: 686 handle := s.pages.DisplayHandle(r.Context(), user.Did) 687 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey)) 688 } 689 } 690} 691 692func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool { 693 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 694 defer cancel() 695 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey) 696 return err == nil && resp != nil 697} 698 699// this is used to rollback changes made to the PDS 700// 701// it is a no-op if the provided ATURI is empty 702func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 703 if aturi == "" { 704 return nil 705 } 706 707 parsed := syntax.ATURI(aturi) 708 709 collection := parsed.Collection().String() 710 repo := parsed.Authority().String() 711 rkey := parsed.RecordKey().String() 712 713 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 714 Collection: collection, 715 Repo: repo, 716 Rkey: rkey, 717 }) 718 return err 719} 720 721func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 722 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 723 if err != nil { 724 return err 725 } 726 // already present 727 if len(defaultLabels) == len(defaults) { 728 return nil 729 } 730 731 labelDefs, err := models.FetchLabelDefs(r, defaults) 732 if err != nil { 733 return err 734 } 735 736 // Insert each label definition to the database 737 for _, labelDef := range labelDefs { 738 _, err = db.AddLabelDefinition(e, &labelDef) 739 if err != nil { 740 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 741 } 742 } 743 744 return nil 745} 746 747func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 748 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 749 if err != nil { 750 logger.Error("failed to resolve tangled.org DID", "err", err) 751 return 752 } 753 754 pdsEndpoint := resolved.PDSEndpoint() 755 if pdsEndpoint == "" { 756 logger.Error("no PDS endpoint found for tangled.sh DID") 757 return 758 } 759 760 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 761 if err != nil { 762 logger.Error("failed to create appassword session... skipping fetch", "err", err) 763 return 764 } 765 766 l := log.SubLogger(logger, "bluesky") 767 768 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 769 defer ticker.Stop() 770 771 for { 772 // refresh session if necessary 773 if !session.IsValid() { 774 l.Debug("access token expired, refreshing session") 775 if err := session.RefreshSession(); err != nil { 776 l.Error("failed to refresh session, stopping bluesky updater", "err", err) 777 return 778 } 779 l.Debug("session refreshed") 780 } 781 782 // make client 783 client := xrpc.Client{ 784 Auth: &xrpc.AuthInfo{ 785 AccessJwt: session.AccessJwt, 786 Did: session.Did, 787 }, 788 Host: session.PdsEndpoint, 789 } 790 791 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 792 if err != nil { 793 l.Error("failed to fetch bluesky posts", "err", err) 794 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 795 l.Error("failed to insert bluesky posts", "err", err) 796 } else { 797 l.Info("inserted bluesky posts", "count", len(posts)) 798 } 799 800 select { 801 case <-ticker.C: 802 case <-ctx.Done(): 803 l.Info("stopping bluesky updater") 804 return 805 } 806 } 807}