Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

at icy/qmlqxq 24 kB View raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cloudflare" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/email" 21 "tangled.org/core/appview/indexer" 22 "tangled.org/core/appview/knotacl" 23 "tangled.org/core/appview/knotcompat" 24 "tangled.org/core/appview/mentions" 25 "tangled.org/core/appview/models" 26 "tangled.org/core/appview/notify" 27 dbnotify "tangled.org/core/appview/notify/db" 28 lognotify "tangled.org/core/appview/notify/logging" 29 phnotify "tangled.org/core/appview/notify/posthog" 30 whnotify "tangled.org/core/appview/notify/webhook" 31 "tangled.org/core/appview/oauth" 32 "tangled.org/core/appview/pages" 33 "tangled.org/core/appview/pipelines" 34 pipelinessh "tangled.org/core/appview/pipelines/ssh" 35 "tangled.org/core/appview/reporesolver" 36 "tangled.org/core/appview/repoverify" 37 "tangled.org/core/appview/validator" 38 xrpcclient "tangled.org/core/appview/xrpcclient" 39 "tangled.org/core/consts" 40 "tangled.org/core/eventconsumer" 41 "tangled.org/core/idresolver" 42 "tangled.org/core/jetstream" 43 "tangled.org/core/log" 44 tlog "tangled.org/core/log" 45 "tangled.org/core/orm" 46 "tangled.org/core/rbac" 47 48 comatproto "github.com/bluesky-social/indigo/api/atproto" 49 "github.com/bluesky-social/indigo/atproto/atclient" 50 "github.com/bluesky-social/indigo/atproto/syntax" 51 lexutil "github.com/bluesky-social/indigo/lex/util" 52 "github.com/bluesky-social/indigo/xrpc" 53 54 "github.com/go-chi/chi/v5" 55 "github.com/posthog/posthog-go" 56) 57 58type State struct { 59 db *db.DB 60 notifier notify.Notifier 61 indexer *indexer.Indexer 62 oauth *oauth.OAuth 63 enforcer *rbac.Enforcer 64 pages *pages.Pages 65 idResolver *idresolver.Resolver 66 rdb *cache.Cache 67 mentionsResolver *mentions.Resolver 68 posthog posthog.Client 69 jc *jetstream.JetstreamClient 70 config *config.Config 71 repoResolver *reporesolver.RepoResolver 72 aclService *knotacl.Service 73 knotstream *eventconsumer.Consumer 74 spindlestream *eventconsumer.Consumer 75 pipelineNotifier *pipelines.StatusNotifier 76 logger *slog.Logger 77 validator *validator.Validator 78 cfClient *cloudflare.Client 79} 80 81func Make(ctx context.Context, config *config.Config) (*State, error) { 82 logger := tlog.FromContext(ctx) 83 84 d, err := db.Make(ctx, config.Core.DbPath) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create db: %w", err) 87 } 88 89 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 90 err = indexer.Init(ctx) 91 if err != nil { 92 return nil, fmt.Errorf("failed to create indexer: %w", err) 93 } 94 95 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 96 if err != nil { 97 return nil, fmt.Errorf("failed to create enforcer: %w", err) 98 } 99 100 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 101 if err != nil { 102 logger.Error("failed to create redis resolver", "err", err) 103 res = idresolver.DefaultResolver(config.Plc.PLCURL) 104 } 105 106 var rdb *cache.Cache 107 if config.Redis.Addr != "" { 108 rdb = cache.New(config.Redis.Addr) 109 } 110 111 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 112 if err != nil { 113 return nil, fmt.Errorf("failed to create posthog client: %w", err) 114 } 115 116 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages")) 117 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch"))) 118 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl")) 119 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth")) 120 if err != nil { 121 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 122 } 123 124 validator := validator.New(d, res, aclService) 125 126 repoResolver := reporesolver.New(config, aclService, d, rdb) 127 128 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 129 130 jc, err := jetstream.NewJetstreamClient( 131 config.Jetstream.Endpoint, 132 "appview", 133 []string{ 134 tangled.ActorProfileNSID, 135 tangled.FeedStarNSID, 136 tangled.FeedReactionNSID, 137 tangled.FeedCommentNSID, 138 tangled.GraphFollowNSID, 139 tangled.GraphVouchNSID, 140 tangled.KnotMemberNSID, 141 tangled.KnotNSID, 142 tangled.LabelDefinitionNSID, 143 tangled.LabelOpNSID, 144 tangled.PublicKeyNSID, 145 tangled.RepoArtifactNSID, 146 tangled.RepoIssueCommentNSID, 147 tangled.RepoIssueNSID, 148 tangled.RepoNSID, 149 tangled.RepoPullNSID, 150 tangled.RepoPullCommentNSID, 151 tangled.SpindleMemberNSID, 152 tangled.SpindleNSID, 153 tangled.StringNSID, 154 }, 155 nil, 156 tlog.SubLogger(logger, "jetstream"), 157 d, 158 false, 159 160 // in-memory filter is inapplicable to appview so 161 // we'll never log dids anyway. 162 false, 163 ) 164 if err != nil { 165 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 166 } 167 168 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 169 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 170 } 171 172 var notifiers []notify.Notifier 173 174 // Always add the database notifier 175 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 176 177 // Add other notifiers in production only 178 if !config.Core.Dev { 179 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 180 } 181 notifiers = append(notifiers, indexer) 182 183 notifiers = append(notifiers, whnotify.NewNotifier(d)) 184 185 notifier := notify.NewMergedNotifier(notifiers) 186 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 187 188 ingester := appview.Ingester{ 189 Ctx: ctx, 190 Db: d, 191 Enforcer: enforcer, 192 Acl: aclService, 193 IdResolver: res, 194 Cache: rdb, 195 Config: config, 196 Logger: log.SubLogger(logger, "ingester"), 197 Validator: validator, 198 MentionsResolver: mentionsResolver, 199 Notifier: notifier, 200 Verifier: repoverify.New(res, config.Core.Dev), 201 } 202 err = jc.StartJetstream(ctx, ingester.Ingest()) 203 if err != nil { 204 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 205 } 206 207 go ingester.SweepPendingVerifications() 208 209 var cfClient *cloudflare.Client 210 if config.Cloudflare.ApiToken != "" { 211 cfClient, err = cloudflare.New(config) 212 if err != nil { 213 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 214 cfClient = nil 215 } 216 } 217 218 knotstream, err := Knotstream(ctx, config, d, aclService, enforcer, posthog, notifier, cfClient) 219 if err != nil { 220 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 221 } 222 knotstream.Start(ctx) 223 224 pipelineNotifier := pipelines.NewStatusNotifier() 225 226 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier) 227 if err != nil { 228 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 229 } 230 spindlestream.Start(ctx) 231 232 state := &State{ 233 db: d, 234 notifier: notifier, 235 indexer: indexer, 236 oauth: oauth, 237 enforcer: enforcer, 238 pages: pages, 239 idResolver: res, 240 rdb: rdb, 241 mentionsResolver: mentionsResolver, 242 posthog: posthog, 243 jc: jc, 244 config: config, 245 repoResolver: repoResolver, 246 aclService: aclService, 247 knotstream: knotstream, 248 spindlestream: spindlestream, 249 pipelineNotifier: pipelineNotifier, 250 logger: logger, 251 validator: validator, 252 cfClient: cfClient, 253 } 254 255 if config.Project.Enabled { 256 if config.Project.User == "" { 257 logger.Warn("project mode enabled but PROJECT_USER is not set") 258 } else { 259 logger.Info("running in project mode", "project_user", config.Project.User) 260 } 261 } 262 263 // fetch initial bluesky posts if configured 264 go fetchBskyPosts(ctx, res, config, d, logger) 265 266 return state, nil 267} 268 269func (s *State) Close() error { 270 // other close up logic goes here 271 return s.db.Close() 272} 273 274func (s *State) NewSSHServer() *pipelinessh.Server { 275 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh")) 276} 277 278func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 279 w.Header().Set("Content-Type", "text/plain") 280 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 281 282 securityTxt := `Contact: mailto:security@tangled.org 283Preferred-Languages: en 284Canonical: https://tangled.org/.well-known/security.txt 285Expires: 2030-01-01T21:59:00.000Z 286` 287 w.Write([]byte(securityTxt)) 288} 289 290func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 291 w.Header().Set("Content-Type", "text/plain") 292 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 293 294 robotsTxt := `# Hello, Tanglers! 295User-agent: * 296Allow: / 297Disallow: /*/*/settings 298Disallow: /settings 299Disallow: /*/*/compare 300Disallow: /*/*/fork 301 302Crawl-delay: 1 303` 304 w.Write([]byte(robotsTxt)) 305} 306 307func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 308 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 309 BaseParams: pages.BaseParamsFromContext(r.Context()), 310 }) 311} 312 313func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 314 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 315 BaseParams: pages.BaseParamsFromContext(r.Context()), 316 }) 317} 318 319func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 320 s.pages.Brand(w, pages.BrandParams{ 321 BaseParams: pages.BaseParamsFromContext(r.Context()), 322 }) 323} 324 325func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 326 user := s.oauth.GetMultiAccountUser(r) 327 if user == nil { 328 return 329 } 330 331 l := s.logger.With("handler", "UpgradeBanner") 332 l = l.With("did", user.Did) 333 334 regs, err := db.GetRegistrations( 335 s.db, 336 orm.FilterEq("did", user.Did), 337 orm.FilterEq("needs_upgrade", 1), 338 ) 339 if err != nil { 340 l.Error("non-fatal: failed to get registrations", "err", err) 341 } 342 343 spindles, err := db.GetSpindles( 344 r.Context(), 345 s.db, 346 orm.FilterEq("owner", user.Did), 347 orm.FilterEq("needs_upgrade", 1), 348 ) 349 if err != nil { 350 l.Error("non-fatal: failed to get spindles", "err", err) 351 } 352 353 if regs == nil && spindles == nil { 354 return 355 } 356 357 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 358 Registrations: regs, 359 Spindles: spindles, 360 }) 361} 362 363func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) { 364 // target is echoed back from the form via hx-vals so the response span's 365 // id matches the form's hx-target. Fallback keeps the handler useful if 366 // a caller forgets to send it. 367 target := strings.TrimSpace(r.FormValue("target")) 368 if target == "" { 369 target = "home" 370 } 371 372 w.Header().Set("Content-Type", "text/html") 373 374 emailAddr := strings.TrimSpace(r.FormValue("email")) 375 if !email.IsValidEmail(emailAddr) { 376 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{ 377 Id: target, 378 Error: "Invalid email address.", 379 }) 380 return 381 } 382 383 // For logged-in users, persist the signup locally so the widget stays 384 // hidden across devices. The DB row is the render-time source of truth; 385 // Resend still owns the mailing list itself. 386 if user := s.oauth.GetMultiAccountUser(r); user != nil { 387 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil { 388 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err) 389 } 390 } 391 392 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" { 393 go func() { 394 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil { 395 s.logger.Error("failed to add newsletter contact", "error", err) 396 } 397 }() 398 } else { 399 s.logger.Error( 400 "failed to add newsletter contact, missing resend config", 401 "isKeyPresent", s.config.Resend.ApiKey != "", 402 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "", 403 "emailAddr", emailAddr, 404 ) 405 } 406 407 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target}) 408} 409 410// NewsletterDismiss records that a logged-in user has dismissed the newsletter 411// widget so it stays hidden across their devices. Anonymous callers get a 204 412// with no DB write — localStorage handles the per-browser fallback. 413func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) { 414 user := s.oauth.GetMultiAccountUser(r) 415 if user == nil { 416 w.WriteHeader(http.StatusNoContent) 417 return 418 } 419 420 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil { 421 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err) 422 } 423 w.WriteHeader(http.StatusNoContent) 424} 425 426func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 427 user := chi.URLParam(r, "user") 428 user = strings.TrimPrefix(user, "@") 429 430 if user == "" { 431 w.WriteHeader(http.StatusBadRequest) 432 return 433 } 434 435 id, err := s.idResolver.ResolveIdent(r.Context(), user) 436 if err != nil { 437 w.WriteHeader(http.StatusInternalServerError) 438 return 439 } 440 441 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 442 if err != nil { 443 s.logger.Error("failed to get public keys", "err", err) 444 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 445 return 446 } 447 448 if len(pubKeys) == 0 { 449 w.WriteHeader(http.StatusNoContent) 450 return 451 } 452 453 for _, k := range pubKeys { 454 key := strings.TrimRight(k.Key, "\n") 455 fmt.Fprintln(w, key) 456 } 457} 458 459func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 460 switch r.Method { 461 case http.MethodGet: 462 user := s.oauth.GetMultiAccountUser(r) 463 knots := s.aclService.KnotsForUser(r.Context(), user.Did) 464 465 s.pages.NewRepo(w, pages.NewRepoParams{ 466 BaseParams: pages.BaseParamsFromContext(r.Context()), 467 Knots: knots, 468 }) 469 470 case http.MethodPost: 471 l := s.logger.With("handler", "NewRepo") 472 473 user := s.oauth.GetMultiAccountUser(r) 474 l = l.With("did", user.Did) 475 476 // form validation 477 domain := r.FormValue("domain") 478 if domain == "" { 479 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 480 return 481 } 482 l = l.With("knot", domain) 483 484 repoName := r.FormValue("name") 485 if repoName == "" { 486 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 487 return 488 } 489 490 if err := models.ValidateRepoName(repoName); err != nil { 491 s.pages.Notice(w, "repo", err.Error()) 492 return 493 } 494 repoName = models.StripGitExt(repoName) 495 rkey := strings.ToLower(repoName) 496 l = l.With("repoName", repoName, "rkey", rkey) 497 498 defaultBranch := r.FormValue("branch") 499 if defaultBranch == "" { 500 defaultBranch = "main" 501 } 502 l = l.With("defaultBranch", defaultBranch) 503 504 description := r.FormValue("description") 505 if len([]rune(description)) > 140 { 506 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 507 return 508 } 509 510 // ACL validation 511 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) { 512 l.Info("unauthorized") 513 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 514 return 515 } 516 517 // Check for existing repos 518 existingRepo, err := db.GetRepo( 519 s.db, 520 orm.FilterEq("did", user.Did), 521 orm.FilterEq("rkey", rkey), 522 ) 523 if err == nil && existingRepo != nil { 524 l.Info("repo exists") 525 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 526 return 527 } 528 529 atpClient, err := s.oauth.AuthorizedClient(r) 530 if err != nil { 531 l.Error("failed to get authorized client", "err", err) 532 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.") 533 return 534 } 535 536 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 537 l.Info("rkey occupied by prior rename alias") 538 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey)) 539 return 540 } 541 542 client, err := s.oauth.ServiceClient( 543 r, 544 oauth.WithService(domain), 545 oauth.WithLxm(tangled.RepoCreateNSID), 546 oauth.WithDev(s.config.Core.Dev), 547 ) 548 if err != nil { 549 l.Error("service auth failed", "err", err) 550 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.") 551 return 552 } 553 554 input := &tangled.RepoCreate_Input{ 555 Rkey: rkey, 556 Name: rkey, 557 DefaultBranch: &defaultBranch, 558 } 559 createResp, err := tangled.RepoCreate( 560 r.Context(), 561 client, 562 input, 563 ) 564 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 565 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 566 s.pages.Notice(w, "repo", err.Error()) 567 return 568 } 569 570 var repoDid string 571 if createResp != nil && createResp.RepoDid != nil { 572 repoDid = *createResp.RepoDid 573 } 574 if repoDid == "" { 575 l.Error("knot returned empty repo DID") 576 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 577 return 578 } 579 580 repo := &models.Repo{ 581 Did: user.Did, 582 Name: repoName, 583 Knot: domain, 584 Rkey: rkey, 585 Description: description, 586 Created: time.Now(), 587 Labels: s.config.Label.DefaultLabelDefs, 588 RepoDid: repoDid, 589 } 590 record := repo.AsRecord() 591 592 cleanupKnot := func() { 593 go func() { 594 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 595 for attempt, delay := range delays { 596 time.Sleep(delay) 597 deleteClient, dErr := s.oauth.ServiceClient( 598 r, 599 oauth.WithService(domain), 600 oauth.WithLxm(tangled.RepoDeleteNSID), 601 oauth.WithDev(s.config.Core.Dev), 602 ) 603 if dErr != nil { 604 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 605 continue 606 } 607 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 608 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 609 Did: user.Did, 610 Name: rkey, 611 Rkey: rkey, 612 }); dErr != nil { 613 cancel() 614 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 615 continue 616 } 617 cancel() 618 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 619 return 620 } 621 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 622 "did", user.Did, "repo", repoName, "knot", domain) 623 }() 624 } 625 626 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 627 Collection: tangled.RepoNSID, 628 Repo: user.Did, 629 Rkey: &rkey, 630 Record: &lexutil.LexiconTypeDecoder{ 631 Val: &record, 632 }, 633 }) 634 if err != nil { 635 l.Info("PDS write failed", "err", err) 636 cleanupKnot() 637 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 638 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey)) 639 } else { 640 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 641 } 642 return 643 } 644 645 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey) 646 l = l.With("aturi", aturi) 647 l.Info("wrote to PDS") 648 649 tx, err := s.db.BeginTx(r.Context(), nil) 650 if err != nil { 651 l.Info("txn failed", "err", err) 652 s.pages.Notice(w, "repo", "Failed to save repository information.") 653 return 654 } 655 656 rollback := func() { 657 err1 := tx.Rollback() 658 err2 := s.enforcer.E.LoadPolicy() 659 err3 := rollbackRecord(context.Background(), aturi, atpClient) 660 661 if errors.Is(err1, sql.ErrTxDone) { 662 err1 = nil 663 } 664 665 if errs := errors.Join(err1, err2, err3); errs != nil { 666 l.Error("failed to rollback changes", "errs", errs) 667 } 668 669 if aturi != "" { 670 cleanupKnot() 671 } 672 } 673 defer rollback() 674 675 err = db.AddRepo(tx, repo) 676 if err != nil { 677 l.Error("db write failed", "err", err) 678 s.pages.Notice(w, "repo", "Failed to save repository information.") 679 return 680 } 681 682 rbacPath := repo.RepoIdentifier() 683 err = s.enforcer.AddRepo(user.Did, domain, rbacPath) 684 if err != nil { 685 l.Error("acl setup failed", "err", err) 686 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 687 return 688 } 689 690 err = tx.Commit() 691 if err != nil { 692 l.Error("txn commit failed", "err", err) 693 http.Error(w, err.Error(), http.StatusInternalServerError) 694 return 695 } 696 697 err = s.enforcer.E.SavePolicy() 698 if err != nil { 699 l.Error("acl save failed", "err", err) 700 http.Error(w, err.Error(), http.StatusInternalServerError) 701 return 702 } 703 704 aturi = "" 705 706 s.notifier.NewRepo(r.Context(), repo) 707 switch { 708 case repoDid != "": 709 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 710 default: 711 handle := s.pages.DisplayHandle(r.Context(), user.Did) 712 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey)) 713 } 714 } 715} 716 717func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool { 718 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 719 defer cancel() 720 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey) 721 return err == nil && resp != nil 722} 723 724// this is used to rollback changes made to the PDS 725// 726// it is a no-op if the provided ATURI is empty 727func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 728 if aturi == "" { 729 return nil 730 } 731 732 parsed := syntax.ATURI(aturi) 733 734 collection := parsed.Collection().String() 735 repo := parsed.Authority().String() 736 rkey := parsed.RecordKey().String() 737 738 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 739 Collection: collection, 740 Repo: repo, 741 Rkey: rkey, 742 }) 743 return err 744} 745 746func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 747 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 748 if err != nil { 749 return err 750 } 751 // already present 752 if len(defaultLabels) == len(defaults) { 753 return nil 754 } 755 756 labelDefs, err := models.FetchLabelDefs(r, defaults) 757 if err != nil { 758 return err 759 } 760 761 // Insert each label definition to the database 762 for _, labelDef := range labelDefs { 763 _, err = db.AddLabelDefinition(e, &labelDef) 764 if err != nil { 765 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 766 } 767 } 768 769 return nil 770} 771 772func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 773 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 774 if err != nil { 775 logger.Error("failed to resolve tangled.org DID", "err", err) 776 return 777 } 778 779 pdsEndpoint := resolved.PDSEndpoint() 780 if pdsEndpoint == "" { 781 logger.Error("no PDS endpoint found for tangled.sh DID") 782 return 783 } 784 785 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 786 if err != nil { 787 logger.Error("failed to create appassword session... skipping fetch", "err", err) 788 return 789 } 790 791 l := log.SubLogger(logger, "bluesky") 792 793 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 794 defer ticker.Stop() 795 796 for { 797 // refresh session if necessary 798 if !session.IsValid() { 799 l.Debug("access token expired, refreshing session") 800 if err := session.RefreshSession(); err != nil { 801 l.Error("failed to refresh session, stopping bluesky updater", "err", err) 802 return 803 } 804 l.Debug("session refreshed") 805 } 806 807 // make client 808 client := xrpc.Client{ 809 Auth: &xrpc.AuthInfo{ 810 AccessJwt: session.AccessJwt, 811 Did: session.Did, 812 }, 813 Host: session.PdsEndpoint, 814 } 815 816 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 817 if err != nil { 818 l.Error("failed to fetch bluesky posts", "err", err) 819 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 820 l.Error("failed to insert bluesky posts", "err", err) 821 } else { 822 l.Info("inserted bluesky posts", "count", len(posts)) 823 } 824 825 select { 826 case <-ticker.C: 827 case <-ctx.Done(): 828 l.Info("stopping bluesky updater") 829 return 830 } 831 } 832}