Monorepo for Tangled tangled.org
8

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cloudflare" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/email" 21 "tangled.org/core/appview/indexer" 22 "tangled.org/core/appview/knotacl" 23 "tangled.org/core/appview/knotcompat" 24 "tangled.org/core/appview/mentions" 25 "tangled.org/core/appview/models" 26 "tangled.org/core/appview/notify" 27 dbnotify "tangled.org/core/appview/notify/db" 28 lognotify "tangled.org/core/appview/notify/logging" 29 phnotify "tangled.org/core/appview/notify/posthog" 30 whnotify "tangled.org/core/appview/notify/webhook" 31 "tangled.org/core/appview/oauth" 32 "tangled.org/core/appview/pages" 33 "tangled.org/core/appview/pipelines" 34 pipelinessh "tangled.org/core/appview/pipelines/ssh" 35 "tangled.org/core/appview/reporesolver" 36 "tangled.org/core/appview/repoverify" 37 "tangled.org/core/appview/validator" 38 xrpcclient "tangled.org/core/appview/xrpcclient" 39 "tangled.org/core/consts" 40 "tangled.org/core/eventconsumer" 41 "tangled.org/core/idresolver" 42 "tangled.org/core/jetstream" 43 "tangled.org/core/log" 44 tlog "tangled.org/core/log" 45 "tangled.org/core/orm" 46 "tangled.org/core/rbac" 47 48 comatproto "github.com/bluesky-social/indigo/api/atproto" 49 "github.com/bluesky-social/indigo/atproto/atclient" 50 "github.com/bluesky-social/indigo/atproto/syntax" 51 lexutil "github.com/bluesky-social/indigo/lex/util" 52 "github.com/bluesky-social/indigo/xrpc" 53 54 "github.com/go-chi/chi/v5" 55 "github.com/posthog/posthog-go" 56) 57 58type State struct { 59 db *db.DB 60 notifier notify.Notifier 61 indexer *indexer.Indexer 62 oauth *oauth.OAuth 63 enforcer *rbac.Enforcer 64 pages *pages.Pages 65 idResolver *idresolver.Resolver 66 rdb *cache.Cache 67 mentionsResolver *mentions.Resolver 68 posthog posthog.Client 69 jc *jetstream.JetstreamClient 70 config *config.Config 71 repoResolver *reporesolver.RepoResolver 72 aclService *knotacl.Service 73 knotstream *eventconsumer.Consumer 74 spindlestream *eventconsumer.Consumer 75 pipelineNotifier *pipelines.StatusNotifier 76 logger *slog.Logger 77 validator *validator.Validator 78 cfClient *cloudflare.Client 79} 80 81func Make(ctx context.Context, config *config.Config) (*State, error) { 82 logger := tlog.FromContext(ctx) 83 84 d, err := db.Make(ctx, config.Core.DbPath) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create db: %w", err) 87 } 88 89 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 90 err = indexer.Init(ctx) 91 if err != nil { 92 return nil, fmt.Errorf("failed to create indexer: %w", err) 93 } 94 95 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 96 if err != nil { 97 return nil, fmt.Errorf("failed to create enforcer: %w", err) 98 } 99 100 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 101 if err != nil { 102 logger.Error("failed to create redis resolver", "err", err) 103 res = idresolver.DefaultResolver(config.Plc.PLCURL) 104 } 105 106 var rdb *cache.Cache 107 if config.Redis.Addr != "" { 108 rdb = cache.New(config.Redis.Addr) 109 } 110 111 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 112 if err != nil { 113 return nil, fmt.Errorf("failed to create posthog client: %w", err) 114 } 115 116 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages")) 117 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch"))) 118 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl")) 119 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth")) 120 if err != nil { 121 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 122 } 123 124 validator := validator.New(d, res, aclService) 125 126 repoResolver := reporesolver.New(config, aclService, d, rdb) 127 128 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 129 130 jc, err := jetstream.NewJetstreamClient( 131 config.Jetstream.Endpoint, 132 "appview", 133 []string{ 134 tangled.ActorProfileNSID, 135 tangled.FeedStarNSID, 136 tangled.FeedReactionNSID, 137 tangled.FeedCommentNSID, 138 tangled.GraphFollowNSID, 139 tangled.GraphVouchNSID, 140 tangled.KnotMemberNSID, 141 tangled.KnotNSID, 142 tangled.LabelDefinitionNSID, 143 tangled.LabelOpNSID, 144 tangled.PublicKeyNSID, 145 tangled.RepoArtifactNSID, 146 tangled.RepoIssueCommentNSID, 147 tangled.RepoIssueNSID, 148 tangled.RepoNSID, 149 tangled.RepoPullNSID, 150 tangled.RepoPullCommentNSID, 151 tangled.SpindleMemberNSID, 152 tangled.SpindleNSID, 153 tangled.StringNSID, 154 }, 155 nil, 156 tlog.SubLogger(logger, "jetstream"), 157 d, 158 false, 159 160 // in-memory filter is inapplicable to appview so 161 // we'll never log dids anyway. 162 false, 163 ) 164 if err != nil { 165 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 166 } 167 168 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 169 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 170 } 171 172 var notifiers []notify.Notifier 173 174 // Always add the database notifier 175 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 176 177 // Add other notifiers in production only 178 if !config.Core.Dev { 179 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 180 } 181 notifiers = append(notifiers, indexer) 182 183 notifiers = append(notifiers, whnotify.NewNotifier(d)) 184 185 notifier := notify.NewMergedNotifier(notifiers) 186 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 187 188 ingester := appview.Ingester{ 189 Ctx: ctx, 190 Db: d, 191 Enforcer: enforcer, 192 Acl: aclService, 193 IdResolver: res, 194 Cache: rdb, 195 Config: config, 196 Logger: log.SubLogger(logger, "ingester"), 197 Validator: validator, 198 MentionsResolver: mentionsResolver, 199 Notifier: notifier, 200 Verifier: repoverify.New(res, config.Core.Dev), 201 } 202 err = jc.StartJetstream(ctx, ingester.Ingest()) 203 if err != nil { 204 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 205 } 206 207 go ingester.SweepPendingVerifications() 208 209 var cfClient *cloudflare.Client 210 if config.Cloudflare.ApiToken != "" { 211 cfClient, err = cloudflare.New(config) 212 if err != nil { 213 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 214 cfClient = nil 215 } 216 } 217 218 knotstream, err := Knotstream(ctx, config, d, aclService, enforcer, posthog, notifier, cfClient) 219 if err != nil { 220 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 221 } 222 knotstream.Start(ctx) 223 224 pipelineNotifier := pipelines.NewStatusNotifier() 225 226 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier) 227 if err != nil { 228 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 229 } 230 spindlestream.Start(ctx) 231 232 state := &State{ 233 db: d, 234 notifier: notifier, 235 indexer: indexer, 236 oauth: oauth, 237 enforcer: enforcer, 238 pages: pages, 239 idResolver: res, 240 rdb: rdb, 241 mentionsResolver: mentionsResolver, 242 posthog: posthog, 243 jc: jc, 244 config: config, 245 repoResolver: repoResolver, 246 aclService: aclService, 247 knotstream: knotstream, 248 spindlestream: spindlestream, 249 pipelineNotifier: pipelineNotifier, 250 logger: logger, 251 validator: validator, 252 cfClient: cfClient, 253 } 254 255 // fetch initial bluesky posts if configured 256 go fetchBskyPosts(ctx, res, config, d, logger) 257 258 return state, nil 259} 260 261func (s *State) Close() error { 262 // other close up logic goes here 263 return s.db.Close() 264} 265 266func (s *State) NewSSHServer() *pipelinessh.Server { 267 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh")) 268} 269 270func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 271 w.Header().Set("Content-Type", "text/plain") 272 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 273 274 securityTxt := `Contact: mailto:security@tangled.org 275Preferred-Languages: en 276Canonical: https://tangled.org/.well-known/security.txt 277Expires: 2030-01-01T21:59:00.000Z 278` 279 w.Write([]byte(securityTxt)) 280} 281 282func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 283 w.Header().Set("Content-Type", "text/plain") 284 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 285 286 robotsTxt := `# Hello, Tanglers! 287User-agent: * 288Allow: / 289Disallow: /*/*/settings 290Disallow: /settings 291Disallow: /*/*/compare 292Disallow: /*/*/fork 293 294Crawl-delay: 1 295` 296 w.Write([]byte(robotsTxt)) 297} 298 299func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 300 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 301 BaseParams: pages.BaseParamsFromContext(r.Context()), 302 }) 303} 304 305func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 306 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 307 BaseParams: pages.BaseParamsFromContext(r.Context()), 308 }) 309} 310 311func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 312 s.pages.Brand(w, pages.BrandParams{ 313 BaseParams: pages.BaseParamsFromContext(r.Context()), 314 }) 315} 316 317func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 318 user := s.oauth.GetMultiAccountUser(r) 319 if user == nil { 320 return 321 } 322 323 l := s.logger.With("handler", "UpgradeBanner") 324 l = l.With("did", user.Did) 325 326 regs, err := db.GetRegistrations( 327 s.db, 328 orm.FilterEq("did", user.Did), 329 orm.FilterEq("needs_upgrade", 1), 330 ) 331 if err != nil { 332 l.Error("non-fatal: failed to get registrations", "err", err) 333 } 334 335 spindles, err := db.GetSpindles( 336 r.Context(), 337 s.db, 338 orm.FilterEq("owner", user.Did), 339 orm.FilterEq("needs_upgrade", 1), 340 ) 341 if err != nil { 342 l.Error("non-fatal: failed to get spindles", "err", err) 343 } 344 345 if regs == nil && spindles == nil { 346 return 347 } 348 349 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 350 Registrations: regs, 351 Spindles: spindles, 352 }) 353} 354 355func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) { 356 // target is echoed back from the form via hx-vals so the response span's 357 // id matches the form's hx-target. Fallback keeps the handler useful if 358 // a caller forgets to send it. 359 target := strings.TrimSpace(r.FormValue("target")) 360 if target == "" { 361 target = "home" 362 } 363 364 w.Header().Set("Content-Type", "text/html") 365 366 emailAddr := strings.TrimSpace(r.FormValue("email")) 367 if !email.IsValidEmail(emailAddr) { 368 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{ 369 Id: target, 370 Error: "Invalid email address.", 371 }) 372 return 373 } 374 375 // For logged-in users, persist the signup locally so the widget stays 376 // hidden across devices. The DB row is the render-time source of truth; 377 // Resend still owns the mailing list itself. 378 if user := s.oauth.GetMultiAccountUser(r); user != nil { 379 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil { 380 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err) 381 } 382 } 383 384 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" { 385 go func() { 386 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil { 387 s.logger.Error("failed to add newsletter contact", "error", err) 388 } 389 }() 390 } else { 391 s.logger.Error( 392 "failed to add newsletter contact, missing resend config", 393 "isKeyPresent", s.config.Resend.ApiKey != "", 394 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "", 395 "emailAddr", emailAddr, 396 ) 397 } 398 399 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target}) 400} 401 402// NewsletterDismiss records that a logged-in user has dismissed the newsletter 403// widget so it stays hidden across their devices. Anonymous callers get a 204 404// with no DB write — localStorage handles the per-browser fallback. 405func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) { 406 user := s.oauth.GetMultiAccountUser(r) 407 if user == nil { 408 w.WriteHeader(http.StatusNoContent) 409 return 410 } 411 412 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil { 413 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err) 414 } 415 w.WriteHeader(http.StatusNoContent) 416} 417 418func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 419 user := chi.URLParam(r, "user") 420 user = strings.TrimPrefix(user, "@") 421 user = strings.TrimSuffix(user, ".keys") 422 423 if user == "" { 424 w.WriteHeader(http.StatusBadRequest) 425 return 426 } 427 428 id, err := s.idResolver.ResolveIdent(r.Context(), user) 429 if err != nil { 430 w.WriteHeader(http.StatusInternalServerError) 431 return 432 } 433 434 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 435 if err != nil { 436 s.logger.Error("failed to get public keys", "err", err) 437 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 438 return 439 } 440 441 if len(pubKeys) == 0 { 442 w.WriteHeader(http.StatusNoContent) 443 return 444 } 445 446 for _, k := range pubKeys { 447 key := strings.TrimRight(k.Key, "\n") 448 fmt.Fprintln(w, key) 449 } 450} 451 452func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 453 switch r.Method { 454 case http.MethodGet: 455 user := s.oauth.GetMultiAccountUser(r) 456 knots := s.aclService.KnotsForUser(r.Context(), user.Did) 457 458 s.pages.NewRepo(w, pages.NewRepoParams{ 459 BaseParams: pages.BaseParamsFromContext(r.Context()), 460 Knots: knots, 461 }) 462 463 case http.MethodPost: 464 l := s.logger.With("handler", "NewRepo") 465 466 user := s.oauth.GetMultiAccountUser(r) 467 l = l.With("did", user.Did) 468 469 // form validation 470 domain := r.FormValue("domain") 471 if domain == "" { 472 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 473 return 474 } 475 l = l.With("knot", domain) 476 477 repoName := r.FormValue("name") 478 if repoName == "" { 479 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 480 return 481 } 482 483 if err := models.ValidateRepoName(repoName); err != nil { 484 s.pages.Notice(w, "repo", err.Error()) 485 return 486 } 487 repoName = models.StripGitExt(repoName) 488 rkey := strings.ToLower(repoName) 489 l = l.With("repoName", repoName, "rkey", rkey) 490 491 defaultBranch := r.FormValue("branch") 492 if defaultBranch == "" { 493 defaultBranch = "main" 494 } 495 l = l.With("defaultBranch", defaultBranch) 496 497 description := r.FormValue("description") 498 if len([]rune(description)) > 140 { 499 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 500 return 501 } 502 503 // ACL validation 504 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) { 505 l.Info("unauthorized") 506 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 507 return 508 } 509 510 // Check for existing repos 511 existingRepo, err := db.GetRepo( 512 s.db, 513 orm.FilterEq("did", user.Did), 514 orm.FilterEq("rkey", rkey), 515 ) 516 if err == nil && existingRepo != nil { 517 l.Info("repo exists") 518 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 519 return 520 } 521 522 atpClient, err := s.oauth.AuthorizedClient(r) 523 if err != nil { 524 l.Error("failed to get authorized client", "err", err) 525 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.") 526 return 527 } 528 529 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 530 l.Info("rkey occupied by prior rename alias") 531 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey)) 532 return 533 } 534 535 client, err := s.oauth.ServiceClient( 536 r, 537 oauth.WithService(domain), 538 oauth.WithLxm(tangled.RepoCreateNSID), 539 oauth.WithDev(s.config.Core.Dev), 540 ) 541 if err != nil { 542 l.Error("service auth failed", "err", err) 543 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.") 544 return 545 } 546 547 input := &tangled.RepoCreate_Input{ 548 Rkey: rkey, 549 Name: rkey, 550 DefaultBranch: &defaultBranch, 551 } 552 createResp, err := tangled.RepoCreate( 553 r.Context(), 554 client, 555 input, 556 ) 557 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 558 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 559 s.pages.Notice(w, "repo", err.Error()) 560 return 561 } 562 563 var repoDid string 564 if createResp != nil && createResp.RepoDid != nil { 565 repoDid = *createResp.RepoDid 566 } 567 if repoDid == "" { 568 l.Error("knot returned empty repo DID") 569 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 570 return 571 } 572 573 repo := &models.Repo{ 574 Did: user.Did, 575 Name: repoName, 576 Knot: domain, 577 Rkey: rkey, 578 Description: description, 579 Created: time.Now(), 580 Labels: s.config.Label.DefaultLabelDefs, 581 RepoDid: repoDid, 582 } 583 record := repo.AsRecord() 584 585 cleanupKnot := func() { 586 go func() { 587 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 588 for attempt, delay := range delays { 589 time.Sleep(delay) 590 deleteClient, dErr := s.oauth.ServiceClient( 591 r, 592 oauth.WithService(domain), 593 oauth.WithLxm(tangled.RepoDeleteNSID), 594 oauth.WithDev(s.config.Core.Dev), 595 ) 596 if dErr != nil { 597 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 598 continue 599 } 600 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 601 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 602 Did: user.Did, 603 Name: rkey, 604 Rkey: rkey, 605 }); dErr != nil { 606 cancel() 607 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 608 continue 609 } 610 cancel() 611 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 612 return 613 } 614 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 615 "did", user.Did, "repo", repoName, "knot", domain) 616 }() 617 } 618 619 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 620 Collection: tangled.RepoNSID, 621 Repo: user.Did, 622 Rkey: &rkey, 623 Record: &lexutil.LexiconTypeDecoder{ 624 Val: &record, 625 }, 626 }) 627 if err != nil { 628 l.Info("PDS write failed", "err", err) 629 cleanupKnot() 630 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 631 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey)) 632 } else { 633 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 634 } 635 return 636 } 637 638 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey) 639 l = l.With("aturi", aturi) 640 l.Info("wrote to PDS") 641 642 tx, err := s.db.BeginTx(r.Context(), nil) 643 if err != nil { 644 l.Info("txn failed", "err", err) 645 s.pages.Notice(w, "repo", "Failed to save repository information.") 646 return 647 } 648 649 rollback := func() { 650 err1 := tx.Rollback() 651 err2 := s.enforcer.E.LoadPolicy() 652 err3 := rollbackRecord(context.Background(), aturi, atpClient) 653 654 if errors.Is(err1, sql.ErrTxDone) { 655 err1 = nil 656 } 657 658 if errs := errors.Join(err1, err2, err3); errs != nil { 659 l.Error("failed to rollback changes", "errs", errs) 660 } 661 662 if aturi != "" { 663 cleanupKnot() 664 } 665 } 666 defer rollback() 667 668 err = db.AddRepo(tx, repo) 669 if err != nil { 670 l.Error("db write failed", "err", err) 671 s.pages.Notice(w, "repo", "Failed to save repository information.") 672 return 673 } 674 675 rbacPath := repo.RepoIdentifier() 676 err = s.enforcer.AddRepo(user.Did, domain, rbacPath) 677 if err != nil { 678 l.Error("acl setup failed", "err", err) 679 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 680 return 681 } 682 683 err = tx.Commit() 684 if err != nil { 685 l.Error("txn commit failed", "err", err) 686 http.Error(w, err.Error(), http.StatusInternalServerError) 687 return 688 } 689 690 err = s.enforcer.E.SavePolicy() 691 if err != nil { 692 l.Error("acl save failed", "err", err) 693 http.Error(w, err.Error(), http.StatusInternalServerError) 694 return 695 } 696 697 aturi = "" 698 699 s.notifier.NewRepo(r.Context(), repo) 700 switch { 701 case repoDid != "": 702 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 703 default: 704 handle := s.pages.DisplayHandle(r.Context(), user.Did) 705 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey)) 706 } 707 } 708} 709 710func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool { 711 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 712 defer cancel() 713 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey) 714 return err == nil && resp != nil 715} 716 717// this is used to rollback changes made to the PDS 718// 719// it is a no-op if the provided ATURI is empty 720func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 721 if aturi == "" { 722 return nil 723 } 724 725 parsed := syntax.ATURI(aturi) 726 727 collection := parsed.Collection().String() 728 repo := parsed.Authority().String() 729 rkey := parsed.RecordKey().String() 730 731 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 732 Collection: collection, 733 Repo: repo, 734 Rkey: rkey, 735 }) 736 return err 737} 738 739func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 740 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 741 if err != nil { 742 return err 743 } 744 // already present 745 if len(defaultLabels) == len(defaults) { 746 return nil 747 } 748 749 labelDefs, err := models.FetchLabelDefs(r, defaults) 750 if err != nil { 751 return err 752 } 753 754 // Insert each label definition to the database 755 for _, labelDef := range labelDefs { 756 _, err = db.AddLabelDefinition(e, &labelDef) 757 if err != nil { 758 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 759 } 760 } 761 762 return nil 763} 764 765func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 766 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 767 if err != nil { 768 logger.Error("failed to resolve tangled.org DID", "err", err) 769 return 770 } 771 772 pdsEndpoint := resolved.PDSEndpoint() 773 if pdsEndpoint == "" { 774 logger.Error("no PDS endpoint found for tangled.sh DID") 775 return 776 } 777 778 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 779 if err != nil { 780 logger.Error("failed to create appassword session... skipping fetch", "err", err) 781 return 782 } 783 784 l := log.SubLogger(logger, "bluesky") 785 786 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 787 defer ticker.Stop() 788 789 for { 790 // refresh session if necessary 791 if !session.IsValid() { 792 l.Debug("access token expired, refreshing session") 793 if err := session.RefreshSession(); err != nil { 794 l.Error("failed to refresh session, stopping bluesky updater", "err", err) 795 return 796 } 797 l.Debug("session refreshed") 798 } 799 800 // make client 801 client := xrpc.Client{ 802 Auth: &xrpc.AuthInfo{ 803 AccessJwt: session.AccessJwt, 804 Did: session.Did, 805 }, 806 Host: session.PdsEndpoint, 807 } 808 809 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 810 if err != nil { 811 l.Error("failed to fetch bluesky posts", "err", err) 812 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 813 l.Error("failed to insert bluesky posts", "err", err) 814 } else { 815 l.Info("inserted bluesky posts", "count", len(posts)) 816 } 817 818 select { 819 case <-ticker.C: 820 case <-ctx.Done(): 821 l.Info("stopping bluesky updater") 822 return 823 } 824 } 825}