Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cloudflare"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/email"
21 "tangled.org/core/appview/indexer"
22 "tangled.org/core/appview/knotacl"
23 "tangled.org/core/appview/knotcompat"
24 "tangled.org/core/appview/mentions"
25 "tangled.org/core/appview/models"
26 "tangled.org/core/appview/notify"
27 dbnotify "tangled.org/core/appview/notify/db"
28 lognotify "tangled.org/core/appview/notify/logging"
29 phnotify "tangled.org/core/appview/notify/posthog"
30 whnotify "tangled.org/core/appview/notify/webhook"
31 "tangled.org/core/appview/oauth"
32 "tangled.org/core/appview/pages"
33 "tangled.org/core/appview/pipelines"
34 pipelinessh "tangled.org/core/appview/pipelines/ssh"
35 "tangled.org/core/appview/reporesolver"
36 "tangled.org/core/appview/repoverify"
37 xrpcclient "tangled.org/core/appview/xrpcclient"
38 "tangled.org/core/consts"
39 "tangled.org/core/eventconsumer"
40 "tangled.org/core/idresolver"
41 "tangled.org/core/jetstream"
42 "tangled.org/core/log"
43 tlog "tangled.org/core/log"
44 "tangled.org/core/orm"
45 "tangled.org/core/rbac"
46
47 comatproto "github.com/bluesky-social/indigo/api/atproto"
48 "github.com/bluesky-social/indigo/atproto/atclient"
49 "github.com/bluesky-social/indigo/atproto/syntax"
50 lexutil "github.com/bluesky-social/indigo/lex/util"
51 "github.com/bluesky-social/indigo/xrpc"
52
53 "github.com/go-chi/chi/v5"
54 "github.com/posthog/posthog-go"
55)
56
57type State struct {
58 db *db.DB
59 notifier notify.Notifier
60 indexer *indexer.Indexer
61 oauth *oauth.OAuth
62 enforcer *rbac.Enforcer
63 pages *pages.Pages
64 idResolver *idresolver.Resolver
65 rdb *cache.Cache
66 mentionsResolver *mentions.Resolver
67 posthog posthog.Client
68 jc *jetstream.JetstreamClient
69 config *config.Config
70 repoResolver *reporesolver.RepoResolver
71 aclService *knotacl.Service
72 knotstream *eventconsumer.Consumer
73 spindlestream *eventconsumer.Consumer
74 pipelineNotifier *pipelines.StatusNotifier
75 logger *slog.Logger
76 cfClient *cloudflare.Client
77}
78
79func Make(ctx context.Context, config *config.Config) (*State, error) {
80 logger := tlog.FromContext(ctx)
81
82 d, err := db.Make(ctx, config.Core.DbPath)
83 if err != nil {
84 return nil, fmt.Errorf("failed to create db: %w", err)
85 }
86
87 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
88 err = indexer.Init(ctx)
89 if err != nil {
90 return nil, fmt.Errorf("failed to create indexer: %w", err)
91 }
92
93 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
94 if err != nil {
95 return nil, fmt.Errorf("failed to create enforcer: %w", err)
96 }
97
98 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
99 if err != nil {
100 logger.Error("failed to create redis resolver", "err", err)
101 res = idresolver.DefaultResolver(config.Plc.PLCURL)
102 }
103
104 var rdb *cache.Cache
105 if config.Redis.Addr != "" {
106 rdb = cache.New(config.Redis.Addr)
107 }
108
109 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
110 if err != nil {
111 return nil, fmt.Errorf("failed to create posthog client: %w", err)
112 }
113
114 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages"))
115 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch")))
116 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl"))
117 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth"))
118 if err != nil {
119 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
120 }
121 repoResolver := reporesolver.New(config, aclService, d, rdb)
122
123 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
124
125 jc, err := jetstream.NewJetstreamClient(
126 config.Jetstream.Endpoint,
127 "appview",
128 []string{
129 tangled.ActorProfileNSID,
130 tangled.FeedStarNSID,
131 tangled.FeedReactionNSID,
132 tangled.FeedCommentNSID,
133 tangled.GraphFollowNSID,
134 tangled.GraphVouchNSID,
135 tangled.KnotMemberNSID,
136 tangled.KnotNSID,
137 tangled.LabelDefinitionNSID,
138 tangled.LabelOpNSID,
139 tangled.PublicKeyNSID,
140 tangled.RepoArtifactNSID,
141 tangled.RepoIssueCommentNSID,
142 tangled.RepoIssueNSID,
143 tangled.RepoNSID,
144 tangled.RepoPullNSID,
145 tangled.RepoPullCommentNSID,
146 tangled.SpindleMemberNSID,
147 tangled.SpindleNSID,
148 tangled.StringNSID,
149 },
150 nil,
151 tlog.SubLogger(logger, "jetstream"),
152 d,
153 false,
154
155 // in-memory filter is inapplicable to appview so
156 // we'll never log dids anyway.
157 false,
158 )
159 if err != nil {
160 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
161 }
162
163 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
164 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
165 }
166
167 var notifiers []notify.Notifier
168
169 // Always add the database notifier
170 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
171
172 // Add other notifiers in production only
173 if !config.Core.Dev {
174 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
175 }
176 notifiers = append(notifiers, indexer)
177
178 notifiers = append(notifiers, whnotify.NewNotifier(d))
179
180 notifier := notify.NewMergedNotifier(notifiers)
181 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
182
183 ingester := appview.Ingester{
184 Ctx: ctx,
185 Db: d,
186 Enforcer: enforcer,
187 Acl: aclService,
188 IdResolver: res,
189 Cache: rdb,
190 Config: config,
191 Logger: log.SubLogger(logger, "ingester"),
192 MentionsResolver: mentionsResolver,
193 Notifier: notifier,
194 Verifier: repoverify.New(res, config.Core.Dev),
195 }
196 err = jc.StartJetstream(ctx, ingester.Ingest())
197 if err != nil {
198 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
199 }
200
201 go ingester.SweepPendingVerifications()
202
203 var cfClient *cloudflare.Client
204 if config.Cloudflare.ApiToken != "" {
205 cfClient, err = cloudflare.New(config)
206 if err != nil {
207 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
208 cfClient = nil
209 }
210 }
211
212 knotstream, err := Knotstream(ctx, config, d, aclService, enforcer, posthog, notifier, cfClient)
213 if err != nil {
214 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
215 }
216 knotstream.Start(ctx)
217
218 pipelineNotifier := pipelines.NewStatusNotifier()
219
220 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier)
221 if err != nil {
222 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
223 }
224 spindlestream.Start(ctx)
225
226 state := &State{
227 db: d,
228 notifier: notifier,
229 indexer: indexer,
230 oauth: oauth,
231 enforcer: enforcer,
232 pages: pages,
233 idResolver: res,
234 rdb: rdb,
235 mentionsResolver: mentionsResolver,
236 posthog: posthog,
237 jc: jc,
238 config: config,
239 repoResolver: repoResolver,
240 aclService: aclService,
241 knotstream: knotstream,
242 spindlestream: spindlestream,
243 pipelineNotifier: pipelineNotifier,
244 logger: logger,
245 cfClient: cfClient,
246 }
247
248 // fetch initial bluesky posts if configured
249 go fetchBskyPosts(ctx, res, config, d, logger)
250
251 return state, nil
252}
253
254func (s *State) Close() error {
255 // other close up logic goes here
256 return s.db.Close()
257}
258
259func (s *State) NewSSHServer() *pipelinessh.Server {
260 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh"))
261}
262
263func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
264 w.Header().Set("Content-Type", "text/plain")
265 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
266
267 securityTxt := `Contact: mailto:security@tangled.org
268Preferred-Languages: en
269Canonical: https://tangled.org/.well-known/security.txt
270Expires: 2030-01-01T21:59:00.000Z
271`
272 w.Write([]byte(securityTxt))
273}
274
275func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
276 w.Header().Set("Content-Type", "text/plain")
277 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
278
279 robotsTxt := `# Hello, Tanglers!
280User-agent: *
281Allow: /
282Disallow: /*/*/settings
283Disallow: /settings
284Disallow: /*/*/compare
285Disallow: /*/*/fork
286
287Crawl-delay: 1
288`
289 w.Write([]byte(robotsTxt))
290}
291
292func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
293 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
294 BaseParams: pages.BaseParamsFromContext(r.Context()),
295 })
296}
297
298func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
299 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
300 BaseParams: pages.BaseParamsFromContext(r.Context()),
301 })
302}
303
304func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
305 s.pages.Brand(w, pages.BrandParams{
306 BaseParams: pages.BaseParamsFromContext(r.Context()),
307 })
308}
309
310func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
311 user := s.oauth.GetMultiAccountUser(r)
312 if user == nil {
313 return
314 }
315
316 l := s.logger.With("handler", "UpgradeBanner")
317 l = l.With("did", user.Did)
318
319 regs, err := db.GetRegistrations(
320 s.db,
321 orm.FilterEq("did", user.Did),
322 orm.FilterEq("needs_upgrade", 1),
323 )
324 if err != nil {
325 l.Error("non-fatal: failed to get registrations", "err", err)
326 }
327
328 spindles, err := db.GetSpindles(
329 r.Context(),
330 s.db,
331 orm.FilterEq("owner", user.Did),
332 orm.FilterEq("needs_upgrade", 1),
333 )
334 if err != nil {
335 l.Error("non-fatal: failed to get spindles", "err", err)
336 }
337
338 if regs == nil && spindles == nil {
339 return
340 }
341
342 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
343 Registrations: regs,
344 Spindles: spindles,
345 })
346}
347
348func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) {
349 // target is echoed back from the form via hx-vals so the response span's
350 // id matches the form's hx-target. Fallback keeps the handler useful if
351 // a caller forgets to send it.
352 target := strings.TrimSpace(r.FormValue("target"))
353 if target == "" {
354 target = "home"
355 }
356
357 w.Header().Set("Content-Type", "text/html")
358
359 emailAddr := strings.TrimSpace(r.FormValue("email"))
360 if !email.IsValidEmail(emailAddr) {
361 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{
362 Id: target,
363 Error: "Invalid email address.",
364 })
365 return
366 }
367
368 // For logged-in users, persist the signup locally so the widget stays
369 // hidden across devices. The DB row is the render-time source of truth;
370 // Resend still owns the mailing list itself.
371 if user := s.oauth.GetMultiAccountUser(r); user != nil {
372 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil {
373 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err)
374 }
375 }
376
377 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" {
378 go func() {
379 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil {
380 s.logger.Error("failed to add newsletter contact", "error", err)
381 }
382 }()
383 } else {
384 s.logger.Error(
385 "failed to add newsletter contact, missing resend config",
386 "isKeyPresent", s.config.Resend.ApiKey != "",
387 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "",
388 "emailAddr", emailAddr,
389 )
390 }
391
392 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target})
393}
394
395// NewsletterDismiss records that a logged-in user has dismissed the newsletter
396// widget so it stays hidden across their devices. Anonymous callers get a 204
397// with no DB write — localStorage handles the per-browser fallback.
398func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) {
399 user := s.oauth.GetMultiAccountUser(r)
400 if user == nil {
401 w.WriteHeader(http.StatusNoContent)
402 return
403 }
404
405 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil {
406 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err)
407 }
408 w.WriteHeader(http.StatusNoContent)
409}
410
411func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
412 user := chi.URLParam(r, "user")
413 user = strings.TrimPrefix(user, "@")
414
415 if user == "" {
416 w.WriteHeader(http.StatusBadRequest)
417 return
418 }
419
420 id, err := s.idResolver.ResolveIdent(r.Context(), user)
421 if err != nil {
422 w.WriteHeader(http.StatusInternalServerError)
423 return
424 }
425
426 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
427 if err != nil {
428 s.logger.Error("failed to get public keys", "err", err)
429 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
430 return
431 }
432
433 if len(pubKeys) == 0 {
434 w.WriteHeader(http.StatusNoContent)
435 return
436 }
437
438 for _, k := range pubKeys {
439 key := strings.TrimRight(k.Key, "\n")
440 fmt.Fprintln(w, key)
441 }
442}
443
444func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
445 switch r.Method {
446 case http.MethodGet:
447 user := s.oauth.GetMultiAccountUser(r)
448 knots := s.aclService.KnotsForUser(r.Context(), user.Did)
449
450 s.pages.NewRepo(w, pages.NewRepoParams{
451 BaseParams: pages.BaseParamsFromContext(r.Context()),
452 Knots: knots,
453 })
454
455 case http.MethodPost:
456 l := s.logger.With("handler", "NewRepo")
457
458 user := s.oauth.GetMultiAccountUser(r)
459 l = l.With("did", user.Did)
460
461 // form validation
462 domain := r.FormValue("domain")
463 if domain == "" {
464 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
465 return
466 }
467 l = l.With("knot", domain)
468
469 repoName := r.FormValue("name")
470 if repoName == "" {
471 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
472 return
473 }
474
475 if err := models.ValidateRepoName(repoName); err != nil {
476 s.pages.Notice(w, "repo", err.Error())
477 return
478 }
479 repoName = models.StripGitExt(repoName)
480 rkey := strings.ToLower(repoName)
481 l = l.With("repoName", repoName, "rkey", rkey)
482
483 defaultBranch := r.FormValue("branch")
484 if defaultBranch == "" {
485 defaultBranch = "main"
486 }
487 l = l.With("defaultBranch", defaultBranch)
488
489 description := r.FormValue("description")
490 if len([]rune(description)) > 140 {
491 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
492 return
493 }
494
495 // ACL validation
496 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) {
497 l.Info("unauthorized")
498 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
499 return
500 }
501
502 // Check for existing repos
503 existingRepo, err := db.GetRepo(
504 s.db,
505 orm.FilterEq("did", user.Did),
506 orm.FilterEq("rkey", rkey),
507 )
508 if err == nil && existingRepo != nil {
509 l.Info("repo exists")
510 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
511 return
512 }
513
514 atpClient, err := s.oauth.AuthorizedClient(r)
515 if err != nil {
516 l.Error("failed to get authorized client", "err", err)
517 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.")
518 return
519 }
520
521 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
522 l.Info("rkey occupied by prior rename alias")
523 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey))
524 return
525 }
526
527 client, err := s.oauth.ServiceClient(
528 r,
529 oauth.WithService(domain),
530 oauth.WithLxm(tangled.RepoCreateNSID),
531 oauth.WithDev(s.config.Core.Dev),
532 )
533 if err != nil {
534 l.Error("service auth failed", "err", err)
535 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.")
536 return
537 }
538
539 input := &tangled.RepoCreate_Input{
540 Rkey: rkey,
541 Name: rkey,
542 DefaultBranch: &defaultBranch,
543 }
544 createResp, err := tangled.RepoCreate(
545 r.Context(),
546 client,
547 input,
548 )
549 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
550 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
551 s.pages.Notice(w, "repo", err.Error())
552 return
553 }
554
555 var repoDid string
556 if createResp != nil && createResp.RepoDid != nil {
557 repoDid = *createResp.RepoDid
558 }
559 if repoDid == "" {
560 l.Error("knot returned empty repo DID")
561 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
562 return
563 }
564
565 repo := &models.Repo{
566 Did: user.Did,
567 Name: repoName,
568 Knot: domain,
569 Rkey: rkey,
570 Description: description,
571 Created: time.Now(),
572 Labels: s.config.Label.DefaultLabelDefs,
573 RepoDid: repoDid,
574 }
575 record := repo.AsRecord()
576
577 cleanupKnot := func() {
578 go func() {
579 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
580 for attempt, delay := range delays {
581 time.Sleep(delay)
582 deleteClient, dErr := s.oauth.ServiceClient(
583 r,
584 oauth.WithService(domain),
585 oauth.WithLxm(tangled.RepoDeleteNSID),
586 oauth.WithDev(s.config.Core.Dev),
587 )
588 if dErr != nil {
589 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
590 continue
591 }
592 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
593 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
594 Did: user.Did,
595 Name: rkey,
596 Rkey: rkey,
597 }); dErr != nil {
598 cancel()
599 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
600 continue
601 }
602 cancel()
603 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
604 return
605 }
606 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
607 "did", user.Did, "repo", repoName, "knot", domain)
608 }()
609 }
610
611 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
612 Collection: tangled.RepoNSID,
613 Repo: user.Did,
614 Rkey: &rkey,
615 Record: &lexutil.LexiconTypeDecoder{
616 Val: &record,
617 },
618 })
619 if err != nil {
620 l.Info("PDS write failed", "err", err)
621 cleanupKnot()
622 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
623 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey))
624 } else {
625 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
626 }
627 return
628 }
629
630 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey)
631 l = l.With("aturi", aturi)
632 l.Info("wrote to PDS")
633
634 tx, err := s.db.BeginTx(r.Context(), nil)
635 if err != nil {
636 l.Info("txn failed", "err", err)
637 s.pages.Notice(w, "repo", "Failed to save repository information.")
638 return
639 }
640
641 rollback := func() {
642 err1 := tx.Rollback()
643 err2 := s.enforcer.E.LoadPolicy()
644 err3 := rollbackRecord(context.Background(), aturi, atpClient)
645
646 if errors.Is(err1, sql.ErrTxDone) {
647 err1 = nil
648 }
649
650 if errs := errors.Join(err1, err2, err3); errs != nil {
651 l.Error("failed to rollback changes", "errs", errs)
652 }
653
654 if aturi != "" {
655 cleanupKnot()
656 }
657 }
658 defer rollback()
659
660 err = db.AddRepo(tx, repo)
661 if err != nil {
662 l.Error("db write failed", "err", err)
663 s.pages.Notice(w, "repo", "Failed to save repository information.")
664 return
665 }
666
667 rbacPath := repo.RepoIdentifier()
668 err = s.enforcer.AddRepo(user.Did, domain, rbacPath)
669 if err != nil {
670 l.Error("acl setup failed", "err", err)
671 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
672 return
673 }
674
675 err = tx.Commit()
676 if err != nil {
677 l.Error("txn commit failed", "err", err)
678 http.Error(w, err.Error(), http.StatusInternalServerError)
679 return
680 }
681
682 err = s.enforcer.E.SavePolicy()
683 if err != nil {
684 l.Error("acl save failed", "err", err)
685 http.Error(w, err.Error(), http.StatusInternalServerError)
686 return
687 }
688
689 aturi = ""
690
691 s.notifier.NewRepo(r.Context(), repo)
692 switch {
693 case repoDid != "":
694 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
695 default:
696 handle := s.pages.DisplayHandle(r.Context(), user.Did)
697 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey))
698 }
699 }
700}
701
702func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool {
703 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
704 defer cancel()
705 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey)
706 return err == nil && resp != nil
707}
708
709// this is used to rollback changes made to the PDS
710//
711// it is a no-op if the provided ATURI is empty
712func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
713 if aturi == "" {
714 return nil
715 }
716
717 parsed := syntax.ATURI(aturi)
718
719 collection := parsed.Collection().String()
720 repo := parsed.Authority().String()
721 rkey := parsed.RecordKey().String()
722
723 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
724 Collection: collection,
725 Repo: repo,
726 Rkey: rkey,
727 })
728 return err
729}
730
731func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
732 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
733 if err != nil {
734 return err
735 }
736 // already present
737 if len(defaultLabels) == len(defaults) {
738 return nil
739 }
740
741 labelDefs, err := models.FetchLabelDefs(r, defaults)
742 if err != nil {
743 return err
744 }
745
746 // Insert each label definition to the database
747 for _, labelDef := range labelDefs {
748 _, err = db.AddLabelDefinition(e, &labelDef)
749 if err != nil {
750 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
751 }
752 }
753
754 return nil
755}
756
757func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
758 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
759 if err != nil {
760 logger.Error("failed to resolve tangled.org DID", "err", err)
761 return
762 }
763
764 pdsEndpoint := resolved.PDSEndpoint()
765 if pdsEndpoint == "" {
766 logger.Error("no PDS endpoint found for tangled.sh DID")
767 return
768 }
769
770 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
771 if err != nil {
772 logger.Error("failed to create appassword session... skipping fetch", "err", err)
773 return
774 }
775
776 l := log.SubLogger(logger, "bluesky")
777
778 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
779 defer ticker.Stop()
780
781 for {
782 // refresh session if necessary
783 if !session.IsValid() {
784 l.Debug("access token expired, refreshing session")
785 if err := session.RefreshSession(); err != nil {
786 l.Error("failed to refresh session, stopping bluesky updater", "err", err)
787 return
788 }
789 l.Debug("session refreshed")
790 }
791
792 // make client
793 client := xrpc.Client{
794 Auth: &xrpc.AuthInfo{
795 AccessJwt: session.AccessJwt,
796 Did: session.Did,
797 },
798 Host: session.PdsEndpoint,
799 }
800
801 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
802 if err != nil {
803 l.Error("failed to fetch bluesky posts", "err", err)
804 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
805 l.Error("failed to insert bluesky posts", "err", err)
806 } else {
807 l.Info("inserted bluesky posts", "count", len(posts))
808 }
809
810 select {
811 case <-ticker.C:
812 case <-ctx.Done():
813 l.Info("stopping bluesky updater")
814 return
815 }
816 }
817}