Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cloudflare" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/email" 21 "tangled.org/core/appview/indexer" 22 "tangled.org/core/appview/knotacl" 23 "tangled.org/core/appview/knotcompat" 24 "tangled.org/core/appview/mentions" 25 "tangled.org/core/appview/models" 26 "tangled.org/core/appview/notify" 27 dbnotify "tangled.org/core/appview/notify/db" 28 lognotify "tangled.org/core/appview/notify/logging" 29 phnotify "tangled.org/core/appview/notify/posthog" 30 whnotify "tangled.org/core/appview/notify/webhook" 31 "tangled.org/core/appview/oauth" 32 "tangled.org/core/appview/pages" 33 "tangled.org/core/appview/pipelines" 34 pipelinessh "tangled.org/core/appview/pipelines/ssh" 35 "tangled.org/core/appview/reporesolver" 36 "tangled.org/core/appview/repoverify" 37 xrpcclient "tangled.org/core/appview/xrpcclient" 38 "tangled.org/core/consts" 39 "tangled.org/core/eventconsumer" 40 "tangled.org/core/idresolver" 41 "tangled.org/core/jetstream" 42 "tangled.org/core/log" 43 tlog "tangled.org/core/log" 44 "tangled.org/core/orm" 45 "tangled.org/core/rbac" 46 47 comatproto "github.com/bluesky-social/indigo/api/atproto" 48 "github.com/bluesky-social/indigo/atproto/atclient" 49 "github.com/bluesky-social/indigo/atproto/syntax" 50 lexutil "github.com/bluesky-social/indigo/lex/util" 51 "github.com/bluesky-social/indigo/xrpc" 52 53 "github.com/go-chi/chi/v5" 54 "github.com/posthog/posthog-go" 55) 56 57type State struct { 58 db *db.DB 59 notifier notify.Notifier 60 indexer *indexer.Indexer 61 oauth *oauth.OAuth 62 enforcer *rbac.Enforcer 63 pages *pages.Pages 64 idResolver *idresolver.Resolver 65 rdb *cache.Cache 66 mentionsResolver *mentions.Resolver 67 posthog posthog.Client 68 jc *jetstream.JetstreamClient 69 config *config.Config 70 repoResolver *reporesolver.RepoResolver 71 aclService *knotacl.Service 72 knotstream *eventconsumer.Consumer 73 spindlestream *eventconsumer.Consumer 74 pipelineNotifier *pipelines.StatusNotifier 75 logger *slog.Logger 76 cfClient *cloudflare.Client 77} 78 79func Make(ctx context.Context, config *config.Config) (*State, error) { 80 logger := tlog.FromContext(ctx) 81 82 d, err := db.Make(ctx, config.Core.DbPath) 83 if err != nil { 84 return nil, fmt.Errorf("failed to create db: %w", err) 85 } 86 87 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 88 err = indexer.Init(ctx) 89 if err != nil { 90 return nil, fmt.Errorf("failed to create indexer: %w", err) 91 } 92 93 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 94 if err != nil { 95 return nil, fmt.Errorf("failed to create enforcer: %w", err) 96 } 97 98 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 99 if err != nil { 100 logger.Error("failed to create redis resolver", "err", err) 101 res = idresolver.DefaultResolver(config.Plc.PLCURL) 102 } 103 104 var rdb *cache.Cache 105 if config.Redis.Addr != "" { 106 rdb = cache.New(config.Redis.Addr) 107 } 108 109 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 110 if err != nil { 111 return nil, fmt.Errorf("failed to create posthog client: %w", err) 112 } 113 114 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages")) 115 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch"))) 116 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl")) 117 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth")) 118 if err != nil { 119 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 120 } 121 repoResolver := reporesolver.New(config, aclService, d, rdb) 122 123 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 124 125 jc, err := jetstream.NewJetstreamClient( 126 config.Jetstream.Endpoint, 127 "appview", 128 []string{ 129 tangled.ActorProfileNSID, 130 tangled.FeedStarNSID, 131 tangled.FeedReactionNSID, 132 tangled.FeedCommentNSID, 133 tangled.GraphFollowNSID, 134 tangled.GraphVouchNSID, 135 tangled.KnotMemberNSID, 136 tangled.KnotNSID, 137 tangled.LabelDefinitionNSID, 138 tangled.LabelOpNSID, 139 tangled.PublicKeyNSID, 140 tangled.RepoArtifactNSID, 141 tangled.RepoIssueCommentNSID, 142 tangled.RepoIssueNSID, 143 tangled.RepoNSID, 144 tangled.RepoPullNSID, 145 tangled.RepoPullCommentNSID, 146 tangled.SpindleMemberNSID, 147 tangled.SpindleNSID, 148 tangled.StringNSID, 149 }, 150 nil, 151 tlog.SubLogger(logger, "jetstream"), 152 d, 153 false, 154 155 // in-memory filter is inapplicable to appview so 156 // we'll never log dids anyway. 157 false, 158 ) 159 if err != nil { 160 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 161 } 162 163 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 164 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 165 } 166 167 var notifiers []notify.Notifier 168 169 // Always add the database notifier 170 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 171 172 // Add other notifiers in production only 173 if !config.Core.Dev { 174 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 175 } 176 notifiers = append(notifiers, indexer) 177 178 notifiers = append(notifiers, whnotify.NewNotifier(d)) 179 180 notifier := notify.NewMergedNotifier(notifiers) 181 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 182 183 ingester := appview.Ingester{ 184 Ctx: ctx, 185 Db: d, 186 Enforcer: enforcer, 187 Acl: aclService, 188 IdResolver: res, 189 Cache: rdb, 190 Config: config, 191 Logger: log.SubLogger(logger, "ingester"), 192 MentionsResolver: mentionsResolver, 193 Notifier: notifier, 194 Verifier: repoverify.New(res, config.Core.Dev), 195 } 196 err = jc.StartJetstream(ctx, ingester.Ingest()) 197 if err != nil { 198 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 199 } 200 201 go ingester.SweepPendingVerifications() 202 203 var cfClient *cloudflare.Client 204 if config.Cloudflare.ApiToken != "" { 205 cfClient, err = cloudflare.New(config) 206 if err != nil { 207 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 208 cfClient = nil 209 } 210 } 211 212 knotstream, err := Knotstream(ctx, config, d, aclService, enforcer, posthog, notifier, cfClient) 213 if err != nil { 214 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 215 } 216 knotstream.Start(ctx) 217 218 pipelineNotifier := pipelines.NewStatusNotifier() 219 220 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier) 221 if err != nil { 222 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 223 } 224 spindlestream.Start(ctx) 225 226 state := &State{ 227 db: d, 228 notifier: notifier, 229 indexer: indexer, 230 oauth: oauth, 231 enforcer: enforcer, 232 pages: pages, 233 idResolver: res, 234 rdb: rdb, 235 mentionsResolver: mentionsResolver, 236 posthog: posthog, 237 jc: jc, 238 config: config, 239 repoResolver: repoResolver, 240 aclService: aclService, 241 knotstream: knotstream, 242 spindlestream: spindlestream, 243 pipelineNotifier: pipelineNotifier, 244 logger: logger, 245 cfClient: cfClient, 246 } 247 248 // fetch initial bluesky posts if configured 249 go fetchBskyPosts(ctx, res, config, d, logger) 250 251 return state, nil 252} 253 254func (s *State) Close() error { 255 // other close up logic goes here 256 return s.db.Close() 257} 258 259func (s *State) NewSSHServer() *pipelinessh.Server { 260 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh")) 261} 262 263func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 264 w.Header().Set("Content-Type", "text/plain") 265 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 266 267 securityTxt := `Contact: mailto:security@tangled.org 268Preferred-Languages: en 269Canonical: https://tangled.org/.well-known/security.txt 270Expires: 2030-01-01T21:59:00.000Z 271` 272 w.Write([]byte(securityTxt)) 273} 274 275func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 276 w.Header().Set("Content-Type", "text/plain") 277 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 278 279 robotsTxt := `# Hello, Tanglers! 280User-agent: * 281Allow: / 282Disallow: /*/*/settings 283Disallow: /settings 284Disallow: /*/*/compare 285Disallow: /*/*/fork 286 287Crawl-delay: 1 288` 289 w.Write([]byte(robotsTxt)) 290} 291 292func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 293 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 294 BaseParams: pages.BaseParamsFromContext(r.Context()), 295 }) 296} 297 298func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 299 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 300 BaseParams: pages.BaseParamsFromContext(r.Context()), 301 }) 302} 303 304func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 305 s.pages.Brand(w, pages.BrandParams{ 306 BaseParams: pages.BaseParamsFromContext(r.Context()), 307 }) 308} 309 310func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 311 user := s.oauth.GetMultiAccountUser(r) 312 if user == nil { 313 return 314 } 315 316 l := s.logger.With("handler", "UpgradeBanner") 317 l = l.With("did", user.Did) 318 319 regs, err := db.GetRegistrations( 320 s.db, 321 orm.FilterEq("did", user.Did), 322 orm.FilterEq("needs_upgrade", 1), 323 ) 324 if err != nil { 325 l.Error("non-fatal: failed to get registrations", "err", err) 326 } 327 328 spindles, err := db.GetSpindles( 329 r.Context(), 330 s.db, 331 orm.FilterEq("owner", user.Did), 332 orm.FilterEq("needs_upgrade", 1), 333 ) 334 if err != nil { 335 l.Error("non-fatal: failed to get spindles", "err", err) 336 } 337 338 if regs == nil && spindles == nil { 339 return 340 } 341 342 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 343 Registrations: regs, 344 Spindles: spindles, 345 }) 346} 347 348func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) { 349 // target is echoed back from the form via hx-vals so the response span's 350 // id matches the form's hx-target. Fallback keeps the handler useful if 351 // a caller forgets to send it. 352 target := strings.TrimSpace(r.FormValue("target")) 353 if target == "" { 354 target = "home" 355 } 356 357 w.Header().Set("Content-Type", "text/html") 358 359 emailAddr := strings.TrimSpace(r.FormValue("email")) 360 if !email.IsValidEmail(emailAddr) { 361 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{ 362 Id: target, 363 Error: "Invalid email address.", 364 }) 365 return 366 } 367 368 // For logged-in users, persist the signup locally so the widget stays 369 // hidden across devices. The DB row is the render-time source of truth; 370 // Resend still owns the mailing list itself. 371 if user := s.oauth.GetMultiAccountUser(r); user != nil { 372 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil { 373 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err) 374 } 375 } 376 377 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" { 378 go func() { 379 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil { 380 s.logger.Error("failed to add newsletter contact", "error", err) 381 } 382 }() 383 } else { 384 s.logger.Error( 385 "failed to add newsletter contact, missing resend config", 386 "isKeyPresent", s.config.Resend.ApiKey != "", 387 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "", 388 "emailAddr", emailAddr, 389 ) 390 } 391 392 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target}) 393} 394 395// NewsletterDismiss records that a logged-in user has dismissed the newsletter 396// widget so it stays hidden across their devices. Anonymous callers get a 204 397// with no DB write — localStorage handles the per-browser fallback. 398func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) { 399 user := s.oauth.GetMultiAccountUser(r) 400 if user == nil { 401 w.WriteHeader(http.StatusNoContent) 402 return 403 } 404 405 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil { 406 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err) 407 } 408 w.WriteHeader(http.StatusNoContent) 409} 410 411func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 412 user := chi.URLParam(r, "user") 413 user = strings.TrimPrefix(user, "@") 414 415 if user == "" { 416 w.WriteHeader(http.StatusBadRequest) 417 return 418 } 419 420 id, err := s.idResolver.ResolveIdent(r.Context(), user) 421 if err != nil { 422 w.WriteHeader(http.StatusInternalServerError) 423 return 424 } 425 426 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 427 if err != nil { 428 s.logger.Error("failed to get public keys", "err", err) 429 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 430 return 431 } 432 433 if len(pubKeys) == 0 { 434 w.WriteHeader(http.StatusNoContent) 435 return 436 } 437 438 for _, k := range pubKeys { 439 key := strings.TrimRight(k.Key, "\n") 440 fmt.Fprintln(w, key) 441 } 442} 443 444func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 445 switch r.Method { 446 case http.MethodGet: 447 user := s.oauth.GetMultiAccountUser(r) 448 knots := s.aclService.KnotsForUser(r.Context(), user.Did) 449 450 s.pages.NewRepo(w, pages.NewRepoParams{ 451 BaseParams: pages.BaseParamsFromContext(r.Context()), 452 Knots: knots, 453 }) 454 455 case http.MethodPost: 456 l := s.logger.With("handler", "NewRepo") 457 458 user := s.oauth.GetMultiAccountUser(r) 459 l = l.With("did", user.Did) 460 461 // form validation 462 domain := r.FormValue("domain") 463 if domain == "" { 464 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 465 return 466 } 467 l = l.With("knot", domain) 468 469 repoName := r.FormValue("name") 470 if repoName == "" { 471 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 472 return 473 } 474 475 if err := models.ValidateRepoName(repoName); err != nil { 476 s.pages.Notice(w, "repo", err.Error()) 477 return 478 } 479 repoName = models.StripGitExt(repoName) 480 rkey := strings.ToLower(repoName) 481 l = l.With("repoName", repoName, "rkey", rkey) 482 483 defaultBranch := r.FormValue("branch") 484 if defaultBranch == "" { 485 defaultBranch = "main" 486 } 487 l = l.With("defaultBranch", defaultBranch) 488 489 description := r.FormValue("description") 490 if len([]rune(description)) > 140 { 491 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 492 return 493 } 494 495 // ACL validation 496 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) { 497 l.Info("unauthorized") 498 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 499 return 500 } 501 502 // Check for existing repos 503 existingRepo, err := db.GetRepo( 504 s.db, 505 orm.FilterEq("did", user.Did), 506 orm.FilterEq("rkey", rkey), 507 ) 508 if err == nil && existingRepo != nil { 509 l.Info("repo exists") 510 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 511 return 512 } 513 514 atpClient, err := s.oauth.AuthorizedClient(r) 515 if err != nil { 516 l.Error("failed to get authorized client", "err", err) 517 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.") 518 return 519 } 520 521 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 522 l.Info("rkey occupied by prior rename alias") 523 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey)) 524 return 525 } 526 527 client, err := s.oauth.ServiceClient( 528 r, 529 oauth.WithService(domain), 530 oauth.WithLxm(tangled.RepoCreateNSID), 531 oauth.WithDev(s.config.Core.Dev), 532 ) 533 if err != nil { 534 l.Error("service auth failed", "err", err) 535 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.") 536 return 537 } 538 539 input := &tangled.RepoCreate_Input{ 540 Rkey: rkey, 541 Name: rkey, 542 DefaultBranch: &defaultBranch, 543 } 544 createResp, err := tangled.RepoCreate( 545 r.Context(), 546 client, 547 input, 548 ) 549 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 550 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 551 s.pages.Notice(w, "repo", err.Error()) 552 return 553 } 554 555 var repoDid string 556 if createResp != nil && createResp.RepoDid != nil { 557 repoDid = *createResp.RepoDid 558 } 559 if repoDid == "" { 560 l.Error("knot returned empty repo DID") 561 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 562 return 563 } 564 565 repo := &models.Repo{ 566 Did: user.Did, 567 Name: repoName, 568 Knot: domain, 569 Rkey: rkey, 570 Description: description, 571 Created: time.Now(), 572 Labels: s.config.Label.DefaultLabelDefs, 573 RepoDid: repoDid, 574 } 575 record := repo.AsRecord() 576 577 cleanupKnot := func() { 578 go func() { 579 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 580 for attempt, delay := range delays { 581 time.Sleep(delay) 582 deleteClient, dErr := s.oauth.ServiceClient( 583 r, 584 oauth.WithService(domain), 585 oauth.WithLxm(tangled.RepoDeleteNSID), 586 oauth.WithDev(s.config.Core.Dev), 587 ) 588 if dErr != nil { 589 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 590 continue 591 } 592 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 593 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 594 Did: user.Did, 595 Name: rkey, 596 Rkey: rkey, 597 }); dErr != nil { 598 cancel() 599 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 600 continue 601 } 602 cancel() 603 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 604 return 605 } 606 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 607 "did", user.Did, "repo", repoName, "knot", domain) 608 }() 609 } 610 611 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 612 Collection: tangled.RepoNSID, 613 Repo: user.Did, 614 Rkey: &rkey, 615 Record: &lexutil.LexiconTypeDecoder{ 616 Val: &record, 617 }, 618 }) 619 if err != nil { 620 l.Info("PDS write failed", "err", err) 621 cleanupKnot() 622 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 623 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey)) 624 } else { 625 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 626 } 627 return 628 } 629 630 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey) 631 l = l.With("aturi", aturi) 632 l.Info("wrote to PDS") 633 634 tx, err := s.db.BeginTx(r.Context(), nil) 635 if err != nil { 636 l.Info("txn failed", "err", err) 637 s.pages.Notice(w, "repo", "Failed to save repository information.") 638 return 639 } 640 641 rollback := func() { 642 err1 := tx.Rollback() 643 err2 := s.enforcer.E.LoadPolicy() 644 err3 := rollbackRecord(context.Background(), aturi, atpClient) 645 646 if errors.Is(err1, sql.ErrTxDone) { 647 err1 = nil 648 } 649 650 if errs := errors.Join(err1, err2, err3); errs != nil { 651 l.Error("failed to rollback changes", "errs", errs) 652 } 653 654 if aturi != "" { 655 cleanupKnot() 656 } 657 } 658 defer rollback() 659 660 err = db.AddRepo(tx, repo) 661 if err != nil { 662 l.Error("db write failed", "err", err) 663 s.pages.Notice(w, "repo", "Failed to save repository information.") 664 return 665 } 666 667 rbacPath := repo.RepoIdentifier() 668 err = s.enforcer.AddRepo(user.Did, domain, rbacPath) 669 if err != nil { 670 l.Error("acl setup failed", "err", err) 671 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 672 return 673 } 674 675 err = tx.Commit() 676 if err != nil { 677 l.Error("txn commit failed", "err", err) 678 http.Error(w, err.Error(), http.StatusInternalServerError) 679 return 680 } 681 682 err = s.enforcer.E.SavePolicy() 683 if err != nil { 684 l.Error("acl save failed", "err", err) 685 http.Error(w, err.Error(), http.StatusInternalServerError) 686 return 687 } 688 689 aturi = "" 690 691 s.notifier.NewRepo(r.Context(), repo) 692 switch { 693 case repoDid != "": 694 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 695 default: 696 handle := s.pages.DisplayHandle(r.Context(), user.Did) 697 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey)) 698 } 699 } 700} 701 702func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool { 703 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 704 defer cancel() 705 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey) 706 return err == nil && resp != nil 707} 708 709// this is used to rollback changes made to the PDS 710// 711// it is a no-op if the provided ATURI is empty 712func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 713 if aturi == "" { 714 return nil 715 } 716 717 parsed := syntax.ATURI(aturi) 718 719 collection := parsed.Collection().String() 720 repo := parsed.Authority().String() 721 rkey := parsed.RecordKey().String() 722 723 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 724 Collection: collection, 725 Repo: repo, 726 Rkey: rkey, 727 }) 728 return err 729} 730 731func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 732 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 733 if err != nil { 734 return err 735 } 736 // already present 737 if len(defaultLabels) == len(defaults) { 738 return nil 739 } 740 741 labelDefs, err := models.FetchLabelDefs(r, defaults) 742 if err != nil { 743 return err 744 } 745 746 // Insert each label definition to the database 747 for _, labelDef := range labelDefs { 748 _, err = db.AddLabelDefinition(e, &labelDef) 749 if err != nil { 750 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 751 } 752 } 753 754 return nil 755} 756 757func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 758 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 759 if err != nil { 760 logger.Error("failed to resolve tangled.org DID", "err", err) 761 return 762 } 763 764 pdsEndpoint := resolved.PDSEndpoint() 765 if pdsEndpoint == "" { 766 logger.Error("no PDS endpoint found for tangled.sh DID") 767 return 768 } 769 770 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 771 if err != nil { 772 logger.Error("failed to create appassword session... skipping fetch", "err", err) 773 return 774 } 775 776 l := log.SubLogger(logger, "bluesky") 777 778 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 779 defer ticker.Stop() 780 781 for { 782 // refresh session if necessary 783 if !session.IsValid() { 784 l.Debug("access token expired, refreshing session") 785 if err := session.RefreshSession(); err != nil { 786 l.Error("failed to refresh session, stopping bluesky updater", "err", err) 787 return 788 } 789 l.Debug("session refreshed") 790 } 791 792 // make client 793 client := xrpc.Client{ 794 Auth: &xrpc.AuthInfo{ 795 AccessJwt: session.AccessJwt, 796 Did: session.Did, 797 }, 798 Host: session.PdsEndpoint, 799 } 800 801 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 802 if err != nil { 803 l.Error("failed to fetch bluesky posts", "err", err) 804 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 805 l.Error("failed to insert bluesky posts", "err", err) 806 } else { 807 l.Info("inserted bluesky posts", "count", len(posts)) 808 } 809 810 select { 811 case <-ticker.C: 812 case <-ctx.Done(): 813 l.Info("stopping bluesky updater") 814 return 815 } 816 } 817}