Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cloudflare"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/email"
21 "tangled.org/core/appview/indexer"
22 "tangled.org/core/appview/mentions"
23 "tangled.org/core/appview/models"
24 "tangled.org/core/appview/notify"
25 dbnotify "tangled.org/core/appview/notify/db"
26 lognotify "tangled.org/core/appview/notify/logging"
27 phnotify "tangled.org/core/appview/notify/posthog"
28 whnotify "tangled.org/core/appview/notify/webhook"
29 "tangled.org/core/appview/oauth"
30 "tangled.org/core/appview/pages"
31 "tangled.org/core/appview/pipelines"
32 pipelinessh "tangled.org/core/appview/pipelines/ssh"
33 "tangled.org/core/appview/reporesolver"
34 "tangled.org/core/appview/repoverify"
35 "tangled.org/core/appview/validator"
36 xrpcclient "tangled.org/core/appview/xrpcclient"
37 "tangled.org/core/consts"
38 "tangled.org/core/eventconsumer"
39 "tangled.org/core/idresolver"
40 "tangled.org/core/jetstream"
41 "tangled.org/core/log"
42 tlog "tangled.org/core/log"
43 "tangled.org/core/orm"
44 "tangled.org/core/rbac"
45
46 comatproto "github.com/bluesky-social/indigo/api/atproto"
47 "github.com/bluesky-social/indigo/atproto/atclient"
48 "github.com/bluesky-social/indigo/atproto/syntax"
49 lexutil "github.com/bluesky-social/indigo/lex/util"
50 "github.com/bluesky-social/indigo/xrpc"
51
52 "github.com/go-chi/chi/v5"
53 "github.com/posthog/posthog-go"
54)
55
56type State struct {
57 db *db.DB
58 notifier notify.Notifier
59 indexer *indexer.Indexer
60 oauth *oauth.OAuth
61 enforcer *rbac.Enforcer
62 pages *pages.Pages
63 idResolver *idresolver.Resolver
64 rdb *cache.Cache
65 mentionsResolver *mentions.Resolver
66 posthog posthog.Client
67 jc *jetstream.JetstreamClient
68 config *config.Config
69 repoResolver *reporesolver.RepoResolver
70 knotstream *eventconsumer.Consumer
71 spindlestream *eventconsumer.Consumer
72 pipelineNotifier *pipelines.StatusNotifier
73 logger *slog.Logger
74 validator *validator.Validator
75 cfClient *cloudflare.Client
76}
77
78func Make(ctx context.Context, config *config.Config) (*State, error) {
79 logger := tlog.FromContext(ctx)
80
81 d, err := db.Make(ctx, config.Core.DbPath)
82 if err != nil {
83 return nil, fmt.Errorf("failed to create db: %w", err)
84 }
85
86 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
87 err = indexer.Init(ctx)
88 if err != nil {
89 return nil, fmt.Errorf("failed to create indexer: %w", err)
90 }
91
92 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
93 if err != nil {
94 return nil, fmt.Errorf("failed to create enforcer: %w", err)
95 }
96
97 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
98 if err != nil {
99 logger.Error("failed to create redis resolver", "err", err)
100 res = idresolver.DefaultResolver(config.Plc.PLCURL)
101 }
102
103 var rdb *cache.Cache
104 if config.Redis.Addr != "" {
105 rdb = cache.New(config.Redis.Addr)
106 }
107
108 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
109 if err != nil {
110 return nil, fmt.Errorf("failed to create posthog client: %w", err)
111 }
112
113 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages"))
114 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
115 if err != nil {
116 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
117 }
118 validator := validator.New(d, res, enforcer)
119
120 repoResolver := reporesolver.New(config, enforcer, d, rdb)
121
122 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
123
124 jc, err := jetstream.NewJetstreamClient(
125 config.Jetstream.Endpoint,
126 "appview",
127 []string{
128 tangled.ActorProfileNSID,
129 tangled.FeedStarNSID,
130 tangled.FeedReactionNSID,
131 tangled.FeedCommentNSID,
132 tangled.GraphFollowNSID,
133 tangled.GraphVouchNSID,
134 tangled.KnotMemberNSID,
135 tangled.KnotNSID,
136 tangled.LabelDefinitionNSID,
137 tangled.LabelOpNSID,
138 tangled.PublicKeyNSID,
139 tangled.RepoArtifactNSID,
140 tangled.RepoIssueCommentNSID,
141 tangled.RepoIssueNSID,
142 tangled.RepoNSID,
143 tangled.RepoPullNSID,
144 tangled.RepoPullCommentNSID,
145 tangled.SpindleMemberNSID,
146 tangled.SpindleNSID,
147 tangled.StringNSID,
148 },
149 nil,
150 tlog.SubLogger(logger, "jetstream"),
151 d,
152 false,
153
154 // in-memory filter is inapplicable to appview so
155 // we'll never log dids anyway.
156 false,
157 )
158 if err != nil {
159 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
160 }
161
162 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
163 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
164 }
165
166 var notifiers []notify.Notifier
167
168 // Always add the database notifier
169 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
170
171 // Add other notifiers in production only
172 if !config.Core.Dev {
173 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
174 }
175 notifiers = append(notifiers, indexer)
176
177 notifiers = append(notifiers, whnotify.NewNotifier(d))
178
179 notifier := notify.NewMergedNotifier(notifiers)
180 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
181
182 ingester := appview.Ingester{
183 Ctx: ctx,
184 Db: d,
185 Enforcer: enforcer,
186 IdResolver: res,
187 Cache: rdb,
188 Config: config,
189 Logger: log.SubLogger(logger, "ingester"),
190 Validator: validator,
191 MentionsResolver: mentionsResolver,
192 Notifier: notifier,
193 Verifier: repoverify.New(res, config.Core.Dev),
194 }
195 err = jc.StartJetstream(ctx, ingester.Ingest())
196 if err != nil {
197 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
198 }
199
200 go ingester.SweepPendingVerifications()
201
202 var cfClient *cloudflare.Client
203 if config.Cloudflare.ApiToken != "" {
204 cfClient, err = cloudflare.New(config)
205 if err != nil {
206 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
207 cfClient = nil
208 }
209 }
210
211 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
212 if err != nil {
213 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
214 }
215 knotstream.Start(ctx)
216
217 pipelineNotifier := pipelines.NewStatusNotifier()
218
219 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier)
220 if err != nil {
221 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
222 }
223 spindlestream.Start(ctx)
224
225 state := &State{
226 db: d,
227 notifier: notifier,
228 indexer: indexer,
229 oauth: oauth,
230 enforcer: enforcer,
231 pages: pages,
232 idResolver: res,
233 rdb: rdb,
234 mentionsResolver: mentionsResolver,
235 posthog: posthog,
236 jc: jc,
237 config: config,
238 repoResolver: repoResolver,
239 knotstream: knotstream,
240 spindlestream: spindlestream,
241 pipelineNotifier: pipelineNotifier,
242 logger: logger,
243 validator: validator,
244 cfClient: cfClient,
245 }
246
247 // fetch initial bluesky posts if configured
248 go fetchBskyPosts(ctx, res, config, d, logger)
249
250 return state, nil
251}
252
253func (s *State) Close() error {
254 // other close up logic goes here
255 return s.db.Close()
256}
257
258func (s *State) NewSSHServer() *pipelinessh.Server {
259 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh"))
260}
261
262func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
263 w.Header().Set("Content-Type", "text/plain")
264 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
265
266 securityTxt := `Contact: mailto:security@tangled.org
267Preferred-Languages: en
268Canonical: https://tangled.org/.well-known/security.txt
269Expires: 2030-01-01T21:59:00.000Z
270`
271 w.Write([]byte(securityTxt))
272}
273
274func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
275 w.Header().Set("Content-Type", "text/plain")
276 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
277
278 robotsTxt := `# Hello, Tanglers!
279User-agent: *
280Allow: /
281Disallow: /*/*/settings
282Disallow: /settings
283Disallow: /*/*/compare
284Disallow: /*/*/fork
285
286Crawl-delay: 1
287`
288 w.Write([]byte(robotsTxt))
289}
290
291func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
292 user := s.oauth.GetMultiAccountUser(r)
293 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
294 LoggedInUser: user,
295 })
296}
297
298func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
299 user := s.oauth.GetMultiAccountUser(r)
300 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
301 LoggedInUser: user,
302 })
303}
304
305func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
306 user := s.oauth.GetMultiAccountUser(r)
307 s.pages.Brand(w, pages.BrandParams{
308 LoggedInUser: user,
309 })
310}
311
312func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
313 user := s.oauth.GetMultiAccountUser(r)
314 if user == nil {
315 return
316 }
317
318 l := s.logger.With("handler", "UpgradeBanner")
319 l = l.With("did", user.Did)
320
321 regs, err := db.GetRegistrations(
322 s.db,
323 orm.FilterEq("did", user.Did),
324 orm.FilterEq("needs_upgrade", 1),
325 )
326 if err != nil {
327 l.Error("non-fatal: failed to get registrations", "err", err)
328 }
329
330 spindles, err := db.GetSpindles(
331 r.Context(),
332 s.db,
333 orm.FilterEq("owner", user.Did),
334 orm.FilterEq("needs_upgrade", 1),
335 )
336 if err != nil {
337 l.Error("non-fatal: failed to get spindles", "err", err)
338 }
339
340 if regs == nil && spindles == nil {
341 return
342 }
343
344 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
345 Registrations: regs,
346 Spindles: spindles,
347 })
348}
349
350func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) {
351 // target is echoed back from the form via hx-vals so the response span's
352 // id matches the form's hx-target. Fallback keeps the handler useful if
353 // a caller forgets to send it.
354 target := strings.TrimSpace(r.FormValue("target"))
355 if target == "" {
356 target = "home"
357 }
358
359 w.Header().Set("Content-Type", "text/html")
360
361 emailAddr := strings.TrimSpace(r.FormValue("email"))
362 if !email.IsValidEmail(emailAddr) {
363 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{
364 Id: target,
365 Error: "Invalid email address.",
366 })
367 return
368 }
369
370 // For logged-in users, persist the signup locally so the widget stays
371 // hidden across devices. The DB row is the render-time source of truth;
372 // Resend still owns the mailing list itself.
373 if user := s.oauth.GetMultiAccountUser(r); user != nil {
374 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil {
375 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err)
376 }
377 }
378
379 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" {
380 go func() {
381 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil {
382 s.logger.Error("failed to add newsletter contact", "error", err)
383 }
384 }()
385 } else {
386 s.logger.Error(
387 "failed to add newsletter contact, missing resend config",
388 "isKeyPresent", s.config.Resend.ApiKey != "",
389 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "",
390 "emailAddr", emailAddr,
391 )
392 }
393
394 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target})
395}
396
397// NewsletterDismiss records that a logged-in user has dismissed the newsletter
398// widget so it stays hidden across their devices. Anonymous callers get a 204
399// with no DB write — localStorage handles the per-browser fallback.
400func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) {
401 user := s.oauth.GetMultiAccountUser(r)
402 if user == nil {
403 w.WriteHeader(http.StatusNoContent)
404 return
405 }
406
407 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil {
408 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err)
409 }
410 w.WriteHeader(http.StatusNoContent)
411}
412
413func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
414 user := chi.URLParam(r, "user")
415 user = strings.TrimPrefix(user, "@")
416
417 if user == "" {
418 w.WriteHeader(http.StatusBadRequest)
419 return
420 }
421
422 id, err := s.idResolver.ResolveIdent(r.Context(), user)
423 if err != nil {
424 w.WriteHeader(http.StatusInternalServerError)
425 return
426 }
427
428 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
429 if err != nil {
430 s.logger.Error("failed to get public keys", "err", err)
431 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
432 return
433 }
434
435 if len(pubKeys) == 0 {
436 w.WriteHeader(http.StatusNoContent)
437 return
438 }
439
440 for _, k := range pubKeys {
441 key := strings.TrimRight(k.Key, "\n")
442 fmt.Fprintln(w, key)
443 }
444}
445
446func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
447 switch r.Method {
448 case http.MethodGet:
449 user := s.oauth.GetMultiAccountUser(r)
450 knots, err := s.enforcer.GetKnotsForUser(user.Did)
451 if err != nil {
452 s.pages.Notice(w, "repo", "Invalid user account.")
453 return
454 }
455
456 s.pages.NewRepo(w, pages.NewRepoParams{
457 LoggedInUser: user,
458 Knots: knots,
459 })
460
461 case http.MethodPost:
462 l := s.logger.With("handler", "NewRepo")
463
464 user := s.oauth.GetMultiAccountUser(r)
465 l = l.With("did", user.Did)
466
467 // form validation
468 domain := r.FormValue("domain")
469 if domain == "" {
470 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
471 return
472 }
473 l = l.With("knot", domain)
474
475 repoName := r.FormValue("name")
476 if repoName == "" {
477 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
478 return
479 }
480
481 if err := models.ValidateRepoName(repoName); err != nil {
482 s.pages.Notice(w, "repo", err.Error())
483 return
484 }
485 repoName = models.StripGitExt(repoName)
486 rkey := strings.ToLower(repoName)
487 l = l.With("repoName", repoName, "rkey", rkey)
488
489 defaultBranch := r.FormValue("branch")
490 if defaultBranch == "" {
491 defaultBranch = "main"
492 }
493 l = l.With("defaultBranch", defaultBranch)
494
495 description := r.FormValue("description")
496 if len([]rune(description)) > 140 {
497 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
498 return
499 }
500
501 // ACL validation
502 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
503 if err != nil || !ok {
504 l.Info("unauthorized")
505 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
506 return
507 }
508
509 // Check for existing repos
510 existingRepo, err := db.GetRepo(
511 s.db,
512 orm.FilterEq("did", user.Did),
513 orm.FilterEq("rkey", rkey),
514 )
515 if err == nil && existingRepo != nil {
516 l.Info("repo exists")
517 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
518 return
519 }
520
521 atpClient, err := s.oauth.AuthorizedClient(r)
522 if err != nil {
523 l.Error("failed to get authorized client", "err", err)
524 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.")
525 return
526 }
527
528 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
529 l.Info("rkey occupied by prior rename alias")
530 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey))
531 return
532 }
533
534 client, err := s.oauth.ServiceClient(
535 r,
536 oauth.WithService(domain),
537 oauth.WithLxm(tangled.RepoCreateNSID),
538 oauth.WithDev(s.config.Core.Dev),
539 )
540 if err != nil {
541 l.Error("service auth failed", "err", err)
542 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.")
543 return
544 }
545
546 input := &tangled.RepoCreate_Input{
547 Rkey: rkey,
548 Name: rkey,
549 DefaultBranch: &defaultBranch,
550 }
551 createResp, err := tangled.RepoCreate(
552 r.Context(),
553 client,
554 input,
555 )
556 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
557 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
558 s.pages.Notice(w, "repo", err.Error())
559 return
560 }
561
562 var repoDid string
563 if createResp != nil && createResp.RepoDid != nil {
564 repoDid = *createResp.RepoDid
565 }
566 if repoDid == "" {
567 l.Error("knot returned empty repo DID")
568 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
569 return
570 }
571
572 repo := &models.Repo{
573 Did: user.Did,
574 Name: repoName,
575 Knot: domain,
576 Rkey: rkey,
577 Description: description,
578 Created: time.Now(),
579 Labels: s.config.Label.DefaultLabelDefs,
580 RepoDid: repoDid,
581 }
582 record := repo.AsRecord()
583
584 cleanupKnot := func() {
585 go func() {
586 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
587 for attempt, delay := range delays {
588 time.Sleep(delay)
589 deleteClient, dErr := s.oauth.ServiceClient(
590 r,
591 oauth.WithService(domain),
592 oauth.WithLxm(tangled.RepoDeleteNSID),
593 oauth.WithDev(s.config.Core.Dev),
594 )
595 if dErr != nil {
596 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
597 continue
598 }
599 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
600 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
601 Did: user.Did,
602 Name: rkey,
603 Rkey: rkey,
604 }); dErr != nil {
605 cancel()
606 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
607 continue
608 }
609 cancel()
610 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
611 return
612 }
613 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
614 "did", user.Did, "repo", repoName, "knot", domain)
615 }()
616 }
617
618 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
619 Collection: tangled.RepoNSID,
620 Repo: user.Did,
621 Rkey: &rkey,
622 Record: &lexutil.LexiconTypeDecoder{
623 Val: &record,
624 },
625 })
626 if err != nil {
627 l.Info("PDS write failed", "err", err)
628 cleanupKnot()
629 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
630 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey))
631 } else {
632 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
633 }
634 return
635 }
636
637 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey)
638 l = l.With("aturi", aturi)
639 l.Info("wrote to PDS")
640
641 tx, err := s.db.BeginTx(r.Context(), nil)
642 if err != nil {
643 l.Info("txn failed", "err", err)
644 s.pages.Notice(w, "repo", "Failed to save repository information.")
645 return
646 }
647
648 rollback := func() {
649 err1 := tx.Rollback()
650 err2 := s.enforcer.E.LoadPolicy()
651 err3 := rollbackRecord(context.Background(), aturi, atpClient)
652
653 if errors.Is(err1, sql.ErrTxDone) {
654 err1 = nil
655 }
656
657 if errs := errors.Join(err1, err2, err3); errs != nil {
658 l.Error("failed to rollback changes", "errs", errs)
659 }
660
661 if aturi != "" {
662 cleanupKnot()
663 }
664 }
665 defer rollback()
666
667 err = db.AddRepo(tx, repo)
668 if err != nil {
669 l.Error("db write failed", "err", err)
670 s.pages.Notice(w, "repo", "Failed to save repository information.")
671 return
672 }
673
674 rbacPath := repo.RepoIdentifier()
675 err = s.enforcer.AddRepo(user.Did, domain, rbacPath)
676 if err != nil {
677 l.Error("acl setup failed", "err", err)
678 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
679 return
680 }
681
682 err = tx.Commit()
683 if err != nil {
684 l.Error("txn commit failed", "err", err)
685 http.Error(w, err.Error(), http.StatusInternalServerError)
686 return
687 }
688
689 err = s.enforcer.E.SavePolicy()
690 if err != nil {
691 l.Error("acl save failed", "err", err)
692 http.Error(w, err.Error(), http.StatusInternalServerError)
693 return
694 }
695
696 aturi = ""
697
698 s.notifier.NewRepo(r.Context(), repo)
699 switch {
700 case repoDid != "":
701 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
702 default:
703 handle := s.pages.DisplayHandle(r.Context(), user.Did)
704 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey))
705 }
706 }
707}
708
709func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool {
710 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
711 defer cancel()
712 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey)
713 return err == nil && resp != nil
714}
715
716// this is used to rollback changes made to the PDS
717//
718// it is a no-op if the provided ATURI is empty
719func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
720 if aturi == "" {
721 return nil
722 }
723
724 parsed := syntax.ATURI(aturi)
725
726 collection := parsed.Collection().String()
727 repo := parsed.Authority().String()
728 rkey := parsed.RecordKey().String()
729
730 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
731 Collection: collection,
732 Repo: repo,
733 Rkey: rkey,
734 })
735 return err
736}
737
738func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
739 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
740 if err != nil {
741 return err
742 }
743 // already present
744 if len(defaultLabels) == len(defaults) {
745 return nil
746 }
747
748 labelDefs, err := models.FetchLabelDefs(r, defaults)
749 if err != nil {
750 return err
751 }
752
753 // Insert each label definition to the database
754 for _, labelDef := range labelDefs {
755 _, err = db.AddLabelDefinition(e, &labelDef)
756 if err != nil {
757 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
758 }
759 }
760
761 return nil
762}
763
764func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
765 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
766 if err != nil {
767 logger.Error("failed to resolve tangled.org DID", "err", err)
768 return
769 }
770
771 pdsEndpoint := resolved.PDSEndpoint()
772 if pdsEndpoint == "" {
773 logger.Error("no PDS endpoint found for tangled.sh DID")
774 return
775 }
776
777 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
778 if err != nil {
779 logger.Error("failed to create appassword session... skipping fetch", "err", err)
780 return
781 }
782
783 l := log.SubLogger(logger, "bluesky")
784
785 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
786 defer ticker.Stop()
787
788 for {
789 // refresh session if necessary
790 if !session.IsValid() {
791 l.Debug("access token expired, refreshing session")
792 if err := session.RefreshSession(); err != nil {
793 l.Error("failed to refresh session, stopping bluesky updater", "err", err)
794 return
795 }
796 l.Debug("session refreshed")
797 }
798
799 // make client
800 client := xrpc.Client{
801 Auth: &xrpc.AuthInfo{
802 AccessJwt: session.AccessJwt,
803 Did: session.Did,
804 },
805 Host: session.PdsEndpoint,
806 }
807
808 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
809 if err != nil {
810 l.Error("failed to fetch bluesky posts", "err", err)
811 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
812 l.Error("failed to insert bluesky posts", "err", err)
813 } else {
814 l.Info("inserted bluesky posts", "count", len(posts))
815 }
816
817 select {
818 case <-ticker.C:
819 case <-ctx.Done():
820 l.Info("stopping bluesky updater")
821 return
822 }
823 }
824}