Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cloudflare"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/email"
21 "tangled.org/core/appview/indexer"
22 "tangled.org/core/appview/knotacl"
23 "tangled.org/core/appview/knotcompat"
24 "tangled.org/core/appview/mentions"
25 "tangled.org/core/appview/models"
26 "tangled.org/core/appview/notify"
27 dbnotify "tangled.org/core/appview/notify/db"
28 lognotify "tangled.org/core/appview/notify/logging"
29 phnotify "tangled.org/core/appview/notify/posthog"
30 whnotify "tangled.org/core/appview/notify/webhook"
31 "tangled.org/core/appview/oauth"
32 "tangled.org/core/appview/pages"
33 "tangled.org/core/appview/pipelines"
34 pipelinessh "tangled.org/core/appview/pipelines/ssh"
35 "tangled.org/core/appview/reporesolver"
36 "tangled.org/core/appview/repoverify"
37 "tangled.org/core/appview/validator"
38 xrpcclient "tangled.org/core/appview/xrpcclient"
39 "tangled.org/core/consts"
40 "tangled.org/core/eventconsumer"
41 "tangled.org/core/idresolver"
42 "tangled.org/core/jetstream"
43 "tangled.org/core/log"
44 tlog "tangled.org/core/log"
45 "tangled.org/core/orm"
46 "tangled.org/core/rbac"
47
48 comatproto "github.com/bluesky-social/indigo/api/atproto"
49 "github.com/bluesky-social/indigo/atproto/atclient"
50 "github.com/bluesky-social/indigo/atproto/syntax"
51 lexutil "github.com/bluesky-social/indigo/lex/util"
52 "github.com/bluesky-social/indigo/xrpc"
53
54 "github.com/go-chi/chi/v5"
55 "github.com/posthog/posthog-go"
56)
57
58type State struct {
59 db *db.DB
60 notifier notify.Notifier
61 indexer *indexer.Indexer
62 oauth *oauth.OAuth
63 enforcer *rbac.Enforcer
64 pages *pages.Pages
65 idResolver *idresolver.Resolver
66 rdb *cache.Cache
67 mentionsResolver *mentions.Resolver
68 posthog posthog.Client
69 jc *jetstream.JetstreamClient
70 config *config.Config
71 repoResolver *reporesolver.RepoResolver
72 aclService *knotacl.Service
73 knotstream *eventconsumer.Consumer
74 spindlestream *eventconsumer.Consumer
75 pipelineNotifier *pipelines.StatusNotifier
76 logger *slog.Logger
77 validator *validator.Validator
78 cfClient *cloudflare.Client
79}
80
81func Make(ctx context.Context, config *config.Config) (*State, error) {
82 logger := tlog.FromContext(ctx)
83
84 d, err := db.Make(ctx, config.Core.DbPath)
85 if err != nil {
86 return nil, fmt.Errorf("failed to create db: %w", err)
87 }
88
89 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
90 err = indexer.Init(ctx)
91 if err != nil {
92 return nil, fmt.Errorf("failed to create indexer: %w", err)
93 }
94
95 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
96 if err != nil {
97 return nil, fmt.Errorf("failed to create enforcer: %w", err)
98 }
99
100 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
101 if err != nil {
102 logger.Error("failed to create redis resolver", "err", err)
103 res = idresolver.DefaultResolver(config.Plc.PLCURL)
104 }
105
106 var rdb *cache.Cache
107 if config.Redis.Addr != "" {
108 rdb = cache.New(config.Redis.Addr)
109 }
110
111 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
112 if err != nil {
113 return nil, fmt.Errorf("failed to create posthog client: %w", err)
114 }
115
116 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages"))
117 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch")))
118 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl"))
119 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth"))
120 if err != nil {
121 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
122 }
123
124 validator := validator.New(d, res, aclService)
125
126 repoResolver := reporesolver.New(config, aclService, d, rdb)
127
128 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
129
130 jc, err := jetstream.NewJetstreamClient(
131 config.Jetstream.Endpoint,
132 "appview",
133 []string{
134 tangled.ActorProfileNSID,
135 tangled.FeedStarNSID,
136 tangled.FeedReactionNSID,
137 tangled.FeedCommentNSID,
138 tangled.GraphFollowNSID,
139 tangled.GraphVouchNSID,
140 tangled.KnotMemberNSID,
141 tangled.KnotNSID,
142 tangled.LabelDefinitionNSID,
143 tangled.LabelOpNSID,
144 tangled.PublicKeyNSID,
145 tangled.RepoArtifactNSID,
146 tangled.RepoIssueCommentNSID,
147 tangled.RepoIssueNSID,
148 tangled.RepoNSID,
149 tangled.RepoPullNSID,
150 tangled.RepoPullCommentNSID,
151 tangled.SpindleMemberNSID,
152 tangled.SpindleNSID,
153 tangled.StringNSID,
154 },
155 nil,
156 tlog.SubLogger(logger, "jetstream"),
157 d,
158 false,
159
160 // in-memory filter is inapplicable to appview so
161 // we'll never log dids anyway.
162 false,
163 )
164 if err != nil {
165 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
166 }
167
168 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
169 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
170 }
171
172 var notifiers []notify.Notifier
173
174 // Always add the database notifier
175 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
176
177 // Add other notifiers in production only
178 if !config.Core.Dev {
179 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
180 }
181 notifiers = append(notifiers, indexer)
182
183 notifiers = append(notifiers, whnotify.NewNotifier(d))
184
185 notifier := notify.NewMergedNotifier(notifiers)
186 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
187
188 ingester := appview.Ingester{
189 Ctx: ctx,
190 Db: d,
191 Enforcer: enforcer,
192 Acl: aclService,
193 IdResolver: res,
194 Cache: rdb,
195 Config: config,
196 Logger: log.SubLogger(logger, "ingester"),
197 Validator: validator,
198 MentionsResolver: mentionsResolver,
199 Notifier: notifier,
200 Verifier: repoverify.New(res, config.Core.Dev),
201 }
202 err = jc.StartJetstream(ctx, ingester.Ingest())
203 if err != nil {
204 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
205 }
206
207 go ingester.SweepPendingVerifications()
208
209 var cfClient *cloudflare.Client
210 if config.Cloudflare.ApiToken != "" {
211 cfClient, err = cloudflare.New(config)
212 if err != nil {
213 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
214 cfClient = nil
215 }
216 }
217
218 knotstream, err := Knotstream(ctx, config, d, aclService, enforcer, posthog, notifier, cfClient)
219 if err != nil {
220 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
221 }
222 knotstream.Start(ctx)
223
224 pipelineNotifier := pipelines.NewStatusNotifier()
225
226 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier)
227 if err != nil {
228 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
229 }
230 spindlestream.Start(ctx)
231
232 state := &State{
233 db: d,
234 notifier: notifier,
235 indexer: indexer,
236 oauth: oauth,
237 enforcer: enforcer,
238 pages: pages,
239 idResolver: res,
240 rdb: rdb,
241 mentionsResolver: mentionsResolver,
242 posthog: posthog,
243 jc: jc,
244 config: config,
245 repoResolver: repoResolver,
246 aclService: aclService,
247 knotstream: knotstream,
248 spindlestream: spindlestream,
249 pipelineNotifier: pipelineNotifier,
250 logger: logger,
251 validator: validator,
252 cfClient: cfClient,
253 }
254
255 // fetch initial bluesky posts if configured
256 go fetchBskyPosts(ctx, res, config, d, logger)
257
258 return state, nil
259}
260
261func (s *State) Close() error {
262 // other close up logic goes here
263 return s.db.Close()
264}
265
266func (s *State) NewSSHServer() *pipelinessh.Server {
267 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh"))
268}
269
270func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
271 w.Header().Set("Content-Type", "text/plain")
272 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
273
274 securityTxt := `Contact: mailto:security@tangled.org
275Preferred-Languages: en
276Canonical: https://tangled.org/.well-known/security.txt
277Expires: 2030-01-01T21:59:00.000Z
278`
279 w.Write([]byte(securityTxt))
280}
281
282func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
283 w.Header().Set("Content-Type", "text/plain")
284 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
285
286 robotsTxt := `# Hello, Tanglers!
287User-agent: *
288Allow: /
289Disallow: /*/*/settings
290Disallow: /settings
291Disallow: /*/*/compare
292Disallow: /*/*/fork
293
294Crawl-delay: 1
295`
296 w.Write([]byte(robotsTxt))
297}
298
299func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
300 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
301 BaseParams: pages.BaseParamsFromContext(r.Context()),
302 })
303}
304
305func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
306 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
307 BaseParams: pages.BaseParamsFromContext(r.Context()),
308 })
309}
310
311func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
312 s.pages.Brand(w, pages.BrandParams{
313 BaseParams: pages.BaseParamsFromContext(r.Context()),
314 })
315}
316
317func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
318 user := s.oauth.GetMultiAccountUser(r)
319 if user == nil {
320 return
321 }
322
323 l := s.logger.With("handler", "UpgradeBanner")
324 l = l.With("did", user.Did)
325
326 regs, err := db.GetRegistrations(
327 s.db,
328 orm.FilterEq("did", user.Did),
329 orm.FilterEq("needs_upgrade", 1),
330 )
331 if err != nil {
332 l.Error("non-fatal: failed to get registrations", "err", err)
333 }
334
335 spindles, err := db.GetSpindles(
336 r.Context(),
337 s.db,
338 orm.FilterEq("owner", user.Did),
339 orm.FilterEq("needs_upgrade", 1),
340 )
341 if err != nil {
342 l.Error("non-fatal: failed to get spindles", "err", err)
343 }
344
345 if regs == nil && spindles == nil {
346 return
347 }
348
349 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
350 Registrations: regs,
351 Spindles: spindles,
352 })
353}
354
355func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) {
356 // target is echoed back from the form via hx-vals so the response span's
357 // id matches the form's hx-target. Fallback keeps the handler useful if
358 // a caller forgets to send it.
359 target := strings.TrimSpace(r.FormValue("target"))
360 if target == "" {
361 target = "home"
362 }
363
364 w.Header().Set("Content-Type", "text/html")
365
366 emailAddr := strings.TrimSpace(r.FormValue("email"))
367 if !email.IsValidEmail(emailAddr) {
368 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{
369 Id: target,
370 Error: "Invalid email address.",
371 })
372 return
373 }
374
375 // For logged-in users, persist the signup locally so the widget stays
376 // hidden across devices. The DB row is the render-time source of truth;
377 // Resend still owns the mailing list itself.
378 if user := s.oauth.GetMultiAccountUser(r); user != nil {
379 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil {
380 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err)
381 }
382 }
383
384 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" {
385 go func() {
386 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil {
387 s.logger.Error("failed to add newsletter contact", "error", err)
388 }
389 }()
390 } else {
391 s.logger.Error(
392 "failed to add newsletter contact, missing resend config",
393 "isKeyPresent", s.config.Resend.ApiKey != "",
394 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "",
395 "emailAddr", emailAddr,
396 )
397 }
398
399 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target})
400}
401
402// NewsletterDismiss records that a logged-in user has dismissed the newsletter
403// widget so it stays hidden across their devices. Anonymous callers get a 204
404// with no DB write — localStorage handles the per-browser fallback.
405func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) {
406 user := s.oauth.GetMultiAccountUser(r)
407 if user == nil {
408 w.WriteHeader(http.StatusNoContent)
409 return
410 }
411
412 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil {
413 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err)
414 }
415 w.WriteHeader(http.StatusNoContent)
416}
417
418func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
419 user := chi.URLParam(r, "user")
420 user = strings.TrimPrefix(user, "@")
421 user = strings.TrimSuffix(user, ".keys")
422
423 if user == "" {
424 w.WriteHeader(http.StatusBadRequest)
425 return
426 }
427
428 id, err := s.idResolver.ResolveIdent(r.Context(), user)
429 if err != nil {
430 w.WriteHeader(http.StatusInternalServerError)
431 return
432 }
433
434 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
435 if err != nil {
436 s.logger.Error("failed to get public keys", "err", err)
437 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
438 return
439 }
440
441 if len(pubKeys) == 0 {
442 w.WriteHeader(http.StatusNoContent)
443 return
444 }
445
446 for _, k := range pubKeys {
447 key := strings.TrimRight(k.Key, "\n")
448 fmt.Fprintln(w, key)
449 }
450}
451
452func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
453 switch r.Method {
454 case http.MethodGet:
455 user := s.oauth.GetMultiAccountUser(r)
456 knots := s.aclService.KnotsForUser(r.Context(), user.Did)
457
458 s.pages.NewRepo(w, pages.NewRepoParams{
459 BaseParams: pages.BaseParamsFromContext(r.Context()),
460 Knots: knots,
461 })
462
463 case http.MethodPost:
464 l := s.logger.With("handler", "NewRepo")
465
466 user := s.oauth.GetMultiAccountUser(r)
467 l = l.With("did", user.Did)
468
469 // form validation
470 domain := r.FormValue("domain")
471 if domain == "" {
472 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
473 return
474 }
475 l = l.With("knot", domain)
476
477 repoName := r.FormValue("name")
478 if repoName == "" {
479 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
480 return
481 }
482
483 if err := models.ValidateRepoName(repoName); err != nil {
484 s.pages.Notice(w, "repo", err.Error())
485 return
486 }
487 repoName = models.StripGitExt(repoName)
488 rkey := strings.ToLower(repoName)
489 l = l.With("repoName", repoName, "rkey", rkey)
490
491 defaultBranch := r.FormValue("branch")
492 if defaultBranch == "" {
493 defaultBranch = "main"
494 }
495 l = l.With("defaultBranch", defaultBranch)
496
497 description := r.FormValue("description")
498 if len([]rune(description)) > 140 {
499 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
500 return
501 }
502
503 // ACL validation
504 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) {
505 l.Info("unauthorized")
506 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
507 return
508 }
509
510 // Check for existing repos
511 existingRepo, err := db.GetRepo(
512 s.db,
513 orm.FilterEq("did", user.Did),
514 orm.FilterEq("rkey", rkey),
515 )
516 if err == nil && existingRepo != nil {
517 l.Info("repo exists")
518 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
519 return
520 }
521
522 atpClient, err := s.oauth.AuthorizedClient(r)
523 if err != nil {
524 l.Error("failed to get authorized client", "err", err)
525 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.")
526 return
527 }
528
529 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
530 l.Info("rkey occupied by prior rename alias")
531 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey))
532 return
533 }
534
535 client, err := s.oauth.ServiceClient(
536 r,
537 oauth.WithService(domain),
538 oauth.WithLxm(tangled.RepoCreateNSID),
539 oauth.WithDev(s.config.Core.Dev),
540 )
541 if err != nil {
542 l.Error("service auth failed", "err", err)
543 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.")
544 return
545 }
546
547 input := &tangled.RepoCreate_Input{
548 Rkey: rkey,
549 Name: rkey,
550 DefaultBranch: &defaultBranch,
551 }
552 createResp, err := tangled.RepoCreate(
553 r.Context(),
554 client,
555 input,
556 )
557 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
558 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
559 s.pages.Notice(w, "repo", err.Error())
560 return
561 }
562
563 var repoDid string
564 if createResp != nil && createResp.RepoDid != nil {
565 repoDid = *createResp.RepoDid
566 }
567 if repoDid == "" {
568 l.Error("knot returned empty repo DID")
569 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
570 return
571 }
572
573 repo := &models.Repo{
574 Did: user.Did,
575 Name: repoName,
576 Knot: domain,
577 Rkey: rkey,
578 Description: description,
579 Created: time.Now(),
580 Labels: s.config.Label.DefaultLabelDefs,
581 RepoDid: repoDid,
582 }
583 record := repo.AsRecord()
584
585 cleanupKnot := func() {
586 go func() {
587 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
588 for attempt, delay := range delays {
589 time.Sleep(delay)
590 deleteClient, dErr := s.oauth.ServiceClient(
591 r,
592 oauth.WithService(domain),
593 oauth.WithLxm(tangled.RepoDeleteNSID),
594 oauth.WithDev(s.config.Core.Dev),
595 )
596 if dErr != nil {
597 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
598 continue
599 }
600 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
601 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
602 Did: user.Did,
603 Name: rkey,
604 Rkey: rkey,
605 }); dErr != nil {
606 cancel()
607 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
608 continue
609 }
610 cancel()
611 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
612 return
613 }
614 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
615 "did", user.Did, "repo", repoName, "knot", domain)
616 }()
617 }
618
619 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
620 Collection: tangled.RepoNSID,
621 Repo: user.Did,
622 Rkey: &rkey,
623 Record: &lexutil.LexiconTypeDecoder{
624 Val: &record,
625 },
626 })
627 if err != nil {
628 l.Info("PDS write failed", "err", err)
629 cleanupKnot()
630 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
631 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey))
632 } else {
633 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
634 }
635 return
636 }
637
638 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey)
639 l = l.With("aturi", aturi)
640 l.Info("wrote to PDS")
641
642 tx, err := s.db.BeginTx(r.Context(), nil)
643 if err != nil {
644 l.Info("txn failed", "err", err)
645 s.pages.Notice(w, "repo", "Failed to save repository information.")
646 return
647 }
648
649 rollback := func() {
650 err1 := tx.Rollback()
651 err2 := s.enforcer.E.LoadPolicy()
652 err3 := rollbackRecord(context.Background(), aturi, atpClient)
653
654 if errors.Is(err1, sql.ErrTxDone) {
655 err1 = nil
656 }
657
658 if errs := errors.Join(err1, err2, err3); errs != nil {
659 l.Error("failed to rollback changes", "errs", errs)
660 }
661
662 if aturi != "" {
663 cleanupKnot()
664 }
665 }
666 defer rollback()
667
668 err = db.AddRepo(tx, repo)
669 if err != nil {
670 l.Error("db write failed", "err", err)
671 s.pages.Notice(w, "repo", "Failed to save repository information.")
672 return
673 }
674
675 rbacPath := repo.RepoIdentifier()
676 err = s.enforcer.AddRepo(user.Did, domain, rbacPath)
677 if err != nil {
678 l.Error("acl setup failed", "err", err)
679 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
680 return
681 }
682
683 err = tx.Commit()
684 if err != nil {
685 l.Error("txn commit failed", "err", err)
686 http.Error(w, err.Error(), http.StatusInternalServerError)
687 return
688 }
689
690 err = s.enforcer.E.SavePolicy()
691 if err != nil {
692 l.Error("acl save failed", "err", err)
693 http.Error(w, err.Error(), http.StatusInternalServerError)
694 return
695 }
696
697 aturi = ""
698
699 s.notifier.NewRepo(r.Context(), repo)
700 switch {
701 case repoDid != "":
702 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
703 default:
704 handle := s.pages.DisplayHandle(r.Context(), user.Did)
705 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey))
706 }
707 }
708}
709
710func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool {
711 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
712 defer cancel()
713 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey)
714 return err == nil && resp != nil
715}
716
717// this is used to rollback changes made to the PDS
718//
719// it is a no-op if the provided ATURI is empty
720func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
721 if aturi == "" {
722 return nil
723 }
724
725 parsed := syntax.ATURI(aturi)
726
727 collection := parsed.Collection().String()
728 repo := parsed.Authority().String()
729 rkey := parsed.RecordKey().String()
730
731 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
732 Collection: collection,
733 Repo: repo,
734 Rkey: rkey,
735 })
736 return err
737}
738
739func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
740 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
741 if err != nil {
742 return err
743 }
744 // already present
745 if len(defaultLabels) == len(defaults) {
746 return nil
747 }
748
749 labelDefs, err := models.FetchLabelDefs(r, defaults)
750 if err != nil {
751 return err
752 }
753
754 // Insert each label definition to the database
755 for _, labelDef := range labelDefs {
756 _, err = db.AddLabelDefinition(e, &labelDef)
757 if err != nil {
758 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
759 }
760 }
761
762 return nil
763}
764
765func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
766 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
767 if err != nil {
768 logger.Error("failed to resolve tangled.org DID", "err", err)
769 return
770 }
771
772 pdsEndpoint := resolved.PDSEndpoint()
773 if pdsEndpoint == "" {
774 logger.Error("no PDS endpoint found for tangled.sh DID")
775 return
776 }
777
778 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
779 if err != nil {
780 logger.Error("failed to create appassword session... skipping fetch", "err", err)
781 return
782 }
783
784 l := log.SubLogger(logger, "bluesky")
785
786 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
787 defer ticker.Stop()
788
789 for {
790 // refresh session if necessary
791 if !session.IsValid() {
792 l.Debug("access token expired, refreshing session")
793 if err := session.RefreshSession(); err != nil {
794 l.Error("failed to refresh session, stopping bluesky updater", "err", err)
795 return
796 }
797 l.Debug("session refreshed")
798 }
799
800 // make client
801 client := xrpc.Client{
802 Auth: &xrpc.AuthInfo{
803 AccessJwt: session.AccessJwt,
804 Did: session.Did,
805 },
806 Host: session.PdsEndpoint,
807 }
808
809 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
810 if err != nil {
811 l.Error("failed to fetch bluesky posts", "err", err)
812 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
813 l.Error("failed to insert bluesky posts", "err", err)
814 } else {
815 l.Info("inserted bluesky posts", "count", len(posts))
816 }
817
818 select {
819 case <-ticker.C:
820 case <-ctx.Done():
821 l.Info("stopping bluesky updater")
822 return
823 }
824 }
825}