Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cloudflare"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/email"
21 "tangled.org/core/appview/indexer"
22 "tangled.org/core/appview/mentions"
23 "tangled.org/core/appview/models"
24 "tangled.org/core/appview/notify"
25 dbnotify "tangled.org/core/appview/notify/db"
26 lognotify "tangled.org/core/appview/notify/logging"
27 phnotify "tangled.org/core/appview/notify/posthog"
28 whnotify "tangled.org/core/appview/notify/webhook"
29 "tangled.org/core/appview/oauth"
30 "tangled.org/core/appview/pages"
31 "tangled.org/core/appview/reporesolver"
32 "tangled.org/core/appview/repoverify"
33 "tangled.org/core/appview/validator"
34 xrpcclient "tangled.org/core/appview/xrpcclient"
35 "tangled.org/core/consts"
36 "tangled.org/core/eventconsumer"
37 "tangled.org/core/idresolver"
38 "tangled.org/core/jetstream"
39 "tangled.org/core/log"
40 tlog "tangled.org/core/log"
41 "tangled.org/core/orm"
42 "tangled.org/core/rbac"
43
44 comatproto "github.com/bluesky-social/indigo/api/atproto"
45 "github.com/bluesky-social/indigo/atproto/atclient"
46 "github.com/bluesky-social/indigo/atproto/syntax"
47 lexutil "github.com/bluesky-social/indigo/lex/util"
48 "github.com/bluesky-social/indigo/xrpc"
49
50 "github.com/go-chi/chi/v5"
51 "github.com/posthog/posthog-go"
52)
53
54type State struct {
55 db *db.DB
56 notifier notify.Notifier
57 indexer *indexer.Indexer
58 oauth *oauth.OAuth
59 enforcer *rbac.Enforcer
60 pages *pages.Pages
61 idResolver *idresolver.Resolver
62 rdb *cache.Cache
63 mentionsResolver *mentions.Resolver
64 posthog posthog.Client
65 jc *jetstream.JetstreamClient
66 config *config.Config
67 repoResolver *reporesolver.RepoResolver
68 knotstream *eventconsumer.Consumer
69 spindlestream *eventconsumer.Consumer
70 logger *slog.Logger
71 validator *validator.Validator
72 cfClient *cloudflare.Client
73}
74
75func Make(ctx context.Context, config *config.Config) (*State, error) {
76 logger := tlog.FromContext(ctx)
77
78 d, err := db.Make(ctx, config.Core.DbPath)
79 if err != nil {
80 return nil, fmt.Errorf("failed to create db: %w", err)
81 }
82
83 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
84 err = indexer.Init(ctx)
85 if err != nil {
86 return nil, fmt.Errorf("failed to create indexer: %w", err)
87 }
88
89 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
90 if err != nil {
91 return nil, fmt.Errorf("failed to create enforcer: %w", err)
92 }
93
94 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
95 if err != nil {
96 logger.Error("failed to create redis resolver", "err", err)
97 res = idresolver.DefaultResolver(config.Plc.PLCURL)
98 }
99
100 var rdb *cache.Cache
101 if config.Redis.Addr != "" {
102 rdb = cache.New(config.Redis.Addr)
103 }
104
105 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
106 if err != nil {
107 return nil, fmt.Errorf("failed to create posthog client: %w", err)
108 }
109
110 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages"))
111 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
112 if err != nil {
113 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
114 }
115 validator := validator.New(d, res, enforcer)
116
117 repoResolver := reporesolver.New(config, enforcer, d, rdb)
118
119 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
120
121 jc, err := jetstream.NewJetstreamClient(
122 config.Jetstream.Endpoint,
123 "appview",
124 []string{
125 tangled.ActorProfileNSID,
126 tangled.FeedStarNSID,
127 tangled.GraphFollowNSID,
128 tangled.GraphVouchNSID,
129 tangled.KnotMemberNSID,
130 tangled.KnotNSID,
131 tangled.LabelDefinitionNSID,
132 tangled.LabelOpNSID,
133 tangled.PublicKeyNSID,
134 tangled.RepoArtifactNSID,
135 tangled.RepoIssueCommentNSID,
136 tangled.RepoIssueNSID,
137 tangled.RepoNSID,
138 tangled.RepoPullNSID,
139 tangled.SpindleMemberNSID,
140 tangled.SpindleNSID,
141 tangled.StringNSID,
142 },
143 nil,
144 tlog.SubLogger(logger, "jetstream"),
145 d,
146 false,
147
148 // in-memory filter is inapplicable to appview so
149 // we'll never log dids anyway.
150 false,
151 )
152 if err != nil {
153 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
154 }
155
156 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
157 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
158 }
159
160 var notifiers []notify.Notifier
161
162 // Always add the database notifier
163 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
164
165 // Add other notifiers in production only
166 if !config.Core.Dev {
167 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
168 }
169 notifiers = append(notifiers, indexer)
170
171 notifiers = append(notifiers, whnotify.NewNotifier(d))
172
173 notifier := notify.NewMergedNotifier(notifiers)
174 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
175
176 ingester := appview.Ingester{
177 Db: d,
178 Enforcer: enforcer,
179 IdResolver: res,
180 Cache: rdb,
181 Config: config,
182 Logger: log.SubLogger(logger, "ingester"),
183 Validator: validator,
184 Notifier: notifier,
185 Verifier: repoverify.New(res, config.Core.Dev),
186 }
187 err = jc.StartJetstream(ctx, ingester.Ingest())
188 if err != nil {
189 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
190 }
191
192 var cfClient *cloudflare.Client
193 if config.Cloudflare.ApiToken != "" {
194 cfClient, err = cloudflare.New(config)
195 if err != nil {
196 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
197 cfClient = nil
198 }
199 }
200
201 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
202 if err != nil {
203 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
204 }
205 knotstream.Start(ctx)
206
207 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
208 if err != nil {
209 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
210 }
211 spindlestream.Start(ctx)
212
213 state := &State{
214 db: d,
215 notifier: notifier,
216 indexer: indexer,
217 oauth: oauth,
218 enforcer: enforcer,
219 pages: pages,
220 idResolver: res,
221 rdb: rdb,
222 mentionsResolver: mentionsResolver,
223 posthog: posthog,
224 jc: jc,
225 config: config,
226 repoResolver: repoResolver,
227 knotstream: knotstream,
228 spindlestream: spindlestream,
229 logger: logger,
230 validator: validator,
231 cfClient: cfClient,
232 }
233
234 // fetch initial bluesky posts if configured
235 go fetchBskyPosts(ctx, res, config, d, logger)
236
237 return state, nil
238}
239
240func (s *State) Close() error {
241 // other close up logic goes here
242 return s.db.Close()
243}
244
245func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
246 w.Header().Set("Content-Type", "text/plain")
247 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
248
249 securityTxt := `Contact: mailto:security@tangled.org
250Preferred-Languages: en
251Canonical: https://tangled.org/.well-known/security.txt
252Expires: 2030-01-01T21:59:00.000Z
253`
254 w.Write([]byte(securityTxt))
255}
256
257func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
258 w.Header().Set("Content-Type", "text/plain")
259 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
260
261 robotsTxt := `# Hello, Tanglers!
262User-agent: *
263Allow: /
264Disallow: /*/*/settings
265Disallow: /settings
266Disallow: /*/*/compare
267Disallow: /*/*/fork
268
269Crawl-delay: 1
270`
271 w.Write([]byte(robotsTxt))
272}
273
274func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
275 user := s.oauth.GetMultiAccountUser(r)
276 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
277 LoggedInUser: user,
278 })
279}
280
281func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
282 user := s.oauth.GetMultiAccountUser(r)
283 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
284 LoggedInUser: user,
285 })
286}
287
288func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
289 user := s.oauth.GetMultiAccountUser(r)
290 s.pages.Brand(w, pages.BrandParams{
291 LoggedInUser: user,
292 })
293}
294
295func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
296 user := s.oauth.GetMultiAccountUser(r)
297 if user == nil {
298 return
299 }
300
301 l := s.logger.With("handler", "UpgradeBanner")
302 l = l.With("did", user.Did)
303
304 regs, err := db.GetRegistrations(
305 s.db,
306 orm.FilterEq("did", user.Did),
307 orm.FilterEq("needs_upgrade", 1),
308 )
309 if err != nil {
310 l.Error("non-fatal: failed to get registrations", "err", err)
311 }
312
313 spindles, err := db.GetSpindles(
314 r.Context(),
315 s.db,
316 orm.FilterEq("owner", user.Did),
317 orm.FilterEq("needs_upgrade", 1),
318 )
319 if err != nil {
320 l.Error("non-fatal: failed to get spindles", "err", err)
321 }
322
323 if regs == nil && spindles == nil {
324 return
325 }
326
327 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
328 Registrations: regs,
329 Spindles: spindles,
330 })
331}
332
333func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) {
334 // target is echoed back from the form via hx-vals so the response span's
335 // id matches the form's hx-target. Fallback keeps the handler useful if
336 // a caller forgets to send it.
337 target := strings.TrimSpace(r.FormValue("target"))
338 if target == "" {
339 target = "home"
340 }
341
342 w.Header().Set("Content-Type", "text/html")
343
344 emailAddr := strings.TrimSpace(r.FormValue("email"))
345 if !email.IsValidEmail(emailAddr) {
346 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{
347 Id: target,
348 Error: "Invalid email address.",
349 })
350 return
351 }
352
353 // For logged-in users, persist the signup locally so the widget stays
354 // hidden across devices. The DB row is the render-time source of truth;
355 // Resend still owns the mailing list itself.
356 if user := s.oauth.GetMultiAccountUser(r); user != nil {
357 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil {
358 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err)
359 }
360 }
361
362 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" {
363 go func() {
364 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil {
365 s.logger.Error("failed to add newsletter contact", "error", err)
366 }
367 }()
368 } else {
369 s.logger.Error(
370 "failed to add newsletter contact, missing resend config",
371 "isKeyPresent", s.config.Resend.ApiKey != "",
372 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "",
373 "emailAddr", emailAddr,
374 )
375 }
376
377 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target})
378}
379
380// NewsletterDismiss records that a logged-in user has dismissed the newsletter
381// widget so it stays hidden across their devices. Anonymous callers get a 204
382// with no DB write — localStorage handles the per-browser fallback.
383func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) {
384 user := s.oauth.GetMultiAccountUser(r)
385 if user == nil {
386 w.WriteHeader(http.StatusNoContent)
387 return
388 }
389
390 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil {
391 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err)
392 }
393 w.WriteHeader(http.StatusNoContent)
394}
395
396func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
397 user := chi.URLParam(r, "user")
398 user = strings.TrimPrefix(user, "@")
399
400 if user == "" {
401 w.WriteHeader(http.StatusBadRequest)
402 return
403 }
404
405 id, err := s.idResolver.ResolveIdent(r.Context(), user)
406 if err != nil {
407 w.WriteHeader(http.StatusInternalServerError)
408 return
409 }
410
411 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
412 if err != nil {
413 s.logger.Error("failed to get public keys", "err", err)
414 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
415 return
416 }
417
418 if len(pubKeys) == 0 {
419 w.WriteHeader(http.StatusNoContent)
420 return
421 }
422
423 for _, k := range pubKeys {
424 key := strings.TrimRight(k.Key, "\n")
425 fmt.Fprintln(w, key)
426 }
427}
428
429func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
430 switch r.Method {
431 case http.MethodGet:
432 user := s.oauth.GetMultiAccountUser(r)
433 knots, err := s.enforcer.GetKnotsForUser(user.Did)
434 if err != nil {
435 s.pages.Notice(w, "repo", "Invalid user account.")
436 return
437 }
438
439 s.pages.NewRepo(w, pages.NewRepoParams{
440 LoggedInUser: user,
441 Knots: knots,
442 })
443
444 case http.MethodPost:
445 l := s.logger.With("handler", "NewRepo")
446
447 user := s.oauth.GetMultiAccountUser(r)
448 l = l.With("did", user.Did)
449
450 // form validation
451 domain := r.FormValue("domain")
452 if domain == "" {
453 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
454 return
455 }
456 l = l.With("knot", domain)
457
458 repoName := r.FormValue("name")
459 if repoName == "" {
460 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
461 return
462 }
463
464 if err := models.ValidateRepoName(repoName); err != nil {
465 s.pages.Notice(w, "repo", err.Error())
466 return
467 }
468 repoName = models.StripGitExt(repoName)
469 rkey := strings.ToLower(repoName)
470 l = l.With("repoName", repoName, "rkey", rkey)
471
472 defaultBranch := r.FormValue("branch")
473 if defaultBranch == "" {
474 defaultBranch = "main"
475 }
476 l = l.With("defaultBranch", defaultBranch)
477
478 description := r.FormValue("description")
479 if len([]rune(description)) > 140 {
480 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
481 return
482 }
483
484 // ACL validation
485 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
486 if err != nil || !ok {
487 l.Info("unauthorized")
488 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
489 return
490 }
491
492 // Check for existing repos
493 existingRepo, err := db.GetRepo(
494 s.db,
495 orm.FilterEq("did", user.Did),
496 orm.FilterEq("rkey", rkey),
497 )
498 if err == nil && existingRepo != nil {
499 l.Info("repo exists")
500 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
501 return
502 }
503
504 atpClient, err := s.oauth.AuthorizedClient(r)
505 if err != nil {
506 l.Error("failed to get authorized client", "err", err)
507 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.")
508 return
509 }
510
511 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
512 l.Info("rkey occupied by prior rename alias")
513 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey))
514 return
515 }
516
517 client, err := s.oauth.ServiceClient(
518 r,
519 oauth.WithService(domain),
520 oauth.WithLxm(tangled.RepoCreateNSID),
521 oauth.WithDev(s.config.Core.Dev),
522 )
523 if err != nil {
524 l.Error("service auth failed", "err", err)
525 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.")
526 return
527 }
528
529 input := &tangled.RepoCreate_Input{
530 Rkey: rkey,
531 Name: rkey,
532 DefaultBranch: &defaultBranch,
533 }
534 createResp, err := tangled.RepoCreate(
535 r.Context(),
536 client,
537 input,
538 )
539 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
540 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
541 s.pages.Notice(w, "repo", err.Error())
542 return
543 }
544
545 var repoDid string
546 if createResp != nil && createResp.RepoDid != nil {
547 repoDid = *createResp.RepoDid
548 }
549 if repoDid == "" {
550 l.Error("knot returned empty repo DID")
551 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
552 return
553 }
554
555 repo := &models.Repo{
556 Did: user.Did,
557 Name: repoName,
558 Knot: domain,
559 Rkey: rkey,
560 Description: description,
561 Created: time.Now(),
562 Labels: s.config.Label.DefaultLabelDefs,
563 RepoDid: repoDid,
564 }
565 record := repo.AsRecord()
566
567 cleanupKnot := func() {
568 go func() {
569 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
570 for attempt, delay := range delays {
571 time.Sleep(delay)
572 deleteClient, dErr := s.oauth.ServiceClient(
573 r,
574 oauth.WithService(domain),
575 oauth.WithLxm(tangled.RepoDeleteNSID),
576 oauth.WithDev(s.config.Core.Dev),
577 )
578 if dErr != nil {
579 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
580 continue
581 }
582 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
583 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
584 Did: user.Did,
585 Name: rkey,
586 Rkey: rkey,
587 }); dErr != nil {
588 cancel()
589 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
590 continue
591 }
592 cancel()
593 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
594 return
595 }
596 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
597 "did", user.Did, "repo", repoName, "knot", domain)
598 }()
599 }
600
601 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
602 Collection: tangled.RepoNSID,
603 Repo: user.Did,
604 Rkey: &rkey,
605 Record: &lexutil.LexiconTypeDecoder{
606 Val: &record,
607 },
608 })
609 if err != nil {
610 l.Info("PDS write failed", "err", err)
611 cleanupKnot()
612 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
613 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey))
614 } else {
615 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
616 }
617 return
618 }
619
620 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey)
621 l = l.With("aturi", aturi)
622 l.Info("wrote to PDS")
623
624 tx, err := s.db.BeginTx(r.Context(), nil)
625 if err != nil {
626 l.Info("txn failed", "err", err)
627 s.pages.Notice(w, "repo", "Failed to save repository information.")
628 return
629 }
630
631 rollback := func() {
632 err1 := tx.Rollback()
633 err2 := s.enforcer.E.LoadPolicy()
634 err3 := rollbackRecord(context.Background(), aturi, atpClient)
635
636 if errors.Is(err1, sql.ErrTxDone) {
637 err1 = nil
638 }
639
640 if errs := errors.Join(err1, err2, err3); errs != nil {
641 l.Error("failed to rollback changes", "errs", errs)
642 }
643
644 if aturi != "" {
645 cleanupKnot()
646 }
647 }
648 defer rollback()
649
650 err = db.AddRepo(tx, repo)
651 if err != nil {
652 l.Error("db write failed", "err", err)
653 s.pages.Notice(w, "repo", "Failed to save repository information.")
654 return
655 }
656
657 rbacPath := repo.RepoIdentifier()
658 err = s.enforcer.AddRepo(user.Did, domain, rbacPath)
659 if err != nil {
660 l.Error("acl setup failed", "err", err)
661 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
662 return
663 }
664
665 err = tx.Commit()
666 if err != nil {
667 l.Error("txn commit failed", "err", err)
668 http.Error(w, err.Error(), http.StatusInternalServerError)
669 return
670 }
671
672 err = s.enforcer.E.SavePolicy()
673 if err != nil {
674 l.Error("acl save failed", "err", err)
675 http.Error(w, err.Error(), http.StatusInternalServerError)
676 return
677 }
678
679 aturi = ""
680
681 s.notifier.NewRepo(r.Context(), repo)
682 switch {
683 case repoDid != "":
684 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
685 default:
686 handle := s.pages.DisplayHandle(r.Context(), user.Did)
687 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey))
688 }
689 }
690}
691
692func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool {
693 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
694 defer cancel()
695 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey)
696 return err == nil && resp != nil
697}
698
699// this is used to rollback changes made to the PDS
700//
701// it is a no-op if the provided ATURI is empty
702func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
703 if aturi == "" {
704 return nil
705 }
706
707 parsed := syntax.ATURI(aturi)
708
709 collection := parsed.Collection().String()
710 repo := parsed.Authority().String()
711 rkey := parsed.RecordKey().String()
712
713 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
714 Collection: collection,
715 Repo: repo,
716 Rkey: rkey,
717 })
718 return err
719}
720
721func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
722 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
723 if err != nil {
724 return err
725 }
726 // already present
727 if len(defaultLabels) == len(defaults) {
728 return nil
729 }
730
731 labelDefs, err := models.FetchLabelDefs(r, defaults)
732 if err != nil {
733 return err
734 }
735
736 // Insert each label definition to the database
737 for _, labelDef := range labelDefs {
738 _, err = db.AddLabelDefinition(e, &labelDef)
739 if err != nil {
740 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
741 }
742 }
743
744 return nil
745}
746
747func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
748 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
749 if err != nil {
750 logger.Error("failed to resolve tangled.org DID", "err", err)
751 return
752 }
753
754 pdsEndpoint := resolved.PDSEndpoint()
755 if pdsEndpoint == "" {
756 logger.Error("no PDS endpoint found for tangled.sh DID")
757 return
758 }
759
760 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
761 if err != nil {
762 logger.Error("failed to create appassword session... skipping fetch", "err", err)
763 return
764 }
765
766 l := log.SubLogger(logger, "bluesky")
767
768 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
769 defer ticker.Stop()
770
771 for {
772 // refresh session if necessary
773 if !session.IsValid() {
774 l.Debug("access token expired, refreshing session")
775 if err := session.RefreshSession(); err != nil {
776 l.Error("failed to refresh session, stopping bluesky updater", "err", err)
777 return
778 }
779 l.Debug("session refreshed")
780 }
781
782 // make client
783 client := xrpc.Client{
784 Auth: &xrpc.AuthInfo{
785 AccessJwt: session.AccessJwt,
786 Did: session.Did,
787 },
788 Host: session.PdsEndpoint,
789 }
790
791 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
792 if err != nil {
793 l.Error("failed to fetch bluesky posts", "err", err)
794 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
795 l.Error("failed to insert bluesky posts", "err", err)
796 } else {
797 l.Info("inserted bluesky posts", "count", len(posts))
798 }
799
800 select {
801 case <-ticker.C:
802 case <-ctx.Done():
803 l.Info("stopping bluesky updater")
804 return
805 }
806 }
807}