Monorepo for Tangled tangled.org
8

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/accountmigration" 16 "tangled.org/core/appview/bsky" 17 "tangled.org/core/appview/cache" 18 "tangled.org/core/appview/cloudflare" 19 "tangled.org/core/appview/config" 20 "tangled.org/core/appview/db" 21 "tangled.org/core/appview/email" 22 "tangled.org/core/appview/indexer" 23 "tangled.org/core/appview/knotacl" 24 "tangled.org/core/appview/knotcompat" 25 "tangled.org/core/appview/mentions" 26 "tangled.org/core/appview/models" 27 "tangled.org/core/appview/notify" 28 dbnotify "tangled.org/core/appview/notify/db" 29 lognotify "tangled.org/core/appview/notify/logging" 30 phnotify "tangled.org/core/appview/notify/posthog" 31 whnotify "tangled.org/core/appview/notify/webhook" 32 "tangled.org/core/appview/oauth" 33 "tangled.org/core/appview/pages" 34 "tangled.org/core/appview/pipelines" 35 pipelinessh "tangled.org/core/appview/pipelines/ssh" 36 "tangled.org/core/appview/reporesolver" 37 "tangled.org/core/appview/repoverify" 38 "tangled.org/core/appview/validator" 39 xrpcclient "tangled.org/core/appview/xrpcclient" 40 "tangled.org/core/consts" 41 "tangled.org/core/eventconsumer" 42 "tangled.org/core/idresolver" 43 "tangled.org/core/jetstream" 44 "tangled.org/core/log" 45 tlog "tangled.org/core/log" 46 "tangled.org/core/orm" 47 "tangled.org/core/rbac" 48 49 comatproto "github.com/bluesky-social/indigo/api/atproto" 50 "github.com/bluesky-social/indigo/atproto/atclient" 51 "github.com/bluesky-social/indigo/atproto/syntax" 52 lexutil "github.com/bluesky-social/indigo/lex/util" 53 "github.com/bluesky-social/indigo/xrpc" 54 55 "github.com/go-chi/chi/v5" 56 "github.com/posthog/posthog-go" 57) 58 59type State struct { 60 db *db.DB 61 notifier notify.Notifier 62 indexer *indexer.Indexer 63 oauth *oauth.OAuth 64 enforcer *rbac.Enforcer 65 pages *pages.Pages 66 idResolver *idresolver.Resolver 67 rdb *cache.Cache 68 mentionsResolver *mentions.Resolver 69 posthog posthog.Client 70 jc *jetstream.JetstreamClient 71 config *config.Config 72 repoResolver *reporesolver.RepoResolver 73 aclService *knotacl.Service 74 knotstream *eventconsumer.Consumer 75 spindlestream *eventconsumer.Consumer 76 pipelineNotifier *pipelines.StatusNotifier 77 logger *slog.Logger 78 validator *validator.Validator 79 cfClient *cloudflare.Client 80 81 accountMigrationWorker *accountmigration.Worker 82} 83 84func Make(ctx context.Context, config *config.Config) (*State, error) { 85 logger := tlog.FromContext(ctx) 86 87 d, err := db.Make(ctx, config.Core.DbPath) 88 if err != nil { 89 return nil, fmt.Errorf("failed to create db: %w", err) 90 } 91 92 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 93 err = indexer.Init(ctx) 94 if err != nil { 95 return nil, fmt.Errorf("failed to create indexer: %w", err) 96 } 97 98 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 99 if err != nil { 100 return nil, fmt.Errorf("failed to create enforcer: %w", err) 101 } 102 103 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 104 if err != nil { 105 logger.Error("failed to create redis resolver", "err", err) 106 res = idresolver.DefaultResolver(config.Plc.PLCURL) 107 } 108 109 var rdb *cache.Cache 110 if config.Redis.Addr != "" { 111 rdb = cache.New(config.Redis.Addr) 112 } 113 114 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 115 if err != nil { 116 return nil, fmt.Errorf("failed to create posthog client: %w", err) 117 } 118 119 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages")) 120 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch"))) 121 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl")) 122 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth")) 123 if err != nil { 124 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 125 } 126 127 validator := validator.New(d, res, aclService) 128 129 repoResolver := reporesolver.New(config, aclService, d, rdb) 130 131 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 132 133 jc, err := jetstream.NewJetstreamClient( 134 config.Jetstream.Endpoint, 135 "appview", 136 []string{ 137 tangled.ActorProfileNSID, 138 tangled.FeedStarNSID, 139 tangled.FeedReactionNSID, 140 tangled.FeedCommentNSID, 141 tangled.GraphFollowNSID, 142 tangled.GraphVouchNSID, 143 tangled.KnotMemberNSID, 144 tangled.KnotNSID, 145 tangled.LabelDefinitionNSID, 146 tangled.LabelOpNSID, 147 tangled.PublicKeyNSID, 148 tangled.RepoArtifactNSID, 149 tangled.RepoIssueCommentNSID, 150 tangled.RepoIssueNSID, 151 tangled.RepoNSID, 152 tangled.RepoPullNSID, 153 tangled.RepoPullCommentNSID, 154 tangled.SpindleMemberNSID, 155 tangled.SpindleNSID, 156 tangled.StringNSID, 157 }, 158 nil, 159 tlog.SubLogger(logger, "jetstream"), 160 d, 161 false, 162 163 // in-memory filter is inapplicable to appview so 164 // we'll never log dids anyway. 165 false, 166 ) 167 if err != nil { 168 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 169 } 170 171 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 172 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 173 } 174 175 var notifiers []notify.Notifier 176 177 // Always add the database notifier 178 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 179 180 // Add other notifiers in production only 181 if !config.Core.Dev { 182 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 183 } 184 notifiers = append(notifiers, indexer) 185 186 notifiers = append(notifiers, whnotify.NewNotifier(d)) 187 188 notifier := notify.NewMergedNotifier(notifiers) 189 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 190 191 ingester := appview.Ingester{ 192 Ctx: ctx, 193 Db: d, 194 Enforcer: enforcer, 195 Acl: aclService, 196 IdResolver: res, 197 Cache: rdb, 198 Config: config, 199 Logger: log.SubLogger(logger, "ingester"), 200 Validator: validator, 201 MentionsResolver: mentionsResolver, 202 Notifier: notifier, 203 Verifier: repoverify.New(res, config.Core.Dev), 204 } 205 err = jc.StartJetstream(ctx, ingester.Ingest()) 206 if err != nil { 207 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 208 } 209 210 go ingester.SweepPendingVerifications() 211 212 var cfClient *cloudflare.Client 213 if config.Cloudflare.ApiToken != "" { 214 cfClient, err = cloudflare.New(config) 215 if err != nil { 216 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 217 cfClient = nil 218 } 219 } 220 221 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 222 if err != nil { 223 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 224 } 225 knotstream.Start(ctx) 226 227 pipelineNotifier := pipelines.NewStatusNotifier() 228 229 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier) 230 if err != nil { 231 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 232 } 233 spindlestream.Start(ctx) 234 235 if err := db.ReapStaleRunningGitRepoMigrations(ctx, d); err != nil { 236 logger.Warn("failed to reap stale gitrepo migrations", "err", err) 237 } 238 amWorker := accountmigration.NewWorker(d, oauth, config.Core.Dev, log.SubLogger(logger, "accountmigration")) 239 amWorker.Start(ctx) 240 241 state := &State{ 242 db: d, 243 notifier: notifier, 244 indexer: indexer, 245 oauth: oauth, 246 enforcer: enforcer, 247 pages: pages, 248 idResolver: res, 249 rdb: rdb, 250 mentionsResolver: mentionsResolver, 251 posthog: posthog, 252 jc: jc, 253 config: config, 254 repoResolver: repoResolver, 255 aclService: aclService, 256 knotstream: knotstream, 257 spindlestream: spindlestream, 258 pipelineNotifier: pipelineNotifier, 259 logger: logger, 260 validator: validator, 261 cfClient: cfClient, 262 263 accountMigrationWorker: amWorker, 264 } 265 266 // fetch initial bluesky posts if configured 267 go fetchBskyPosts(ctx, res, config, d, logger) 268 269 return state, nil 270} 271 272func (s *State) Close() error { 273 // other close up logic goes here 274 return s.db.Close() 275} 276 277func (s *State) NewSSHServer() *pipelinessh.Server { 278 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh")) 279} 280 281func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 282 w.Header().Set("Content-Type", "text/plain") 283 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 284 285 securityTxt := `Contact: mailto:security@tangled.org 286Preferred-Languages: en 287Canonical: https://tangled.org/.well-known/security.txt 288Expires: 2030-01-01T21:59:00.000Z 289` 290 w.Write([]byte(securityTxt)) 291} 292 293func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 294 w.Header().Set("Content-Type", "text/plain") 295 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 296 297 robotsTxt := `# Hello, Tanglers! 298User-agent: * 299Allow: / 300Disallow: /*/*/settings 301Disallow: /settings 302Disallow: /*/*/compare 303Disallow: /*/*/fork 304 305Crawl-delay: 1 306` 307 w.Write([]byte(robotsTxt)) 308} 309 310func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 311 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 312 BaseParams: pages.BaseParamsFromContext(r.Context()), 313 }) 314} 315 316func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 317 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 318 BaseParams: pages.BaseParamsFromContext(r.Context()), 319 }) 320} 321 322func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 323 s.pages.Brand(w, pages.BrandParams{ 324 BaseParams: pages.BaseParamsFromContext(r.Context()), 325 }) 326} 327 328func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 329 user := s.oauth.GetMultiAccountUser(r) 330 if user == nil { 331 return 332 } 333 334 l := s.logger.With("handler", "UpgradeBanner") 335 l = l.With("did", user.Did) 336 337 regs, err := db.GetRegistrations( 338 s.db, 339 orm.FilterEq("did", user.Did), 340 orm.FilterEq("needs_upgrade", 1), 341 ) 342 if err != nil { 343 l.Error("non-fatal: failed to get registrations", "err", err) 344 } 345 346 spindles, err := db.GetSpindles( 347 r.Context(), 348 s.db, 349 orm.FilterEq("owner", user.Did), 350 orm.FilterEq("needs_upgrade", 1), 351 ) 352 if err != nil { 353 l.Error("non-fatal: failed to get spindles", "err", err) 354 } 355 356 if regs == nil && spindles == nil { 357 return 358 } 359 360 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 361 Registrations: regs, 362 Spindles: spindles, 363 }) 364} 365 366func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) { 367 // target is echoed back from the form via hx-vals so the response span's 368 // id matches the form's hx-target. Fallback keeps the handler useful if 369 // a caller forgets to send it. 370 target := strings.TrimSpace(r.FormValue("target")) 371 if target == "" { 372 target = "home" 373 } 374 375 w.Header().Set("Content-Type", "text/html") 376 377 emailAddr := strings.TrimSpace(r.FormValue("email")) 378 if !email.IsValidEmail(emailAddr) { 379 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{ 380 Id: target, 381 Error: "Invalid email address.", 382 }) 383 return 384 } 385 386 // For logged-in users, persist the signup locally so the widget stays 387 // hidden across devices. The DB row is the render-time source of truth; 388 // Resend still owns the mailing list itself. 389 if user := s.oauth.GetMultiAccountUser(r); user != nil { 390 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil { 391 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err) 392 } 393 } 394 395 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" { 396 go func() { 397 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil { 398 s.logger.Error("failed to add newsletter contact", "error", err) 399 } 400 }() 401 } else { 402 s.logger.Error( 403 "failed to add newsletter contact, missing resend config", 404 "isKeyPresent", s.config.Resend.ApiKey != "", 405 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "", 406 "emailAddr", emailAddr, 407 ) 408 } 409 410 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target}) 411} 412 413// NewsletterDismiss records that a logged-in user has dismissed the newsletter 414// widget so it stays hidden across their devices. Anonymous callers get a 204 415// with no DB write — localStorage handles the per-browser fallback. 416func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) { 417 user := s.oauth.GetMultiAccountUser(r) 418 if user == nil { 419 w.WriteHeader(http.StatusNoContent) 420 return 421 } 422 423 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil { 424 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err) 425 } 426 w.WriteHeader(http.StatusNoContent) 427} 428 429func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 430 user := chi.URLParam(r, "user") 431 user = strings.TrimPrefix(user, "@") 432 433 if user == "" { 434 w.WriteHeader(http.StatusBadRequest) 435 return 436 } 437 438 id, err := s.idResolver.ResolveIdent(r.Context(), user) 439 if err != nil { 440 w.WriteHeader(http.StatusInternalServerError) 441 return 442 } 443 444 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 445 if err != nil { 446 s.logger.Error("failed to get public keys", "err", err) 447 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 448 return 449 } 450 451 if len(pubKeys) == 0 { 452 w.WriteHeader(http.StatusNoContent) 453 return 454 } 455 456 for _, k := range pubKeys { 457 key := strings.TrimRight(k.Key, "\n") 458 fmt.Fprintln(w, key) 459 } 460} 461 462func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 463 switch r.Method { 464 case http.MethodGet: 465 user := s.oauth.GetMultiAccountUser(r) 466 knots := s.aclService.KnotsForUser(r.Context(), user.Did) 467 468 s.pages.NewRepo(w, pages.NewRepoParams{ 469 BaseParams: pages.BaseParamsFromContext(r.Context()), 470 Knots: knots, 471 }) 472 473 case http.MethodPost: 474 l := s.logger.With("handler", "NewRepo") 475 476 user := s.oauth.GetMultiAccountUser(r) 477 l = l.With("did", user.Did) 478 479 // form validation 480 domain := r.FormValue("domain") 481 if domain == "" { 482 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 483 return 484 } 485 l = l.With("knot", domain) 486 487 repoName := r.FormValue("name") 488 if repoName == "" { 489 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 490 return 491 } 492 493 if err := models.ValidateRepoName(repoName); err != nil { 494 s.pages.Notice(w, "repo", err.Error()) 495 return 496 } 497 repoName = models.StripGitExt(repoName) 498 rkey := strings.ToLower(repoName) 499 l = l.With("repoName", repoName, "rkey", rkey) 500 501 defaultBranch := r.FormValue("branch") 502 if defaultBranch == "" { 503 defaultBranch = "main" 504 } 505 l = l.With("defaultBranch", defaultBranch) 506 507 description := r.FormValue("description") 508 if len([]rune(description)) > 140 { 509 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 510 return 511 } 512 513 // ACL validation 514 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) { 515 l.Info("unauthorized") 516 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 517 return 518 } 519 520 // Check for existing repos 521 existingRepo, err := db.GetRepo( 522 s.db, 523 orm.FilterEq("did", user.Did), 524 orm.FilterEq("rkey", rkey), 525 ) 526 if err == nil && existingRepo != nil { 527 l.Info("repo exists") 528 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 529 return 530 } 531 532 atpClient, err := s.oauth.AuthorizedClient(r) 533 if err != nil { 534 l.Error("failed to get authorized client", "err", err) 535 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.") 536 return 537 } 538 539 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 540 l.Info("rkey occupied by prior rename alias") 541 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey)) 542 return 543 } 544 545 client, err := s.oauth.ServiceClient( 546 r, 547 oauth.WithService(domain), 548 oauth.WithLxm(tangled.RepoCreateNSID), 549 oauth.WithDev(s.config.Core.Dev), 550 ) 551 if err != nil { 552 l.Error("service auth failed", "err", err) 553 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.") 554 return 555 } 556 557 input := &tangled.RepoCreate_Input{ 558 Rkey: rkey, 559 Name: rkey, 560 DefaultBranch: &defaultBranch, 561 } 562 createResp, err := tangled.RepoCreate( 563 r.Context(), 564 client, 565 input, 566 ) 567 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 568 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 569 s.pages.Notice(w, "repo", err.Error()) 570 return 571 } 572 573 var repoDid string 574 if createResp != nil && createResp.RepoDid != nil { 575 repoDid = *createResp.RepoDid 576 } 577 if repoDid == "" { 578 l.Error("knot returned empty repo DID") 579 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 580 return 581 } 582 583 repo := &models.Repo{ 584 Did: user.Did, 585 Name: repoName, 586 Knot: domain, 587 Rkey: rkey, 588 Description: description, 589 Created: time.Now(), 590 Labels: s.config.Label.DefaultLabelDefs, 591 RepoDid: repoDid, 592 } 593 record := repo.AsRecord() 594 595 cleanupKnot := func() { 596 go func() { 597 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 598 for attempt, delay := range delays { 599 time.Sleep(delay) 600 deleteClient, dErr := s.oauth.ServiceClient( 601 r, 602 oauth.WithService(domain), 603 oauth.WithLxm(tangled.RepoDeleteNSID), 604 oauth.WithDev(s.config.Core.Dev), 605 ) 606 if dErr != nil { 607 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 608 continue 609 } 610 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 611 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 612 Did: user.Did, 613 Name: rkey, 614 Rkey: rkey, 615 }); dErr != nil { 616 cancel() 617 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 618 continue 619 } 620 cancel() 621 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 622 return 623 } 624 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 625 "did", user.Did, "repo", repoName, "knot", domain) 626 }() 627 } 628 629 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 630 Collection: tangled.RepoNSID, 631 Repo: user.Did, 632 Rkey: &rkey, 633 Record: &lexutil.LexiconTypeDecoder{ 634 Val: &record, 635 }, 636 }) 637 if err != nil { 638 l.Info("PDS write failed", "err", err) 639 cleanupKnot() 640 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 641 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey)) 642 } else { 643 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 644 } 645 return 646 } 647 648 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey) 649 l = l.With("aturi", aturi) 650 l.Info("wrote to PDS") 651 652 tx, err := s.db.BeginTx(r.Context(), nil) 653 if err != nil { 654 l.Info("txn failed", "err", err) 655 s.pages.Notice(w, "repo", "Failed to save repository information.") 656 return 657 } 658 659 rollback := func() { 660 err1 := tx.Rollback() 661 err2 := s.enforcer.E.LoadPolicy() 662 err3 := rollbackRecord(context.Background(), aturi, atpClient) 663 664 if errors.Is(err1, sql.ErrTxDone) { 665 err1 = nil 666 } 667 668 if errs := errors.Join(err1, err2, err3); errs != nil { 669 l.Error("failed to rollback changes", "errs", errs) 670 } 671 672 if aturi != "" { 673 cleanupKnot() 674 } 675 } 676 defer rollback() 677 678 err = db.AddRepo(tx, repo) 679 if err != nil { 680 l.Error("db write failed", "err", err) 681 s.pages.Notice(w, "repo", "Failed to save repository information.") 682 return 683 } 684 685 rbacPath := repo.RepoIdentifier() 686 err = s.enforcer.AddRepo(user.Did, domain, rbacPath) 687 if err != nil { 688 l.Error("acl setup failed", "err", err) 689 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 690 return 691 } 692 693 err = tx.Commit() 694 if err != nil { 695 l.Error("txn commit failed", "err", err) 696 http.Error(w, err.Error(), http.StatusInternalServerError) 697 return 698 } 699 700 err = s.enforcer.E.SavePolicy() 701 if err != nil { 702 l.Error("acl save failed", "err", err) 703 http.Error(w, err.Error(), http.StatusInternalServerError) 704 return 705 } 706 707 aturi = "" 708 709 s.notifier.NewRepo(r.Context(), repo) 710 switch { 711 case repoDid != "": 712 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 713 default: 714 handle := s.pages.DisplayHandle(r.Context(), user.Did) 715 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey)) 716 } 717 } 718} 719 720func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool { 721 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 722 defer cancel() 723 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey) 724 return err == nil && resp != nil 725} 726 727// this is used to rollback changes made to the PDS 728// 729// it is a no-op if the provided ATURI is empty 730func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 731 if aturi == "" { 732 return nil 733 } 734 735 parsed := syntax.ATURI(aturi) 736 737 collection := parsed.Collection().String() 738 repo := parsed.Authority().String() 739 rkey := parsed.RecordKey().String() 740 741 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 742 Collection: collection, 743 Repo: repo, 744 Rkey: rkey, 745 }) 746 return err 747} 748 749func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 750 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 751 if err != nil { 752 return err 753 } 754 // already present 755 if len(defaultLabels) == len(defaults) { 756 return nil 757 } 758 759 labelDefs, err := models.FetchLabelDefs(r, defaults) 760 if err != nil { 761 return err 762 } 763 764 // Insert each label definition to the database 765 for _, labelDef := range labelDefs { 766 _, err = db.AddLabelDefinition(e, &labelDef) 767 if err != nil { 768 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 769 } 770 } 771 772 return nil 773} 774 775func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 776 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 777 if err != nil { 778 logger.Error("failed to resolve tangled.org DID", "err", err) 779 return 780 } 781 782 pdsEndpoint := resolved.PDSEndpoint() 783 if pdsEndpoint == "" { 784 logger.Error("no PDS endpoint found for tangled.sh DID") 785 return 786 } 787 788 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 789 if err != nil { 790 logger.Error("failed to create appassword session... skipping fetch", "err", err) 791 return 792 } 793 794 l := log.SubLogger(logger, "bluesky") 795 796 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 797 defer ticker.Stop() 798 799 for { 800 // refresh session if necessary 801 if !session.IsValid() { 802 l.Debug("access token expired, refreshing session") 803 if err := session.RefreshSession(); err != nil { 804 l.Error("failed to refresh session, stopping bluesky updater", "err", err) 805 return 806 } 807 l.Debug("session refreshed") 808 } 809 810 // make client 811 client := xrpc.Client{ 812 Auth: &xrpc.AuthInfo{ 813 AccessJwt: session.AccessJwt, 814 Did: session.Did, 815 }, 816 Host: session.PdsEndpoint, 817 } 818 819 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 820 if err != nil { 821 l.Error("failed to fetch bluesky posts", "err", err) 822 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 823 l.Error("failed to insert bluesky posts", "err", err) 824 } else { 825 l.Info("inserted bluesky posts", "count", len(posts)) 826 } 827 828 select { 829 case <-ticker.C: 830 case <-ctx.Done(): 831 l.Info("stopping bluesky updater") 832 return 833 } 834 } 835}