Monorepo for Tangled tangled.org
12

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cloudflare" 18 "tangled.org/core/appview/codesearch" 19 "tangled.org/core/appview/config" 20 "tangled.org/core/appview/db" 21 "tangled.org/core/appview/email" 22 "tangled.org/core/appview/indexer" 23 "tangled.org/core/appview/knotacl" 24 "tangled.org/core/appview/knotcompat" 25 "tangled.org/core/appview/mentions" 26 "tangled.org/core/appview/models" 27 "tangled.org/core/appview/notify" 28 dbnotify "tangled.org/core/appview/notify/db" 29 lognotify "tangled.org/core/appview/notify/logging" 30 phnotify "tangled.org/core/appview/notify/posthog" 31 whnotify "tangled.org/core/appview/notify/webhook" 32 "tangled.org/core/appview/oauth" 33 "tangled.org/core/appview/pages" 34 "tangled.org/core/appview/pipelines" 35 pipelinessh "tangled.org/core/appview/pipelines/ssh" 36 "tangled.org/core/appview/reporesolver" 37 "tangled.org/core/appview/repoverify" 38 "tangled.org/core/appview/validator" 39 xrpcclient "tangled.org/core/appview/xrpcclient" 40 "tangled.org/core/consts" 41 "tangled.org/core/eventconsumer" 42 "tangled.org/core/idresolver" 43 "tangled.org/core/jetstream" 44 "tangled.org/core/log" 45 tlog "tangled.org/core/log" 46 "tangled.org/core/orm" 47 "tangled.org/core/rbac" 48 49 comatproto "github.com/bluesky-social/indigo/api/atproto" 50 "github.com/bluesky-social/indigo/atproto/atclient" 51 "github.com/bluesky-social/indigo/atproto/syntax" 52 lexutil "github.com/bluesky-social/indigo/lex/util" 53 "github.com/bluesky-social/indigo/xrpc" 54 55 "github.com/go-chi/chi/v5" 56 "github.com/posthog/posthog-go" 57) 58 59type State struct { 60 db *db.DB 61 notifier notify.Notifier 62 indexer *indexer.Indexer 63 oauth *oauth.OAuth 64 enforcer *rbac.Enforcer 65 pages *pages.Pages 66 idResolver *idresolver.Resolver 67 rdb *cache.Cache 68 mentionsResolver *mentions.Resolver 69 posthog posthog.Client 70 jc *jetstream.JetstreamClient 71 config *config.Config 72 repoResolver *reporesolver.RepoResolver 73 aclService *knotacl.Service 74 knotstream *eventconsumer.Consumer 75 spindlestream *eventconsumer.Consumer 76 pipelineNotifier *pipelines.StatusNotifier 77 logger *slog.Logger 78 validator *validator.Validator 79 cfClient *cloudflare.Client 80 codesearch *codesearch.CodeSearch 81} 82 83func Make(ctx context.Context, config *config.Config) (*State, error) { 84 logger := tlog.FromContext(ctx) 85 86 d, err := db.Make(ctx, config.Core.DbPath) 87 if err != nil { 88 return nil, fmt.Errorf("failed to create db: %w", err) 89 } 90 91 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 92 err = indexer.Init(ctx) 93 if err != nil { 94 return nil, fmt.Errorf("failed to create indexer: %w", err) 95 } 96 97 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 98 if err != nil { 99 return nil, fmt.Errorf("failed to create enforcer: %w", err) 100 } 101 102 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 103 if err != nil { 104 logger.Error("failed to create redis resolver", "err", err) 105 res = idresolver.DefaultResolver(config.Plc.PLCURL) 106 } 107 108 var rdb *cache.Cache 109 if config.Redis.Addr != "" { 110 rdb = cache.New(config.Redis.Addr) 111 } 112 113 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 114 if err != nil { 115 return nil, fmt.Errorf("failed to create posthog client: %w", err) 116 } 117 118 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages")) 119 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch"))) 120 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl")) 121 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth")) 122 if err != nil { 123 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 124 } 125 126 validator := validator.New(d, res, aclService) 127 128 repoResolver := reporesolver.New(config, aclService, d, rdb) 129 130 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 131 132 jc, err := jetstream.NewJetstreamClient( 133 config.Jetstream.Endpoint, 134 "appview", 135 []string{ 136 tangled.ActorProfileNSID, 137 tangled.FeedStarNSID, 138 tangled.FeedReactionNSID, 139 tangled.FeedCommentNSID, 140 tangled.GraphFollowNSID, 141 tangled.GraphVouchNSID, 142 tangled.KnotMemberNSID, 143 tangled.KnotNSID, 144 tangled.LabelDefinitionNSID, 145 tangled.LabelOpNSID, 146 tangled.PublicKeyNSID, 147 tangled.RepoArtifactNSID, 148 tangled.RepoIssueCommentNSID, 149 tangled.RepoIssueNSID, 150 tangled.RepoNSID, 151 tangled.RepoPullNSID, 152 tangled.RepoPullCommentNSID, 153 tangled.SpindleMemberNSID, 154 tangled.SpindleNSID, 155 tangled.StringNSID, 156 }, 157 nil, 158 tlog.SubLogger(logger, "jetstream"), 159 d, 160 false, 161 162 // in-memory filter is inapplicable to appview so 163 // we'll never log dids anyway. 164 false, 165 ) 166 if err != nil { 167 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 168 } 169 170 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 171 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 172 } 173 174 var notifiers []notify.Notifier 175 176 // Always add the database notifier 177 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 178 179 // Add other notifiers in production only 180 if !config.Core.Dev { 181 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 182 } 183 notifiers = append(notifiers, indexer) 184 185 notifiers = append(notifiers, whnotify.NewNotifier(d)) 186 187 notifier := notify.NewMergedNotifier(notifiers) 188 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 189 190 ingester := appview.Ingester{ 191 Ctx: ctx, 192 Db: d, 193 Enforcer: enforcer, 194 Acl: aclService, 195 IdResolver: res, 196 Cache: rdb, 197 Config: config, 198 Logger: log.SubLogger(logger, "ingester"), 199 Validator: validator, 200 MentionsResolver: mentionsResolver, 201 Notifier: notifier, 202 Verifier: repoverify.New(res, config.Core.Dev), 203 } 204 err = jc.StartJetstream(ctx, ingester.Ingest()) 205 if err != nil { 206 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 207 } 208 209 go ingester.SweepPendingVerifications() 210 211 var cfClient *cloudflare.Client 212 if config.Cloudflare.ApiToken != "" { 213 cfClient, err = cloudflare.New(config) 214 if err != nil { 215 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 216 cfClient = nil 217 } 218 } 219 220 knotstream, err := Knotstream(ctx, config, d, aclService, enforcer, posthog, notifier, cfClient) 221 if err != nil { 222 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 223 } 224 knotstream.Start(ctx) 225 226 pipelineNotifier := pipelines.NewStatusNotifier() 227 228 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier) 229 if err != nil { 230 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 231 } 232 spindlestream.Start(ctx) 233 234 state := &State{ 235 db: d, 236 notifier: notifier, 237 indexer: indexer, 238 oauth: oauth, 239 enforcer: enforcer, 240 pages: pages, 241 idResolver: res, 242 rdb: rdb, 243 mentionsResolver: mentionsResolver, 244 posthog: posthog, 245 jc: jc, 246 config: config, 247 repoResolver: repoResolver, 248 aclService: aclService, 249 knotstream: knotstream, 250 spindlestream: spindlestream, 251 pipelineNotifier: pipelineNotifier, 252 logger: logger, 253 validator: validator, 254 cfClient: cfClient, 255 codesearch: &codesearch.CodeSearch{Host: config.CodeSearch.ZoektUrl}, 256 } 257 258 // fetch initial bluesky posts if configured 259 go fetchBskyPosts(ctx, res, config, d, logger) 260 261 return state, nil 262} 263 264func (s *State) Close() error { 265 // other close up logic goes here 266 return s.db.Close() 267} 268 269func (s *State) NewSSHServer() *pipelinessh.Server { 270 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh")) 271} 272 273func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 274 w.Header().Set("Content-Type", "text/plain") 275 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 276 277 securityTxt := `Contact: mailto:security@tangled.org 278Preferred-Languages: en 279Canonical: https://tangled.org/.well-known/security.txt 280Expires: 2030-01-01T21:59:00.000Z 281` 282 w.Write([]byte(securityTxt)) 283} 284 285func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 286 w.Header().Set("Content-Type", "text/plain") 287 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 288 289 robotsTxt := `# Hello, Tanglers! 290User-agent: * 291Allow: / 292Disallow: /*/*/settings 293Disallow: /settings 294Disallow: /*/*/compare 295Disallow: /*/*/fork 296 297Crawl-delay: 1 298` 299 w.Write([]byte(robotsTxt)) 300} 301 302func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 303 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 304 BaseParams: pages.BaseParamsFromContext(r.Context()), 305 }) 306} 307 308func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 309 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 310 BaseParams: pages.BaseParamsFromContext(r.Context()), 311 }) 312} 313 314func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 315 s.pages.Brand(w, pages.BrandParams{ 316 BaseParams: pages.BaseParamsFromContext(r.Context()), 317 }) 318} 319 320func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 321 user := s.oauth.GetMultiAccountUser(r) 322 if user == nil { 323 return 324 } 325 326 l := s.logger.With("handler", "UpgradeBanner") 327 l = l.With("did", user.Did) 328 329 regs, err := db.GetRegistrations( 330 s.db, 331 orm.FilterEq("did", user.Did), 332 orm.FilterEq("needs_upgrade", 1), 333 ) 334 if err != nil { 335 l.Error("non-fatal: failed to get registrations", "err", err) 336 } 337 338 spindles, err := db.GetSpindles( 339 r.Context(), 340 s.db, 341 orm.FilterEq("owner", user.Did), 342 orm.FilterEq("needs_upgrade", 1), 343 ) 344 if err != nil { 345 l.Error("non-fatal: failed to get spindles", "err", err) 346 } 347 348 if regs == nil && spindles == nil { 349 return 350 } 351 352 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 353 Registrations: regs, 354 Spindles: spindles, 355 }) 356} 357 358func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) { 359 // target is echoed back from the form via hx-vals so the response span's 360 // id matches the form's hx-target. Fallback keeps the handler useful if 361 // a caller forgets to send it. 362 target := strings.TrimSpace(r.FormValue("target")) 363 if target == "" { 364 target = "home" 365 } 366 367 w.Header().Set("Content-Type", "text/html") 368 369 emailAddr := strings.TrimSpace(r.FormValue("email")) 370 if !email.IsValidEmail(emailAddr) { 371 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{ 372 Id: target, 373 Error: "Invalid email address.", 374 }) 375 return 376 } 377 378 // For logged-in users, persist the signup locally so the widget stays 379 // hidden across devices. The DB row is the render-time source of truth; 380 // Resend still owns the mailing list itself. 381 if user := s.oauth.GetMultiAccountUser(r); user != nil { 382 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil { 383 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err) 384 } 385 } 386 387 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" { 388 go func() { 389 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil { 390 s.logger.Error("failed to add newsletter contact", "error", err) 391 } 392 }() 393 } else { 394 s.logger.Error( 395 "failed to add newsletter contact, missing resend config", 396 "isKeyPresent", s.config.Resend.ApiKey != "", 397 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "", 398 "emailAddr", emailAddr, 399 ) 400 } 401 402 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target}) 403} 404 405// NewsletterDismiss records that a logged-in user has dismissed the newsletter 406// widget so it stays hidden across their devices. Anonymous callers get a 204 407// with no DB write — localStorage handles the per-browser fallback. 408func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) { 409 user := s.oauth.GetMultiAccountUser(r) 410 if user == nil { 411 w.WriteHeader(http.StatusNoContent) 412 return 413 } 414 415 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil { 416 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err) 417 } 418 w.WriteHeader(http.StatusNoContent) 419} 420 421func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 422 user := chi.URLParam(r, "user") 423 user = strings.TrimPrefix(user, "@") 424 user = strings.TrimSuffix(user, ".keys") 425 426 if user == "" { 427 w.WriteHeader(http.StatusBadRequest) 428 return 429 } 430 431 id, err := s.idResolver.ResolveIdent(r.Context(), user) 432 if err != nil { 433 w.WriteHeader(http.StatusInternalServerError) 434 return 435 } 436 437 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 438 if err != nil { 439 s.logger.Error("failed to get public keys", "err", err) 440 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 441 return 442 } 443 444 if len(pubKeys) == 0 { 445 w.WriteHeader(http.StatusNoContent) 446 return 447 } 448 449 for _, k := range pubKeys { 450 key := strings.TrimRight(k.Key, "\n") 451 fmt.Fprintln(w, key) 452 } 453} 454 455func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 456 switch r.Method { 457 case http.MethodGet: 458 user := s.oauth.GetMultiAccountUser(r) 459 knots := s.aclService.KnotsForUser(r.Context(), user.Did) 460 461 s.pages.NewRepo(w, pages.NewRepoParams{ 462 BaseParams: pages.BaseParamsFromContext(r.Context()), 463 Knots: knots, 464 }) 465 466 case http.MethodPost: 467 l := s.logger.With("handler", "NewRepo") 468 469 user := s.oauth.GetMultiAccountUser(r) 470 l = l.With("did", user.Did) 471 472 // form validation 473 domain := r.FormValue("domain") 474 if domain == "" { 475 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 476 return 477 } 478 l = l.With("knot", domain) 479 480 repoName := r.FormValue("name") 481 if repoName == "" { 482 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 483 return 484 } 485 486 if err := models.ValidateRepoName(repoName); err != nil { 487 s.pages.Notice(w, "repo", err.Error()) 488 return 489 } 490 repoName = models.StripGitExt(repoName) 491 rkey := strings.ToLower(repoName) 492 l = l.With("repoName", repoName, "rkey", rkey) 493 494 defaultBranch := r.FormValue("branch") 495 if defaultBranch == "" { 496 defaultBranch = "main" 497 } 498 l = l.With("defaultBranch", defaultBranch) 499 500 description := r.FormValue("description") 501 if len([]rune(description)) > 140 { 502 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 503 return 504 } 505 506 // ACL validation 507 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) { 508 l.Info("unauthorized") 509 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 510 return 511 } 512 513 // Check for existing repos 514 existingRepo, err := db.GetRepo( 515 s.db, 516 orm.FilterEq("did", user.Did), 517 orm.FilterEq("rkey", rkey), 518 ) 519 if err == nil && existingRepo != nil { 520 l.Info("repo exists") 521 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 522 return 523 } 524 525 atpClient, err := s.oauth.AuthorizedClient(r) 526 if err != nil { 527 l.Error("failed to get authorized client", "err", err) 528 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.") 529 return 530 } 531 532 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 533 l.Info("rkey occupied by prior rename alias") 534 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey)) 535 return 536 } 537 538 client, err := s.oauth.ServiceClient( 539 r, 540 oauth.WithService(domain), 541 oauth.WithLxm(tangled.RepoCreateNSID), 542 oauth.WithDev(s.config.Core.Dev), 543 ) 544 if err != nil { 545 l.Error("service auth failed", "err", err) 546 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.") 547 return 548 } 549 550 input := &tangled.RepoCreate_Input{ 551 Rkey: rkey, 552 Name: rkey, 553 DefaultBranch: &defaultBranch, 554 } 555 createResp, err := tangled.RepoCreate( 556 r.Context(), 557 client, 558 input, 559 ) 560 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 561 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 562 s.pages.Notice(w, "repo", err.Error()) 563 return 564 } 565 566 var repoDid string 567 if createResp != nil && createResp.RepoDid != nil { 568 repoDid = *createResp.RepoDid 569 } 570 if repoDid == "" { 571 l.Error("knot returned empty repo DID") 572 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 573 return 574 } 575 576 repo := &models.Repo{ 577 Did: user.Did, 578 Name: repoName, 579 Knot: domain, 580 Rkey: rkey, 581 Description: description, 582 Created: time.Now(), 583 Labels: s.config.Label.DefaultLabelDefs, 584 RepoDid: repoDid, 585 } 586 record := repo.AsRecord() 587 588 cleanupKnot := func() { 589 go func() { 590 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 591 for attempt, delay := range delays { 592 time.Sleep(delay) 593 deleteClient, dErr := s.oauth.ServiceClient( 594 r, 595 oauth.WithService(domain), 596 oauth.WithLxm(tangled.RepoDeleteNSID), 597 oauth.WithDev(s.config.Core.Dev), 598 ) 599 if dErr != nil { 600 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 601 continue 602 } 603 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 604 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 605 Did: user.Did, 606 Name: rkey, 607 Rkey: rkey, 608 }); dErr != nil { 609 cancel() 610 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 611 continue 612 } 613 cancel() 614 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 615 return 616 } 617 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 618 "did", user.Did, "repo", repoName, "knot", domain) 619 }() 620 } 621 622 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 623 Collection: tangled.RepoNSID, 624 Repo: user.Did, 625 Rkey: &rkey, 626 Record: &lexutil.LexiconTypeDecoder{ 627 Val: &record, 628 }, 629 }) 630 if err != nil { 631 l.Info("PDS write failed", "err", err) 632 cleanupKnot() 633 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 634 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey)) 635 } else { 636 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 637 } 638 return 639 } 640 641 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey) 642 l = l.With("aturi", aturi) 643 l.Info("wrote to PDS") 644 645 tx, err := s.db.BeginTx(r.Context(), nil) 646 if err != nil { 647 l.Info("txn failed", "err", err) 648 s.pages.Notice(w, "repo", "Failed to save repository information.") 649 return 650 } 651 652 rollback := func() { 653 err1 := tx.Rollback() 654 err2 := s.enforcer.E.LoadPolicy() 655 err3 := rollbackRecord(context.Background(), aturi, atpClient) 656 657 if errors.Is(err1, sql.ErrTxDone) { 658 err1 = nil 659 } 660 661 if errs := errors.Join(err1, err2, err3); errs != nil { 662 l.Error("failed to rollback changes", "errs", errs) 663 } 664 665 if aturi != "" { 666 cleanupKnot() 667 } 668 } 669 defer rollback() 670 671 err = db.AddRepo(tx, repo) 672 if err != nil { 673 l.Error("db write failed", "err", err) 674 s.pages.Notice(w, "repo", "Failed to save repository information.") 675 return 676 } 677 678 rbacPath := repo.RepoIdentifier() 679 err = s.enforcer.AddRepo(user.Did, domain, rbacPath) 680 if err != nil { 681 l.Error("acl setup failed", "err", err) 682 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 683 return 684 } 685 686 err = tx.Commit() 687 if err != nil { 688 l.Error("txn commit failed", "err", err) 689 http.Error(w, err.Error(), http.StatusInternalServerError) 690 return 691 } 692 693 err = s.enforcer.E.SavePolicy() 694 if err != nil { 695 l.Error("acl save failed", "err", err) 696 http.Error(w, err.Error(), http.StatusInternalServerError) 697 return 698 } 699 700 aturi = "" 701 702 s.notifier.NewRepo(r.Context(), repo) 703 switch { 704 case repoDid != "": 705 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 706 default: 707 handle := s.pages.DisplayHandle(r.Context(), user.Did) 708 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey)) 709 } 710 } 711} 712 713func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool { 714 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 715 defer cancel() 716 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey) 717 return err == nil && resp != nil 718} 719 720// this is used to rollback changes made to the PDS 721// 722// it is a no-op if the provided ATURI is empty 723func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 724 if aturi == "" { 725 return nil 726 } 727 728 parsed := syntax.ATURI(aturi) 729 730 collection := parsed.Collection().String() 731 repo := parsed.Authority().String() 732 rkey := parsed.RecordKey().String() 733 734 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 735 Collection: collection, 736 Repo: repo, 737 Rkey: rkey, 738 }) 739 return err 740} 741 742func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 743 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 744 if err != nil { 745 return err 746 } 747 // already present 748 if len(defaultLabels) == len(defaults) { 749 return nil 750 } 751 752 labelDefs, err := models.FetchLabelDefs(r, defaults) 753 if err != nil { 754 return err 755 } 756 757 // Insert each label definition to the database 758 for _, labelDef := range labelDefs { 759 _, err = db.AddLabelDefinition(e, &labelDef) 760 if err != nil { 761 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 762 } 763 } 764 765 return nil 766} 767 768func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 769 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 770 if err != nil { 771 logger.Error("failed to resolve tangled.org DID", "err", err) 772 return 773 } 774 775 pdsEndpoint := resolved.PDSEndpoint() 776 if pdsEndpoint == "" { 777 logger.Error("no PDS endpoint found for tangled.sh DID") 778 return 779 } 780 781 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 782 if err != nil { 783 logger.Error("failed to create appassword session... skipping fetch", "err", err) 784 return 785 } 786 787 l := log.SubLogger(logger, "bluesky") 788 789 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 790 defer ticker.Stop() 791 792 for { 793 // refresh session if necessary 794 if !session.IsValid() { 795 l.Debug("access token expired, refreshing session") 796 if err := session.RefreshSession(); err != nil { 797 l.Error("failed to refresh session, stopping bluesky updater", "err", err) 798 return 799 } 800 l.Debug("session refreshed") 801 } 802 803 // make client 804 client := xrpc.Client{ 805 Auth: &xrpc.AuthInfo{ 806 AccessJwt: session.AccessJwt, 807 Did: session.Did, 808 }, 809 Host: session.PdsEndpoint, 810 } 811 812 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 813 if err != nil { 814 l.Error("failed to fetch bluesky posts", "err", err) 815 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 816 l.Error("failed to insert bluesky posts", "err", err) 817 } else { 818 l.Info("inserted bluesky posts", "count", len(posts)) 819 } 820 821 select { 822 case <-ticker.C: 823 case <-ctx.Done(): 824 l.Info("stopping bluesky updater") 825 return 826 } 827 } 828}