Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cloudflare"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/email"
21 "tangled.org/core/appview/indexer"
22 "tangled.org/core/appview/knotacl"
23 "tangled.org/core/appview/knotcompat"
24 "tangled.org/core/appview/mentions"
25 "tangled.org/core/appview/models"
26 "tangled.org/core/appview/notify"
27 dbnotify "tangled.org/core/appview/notify/db"
28 lognotify "tangled.org/core/appview/notify/logging"
29 phnotify "tangled.org/core/appview/notify/posthog"
30 whnotify "tangled.org/core/appview/notify/webhook"
31 "tangled.org/core/appview/oauth"
32 "tangled.org/core/appview/pages"
33 "tangled.org/core/appview/pipelines"
34 pipelinessh "tangled.org/core/appview/pipelines/ssh"
35 "tangled.org/core/appview/reporesolver"
36 "tangled.org/core/appview/repoverify"
37 "tangled.org/core/appview/validator"
38 xrpcclient "tangled.org/core/appview/xrpcclient"
39 "tangled.org/core/consts"
40 "tangled.org/core/eventconsumer"
41 "tangled.org/core/idresolver"
42 "tangled.org/core/jetstream"
43 "tangled.org/core/log"
44 tlog "tangled.org/core/log"
45 "tangled.org/core/orm"
46 "tangled.org/core/rbac"
47
48 comatproto "github.com/bluesky-social/indigo/api/atproto"
49 "github.com/bluesky-social/indigo/atproto/atclient"
50 "github.com/bluesky-social/indigo/atproto/syntax"
51 lexutil "github.com/bluesky-social/indigo/lex/util"
52 "github.com/bluesky-social/indigo/xrpc"
53
54 "github.com/go-chi/chi/v5"
55 "github.com/posthog/posthog-go"
56)
57
58type State struct {
59 db *db.DB
60 notifier notify.Notifier
61 indexer *indexer.Indexer
62 oauth *oauth.OAuth
63 enforcer *rbac.Enforcer
64 pages *pages.Pages
65 idResolver *idresolver.Resolver
66 rdb *cache.Cache
67 mentionsResolver *mentions.Resolver
68 posthog posthog.Client
69 jc *jetstream.JetstreamClient
70 config *config.Config
71 repoResolver *reporesolver.RepoResolver
72 aclService *knotacl.Service
73 knotstream *eventconsumer.Consumer
74 spindlestream *eventconsumer.Consumer
75 pipelineNotifier *pipelines.StatusNotifier
76 logger *slog.Logger
77 validator *validator.Validator
78 cfClient *cloudflare.Client
79}
80
81func Make(ctx context.Context, config *config.Config) (*State, error) {
82 logger := tlog.FromContext(ctx)
83
84 d, err := db.Make(ctx, config.Core.DbPath)
85 if err != nil {
86 return nil, fmt.Errorf("failed to create db: %w", err)
87 }
88
89 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
90 err = indexer.Init(ctx)
91 if err != nil {
92 return nil, fmt.Errorf("failed to create indexer: %w", err)
93 }
94
95 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
96 if err != nil {
97 return nil, fmt.Errorf("failed to create enforcer: %w", err)
98 }
99
100 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
101 if err != nil {
102 logger.Error("failed to create redis resolver", "err", err)
103 res = idresolver.DefaultResolver(config.Plc.PLCURL)
104 }
105
106 var rdb *cache.Cache
107 if config.Redis.Addr != "" {
108 rdb = cache.New(config.Redis.Addr)
109 }
110
111 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
112 if err != nil {
113 return nil, fmt.Errorf("failed to create posthog client: %w", err)
114 }
115
116 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages"))
117 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch")))
118 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl"))
119 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth"))
120 if err != nil {
121 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
122 }
123
124 validator := validator.New(d, res, aclService)
125
126 repoResolver := reporesolver.New(config, aclService, d, rdb)
127
128 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
129
130 jc, err := jetstream.NewJetstreamClient(
131 config.Jetstream.Endpoint,
132 "appview",
133 []string{
134 tangled.ActorProfileNSID,
135 tangled.FeedStarNSID,
136 tangled.FeedReactionNSID,
137 tangled.FeedCommentNSID,
138 tangled.GraphFollowNSID,
139 tangled.GraphVouchNSID,
140 tangled.KnotMemberNSID,
141 tangled.KnotNSID,
142 tangled.LabelDefinitionNSID,
143 tangled.LabelOpNSID,
144 tangled.PublicKeyNSID,
145 tangled.RepoArtifactNSID,
146 tangled.RepoIssueCommentNSID,
147 tangled.RepoIssueNSID,
148 tangled.RepoNSID,
149 tangled.RepoPullNSID,
150 tangled.RepoPullCommentNSID,
151 tangled.SpindleMemberNSID,
152 tangled.SpindleNSID,
153 tangled.StringNSID,
154 },
155 nil,
156 tlog.SubLogger(logger, "jetstream"),
157 d,
158 false,
159
160 // in-memory filter is inapplicable to appview so
161 // we'll never log dids anyway.
162 false,
163 )
164 if err != nil {
165 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
166 }
167
168 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
169 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
170 }
171
172 var notifiers []notify.Notifier
173
174 // Always add the database notifier
175 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
176
177 // Add other notifiers in production only
178 if !config.Core.Dev {
179 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
180 }
181 notifiers = append(notifiers, indexer)
182
183 notifiers = append(notifiers, whnotify.NewNotifier(d))
184
185 notifier := notify.NewMergedNotifier(notifiers)
186 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
187
188 ingester := appview.Ingester{
189 Ctx: ctx,
190 Db: d,
191 Enforcer: enforcer,
192 Acl: aclService,
193 IdResolver: res,
194 Cache: rdb,
195 Config: config,
196 Logger: log.SubLogger(logger, "ingester"),
197 Validator: validator,
198 MentionsResolver: mentionsResolver,
199 Notifier: notifier,
200 Verifier: repoverify.New(res, config.Core.Dev),
201 }
202 err = jc.StartJetstream(ctx, ingester.Ingest())
203 if err != nil {
204 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
205 }
206
207 go ingester.SweepPendingVerifications()
208
209 var cfClient *cloudflare.Client
210 if config.Cloudflare.ApiToken != "" {
211 cfClient, err = cloudflare.New(config)
212 if err != nil {
213 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
214 cfClient = nil
215 }
216 }
217
218 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
219 if err != nil {
220 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
221 }
222 knotstream.Start(ctx)
223
224 pipelineNotifier := pipelines.NewStatusNotifier()
225
226 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier)
227 if err != nil {
228 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
229 }
230 spindlestream.Start(ctx)
231
232 state := &State{
233 db: d,
234 notifier: notifier,
235 indexer: indexer,
236 oauth: oauth,
237 enforcer: enforcer,
238 pages: pages,
239 idResolver: res,
240 rdb: rdb,
241 mentionsResolver: mentionsResolver,
242 posthog: posthog,
243 jc: jc,
244 config: config,
245 repoResolver: repoResolver,
246 aclService: aclService,
247 knotstream: knotstream,
248 spindlestream: spindlestream,
249 pipelineNotifier: pipelineNotifier,
250 logger: logger,
251 validator: validator,
252 cfClient: cfClient,
253 }
254
255 // fetch initial bluesky posts if configured
256 go fetchBskyPosts(ctx, res, config, d, logger)
257
258 return state, nil
259}
260
261func (s *State) Close() error {
262 // other close up logic goes here
263 return s.db.Close()
264}
265
266func (s *State) NewSSHServer() *pipelinessh.Server {
267 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh"))
268}
269
270func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
271 w.Header().Set("Content-Type", "text/plain")
272 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
273
274 securityTxt := `Contact: mailto:security@tangled.org
275Preferred-Languages: en
276Canonical: https://tangled.org/.well-known/security.txt
277Expires: 2030-01-01T21:59:00.000Z
278`
279 w.Write([]byte(securityTxt))
280}
281
282func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
283 w.Header().Set("Content-Type", "text/plain")
284 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
285
286 robotsTxt := `# Hello, Tanglers!
287User-agent: *
288Allow: /
289Disallow: /*/*/settings
290Disallow: /settings
291Disallow: /*/*/compare
292Disallow: /*/*/fork
293
294Crawl-delay: 1
295`
296 w.Write([]byte(robotsTxt))
297}
298
299func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
300 user := s.oauth.GetMultiAccountUser(r)
301 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
302 LoggedInUser: user,
303 })
304}
305
306func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
307 user := s.oauth.GetMultiAccountUser(r)
308 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
309 LoggedInUser: user,
310 })
311}
312
313func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
314 user := s.oauth.GetMultiAccountUser(r)
315 s.pages.Brand(w, pages.BrandParams{
316 LoggedInUser: user,
317 })
318}
319
320func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
321 user := s.oauth.GetMultiAccountUser(r)
322 if user == nil {
323 return
324 }
325
326 l := s.logger.With("handler", "UpgradeBanner")
327 l = l.With("did", user.Did)
328
329 regs, err := db.GetRegistrations(
330 s.db,
331 orm.FilterEq("did", user.Did),
332 orm.FilterEq("needs_upgrade", 1),
333 )
334 if err != nil {
335 l.Error("non-fatal: failed to get registrations", "err", err)
336 }
337
338 spindles, err := db.GetSpindles(
339 r.Context(),
340 s.db,
341 orm.FilterEq("owner", user.Did),
342 orm.FilterEq("needs_upgrade", 1),
343 )
344 if err != nil {
345 l.Error("non-fatal: failed to get spindles", "err", err)
346 }
347
348 if regs == nil && spindles == nil {
349 return
350 }
351
352 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
353 Registrations: regs,
354 Spindles: spindles,
355 })
356}
357
358func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) {
359 // target is echoed back from the form via hx-vals so the response span's
360 // id matches the form's hx-target. Fallback keeps the handler useful if
361 // a caller forgets to send it.
362 target := strings.TrimSpace(r.FormValue("target"))
363 if target == "" {
364 target = "home"
365 }
366
367 w.Header().Set("Content-Type", "text/html")
368
369 emailAddr := strings.TrimSpace(r.FormValue("email"))
370 if !email.IsValidEmail(emailAddr) {
371 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{
372 Id: target,
373 Error: "Invalid email address.",
374 })
375 return
376 }
377
378 // For logged-in users, persist the signup locally so the widget stays
379 // hidden across devices. The DB row is the render-time source of truth;
380 // Resend still owns the mailing list itself.
381 if user := s.oauth.GetMultiAccountUser(r); user != nil {
382 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil {
383 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err)
384 }
385 }
386
387 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" {
388 go func() {
389 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil {
390 s.logger.Error("failed to add newsletter contact", "error", err)
391 }
392 }()
393 } else {
394 s.logger.Error(
395 "failed to add newsletter contact, missing resend config",
396 "isKeyPresent", s.config.Resend.ApiKey != "",
397 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "",
398 "emailAddr", emailAddr,
399 )
400 }
401
402 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target})
403}
404
405// NewsletterDismiss records that a logged-in user has dismissed the newsletter
406// widget so it stays hidden across their devices. Anonymous callers get a 204
407// with no DB write — localStorage handles the per-browser fallback.
408func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) {
409 user := s.oauth.GetMultiAccountUser(r)
410 if user == nil {
411 w.WriteHeader(http.StatusNoContent)
412 return
413 }
414
415 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil {
416 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err)
417 }
418 w.WriteHeader(http.StatusNoContent)
419}
420
421func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
422 user := chi.URLParam(r, "user")
423 user = strings.TrimPrefix(user, "@")
424
425 if user == "" {
426 w.WriteHeader(http.StatusBadRequest)
427 return
428 }
429
430 id, err := s.idResolver.ResolveIdent(r.Context(), user)
431 if err != nil {
432 w.WriteHeader(http.StatusInternalServerError)
433 return
434 }
435
436 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
437 if err != nil {
438 s.logger.Error("failed to get public keys", "err", err)
439 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
440 return
441 }
442
443 if len(pubKeys) == 0 {
444 w.WriteHeader(http.StatusNoContent)
445 return
446 }
447
448 for _, k := range pubKeys {
449 key := strings.TrimRight(k.Key, "\n")
450 fmt.Fprintln(w, key)
451 }
452}
453
454func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
455 switch r.Method {
456 case http.MethodGet:
457 user := s.oauth.GetMultiAccountUser(r)
458 knots := s.aclService.KnotsForUser(r.Context(), user.Did)
459
460 s.pages.NewRepo(w, pages.NewRepoParams{
461 LoggedInUser: user,
462 Knots: knots,
463 })
464
465 case http.MethodPost:
466 l := s.logger.With("handler", "NewRepo")
467
468 user := s.oauth.GetMultiAccountUser(r)
469 l = l.With("did", user.Did)
470
471 // form validation
472 domain := r.FormValue("domain")
473 if domain == "" {
474 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
475 return
476 }
477 l = l.With("knot", domain)
478
479 repoName := r.FormValue("name")
480 if repoName == "" {
481 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
482 return
483 }
484
485 if err := models.ValidateRepoName(repoName); err != nil {
486 s.pages.Notice(w, "repo", err.Error())
487 return
488 }
489 repoName = models.StripGitExt(repoName)
490 rkey := strings.ToLower(repoName)
491 l = l.With("repoName", repoName, "rkey", rkey)
492
493 defaultBranch := r.FormValue("branch")
494 if defaultBranch == "" {
495 defaultBranch = "main"
496 }
497 l = l.With("defaultBranch", defaultBranch)
498
499 description := r.FormValue("description")
500 if len([]rune(description)) > 140 {
501 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
502 return
503 }
504
505 // ACL validation
506 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) {
507 l.Info("unauthorized")
508 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
509 return
510 }
511
512 // Check for existing repos
513 existingRepo, err := db.GetRepo(
514 s.db,
515 orm.FilterEq("did", user.Did),
516 orm.FilterEq("rkey", rkey),
517 )
518 if err == nil && existingRepo != nil {
519 l.Info("repo exists")
520 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
521 return
522 }
523
524 atpClient, err := s.oauth.AuthorizedClient(r)
525 if err != nil {
526 l.Error("failed to get authorized client", "err", err)
527 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.")
528 return
529 }
530
531 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
532 l.Info("rkey occupied by prior rename alias")
533 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey))
534 return
535 }
536
537 client, err := s.oauth.ServiceClient(
538 r,
539 oauth.WithService(domain),
540 oauth.WithLxm(tangled.RepoCreateNSID),
541 oauth.WithDev(s.config.Core.Dev),
542 )
543 if err != nil {
544 l.Error("service auth failed", "err", err)
545 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.")
546 return
547 }
548
549 input := &tangled.RepoCreate_Input{
550 Rkey: rkey,
551 Name: rkey,
552 DefaultBranch: &defaultBranch,
553 }
554 createResp, err := tangled.RepoCreate(
555 r.Context(),
556 client,
557 input,
558 )
559 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
560 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
561 s.pages.Notice(w, "repo", err.Error())
562 return
563 }
564
565 var repoDid string
566 if createResp != nil && createResp.RepoDid != nil {
567 repoDid = *createResp.RepoDid
568 }
569 if repoDid == "" {
570 l.Error("knot returned empty repo DID")
571 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
572 return
573 }
574
575 repo := &models.Repo{
576 Did: user.Did,
577 Name: repoName,
578 Knot: domain,
579 Rkey: rkey,
580 Description: description,
581 Created: time.Now(),
582 Labels: s.config.Label.DefaultLabelDefs,
583 RepoDid: repoDid,
584 }
585 record := repo.AsRecord()
586
587 cleanupKnot := func() {
588 go func() {
589 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
590 for attempt, delay := range delays {
591 time.Sleep(delay)
592 deleteClient, dErr := s.oauth.ServiceClient(
593 r,
594 oauth.WithService(domain),
595 oauth.WithLxm(tangled.RepoDeleteNSID),
596 oauth.WithDev(s.config.Core.Dev),
597 )
598 if dErr != nil {
599 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
600 continue
601 }
602 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
603 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
604 Did: user.Did,
605 Name: rkey,
606 Rkey: rkey,
607 }); dErr != nil {
608 cancel()
609 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
610 continue
611 }
612 cancel()
613 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
614 return
615 }
616 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
617 "did", user.Did, "repo", repoName, "knot", domain)
618 }()
619 }
620
621 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
622 Collection: tangled.RepoNSID,
623 Repo: user.Did,
624 Rkey: &rkey,
625 Record: &lexutil.LexiconTypeDecoder{
626 Val: &record,
627 },
628 })
629 if err != nil {
630 l.Info("PDS write failed", "err", err)
631 cleanupKnot()
632 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
633 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey))
634 } else {
635 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
636 }
637 return
638 }
639
640 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey)
641 l = l.With("aturi", aturi)
642 l.Info("wrote to PDS")
643
644 tx, err := s.db.BeginTx(r.Context(), nil)
645 if err != nil {
646 l.Info("txn failed", "err", err)
647 s.pages.Notice(w, "repo", "Failed to save repository information.")
648 return
649 }
650
651 rollback := func() {
652 err1 := tx.Rollback()
653 err2 := s.enforcer.E.LoadPolicy()
654 err3 := rollbackRecord(context.Background(), aturi, atpClient)
655
656 if errors.Is(err1, sql.ErrTxDone) {
657 err1 = nil
658 }
659
660 if errs := errors.Join(err1, err2, err3); errs != nil {
661 l.Error("failed to rollback changes", "errs", errs)
662 }
663
664 if aturi != "" {
665 cleanupKnot()
666 }
667 }
668 defer rollback()
669
670 err = db.AddRepo(tx, repo)
671 if err != nil {
672 l.Error("db write failed", "err", err)
673 s.pages.Notice(w, "repo", "Failed to save repository information.")
674 return
675 }
676
677 rbacPath := repo.RepoIdentifier()
678 err = s.enforcer.AddRepo(user.Did, domain, rbacPath)
679 if err != nil {
680 l.Error("acl setup failed", "err", err)
681 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
682 return
683 }
684
685 err = tx.Commit()
686 if err != nil {
687 l.Error("txn commit failed", "err", err)
688 http.Error(w, err.Error(), http.StatusInternalServerError)
689 return
690 }
691
692 err = s.enforcer.E.SavePolicy()
693 if err != nil {
694 l.Error("acl save failed", "err", err)
695 http.Error(w, err.Error(), http.StatusInternalServerError)
696 return
697 }
698
699 aturi = ""
700
701 s.notifier.NewRepo(r.Context(), repo)
702 switch {
703 case repoDid != "":
704 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
705 default:
706 handle := s.pages.DisplayHandle(r.Context(), user.Did)
707 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey))
708 }
709 }
710}
711
712func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool {
713 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
714 defer cancel()
715 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey)
716 return err == nil && resp != nil
717}
718
719// this is used to rollback changes made to the PDS
720//
721// it is a no-op if the provided ATURI is empty
722func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
723 if aturi == "" {
724 return nil
725 }
726
727 parsed := syntax.ATURI(aturi)
728
729 collection := parsed.Collection().String()
730 repo := parsed.Authority().String()
731 rkey := parsed.RecordKey().String()
732
733 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
734 Collection: collection,
735 Repo: repo,
736 Rkey: rkey,
737 })
738 return err
739}
740
741func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
742 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
743 if err != nil {
744 return err
745 }
746 // already present
747 if len(defaultLabels) == len(defaults) {
748 return nil
749 }
750
751 labelDefs, err := models.FetchLabelDefs(r, defaults)
752 if err != nil {
753 return err
754 }
755
756 // Insert each label definition to the database
757 for _, labelDef := range labelDefs {
758 _, err = db.AddLabelDefinition(e, &labelDef)
759 if err != nil {
760 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
761 }
762 }
763
764 return nil
765}
766
767func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
768 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
769 if err != nil {
770 logger.Error("failed to resolve tangled.org DID", "err", err)
771 return
772 }
773
774 pdsEndpoint := resolved.PDSEndpoint()
775 if pdsEndpoint == "" {
776 logger.Error("no PDS endpoint found for tangled.sh DID")
777 return
778 }
779
780 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
781 if err != nil {
782 logger.Error("failed to create appassword session... skipping fetch", "err", err)
783 return
784 }
785
786 l := log.SubLogger(logger, "bluesky")
787
788 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
789 defer ticker.Stop()
790
791 for {
792 // refresh session if necessary
793 if !session.IsValid() {
794 l.Debug("access token expired, refreshing session")
795 if err := session.RefreshSession(); err != nil {
796 l.Error("failed to refresh session, stopping bluesky updater", "err", err)
797 return
798 }
799 l.Debug("session refreshed")
800 }
801
802 // make client
803 client := xrpc.Client{
804 Auth: &xrpc.AuthInfo{
805 AccessJwt: session.AccessJwt,
806 Did: session.Did,
807 },
808 Host: session.PdsEndpoint,
809 }
810
811 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
812 if err != nil {
813 l.Error("failed to fetch bluesky posts", "err", err)
814 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
815 l.Error("failed to insert bluesky posts", "err", err)
816 } else {
817 l.Info("inserted bluesky posts", "count", len(posts))
818 }
819
820 select {
821 case <-ticker.C:
822 case <-ctx.Done():
823 l.Info("stopping bluesky updater")
824 return
825 }
826 }
827}