Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cloudflare" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/email" 21 "tangled.org/core/appview/indexer" 22 "tangled.org/core/appview/knotacl" 23 "tangled.org/core/appview/knotcompat" 24 "tangled.org/core/appview/mentions" 25 "tangled.org/core/appview/models" 26 "tangled.org/core/appview/notify" 27 dbnotify "tangled.org/core/appview/notify/db" 28 lognotify "tangled.org/core/appview/notify/logging" 29 phnotify "tangled.org/core/appview/notify/posthog" 30 whnotify "tangled.org/core/appview/notify/webhook" 31 "tangled.org/core/appview/oauth" 32 "tangled.org/core/appview/pages" 33 "tangled.org/core/appview/pipelines" 34 pipelinessh "tangled.org/core/appview/pipelines/ssh" 35 "tangled.org/core/appview/reporesolver" 36 "tangled.org/core/appview/repoverify" 37 "tangled.org/core/appview/validator" 38 xrpcclient "tangled.org/core/appview/xrpcclient" 39 "tangled.org/core/consts" 40 "tangled.org/core/eventconsumer" 41 "tangled.org/core/idresolver" 42 "tangled.org/core/jetstream" 43 "tangled.org/core/log" 44 tlog "tangled.org/core/log" 45 "tangled.org/core/orm" 46 "tangled.org/core/rbac" 47 48 comatproto "github.com/bluesky-social/indigo/api/atproto" 49 "github.com/bluesky-social/indigo/atproto/atclient" 50 "github.com/bluesky-social/indigo/atproto/syntax" 51 lexutil "github.com/bluesky-social/indigo/lex/util" 52 "github.com/bluesky-social/indigo/xrpc" 53 54 "github.com/go-chi/chi/v5" 55 "github.com/posthog/posthog-go" 56) 57 58type State struct { 59 db *db.DB 60 notifier notify.Notifier 61 indexer *indexer.Indexer 62 oauth *oauth.OAuth 63 enforcer *rbac.Enforcer 64 pages *pages.Pages 65 idResolver *idresolver.Resolver 66 rdb *cache.Cache 67 mentionsResolver *mentions.Resolver 68 posthog posthog.Client 69 jc *jetstream.JetstreamClient 70 config *config.Config 71 repoResolver *reporesolver.RepoResolver 72 aclService *knotacl.Service 73 knotstream *eventconsumer.Consumer 74 spindlestream *eventconsumer.Consumer 75 pipelineNotifier *pipelines.StatusNotifier 76 logger *slog.Logger 77 validator *validator.Validator 78 cfClient *cloudflare.Client 79} 80 81func Make(ctx context.Context, config *config.Config) (*State, error) { 82 logger := tlog.FromContext(ctx) 83 84 d, err := db.Make(ctx, config.Core.DbPath) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create db: %w", err) 87 } 88 89 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 90 err = indexer.Init(ctx) 91 if err != nil { 92 return nil, fmt.Errorf("failed to create indexer: %w", err) 93 } 94 95 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 96 if err != nil { 97 return nil, fmt.Errorf("failed to create enforcer: %w", err) 98 } 99 100 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 101 if err != nil { 102 logger.Error("failed to create redis resolver", "err", err) 103 res = idresolver.DefaultResolver(config.Plc.PLCURL) 104 } 105 106 var rdb *cache.Cache 107 if config.Redis.Addr != "" { 108 rdb = cache.New(config.Redis.Addr) 109 } 110 111 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 112 if err != nil { 113 return nil, fmt.Errorf("failed to create posthog client: %w", err) 114 } 115 116 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages")) 117 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch"))) 118 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl")) 119 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth")) 120 if err != nil { 121 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 122 } 123 124 validator := validator.New(d, res, aclService) 125 126 repoResolver := reporesolver.New(config, aclService, d, rdb) 127 128 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 129 130 jc, err := jetstream.NewJetstreamClient( 131 config.Jetstream.Endpoint, 132 "appview", 133 []string{ 134 tangled.ActorProfileNSID, 135 tangled.FeedStarNSID, 136 tangled.FeedReactionNSID, 137 tangled.FeedCommentNSID, 138 tangled.GraphFollowNSID, 139 tangled.GraphVouchNSID, 140 tangled.KnotMemberNSID, 141 tangled.KnotNSID, 142 tangled.LabelDefinitionNSID, 143 tangled.LabelOpNSID, 144 tangled.PublicKeyNSID, 145 tangled.RepoArtifactNSID, 146 tangled.RepoIssueCommentNSID, 147 tangled.RepoIssueNSID, 148 tangled.RepoNSID, 149 tangled.RepoPullNSID, 150 tangled.RepoPullCommentNSID, 151 tangled.SpindleMemberNSID, 152 tangled.SpindleNSID, 153 tangled.StringNSID, 154 }, 155 nil, 156 tlog.SubLogger(logger, "jetstream"), 157 d, 158 false, 159 160 // in-memory filter is inapplicable to appview so 161 // we'll never log dids anyway. 162 false, 163 ) 164 if err != nil { 165 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 166 } 167 168 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 169 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 170 } 171 172 var notifiers []notify.Notifier 173 174 // Always add the database notifier 175 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 176 177 // Add other notifiers in production only 178 if !config.Core.Dev { 179 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 180 } 181 notifiers = append(notifiers, indexer) 182 183 notifiers = append(notifiers, whnotify.NewNotifier(d)) 184 185 notifier := notify.NewMergedNotifier(notifiers) 186 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 187 188 ingester := appview.Ingester{ 189 Ctx: ctx, 190 Db: d, 191 Enforcer: enforcer, 192 Acl: aclService, 193 IdResolver: res, 194 Cache: rdb, 195 Config: config, 196 Logger: log.SubLogger(logger, "ingester"), 197 Validator: validator, 198 MentionsResolver: mentionsResolver, 199 Notifier: notifier, 200 Verifier: repoverify.New(res, config.Core.Dev), 201 } 202 err = jc.StartJetstream(ctx, ingester.Ingest()) 203 if err != nil { 204 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 205 } 206 207 go ingester.SweepPendingVerifications() 208 209 var cfClient *cloudflare.Client 210 if config.Cloudflare.ApiToken != "" { 211 cfClient, err = cloudflare.New(config) 212 if err != nil { 213 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 214 cfClient = nil 215 } 216 } 217 218 knotstream, err := Knotstream(ctx, config, d, aclService, enforcer, posthog, notifier, cfClient) 219 if err != nil { 220 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 221 } 222 knotstream.Start(ctx) 223 224 pipelineNotifier := pipelines.NewStatusNotifier() 225 226 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier) 227 if err != nil { 228 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 229 } 230 spindlestream.Start(ctx) 231 232 state := &State{ 233 db: d, 234 notifier: notifier, 235 indexer: indexer, 236 oauth: oauth, 237 enforcer: enforcer, 238 pages: pages, 239 idResolver: res, 240 rdb: rdb, 241 mentionsResolver: mentionsResolver, 242 posthog: posthog, 243 jc: jc, 244 config: config, 245 repoResolver: repoResolver, 246 aclService: aclService, 247 knotstream: knotstream, 248 spindlestream: spindlestream, 249 pipelineNotifier: pipelineNotifier, 250 logger: logger, 251 validator: validator, 252 cfClient: cfClient, 253 } 254 255 // fetch initial bluesky posts if configured 256 go fetchBskyPosts(ctx, res, config, d, logger) 257 258 return state, nil 259} 260 261func (s *State) Close() error { 262 // other close up logic goes here 263 return s.db.Close() 264} 265 266func (s *State) NewSSHServer() *pipelinessh.Server { 267 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh")) 268} 269 270func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 271 w.Header().Set("Content-Type", "text/plain") 272 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 273 274 securityTxt := `Contact: mailto:security@tangled.org 275Preferred-Languages: en 276Canonical: https://tangled.org/.well-known/security.txt 277Expires: 2030-01-01T21:59:00.000Z 278` 279 w.Write([]byte(securityTxt)) 280} 281 282func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 283 w.Header().Set("Content-Type", "text/plain") 284 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 285 286 robotsTxt := `# Hello, Tanglers! 287User-agent: * 288Allow: / 289Disallow: /*/*/settings 290Disallow: /settings 291Disallow: /*/*/compare 292Disallow: /*/*/fork 293 294Crawl-delay: 1 295` 296 w.Write([]byte(robotsTxt)) 297} 298 299func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 300 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 301 BaseParams: pages.BaseParamsFromContext(r.Context()), 302 }) 303} 304 305func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 306 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 307 BaseParams: pages.BaseParamsFromContext(r.Context()), 308 }) 309} 310 311func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 312 s.pages.Brand(w, pages.BrandParams{ 313 BaseParams: pages.BaseParamsFromContext(r.Context()), 314 }) 315} 316 317func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 318 user := s.oauth.GetMultiAccountUser(r) 319 if user == nil { 320 return 321 } 322 323 l := s.logger.With("handler", "UpgradeBanner") 324 l = l.With("did", user.Did) 325 326 regs, err := db.GetRegistrations( 327 s.db, 328 orm.FilterEq("did", user.Did), 329 orm.FilterEq("needs_upgrade", 1), 330 ) 331 if err != nil { 332 l.Error("non-fatal: failed to get registrations", "err", err) 333 } 334 335 spindles, err := db.GetSpindles( 336 r.Context(), 337 s.db, 338 orm.FilterEq("owner", user.Did), 339 orm.FilterEq("needs_upgrade", 1), 340 ) 341 if err != nil { 342 l.Error("non-fatal: failed to get spindles", "err", err) 343 } 344 345 if regs == nil && spindles == nil { 346 return 347 } 348 349 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 350 Registrations: regs, 351 Spindles: spindles, 352 }) 353} 354 355func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) { 356 // target is echoed back from the form via hx-vals so the response span's 357 // id matches the form's hx-target. Fallback keeps the handler useful if 358 // a caller forgets to send it. 359 target := strings.TrimSpace(r.FormValue("target")) 360 if target == "" { 361 target = "home" 362 } 363 364 w.Header().Set("Content-Type", "text/html") 365 366 emailAddr := strings.TrimSpace(r.FormValue("email")) 367 if !email.IsValidEmail(emailAddr) { 368 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{ 369 Id: target, 370 Error: "Invalid email address.", 371 }) 372 return 373 } 374 375 // For logged-in users, persist the signup locally so the widget stays 376 // hidden across devices. The DB row is the render-time source of truth; 377 // Resend still owns the mailing list itself. 378 if user := s.oauth.GetMultiAccountUser(r); user != nil { 379 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil { 380 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err) 381 } 382 } 383 384 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" { 385 go func() { 386 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil { 387 s.logger.Error("failed to add newsletter contact", "error", err) 388 } 389 }() 390 } else { 391 s.logger.Error( 392 "failed to add newsletter contact, missing resend config", 393 "isKeyPresent", s.config.Resend.ApiKey != "", 394 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "", 395 "emailAddr", emailAddr, 396 ) 397 } 398 399 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target}) 400} 401 402// NewsletterDismiss records that a logged-in user has dismissed the newsletter 403// widget so it stays hidden across their devices. Anonymous callers get a 204 404// with no DB write — localStorage handles the per-browser fallback. 405func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) { 406 user := s.oauth.GetMultiAccountUser(r) 407 if user == nil { 408 w.WriteHeader(http.StatusNoContent) 409 return 410 } 411 412 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil { 413 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err) 414 } 415 w.WriteHeader(http.StatusNoContent) 416} 417 418func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 419 user := chi.URLParam(r, "user") 420 user = strings.TrimPrefix(user, "@") 421 422 if user == "" { 423 w.WriteHeader(http.StatusBadRequest) 424 return 425 } 426 427 id, err := s.idResolver.ResolveIdent(r.Context(), user) 428 if err != nil { 429 w.WriteHeader(http.StatusInternalServerError) 430 return 431 } 432 433 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 434 if err != nil { 435 s.logger.Error("failed to get public keys", "err", err) 436 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 437 return 438 } 439 440 if len(pubKeys) == 0 { 441 w.WriteHeader(http.StatusNoContent) 442 return 443 } 444 445 for _, k := range pubKeys { 446 key := strings.TrimRight(k.Key, "\n") 447 fmt.Fprintln(w, key) 448 } 449} 450 451func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 452 switch r.Method { 453 case http.MethodGet: 454 user := s.oauth.GetMultiAccountUser(r) 455 knots := s.aclService.KnotsForUser(r.Context(), user.Did) 456 457 s.pages.NewRepo(w, pages.NewRepoParams{ 458 BaseParams: pages.BaseParamsFromContext(r.Context()), 459 Knots: knots, 460 }) 461 462 case http.MethodPost: 463 l := s.logger.With("handler", "NewRepo") 464 465 user := s.oauth.GetMultiAccountUser(r) 466 l = l.With("did", user.Did) 467 468 // form validation 469 domain := r.FormValue("domain") 470 if domain == "" { 471 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 472 return 473 } 474 l = l.With("knot", domain) 475 476 repoName := r.FormValue("name") 477 if repoName == "" { 478 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 479 return 480 } 481 482 if err := models.ValidateRepoName(repoName); err != nil { 483 s.pages.Notice(w, "repo", err.Error()) 484 return 485 } 486 repoName = models.StripGitExt(repoName) 487 rkey := strings.ToLower(repoName) 488 l = l.With("repoName", repoName, "rkey", rkey) 489 490 defaultBranch := r.FormValue("branch") 491 if defaultBranch == "" { 492 defaultBranch = "main" 493 } 494 l = l.With("defaultBranch", defaultBranch) 495 496 description := r.FormValue("description") 497 if len([]rune(description)) > 140 { 498 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 499 return 500 } 501 502 // ACL validation 503 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) { 504 l.Info("unauthorized") 505 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 506 return 507 } 508 509 // Check for existing repos 510 existingRepo, err := db.GetRepo( 511 s.db, 512 orm.FilterEq("did", user.Did), 513 orm.FilterEq("rkey", rkey), 514 ) 515 if err == nil && existingRepo != nil { 516 l.Info("repo exists") 517 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 518 return 519 } 520 521 atpClient, err := s.oauth.AuthorizedClient(r) 522 if err != nil { 523 l.Error("failed to get authorized client", "err", err) 524 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.") 525 return 526 } 527 528 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 529 l.Info("rkey occupied by prior rename alias") 530 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey)) 531 return 532 } 533 534 client, err := s.oauth.ServiceClient( 535 r, 536 oauth.WithService(domain), 537 oauth.WithLxm(tangled.RepoCreateNSID), 538 oauth.WithDev(s.config.Core.Dev), 539 ) 540 if err != nil { 541 l.Error("service auth failed", "err", err) 542 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.") 543 return 544 } 545 546 input := &tangled.RepoCreate_Input{ 547 Rkey: rkey, 548 Name: rkey, 549 DefaultBranch: &defaultBranch, 550 } 551 createResp, err := tangled.RepoCreate( 552 r.Context(), 553 client, 554 input, 555 ) 556 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 557 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 558 s.pages.Notice(w, "repo", err.Error()) 559 return 560 } 561 562 var repoDid string 563 if createResp != nil && createResp.RepoDid != nil { 564 repoDid = *createResp.RepoDid 565 } 566 if repoDid == "" { 567 l.Error("knot returned empty repo DID") 568 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 569 return 570 } 571 572 repo := &models.Repo{ 573 Did: user.Did, 574 Name: repoName, 575 Knot: domain, 576 Rkey: rkey, 577 Description: description, 578 Created: time.Now(), 579 Labels: s.config.Label.DefaultLabelDefs, 580 RepoDid: repoDid, 581 } 582 record := repo.AsRecord() 583 584 cleanupKnot := func() { 585 go func() { 586 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 587 for attempt, delay := range delays { 588 time.Sleep(delay) 589 deleteClient, dErr := s.oauth.ServiceClient( 590 r, 591 oauth.WithService(domain), 592 oauth.WithLxm(tangled.RepoDeleteNSID), 593 oauth.WithDev(s.config.Core.Dev), 594 ) 595 if dErr != nil { 596 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 597 continue 598 } 599 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 600 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 601 Did: user.Did, 602 Name: rkey, 603 Rkey: rkey, 604 }); dErr != nil { 605 cancel() 606 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 607 continue 608 } 609 cancel() 610 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 611 return 612 } 613 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 614 "did", user.Did, "repo", repoName, "knot", domain) 615 }() 616 } 617 618 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{ 619 Collection: tangled.RepoNSID, 620 Repo: user.Did, 621 Rkey: &rkey, 622 Record: &lexutil.LexiconTypeDecoder{ 623 Val: &record, 624 }, 625 }) 626 if err != nil { 627 l.Info("PDS write failed", "err", err) 628 cleanupKnot() 629 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) { 630 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey)) 631 } else { 632 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 633 } 634 return 635 } 636 637 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey) 638 l = l.With("aturi", aturi) 639 l.Info("wrote to PDS") 640 641 tx, err := s.db.BeginTx(r.Context(), nil) 642 if err != nil { 643 l.Info("txn failed", "err", err) 644 s.pages.Notice(w, "repo", "Failed to save repository information.") 645 return 646 } 647 648 rollback := func() { 649 err1 := tx.Rollback() 650 err2 := s.enforcer.E.LoadPolicy() 651 err3 := rollbackRecord(context.Background(), aturi, atpClient) 652 653 if errors.Is(err1, sql.ErrTxDone) { 654 err1 = nil 655 } 656 657 if errs := errors.Join(err1, err2, err3); errs != nil { 658 l.Error("failed to rollback changes", "errs", errs) 659 } 660 661 if aturi != "" { 662 cleanupKnot() 663 } 664 } 665 defer rollback() 666 667 err = db.AddRepo(tx, repo) 668 if err != nil { 669 l.Error("db write failed", "err", err) 670 s.pages.Notice(w, "repo", "Failed to save repository information.") 671 return 672 } 673 674 rbacPath := repo.RepoIdentifier() 675 err = s.enforcer.AddRepo(user.Did, domain, rbacPath) 676 if err != nil { 677 l.Error("acl setup failed", "err", err) 678 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 679 return 680 } 681 682 err = tx.Commit() 683 if err != nil { 684 l.Error("txn commit failed", "err", err) 685 http.Error(w, err.Error(), http.StatusInternalServerError) 686 return 687 } 688 689 err = s.enforcer.E.SavePolicy() 690 if err != nil { 691 l.Error("acl save failed", "err", err) 692 http.Error(w, err.Error(), http.StatusInternalServerError) 693 return 694 } 695 696 aturi = "" 697 698 s.notifier.NewRepo(r.Context(), repo) 699 switch { 700 case repoDid != "": 701 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 702 default: 703 handle := s.pages.DisplayHandle(r.Context(), user.Did) 704 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey)) 705 } 706 } 707} 708 709func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool { 710 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 711 defer cancel() 712 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey) 713 return err == nil && resp != nil 714} 715 716// this is used to rollback changes made to the PDS 717// 718// it is a no-op if the provided ATURI is empty 719func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 720 if aturi == "" { 721 return nil 722 } 723 724 parsed := syntax.ATURI(aturi) 725 726 collection := parsed.Collection().String() 727 repo := parsed.Authority().String() 728 rkey := parsed.RecordKey().String() 729 730 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 731 Collection: collection, 732 Repo: repo, 733 Rkey: rkey, 734 }) 735 return err 736} 737 738func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 739 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 740 if err != nil { 741 return err 742 } 743 // already present 744 if len(defaultLabels) == len(defaults) { 745 return nil 746 } 747 748 labelDefs, err := models.FetchLabelDefs(r, defaults) 749 if err != nil { 750 return err 751 } 752 753 // Insert each label definition to the database 754 for _, labelDef := range labelDefs { 755 _, err = db.AddLabelDefinition(e, &labelDef) 756 if err != nil { 757 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 758 } 759 } 760 761 return nil 762} 763 764func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 765 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 766 if err != nil { 767 logger.Error("failed to resolve tangled.org DID", "err", err) 768 return 769 } 770 771 pdsEndpoint := resolved.PDSEndpoint() 772 if pdsEndpoint == "" { 773 logger.Error("no PDS endpoint found for tangled.sh DID") 774 return 775 } 776 777 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 778 if err != nil { 779 logger.Error("failed to create appassword session... skipping fetch", "err", err) 780 return 781 } 782 783 l := log.SubLogger(logger, "bluesky") 784 785 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 786 defer ticker.Stop() 787 788 for { 789 // refresh session if necessary 790 if !session.IsValid() { 791 l.Debug("access token expired, refreshing session") 792 if err := session.RefreshSession(); err != nil { 793 l.Error("failed to refresh session, stopping bluesky updater", "err", err) 794 return 795 } 796 l.Debug("session refreshed") 797 } 798 799 // make client 800 client := xrpc.Client{ 801 Auth: &xrpc.AuthInfo{ 802 AccessJwt: session.AccessJwt, 803 Did: session.Did, 804 }, 805 Host: session.PdsEndpoint, 806 } 807 808 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 809 if err != nil { 810 l.Error("failed to fetch bluesky posts", "err", err) 811 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 812 l.Error("failed to insert bluesky posts", "err", err) 813 } else { 814 l.Info("inserted bluesky posts", "count", len(posts)) 815 } 816 817 select { 818 case <-ticker.C: 819 case <-ctx.Done(): 820 l.Info("stopping bluesky updater") 821 return 822 } 823 } 824}