Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cloudflare"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/email"
21 "tangled.org/core/appview/indexer"
22 "tangled.org/core/appview/knotacl"
23 "tangled.org/core/appview/knotcompat"
24 "tangled.org/core/appview/mentions"
25 "tangled.org/core/appview/models"
26 "tangled.org/core/appview/notify"
27 dbnotify "tangled.org/core/appview/notify/db"
28 lognotify "tangled.org/core/appview/notify/logging"
29 phnotify "tangled.org/core/appview/notify/posthog"
30 whnotify "tangled.org/core/appview/notify/webhook"
31 "tangled.org/core/appview/oauth"
32 "tangled.org/core/appview/pages"
33 pipelinessh "tangled.org/core/appview/pipelines/ssh"
34 "tangled.org/core/appview/reporesolver"
35 "tangled.org/core/appview/repoverify"
36 "tangled.org/core/appview/validator"
37 xrpcclient "tangled.org/core/appview/xrpcclient"
38 "tangled.org/core/consts"
39 "tangled.org/core/eventconsumer"
40 "tangled.org/core/idresolver"
41 "tangled.org/core/jetstream"
42 "tangled.org/core/log"
43 tlog "tangled.org/core/log"
44 "tangled.org/core/orm"
45 "tangled.org/core/rbac"
46
47 comatproto "github.com/bluesky-social/indigo/api/atproto"
48 "github.com/bluesky-social/indigo/atproto/atclient"
49 "github.com/bluesky-social/indigo/atproto/syntax"
50 lexutil "github.com/bluesky-social/indigo/lex/util"
51 "github.com/bluesky-social/indigo/xrpc"
52
53 "github.com/go-chi/chi/v5"
54 "github.com/posthog/posthog-go"
55)
56
57type State struct {
58 db *db.DB
59 notifier notify.Notifier
60 indexer *indexer.Indexer
61 oauth *oauth.OAuth
62 enforcer *rbac.Enforcer
63 pages *pages.Pages
64 idResolver *idresolver.Resolver
65 rdb *cache.Cache
66 mentionsResolver *mentions.Resolver
67 posthog posthog.Client
68 jc *jetstream.JetstreamClient
69 config *config.Config
70 repoResolver *reporesolver.RepoResolver
71 aclService *knotacl.Service
72 knotstream *eventconsumer.Consumer
73 logger *slog.Logger
74 validator *validator.Validator
75 cfClient *cloudflare.Client
76}
77
78func Make(ctx context.Context, config *config.Config) (*State, error) {
79 logger := tlog.FromContext(ctx)
80
81 d, err := db.Make(ctx, config.Core.DbPath)
82 if err != nil {
83 return nil, fmt.Errorf("failed to create db: %w", err)
84 }
85
86 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
87 err = indexer.Init(ctx)
88 if err != nil {
89 return nil, fmt.Errorf("failed to create indexer: %w", err)
90 }
91
92 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
93 if err != nil {
94 return nil, fmt.Errorf("failed to create enforcer: %w", err)
95 }
96
97 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
98 if err != nil {
99 logger.Error("failed to create redis resolver", "err", err)
100 res = idresolver.DefaultResolver(config.Plc.PLCURL)
101 }
102
103 var rdb *cache.Cache
104 if config.Redis.Addr != "" {
105 rdb = cache.New(config.Redis.Addr)
106 }
107
108 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
109 if err != nil {
110 return nil, fmt.Errorf("failed to create posthog client: %w", err)
111 }
112
113 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages"))
114 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch")))
115 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl"))
116 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth"))
117 if err != nil {
118 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
119 }
120
121 validator := validator.New(d, res, aclService)
122
123 repoResolver := reporesolver.New(config, aclService, d, rdb)
124
125 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
126
127 jc, err := jetstream.NewJetstreamClient(
128 config.Jetstream.Endpoint,
129 "appview",
130 []string{
131 tangled.ActorProfileNSID,
132 tangled.FeedStarNSID,
133 tangled.FeedReactionNSID,
134 tangled.FeedCommentNSID,
135 tangled.GraphFollowNSID,
136 tangled.GraphVouchNSID,
137 tangled.KnotMemberNSID,
138 tangled.KnotNSID,
139 tangled.LabelDefinitionNSID,
140 tangled.LabelOpNSID,
141 tangled.PublicKeyNSID,
142 tangled.RepoArtifactNSID,
143 tangled.RepoIssueCommentNSID,
144 tangled.RepoIssueNSID,
145 tangled.RepoNSID,
146 tangled.RepoPullNSID,
147 tangled.RepoPullCommentNSID,
148 tangled.SpindleMemberNSID,
149 tangled.SpindleNSID,
150 tangled.StringNSID,
151 },
152 nil,
153 tlog.SubLogger(logger, "jetstream"),
154 d,
155 false,
156
157 // in-memory filter is inapplicable to appview so
158 // we'll never log dids anyway.
159 false,
160 )
161 if err != nil {
162 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
163 }
164
165 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
166 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
167 }
168
169 var notifiers []notify.Notifier
170
171 // Always add the database notifier
172 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
173
174 // Add other notifiers in production only
175 if !config.Core.Dev {
176 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
177 }
178 notifiers = append(notifiers, indexer)
179
180 notifiers = append(notifiers, whnotify.NewNotifier(d))
181
182 notifier := notify.NewMergedNotifier(notifiers)
183 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
184
185 ingester := appview.Ingester{
186 Ctx: ctx,
187 Db: d,
188 Enforcer: enforcer,
189 Acl: aclService,
190 IdResolver: res,
191 Cache: rdb,
192 Config: config,
193 Logger: log.SubLogger(logger, "ingester"),
194 Validator: validator,
195 MentionsResolver: mentionsResolver,
196 Notifier: notifier,
197 Verifier: repoverify.New(res, config.Core.Dev),
198 }
199 err = jc.StartJetstream(ctx, ingester.Ingest())
200 if err != nil {
201 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
202 }
203
204 go ingester.SweepPendingVerifications()
205
206 var cfClient *cloudflare.Client
207 if config.Cloudflare.ApiToken != "" {
208 cfClient, err = cloudflare.New(config)
209 if err != nil {
210 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
211 cfClient = nil
212 }
213 }
214
215 knotstream, err := Knotstream(ctx, config, d, aclService, enforcer, posthog, notifier, cfClient)
216 if err != nil {
217 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
218 }
219 knotstream.Start(ctx)
220
221 state := &State{
222 db: d,
223 notifier: notifier,
224 indexer: indexer,
225 oauth: oauth,
226 enforcer: enforcer,
227 pages: pages,
228 idResolver: res,
229 rdb: rdb,
230 mentionsResolver: mentionsResolver,
231 posthog: posthog,
232 jc: jc,
233 config: config,
234 repoResolver: repoResolver,
235 aclService: aclService,
236 knotstream: knotstream,
237 logger: logger,
238 validator: validator,
239 cfClient: cfClient,
240 }
241
242 // fetch initial bluesky posts if configured
243 go fetchBskyPosts(ctx, res, config, d, logger)
244
245 return state, nil
246}
247
248func (s *State) Close() error {
249 // other close up logic goes here
250 return s.db.Close()
251}
252
253func (s *State) NewSSHServer() *pipelinessh.Server {
254 return pipelinessh.New(s.db, s.config, log.SubLogger(s.logger, "pipelinessh"))
255}
256
257func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
258 w.Header().Set("Content-Type", "text/plain")
259 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
260
261 securityTxt := `Contact: mailto:security@tangled.org
262Preferred-Languages: en
263Canonical: https://tangled.org/.well-known/security.txt
264Expires: 2030-01-01T21:59:00.000Z
265`
266 w.Write([]byte(securityTxt))
267}
268
269func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
270 w.Header().Set("Content-Type", "text/plain")
271 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
272
273 robotsTxt := `# Hello, Tanglers!
274User-agent: *
275Allow: /
276Disallow: /*/*/settings
277Disallow: /settings
278Disallow: /*/*/compare
279Disallow: /*/*/fork
280
281Crawl-delay: 1
282`
283 w.Write([]byte(robotsTxt))
284}
285
286func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
287 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
288 BaseParams: pages.BaseParamsFromContext(r.Context()),
289 })
290}
291
292func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
293 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
294 BaseParams: pages.BaseParamsFromContext(r.Context()),
295 })
296}
297
298func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
299 s.pages.Brand(w, pages.BrandParams{
300 BaseParams: pages.BaseParamsFromContext(r.Context()),
301 })
302}
303
304func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
305 user := s.oauth.GetMultiAccountUser(r)
306 if user == nil {
307 return
308 }
309
310 l := s.logger.With("handler", "UpgradeBanner")
311 l = l.With("did", user.Did)
312
313 regs, err := db.GetRegistrations(
314 s.db,
315 orm.FilterEq("did", user.Did),
316 orm.FilterEq("needs_upgrade", 1),
317 )
318 if err != nil {
319 l.Error("non-fatal: failed to get registrations", "err", err)
320 }
321
322 spindles, err := db.GetSpindles(
323 r.Context(),
324 s.db,
325 orm.FilterEq("owner", user.Did),
326 orm.FilterEq("needs_upgrade", 1),
327 )
328 if err != nil {
329 l.Error("non-fatal: failed to get spindles", "err", err)
330 }
331
332 if regs == nil && spindles == nil {
333 return
334 }
335
336 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
337 Registrations: regs,
338 Spindles: spindles,
339 })
340}
341
342func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) {
343 // target is echoed back from the form via hx-vals so the response span's
344 // id matches the form's hx-target. Fallback keeps the handler useful if
345 // a caller forgets to send it.
346 target := strings.TrimSpace(r.FormValue("target"))
347 if target == "" {
348 target = "home"
349 }
350
351 w.Header().Set("Content-Type", "text/html")
352
353 emailAddr := strings.TrimSpace(r.FormValue("email"))
354 if !email.IsValidEmail(emailAddr) {
355 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{
356 Id: target,
357 Error: "Invalid email address.",
358 })
359 return
360 }
361
362 // For logged-in users, persist the signup locally so the widget stays
363 // hidden across devices. The DB row is the render-time source of truth;
364 // Resend still owns the mailing list itself.
365 if user := s.oauth.GetMultiAccountUser(r); user != nil {
366 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil {
367 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err)
368 }
369 }
370
371 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" {
372 go func() {
373 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil {
374 s.logger.Error("failed to add newsletter contact", "error", err)
375 }
376 }()
377 } else {
378 s.logger.Error(
379 "failed to add newsletter contact, missing resend config",
380 "isKeyPresent", s.config.Resend.ApiKey != "",
381 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "",
382 "emailAddr", emailAddr,
383 )
384 }
385
386 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target})
387}
388
389// NewsletterDismiss records that a logged-in user has dismissed the newsletter
390// widget so it stays hidden across their devices. Anonymous callers get a 204
391// with no DB write — localStorage handles the per-browser fallback.
392func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) {
393 user := s.oauth.GetMultiAccountUser(r)
394 if user == nil {
395 w.WriteHeader(http.StatusNoContent)
396 return
397 }
398
399 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil {
400 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err)
401 }
402 w.WriteHeader(http.StatusNoContent)
403}
404
405func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
406 user := chi.URLParam(r, "user")
407 user = strings.TrimPrefix(user, "@")
408 user = strings.TrimSuffix(user, ".keys")
409
410 if user == "" {
411 w.WriteHeader(http.StatusBadRequest)
412 return
413 }
414
415 id, err := s.idResolver.ResolveIdent(r.Context(), user)
416 if err != nil {
417 w.WriteHeader(http.StatusInternalServerError)
418 return
419 }
420
421 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
422 if err != nil {
423 s.logger.Error("failed to get public keys", "err", err)
424 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
425 return
426 }
427
428 if len(pubKeys) == 0 {
429 w.WriteHeader(http.StatusNoContent)
430 return
431 }
432
433 for _, k := range pubKeys {
434 key := strings.TrimRight(k.Key, "\n")
435 fmt.Fprintln(w, key)
436 }
437}
438
439func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
440 switch r.Method {
441 case http.MethodGet:
442 user := s.oauth.GetMultiAccountUser(r)
443 knots := s.aclService.KnotsForUser(r.Context(), user.Did)
444
445 s.pages.NewRepo(w, pages.NewRepoParams{
446 BaseParams: pages.BaseParamsFromContext(r.Context()),
447 Knots: knots,
448 })
449
450 case http.MethodPost:
451 l := s.logger.With("handler", "NewRepo")
452
453 user := s.oauth.GetMultiAccountUser(r)
454 l = l.With("did", user.Did)
455
456 // form validation
457 domain := r.FormValue("domain")
458 if domain == "" {
459 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
460 return
461 }
462 l = l.With("knot", domain)
463
464 repoName := r.FormValue("name")
465 if repoName == "" {
466 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
467 return
468 }
469
470 if err := models.ValidateRepoName(repoName); err != nil {
471 s.pages.Notice(w, "repo", err.Error())
472 return
473 }
474 repoName = models.StripGitExt(repoName)
475 rkey := strings.ToLower(repoName)
476 l = l.With("repoName", repoName, "rkey", rkey)
477
478 defaultBranch := r.FormValue("branch")
479 if defaultBranch == "" {
480 defaultBranch = "main"
481 }
482 l = l.With("defaultBranch", defaultBranch)
483
484 description := r.FormValue("description")
485 if len([]rune(description)) > 140 {
486 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
487 return
488 }
489
490 // ACL validation
491 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) {
492 l.Info("unauthorized")
493 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
494 return
495 }
496
497 // Check for existing repos
498 existingRepo, err := db.GetRepo(
499 s.db,
500 orm.FilterEq("did", user.Did),
501 orm.FilterEq("rkey", rkey),
502 )
503 if err == nil && existingRepo != nil {
504 l.Info("repo exists")
505 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
506 return
507 }
508
509 atpClient, err := s.oauth.AuthorizedClient(r)
510 if err != nil {
511 l.Error("failed to get authorized client", "err", err)
512 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.")
513 return
514 }
515
516 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
517 l.Info("rkey occupied by prior rename alias")
518 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey))
519 return
520 }
521
522 client, err := s.oauth.ServiceClient(
523 r,
524 oauth.WithService(domain),
525 oauth.WithLxm(tangled.RepoCreateNSID),
526 oauth.WithDev(s.config.Core.Dev),
527 )
528 if err != nil {
529 l.Error("service auth failed", "err", err)
530 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.")
531 return
532 }
533
534 input := &tangled.RepoCreate_Input{
535 Rkey: rkey,
536 Name: rkey,
537 DefaultBranch: &defaultBranch,
538 }
539 createResp, err := tangled.RepoCreate(
540 r.Context(),
541 client,
542 input,
543 )
544 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
545 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
546 s.pages.Notice(w, "repo", err.Error())
547 return
548 }
549
550 var repoDid string
551 if createResp != nil && createResp.RepoDid != nil {
552 repoDid = *createResp.RepoDid
553 }
554 if repoDid == "" {
555 l.Error("knot returned empty repo DID")
556 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
557 return
558 }
559
560 repo := &models.Repo{
561 Did: user.Did,
562 Name: repoName,
563 Knot: domain,
564 Rkey: rkey,
565 Description: description,
566 Created: time.Now(),
567 Labels: s.config.Label.DefaultLabelDefs,
568 RepoDid: repoDid,
569 }
570 record := repo.AsRecord()
571
572 cleanupKnot := func() {
573 go func() {
574 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
575 for attempt, delay := range delays {
576 time.Sleep(delay)
577 deleteClient, dErr := s.oauth.ServiceClient(
578 r,
579 oauth.WithService(domain),
580 oauth.WithLxm(tangled.RepoDeleteNSID),
581 oauth.WithDev(s.config.Core.Dev),
582 )
583 if dErr != nil {
584 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
585 continue
586 }
587 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
588 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
589 Did: user.Did,
590 Name: rkey,
591 Rkey: rkey,
592 }); dErr != nil {
593 cancel()
594 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
595 continue
596 }
597 cancel()
598 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
599 return
600 }
601 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
602 "did", user.Did, "repo", repoName, "knot", domain)
603 }()
604 }
605
606 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
607 Collection: tangled.RepoNSID,
608 Repo: user.Did,
609 Rkey: &rkey,
610 Record: &lexutil.LexiconTypeDecoder{
611 Val: &record,
612 },
613 })
614 if err != nil {
615 l.Info("PDS write failed", "err", err)
616 cleanupKnot()
617 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
618 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey))
619 } else {
620 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
621 }
622 return
623 }
624
625 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey)
626 l = l.With("aturi", aturi)
627 l.Info("wrote to PDS")
628
629 tx, err := s.db.BeginTx(r.Context(), nil)
630 if err != nil {
631 l.Info("txn failed", "err", err)
632 s.pages.Notice(w, "repo", "Failed to save repository information.")
633 return
634 }
635
636 rollback := func() {
637 err1 := tx.Rollback()
638 err2 := s.enforcer.E.LoadPolicy()
639 err3 := rollbackRecord(context.Background(), aturi, atpClient)
640
641 if errors.Is(err1, sql.ErrTxDone) {
642 err1 = nil
643 }
644
645 if errs := errors.Join(err1, err2, err3); errs != nil {
646 l.Error("failed to rollback changes", "errs", errs)
647 }
648
649 if aturi != "" {
650 cleanupKnot()
651 }
652 }
653 defer rollback()
654
655 err = db.AddRepo(tx, repo)
656 if err != nil {
657 l.Error("db write failed", "err", err)
658 s.pages.Notice(w, "repo", "Failed to save repository information.")
659 return
660 }
661
662 rbacPath := repo.RepoIdentifier()
663 err = s.enforcer.AddRepo(user.Did, domain, rbacPath)
664 if err != nil {
665 l.Error("acl setup failed", "err", err)
666 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
667 return
668 }
669
670 err = tx.Commit()
671 if err != nil {
672 l.Error("txn commit failed", "err", err)
673 http.Error(w, err.Error(), http.StatusInternalServerError)
674 return
675 }
676
677 err = s.enforcer.E.SavePolicy()
678 if err != nil {
679 l.Error("acl save failed", "err", err)
680 http.Error(w, err.Error(), http.StatusInternalServerError)
681 return
682 }
683
684 aturi = ""
685
686 s.notifier.NewRepo(r.Context(), repo)
687 switch {
688 case repoDid != "":
689 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
690 default:
691 handle := s.pages.DisplayHandle(r.Context(), user.Did)
692 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey))
693 }
694 }
695}
696
697func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool {
698 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
699 defer cancel()
700 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey)
701 return err == nil && resp != nil
702}
703
704// this is used to rollback changes made to the PDS
705//
706// it is a no-op if the provided ATURI is empty
707func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
708 if aturi == "" {
709 return nil
710 }
711
712 parsed := syntax.ATURI(aturi)
713
714 collection := parsed.Collection().String()
715 repo := parsed.Authority().String()
716 rkey := parsed.RecordKey().String()
717
718 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
719 Collection: collection,
720 Repo: repo,
721 Rkey: rkey,
722 })
723 return err
724}
725
726func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
727 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
728 if err != nil {
729 return err
730 }
731 // already present
732 if len(defaultLabels) == len(defaults) {
733 return nil
734 }
735
736 labelDefs, err := models.FetchLabelDefs(r, defaults)
737 if err != nil {
738 return err
739 }
740
741 // Insert each label definition to the database
742 for _, labelDef := range labelDefs {
743 _, err = db.AddLabelDefinition(e, &labelDef)
744 if err != nil {
745 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
746 }
747 }
748
749 return nil
750}
751
752func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
753 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
754 if err != nil {
755 logger.Error("failed to resolve tangled.org DID", "err", err)
756 return
757 }
758
759 pdsEndpoint := resolved.PDSEndpoint()
760 if pdsEndpoint == "" {
761 logger.Error("no PDS endpoint found for tangled.sh DID")
762 return
763 }
764
765 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
766 if err != nil {
767 logger.Error("failed to create appassword session... skipping fetch", "err", err)
768 return
769 }
770
771 l := log.SubLogger(logger, "bluesky")
772
773 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
774 defer ticker.Stop()
775
776 for {
777 // refresh session if necessary
778 if !session.IsValid() {
779 l.Debug("access token expired, refreshing session")
780 if err := session.RefreshSession(); err != nil {
781 l.Error("failed to refresh session, stopping bluesky updater", "err", err)
782 return
783 }
784 l.Debug("session refreshed")
785 }
786
787 // make client
788 client := xrpc.Client{
789 Auth: &xrpc.AuthInfo{
790 AccessJwt: session.AccessJwt,
791 Did: session.Did,
792 },
793 Host: session.PdsEndpoint,
794 }
795
796 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
797 if err != nil {
798 l.Error("failed to fetch bluesky posts", "err", err)
799 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
800 l.Error("failed to insert bluesky posts", "err", err)
801 } else {
802 l.Info("inserted bluesky posts", "count", len(posts))
803 }
804
805 select {
806 case <-ticker.C:
807 case <-ctx.Done():
808 l.Info("stopping bluesky updater")
809 return
810 }
811 }
812}