Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

at icy/yovxsu 24 kB View raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cloudflare" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/email" 21 "tangled.org/core/appview/indexer" 22 "tangled.org/core/appview/knotacl" 23 "tangled.org/core/appview/knotcompat" 24 "tangled.org/core/appview/mentions" 25 "tangled.org/core/appview/models" 26 "tangled.org/core/appview/notify" 27 dbnotify "tangled.org/core/appview/notify/db" 28 lognotify "tangled.org/core/appview/notify/logging" 29 phnotify "tangled.org/core/appview/notify/posthog" 30 whnotify "tangled.org/core/appview/notify/webhook" 31 "tangled.org/core/appview/oauth" 32 "tangled.org/core/appview/pages" 33 "tangled.org/core/appview/pipelines" 34 pipelinessh "tangled.org/core/appview/pipelines/ssh" 35 "tangled.org/core/appview/reporesolver" 36 "tangled.org/core/appview/repoverify" 37 "tangled.org/core/appview/validator" 38 xrpcclient "tangled.org/core/appview/xrpcclient" 39 "tangled.org/core/consts" 40 "tangled.org/core/eventconsumer" 41 "tangled.org/core/idresolver" 42 "tangled.org/core/jetstream" 43 "tangled.org/core/log" 44 tlog "tangled.org/core/log" 45 "tangled.org/core/orm" 46 "tangled.org/core/rbac" 47 48 comatproto "github.com/bluesky-social/indigo/api/atproto" 49 "github.com/bluesky-social/indigo/atproto/atclient" 50 "github.com/bluesky-social/indigo/atproto/syntax" 51 lexutil "github.com/bluesky-social/indigo/lex/util" 52 "github.com/bluesky-social/indigo/xrpc" 53 54 "github.com/go-chi/chi/v5" 55 "github.com/posthog/posthog-go" 56) 57 58type State struct { 59 db *db.DB 60 notifier notify.Notifier 61 indexer *indexer.Indexer 62 oauth *oauth.OAuth 63 enforcer *rbac.Enforcer 64 pages *pages.Pages 65 idResolver *idresolver.Resolver 66 rdb *cache.Cache 67 mentionsResolver *mentions.Resolver 68 posthog posthog.Client 69 jc *jetstream.JetstreamClient 70 config *config.Config 71 repoResolver *reporesolver.RepoResolver 72 aclService *knotacl.Service 73 knotstream *eventconsumer.Consumer 74 spindlestream *eventconsumer.Consumer 75 pipelineNotifier *pipelines.StatusNotifier 76 logger *slog.Logger 77 validator *validator.Validator 78 cfClient *cloudflare.Client 79} 80 81func Make(ctx context.Context, config *config.Config) (*State, error) { 82 logger := tlog.FromContext(ctx) 83 84 d, err := db.Make(ctx, config.Core.DbPath) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create db: %w", err) 87 } 88 89 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 90 err = indexer.Init(ctx) 91 if err != nil { 92 return nil, fmt.Errorf("failed to create indexer: %w", err) 93 } 94 95 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 96 if err != nil { 97 return nil, fmt.Errorf("failed to create enforcer: %w", err) 98 } 99 100 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 101 if err != nil { 102 logger.Error("failed to create redis resolver", "err", err) 103 res = idresolver.DefaultResolver(config.Plc.PLCURL) 104 } 105 106 var rdb *cache.Cache 107 if config.Redis.Addr != "" { 108 rdb = cache.New(config.Redis.Addr) 109 } 110 111 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 112 if err != nil { 113 return nil, fmt.Errorf("failed to create posthog client: %w", err) 114 } 115 116 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages")) 117 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch"))) 118 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl")) 119 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth")) 120 if err != nil { 121 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 122 } 123 124 validator := validator.New(d, res, aclService) 125 126 repoResolver := reporesolver.New(config, aclService, d, rdb) 127 128 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 129 130 jc, err := jetstream.NewJetstreamClient( 131 config.Jetstream.Endpoint, 132 "appview", 133 []string{ 134 tangled.ActorProfileNSID, 135 tangled.FeedStarNSID, 136 tangled.FeedReactionNSID, 137 tangled.FeedCommentNSID, 138 tangled.GraphFollowNSID, 139 tangled.GraphVouchNSID, 140 tangled.KnotMemberNSID, 141 tangled.KnotNSID, 142 tangled.LabelDefinitionNSID, 143 tangled.LabelOpNSID, 144 tangled.PublicKeyNSID, 145 tangled.RepoArtifactNSID, 146 tangled.RepoIssueCommentNSID, 147 tangled.RepoIssueNSID, 148 tangled.RepoNSID, 149 tangled.RepoPullNSID, 150 tangled.RepoPullCommentNSID, 151 tangled.SpindleMemberNSID, 152 tangled.SpindleNSID, 153 tangled.StringNSID, 154 }, 155 nil, 156 tlog.SubLogger(logger, "jetstream"), 157 d, 158 false, 159 160 // in-memory filter is inapplicable to appview so 161 // we'll never log dids anyway. 162 false, 163 ) 164 if err != nil { 165 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 166 } 167 168 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 169 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 170 } 171 172 var notifiers []notify.Notifier 173 174 // Always add the database notifier 175 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 176 177 // Add other notifiers in production only 178 if !config.Core.Dev { 179 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 180 } 181 notifiers = append(notifiers, indexer) 182 183 notifiers = append(notifiers, whnotify.NewNotifier(d)) 184 185 notifier := notify.NewMergedNotifier(notifiers) 186 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 187 188 ingester := appview.Ingester{ 189 Ctx: ctx, 190 Db: d, 191 Enforcer: enforcer, 192 Acl: aclService, 193 IdResolver: res, 194 Cache: rdb, 195 Config: config, 196 Logger: log.SubLogger(logger, "ingester"), 197 Validator: validator, 198 MentionsResolver: mentionsResolver, 199 Notifier: notifier, 200 Verifier: repoverify.New(res, config.Core.Dev), 201 } 202 err = jc.StartJetstream(ctx, ingester.Ingest()) 203 if err != nil { 204 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 205 } 206 207 go ingester.SweepPendingVerifications() 208 209 var cfClient *cloudflare.Client 210 if config.Cloudflare.ApiToken != "" { 211 cfClient, err = cloudflare.New(config) 212 if err != nil { 213 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 214 cfClient = nil 215 } 216 } 217 218 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 219 if err != nil { 220 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 221 } 222 knotstream.Start(ctx) 223 224 pipelineNotifier := pipelines.NewStatusNotifier() 225 226 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier) 227 if err != nil { 228 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 229 } 230 spindlestream.Start(ctx) 231 232 state := &State{ 233 db: d, 234 notifier: notifier, 235 indexer: indexer, 236 oauth: oauth, 237 enforcer: enforcer, 238 pages: pages, 239 idResolver: res, 240 rdb: rdb, 241 mentionsResolver: mentionsResolver, 242 posthog: posthog, 243 jc: jc, 244 config: config, 245 repoResolver: repoResolver, 246 aclService: aclService, 247 knotstream: knotstream, 248 spindlestream: spindlestream, 249 pipelineNotifier: pipelineNotifier, 250 logger: logger, 251 validator: validator, 252 cfClient: cfClient, 253 } 254 255 // fetch initial bluesky posts if configured 256 go fetchBskyPosts(ctx, res, config, d, logger) 257 258 return state, nil 259} 260 261func (s *State) Close() error { 262 // other close up logic goes here 263 return s.db.Close() 264} 265 266func (s *State) NewSSHServer() *pipelinessh.Server { 267 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh")) 268} 269 270func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 271 w.Header().Set("Content-Type", "text/plain") 272 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 273 274 securityTxt := `Contact: mailto:security@tangled.org 275Preferred-Languages: en 276Canonical: https://tangled.org/.well-known/security.txt 277Expires: 2030-01-01T21:59:00.000Z 278` 279 w.Write([]byte(securityTxt)) 280} 281 282func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 283 w.Header().Set("Content-Type", "text/plain") 284 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 285 286 robotsTxt := `# Hello, Tanglers! 287User-agent: * 288Allow: / 289Disallow: /*/*/settings 290Disallow: /settings 291Disallow: /*/*/compare 292Disallow: /*/*/fork 293 294Crawl-delay: 1 295` 296 w.Write([]byte(robotsTxt)) 297} 298 299func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 300 user := s.oauth.GetMultiAccountUser(r) 301 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 302 LoggedInUser: user, 303 }) 304} 305 306func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 307 user := s.oauth.GetMultiAccountUser(r) 308 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 309 LoggedInUser: user, 310 }) 311} 312 313func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 314 user := s.oauth.GetMultiAccountUser(r) 315 s.pages.Brand(w, pages.BrandParams{ 316 LoggedInUser: user, 317 }) 318} 319 320func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 321 user := s.oauth.GetMultiAccountUser(r) 322 if user == nil { 323 return 324 } 325 326 l := s.logger.With("handler", "UpgradeBanner") 327 l = l.With("did", user.Did) 328 329 regs, err := db.GetRegistrations( 330 s.db, 331 orm.FilterEq("did", user.Did), 332 orm.FilterEq("needs_upgrade", 1), 333 ) 334 if err != nil { 335 l.Error("non-fatal: failed to get registrations", "err", err) 336 } 337 338 spindles, err := db.GetSpindles( 339 r.Context(), 340 s.db, 341 orm.FilterEq("owner", user.Did), 342 orm.FilterEq("needs_upgrade", 1), 343 ) 344 if err != nil { 345 l.Error("non-fatal: failed to get spindles", "err", err) 346 } 347 348 if regs == nil && spindles == nil { 349 return 350 } 351 352 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 353 Registrations: regs, 354 Spindles: spindles, 355 }) 356} 357 358func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) { 359 // target is echoed back from the form via hx-vals so the response span's 360 // id matches the form's hx-target. Fallback keeps the handler useful if 361 // a caller forgets to send it. 362 target := strings.TrimSpace(r.FormValue("target")) 363 if target == "" { 364 target = "home" 365 } 366 367 w.Header().Set("Content-Type", "text/html") 368 369 emailAddr := strings.TrimSpace(r.FormValue("email")) 370 if !email.IsValidEmail(emailAddr) { 371 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{ 372 Id: target, 373 Error: "Invalid email address.", 374 }) 375 return 376 } 377 378 // For logged-in users, persist the signup locally so the widget stays 379 // hidden across devices. The DB row is the render-time source of truth; 380 // Resend still owns the mailing list itself. 381 if user := s.oauth.GetMultiAccountUser(r); user != nil { 382 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil { 383 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err) 384 } 385 } 386 387 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" { 388 go func() { 389 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil { 390 s.logger.Error("failed to add newsletter contact", "error", err) 391 } 392 }() 393 } else { 394 s.logger.Error( 395 "failed to add newsletter contact, missing resend config", 396 "isKeyPresent", s.config.Resend.ApiKey != "", 397 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "", 398 "emailAddr", emailAddr, 399 ) 400 } 401 402 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target}) 403} 404 405// NewsletterDismiss records that a logged-in user has dismissed the newsletter 406// widget so it stays hidden across their devices. Anonymous callers get a 204 407// with no DB write — localStorage handles the per-browser fallback. 408func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) { 409 user := s.oauth.GetMultiAccountUser(r) 410 if user == nil { 411 w.WriteHeader(http.StatusNoContent) 412 return 413 } 414 415 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil { 416 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err) 417 } 418 w.WriteHeader(http.StatusNoContent) 419} 420 421func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 422 user := chi.URLParam(r, "user") 423 user = strings.TrimPrefix(user, "@") 424 425 if user == "" { 426 w.WriteHeader(http.StatusBadRequest) 427 return 428 } 429 430 id, err := s.idResolver.ResolveIdent(r.Context(), user) 431 if err != nil { 432 w.WriteHeader(http.StatusInternalServerError) 433 return 434 } 435 436 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 437 if err != nil { 438 s.logger.Error("failed to get public keys", "err", err) 439 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 440 return 441 } 442 443 if len(pubKeys) == 0 { 444 w.WriteHeader(http.StatusNoContent) 445 return 446 } 447 448 for _, k := range pubKeys { 449 key := strings.TrimRight(k.Key, "\n") 450 fmt.Fprintln(w, key) 451 } 452} 453 454func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 455 switch r.Method { 456 case http.MethodGet: 457 user := s.oauth.GetMultiAccountUser(r) 458 knots := s.aclService.KnotsForUser(r.Context(), user.Did) 459 460 s.pages.NewRepo(w, pages.NewRepoParams{ 461 LoggedInUser: user, 462 Knots: knots, 463 }) 464 465 case http.MethodPost: 466 l := s.logger.With("handler", "NewRepo") 467 468 user := s.oauth.GetMultiAccountUser(r) 469 l = l.With("did", user.Did) 470 471 // form validation 472 domain := r.FormValue("domain") 473 if domain == "" { 474 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 475 return 476 } 477 l = l.With("knot", domain) 478 479 repoName := r.FormValue("name") 480 if repoName == "" { 481 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 482 return 483 } 484 485 if err := models.ValidateRepoName(repoName); err != nil { 486 s.pages.Notice(w, "repo", err.Error()) 487 return 488 } 489 repoName = models.StripGitExt(repoName) 490 rkey := strings.ToLower(repoName) 491 l = l.With("repoName", repoName, "rkey", rkey) 492 493 defaultBranch := r.FormValue("branch") 494 if defaultBranch == "" { 495 defaultBranch = "main" 496 } 497 l = l.With("defaultBranch", defaultBranch) 498 499 description := r.FormValue("description") 500 if len([]rune(description)) > 140 { 501 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 502 return 503 } 504 505 // ACL validation 506 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) { 507 l.Info("unauthorized") 508 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 509 return 510 } 511 512 // Check for existing repos 513 existingRepo, err := db.GetRepo( 514 s.db, 515 orm.FilterEq("did", user.Did), 516 orm.FilterEq("rkey", rkey), 517 ) 518 if err == nil && existingRepo != nil { 519 l.Info("repo exists") 520 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 521 return 522 } 523 524 atpClient, err := s.oauth.AuthorizedClient(r) 525 if err != nil { 526 l.Error("failed to get authorized client", "err", err) 527 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.") 528 return 529 } 530 531 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 532 l.Info("rkey occupied by prior rename alias") 533 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey)) 534 return 535 } 536 537 client, err := s.oauth.ServiceClient( 538 r, 539 oauth.WithService(domain), 540 oauth.WithLxm(tangled.RepoCreateNSID), 541 oauth.WithDev(s.config.Core.Dev), 542 ) 543 if err != nil { 544 l.Error("service auth failed", "err", err) 545 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.") 546 return 547 } 548 549 input := &tangled.RepoCreate_Input{ 550 Rkey: rkey, 551 Name: rkey, 552 DefaultBranch: &defaultBranch, 553 } 554 createResp, err := tangled.RepoCreate( 555 r.Context(), 556 client, 557 input, 558 ) 559 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 560 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 561 s.pages.Notice(w, "repo", err.Error()) 562 return 563 } 564 565 var repoDid string 566 if createResp != nil && createResp.RepoDid != nil { 567 repoDid = *createResp.RepoDid 568 } 569 if repoDid == "" { 570 l.Error("knot returned empty repo DID") 571 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 572 return 573 } 574 575 repo := &models.Repo{ 576 Did: user.Did, 577 Name: repoName, 578 Knot: domain, 579 Rkey: rkey, 580 Description: description, 581 Created: time.Now(), 582 Labels: s.config.Label.DefaultLabelDefs, 583 RepoDid: repoDid, 584 } 585 record := repo.AsRecord() 586 587 cleanupKnot := func() { 588 go func() { 589 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 590 for attempt, delay := range delays { 591 time.Sleep(delay) 592 deleteClient, dErr := s.oauth.ServiceClient( 593 r, 594 oauth.WithService(domain), 595 oauth.WithLxm(tangled.RepoDeleteNSID), 596 oauth.WithDev(s.config.Core.Dev), 597 ) 598 if dErr != nil { 599 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 600 continue 601 } 602 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 603 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 604 Did: user.Did, 605 Name: rkey, 606 Rkey: rkey, 607 }); dErr != nil { 608 cancel() 609 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 610 continue 611 } 612 cancel() 613 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 614 return 615 } 616 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 617 "did", user.Did, "repo", repoName, "knot", domain) 618 }() 619 } 620 621 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 622 Collection: tangled.RepoNSID, 623 Repo: user.Did, 624 Rkey: &rkey, 625 Record: &lexutil.LexiconTypeDecoder{ 626 Val: &record, 627 }, 628 }) 629 if err != nil { 630 l.Info("PDS write failed", "err", err) 631 cleanupKnot() 632 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 633 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey)) 634 } else { 635 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 636 } 637 return 638 } 639 640 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey) 641 l = l.With("aturi", aturi) 642 l.Info("wrote to PDS") 643 644 tx, err := s.db.BeginTx(r.Context(), nil) 645 if err != nil { 646 l.Info("txn failed", "err", err) 647 s.pages.Notice(w, "repo", "Failed to save repository information.") 648 return 649 } 650 651 rollback := func() { 652 err1 := tx.Rollback() 653 err2 := s.enforcer.E.LoadPolicy() 654 err3 := rollbackRecord(context.Background(), aturi, atpClient) 655 656 if errors.Is(err1, sql.ErrTxDone) { 657 err1 = nil 658 } 659 660 if errs := errors.Join(err1, err2, err3); errs != nil { 661 l.Error("failed to rollback changes", "errs", errs) 662 } 663 664 if aturi != "" { 665 cleanupKnot() 666 } 667 } 668 defer rollback() 669 670 err = db.AddRepo(tx, repo) 671 if err != nil { 672 l.Error("db write failed", "err", err) 673 s.pages.Notice(w, "repo", "Failed to save repository information.") 674 return 675 } 676 677 rbacPath := repo.RepoIdentifier() 678 err = s.enforcer.AddRepo(user.Did, domain, rbacPath) 679 if err != nil { 680 l.Error("acl setup failed", "err", err) 681 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 682 return 683 } 684 685 err = tx.Commit() 686 if err != nil { 687 l.Error("txn commit failed", "err", err) 688 http.Error(w, err.Error(), http.StatusInternalServerError) 689 return 690 } 691 692 err = s.enforcer.E.SavePolicy() 693 if err != nil { 694 l.Error("acl save failed", "err", err) 695 http.Error(w, err.Error(), http.StatusInternalServerError) 696 return 697 } 698 699 aturi = "" 700 701 s.notifier.NewRepo(r.Context(), repo) 702 switch { 703 case repoDid != "": 704 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 705 default: 706 handle := s.pages.DisplayHandle(r.Context(), user.Did) 707 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey)) 708 } 709 } 710} 711 712func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool { 713 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 714 defer cancel() 715 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey) 716 return err == nil && resp != nil 717} 718 719// this is used to rollback changes made to the PDS 720// 721// it is a no-op if the provided ATURI is empty 722func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 723 if aturi == "" { 724 return nil 725 } 726 727 parsed := syntax.ATURI(aturi) 728 729 collection := parsed.Collection().String() 730 repo := parsed.Authority().String() 731 rkey := parsed.RecordKey().String() 732 733 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 734 Collection: collection, 735 Repo: repo, 736 Rkey: rkey, 737 }) 738 return err 739} 740 741func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 742 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 743 if err != nil { 744 return err 745 } 746 // already present 747 if len(defaultLabels) == len(defaults) { 748 return nil 749 } 750 751 labelDefs, err := models.FetchLabelDefs(r, defaults) 752 if err != nil { 753 return err 754 } 755 756 // Insert each label definition to the database 757 for _, labelDef := range labelDefs { 758 _, err = db.AddLabelDefinition(e, &labelDef) 759 if err != nil { 760 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 761 } 762 } 763 764 return nil 765} 766 767func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 768 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 769 if err != nil { 770 logger.Error("failed to resolve tangled.org DID", "err", err) 771 return 772 } 773 774 pdsEndpoint := resolved.PDSEndpoint() 775 if pdsEndpoint == "" { 776 logger.Error("no PDS endpoint found for tangled.sh DID") 777 return 778 } 779 780 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 781 if err != nil { 782 logger.Error("failed to create appassword session... skipping fetch", "err", err) 783 return 784 } 785 786 l := log.SubLogger(logger, "bluesky") 787 788 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 789 defer ticker.Stop() 790 791 for { 792 // refresh session if necessary 793 if !session.IsValid() { 794 l.Debug("access token expired, refreshing session") 795 if err := session.RefreshSession(); err != nil { 796 l.Error("failed to refresh session, stopping bluesky updater", "err", err) 797 return 798 } 799 l.Debug("session refreshed") 800 } 801 802 // make client 803 client := xrpc.Client{ 804 Auth: &xrpc.AuthInfo{ 805 AccessJwt: session.AccessJwt, 806 Did: session.Did, 807 }, 808 Host: session.PdsEndpoint, 809 } 810 811 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 812 if err != nil { 813 l.Error("failed to fetch bluesky posts", "err", err) 814 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 815 l.Error("failed to insert bluesky posts", "err", err) 816 } else { 817 l.Info("inserted bluesky posts", "count", len(posts)) 818 } 819 820 select { 821 case <-ticker.C: 822 case <-ctx.Done(): 823 l.Info("stopping bluesky updater") 824 return 825 } 826 } 827}