Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cloudflare"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/email"
21 "tangled.org/core/appview/indexer"
22 "tangled.org/core/appview/knotacl"
23 "tangled.org/core/appview/knotcompat"
24 "tangled.org/core/appview/mentions"
25 "tangled.org/core/appview/models"
26 "tangled.org/core/appview/notify"
27 dbnotify "tangled.org/core/appview/notify/db"
28 lognotify "tangled.org/core/appview/notify/logging"
29 phnotify "tangled.org/core/appview/notify/posthog"
30 whnotify "tangled.org/core/appview/notify/webhook"
31 "tangled.org/core/appview/oauth"
32 "tangled.org/core/appview/pages"
33 "tangled.org/core/appview/pipelines"
34 pipelinessh "tangled.org/core/appview/pipelines/ssh"
35 "tangled.org/core/appview/reporesolver"
36 "tangled.org/core/appview/repoverify"
37 "tangled.org/core/appview/validator"
38 xrpcclient "tangled.org/core/appview/xrpcclient"
39 "tangled.org/core/consts"
40 "tangled.org/core/eventconsumer"
41 "tangled.org/core/idresolver"
42 "tangled.org/core/jetstream"
43 "tangled.org/core/log"
44 tlog "tangled.org/core/log"
45 "tangled.org/core/orm"
46 "tangled.org/core/rbac"
47
48 comatproto "github.com/bluesky-social/indigo/api/atproto"
49 "github.com/bluesky-social/indigo/atproto/atclient"
50 "github.com/bluesky-social/indigo/atproto/syntax"
51 lexutil "github.com/bluesky-social/indigo/lex/util"
52 "github.com/bluesky-social/indigo/xrpc"
53
54 "github.com/go-chi/chi/v5"
55 "github.com/posthog/posthog-go"
56)
57
58type State struct {
59 db *db.DB
60 notifier notify.Notifier
61 indexer *indexer.Indexer
62 oauth *oauth.OAuth
63 enforcer *rbac.Enforcer
64 pages *pages.Pages
65 idResolver *idresolver.Resolver
66 rdb *cache.Cache
67 mentionsResolver *mentions.Resolver
68 posthog posthog.Client
69 jc *jetstream.JetstreamClient
70 config *config.Config
71 repoResolver *reporesolver.RepoResolver
72 aclService *knotacl.Service
73 knotstream *eventconsumer.Consumer
74 spindlestream *eventconsumer.Consumer
75 pipelineNotifier *pipelines.StatusNotifier
76 logger *slog.Logger
77 validator *validator.Validator
78 cfClient *cloudflare.Client
79}
80
81func Make(ctx context.Context, config *config.Config) (*State, error) {
82 logger := tlog.FromContext(ctx)
83
84 d, err := db.Make(ctx, config.Core.DbPath)
85 if err != nil {
86 return nil, fmt.Errorf("failed to create db: %w", err)
87 }
88
89 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
90 err = indexer.Init(ctx)
91 if err != nil {
92 return nil, fmt.Errorf("failed to create indexer: %w", err)
93 }
94
95 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
96 if err != nil {
97 return nil, fmt.Errorf("failed to create enforcer: %w", err)
98 }
99
100 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
101 if err != nil {
102 logger.Error("failed to create redis resolver", "err", err)
103 res = idresolver.DefaultResolver(config.Plc.PLCURL)
104 }
105
106 var rdb *cache.Cache
107 if config.Redis.Addr != "" {
108 rdb = cache.New(config.Redis.Addr)
109 }
110
111 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
112 if err != nil {
113 return nil, fmt.Errorf("failed to create posthog client: %w", err)
114 }
115
116 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages"))
117 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch")))
118 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl"))
119 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth"))
120 if err != nil {
121 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
122 }
123
124 validator := validator.New(d, res, aclService)
125
126 repoResolver := reporesolver.New(config, aclService, d, rdb)
127
128 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
129
130 jc, err := jetstream.NewJetstreamClient(
131 config.Jetstream.Endpoint,
132 "appview",
133 []string{
134 tangled.ActorProfileNSID,
135 tangled.FeedStarNSID,
136 tangled.FeedReactionNSID,
137 tangled.FeedCommentNSID,
138 tangled.GraphFollowNSID,
139 tangled.GraphVouchNSID,
140 tangled.KnotMemberNSID,
141 tangled.KnotNSID,
142 tangled.LabelDefinitionNSID,
143 tangled.LabelOpNSID,
144 tangled.PublicKeyNSID,
145 tangled.RepoArtifactNSID,
146 tangled.RepoIssueCommentNSID,
147 tangled.RepoIssueNSID,
148 tangled.RepoNSID,
149 tangled.RepoPullNSID,
150 tangled.RepoPullCommentNSID,
151 tangled.SpindleMemberNSID,
152 tangled.SpindleNSID,
153 tangled.StringNSID,
154 },
155 nil,
156 tlog.SubLogger(logger, "jetstream"),
157 d,
158 false,
159
160 // in-memory filter is inapplicable to appview so
161 // we'll never log dids anyway.
162 false,
163 )
164 if err != nil {
165 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
166 }
167
168 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
169 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
170 }
171
172 var notifiers []notify.Notifier
173
174 // Always add the database notifier
175 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
176
177 // Add other notifiers in production only
178 if !config.Core.Dev {
179 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
180 }
181 notifiers = append(notifiers, indexer)
182
183 notifiers = append(notifiers, whnotify.NewNotifier(d))
184
185 notifier := notify.NewMergedNotifier(notifiers)
186 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
187
188 ingester := appview.Ingester{
189 Ctx: ctx,
190 Db: d,
191 Enforcer: enforcer,
192 Acl: aclService,
193 IdResolver: res,
194 Cache: rdb,
195 Config: config,
196 Logger: log.SubLogger(logger, "ingester"),
197 Validator: validator,
198 MentionsResolver: mentionsResolver,
199 Notifier: notifier,
200 Verifier: repoverify.New(res, config.Core.Dev),
201 }
202 err = jc.StartJetstream(ctx, ingester.Ingest())
203 if err != nil {
204 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
205 }
206
207 go ingester.SweepPendingVerifications()
208
209 var cfClient *cloudflare.Client
210 if config.Cloudflare.ApiToken != "" {
211 cfClient, err = cloudflare.New(config)
212 if err != nil {
213 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
214 cfClient = nil
215 }
216 }
217
218 knotstream, err := Knotstream(ctx, config, d, aclService, enforcer, posthog, notifier, cfClient)
219 if err != nil {
220 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
221 }
222 knotstream.Start(ctx)
223
224 pipelineNotifier := pipelines.NewStatusNotifier()
225
226 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier)
227 if err != nil {
228 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
229 }
230 spindlestream.Start(ctx)
231
232 state := &State{
233 db: d,
234 notifier: notifier,
235 indexer: indexer,
236 oauth: oauth,
237 enforcer: enforcer,
238 pages: pages,
239 idResolver: res,
240 rdb: rdb,
241 mentionsResolver: mentionsResolver,
242 posthog: posthog,
243 jc: jc,
244 config: config,
245 repoResolver: repoResolver,
246 aclService: aclService,
247 knotstream: knotstream,
248 spindlestream: spindlestream,
249 pipelineNotifier: pipelineNotifier,
250 logger: logger,
251 validator: validator,
252 cfClient: cfClient,
253 }
254
255 // fetch initial bluesky posts if configured
256 go fetchBskyPosts(ctx, res, config, d, logger)
257
258 return state, nil
259}
260
261func (s *State) Close() error {
262 // other close up logic goes here
263 return s.db.Close()
264}
265
266func (s *State) NewSSHServer() *pipelinessh.Server {
267 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh"))
268}
269
270func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
271 w.Header().Set("Content-Type", "text/plain")
272 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
273
274 securityTxt := `Contact: mailto:security@tangled.org
275Preferred-Languages: en
276Canonical: https://tangled.org/.well-known/security.txt
277Expires: 2030-01-01T21:59:00.000Z
278`
279 w.Write([]byte(securityTxt))
280}
281
282func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
283 w.Header().Set("Content-Type", "text/plain")
284 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
285
286 robotsTxt := `# Hello, Tanglers!
287User-agent: *
288Allow: /
289Disallow: /*/*/settings
290Disallow: /settings
291Disallow: /*/*/compare
292Disallow: /*/*/fork
293
294Crawl-delay: 1
295`
296 w.Write([]byte(robotsTxt))
297}
298
299func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
300 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
301 BaseParams: pages.BaseParamsFromContext(r.Context()),
302 })
303}
304
305func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
306 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
307 BaseParams: pages.BaseParamsFromContext(r.Context()),
308 })
309}
310
311func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
312 s.pages.Brand(w, pages.BrandParams{
313 BaseParams: pages.BaseParamsFromContext(r.Context()),
314 })
315}
316
317func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
318 user := s.oauth.GetMultiAccountUser(r)
319 if user == nil {
320 return
321 }
322
323 l := s.logger.With("handler", "UpgradeBanner")
324 l = l.With("did", user.Did)
325
326 regs, err := db.GetRegistrations(
327 s.db,
328 orm.FilterEq("did", user.Did),
329 orm.FilterEq("needs_upgrade", 1),
330 )
331 if err != nil {
332 l.Error("non-fatal: failed to get registrations", "err", err)
333 }
334
335 spindles, err := db.GetSpindles(
336 r.Context(),
337 s.db,
338 orm.FilterEq("owner", user.Did),
339 orm.FilterEq("needs_upgrade", 1),
340 )
341 if err != nil {
342 l.Error("non-fatal: failed to get spindles", "err", err)
343 }
344
345 if regs == nil && spindles == nil {
346 return
347 }
348
349 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
350 Registrations: regs,
351 Spindles: spindles,
352 })
353}
354
355func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) {
356 // target is echoed back from the form via hx-vals so the response span's
357 // id matches the form's hx-target. Fallback keeps the handler useful if
358 // a caller forgets to send it.
359 target := strings.TrimSpace(r.FormValue("target"))
360 if target == "" {
361 target = "home"
362 }
363
364 w.Header().Set("Content-Type", "text/html")
365
366 emailAddr := strings.TrimSpace(r.FormValue("email"))
367 if !email.IsValidEmail(emailAddr) {
368 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{
369 Id: target,
370 Error: "Invalid email address.",
371 })
372 return
373 }
374
375 // For logged-in users, persist the signup locally so the widget stays
376 // hidden across devices. The DB row is the render-time source of truth;
377 // Resend still owns the mailing list itself.
378 if user := s.oauth.GetMultiAccountUser(r); user != nil {
379 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil {
380 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err)
381 }
382 }
383
384 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" {
385 go func() {
386 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil {
387 s.logger.Error("failed to add newsletter contact", "error", err)
388 }
389 }()
390 } else {
391 s.logger.Error(
392 "failed to add newsletter contact, missing resend config",
393 "isKeyPresent", s.config.Resend.ApiKey != "",
394 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "",
395 "emailAddr", emailAddr,
396 )
397 }
398
399 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target})
400}
401
402// NewsletterDismiss records that a logged-in user has dismissed the newsletter
403// widget so it stays hidden across their devices. Anonymous callers get a 204
404// with no DB write — localStorage handles the per-browser fallback.
405func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) {
406 user := s.oauth.GetMultiAccountUser(r)
407 if user == nil {
408 w.WriteHeader(http.StatusNoContent)
409 return
410 }
411
412 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil {
413 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err)
414 }
415 w.WriteHeader(http.StatusNoContent)
416}
417
418func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
419 user := chi.URLParam(r, "user")
420 user = strings.TrimPrefix(user, "@")
421
422 if user == "" {
423 w.WriteHeader(http.StatusBadRequest)
424 return
425 }
426
427 id, err := s.idResolver.ResolveIdent(r.Context(), user)
428 if err != nil {
429 w.WriteHeader(http.StatusInternalServerError)
430 return
431 }
432
433 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
434 if err != nil {
435 s.logger.Error("failed to get public keys", "err", err)
436 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
437 return
438 }
439
440 if len(pubKeys) == 0 {
441 w.WriteHeader(http.StatusNoContent)
442 return
443 }
444
445 for _, k := range pubKeys {
446 key := strings.TrimRight(k.Key, "\n")
447 fmt.Fprintln(w, key)
448 }
449}
450
451func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
452 switch r.Method {
453 case http.MethodGet:
454 user := s.oauth.GetMultiAccountUser(r)
455 knots := s.aclService.KnotsForUser(r.Context(), user.Did)
456
457 s.pages.NewRepo(w, pages.NewRepoParams{
458 BaseParams: pages.BaseParamsFromContext(r.Context()),
459 Knots: knots,
460 })
461
462 case http.MethodPost:
463 l := s.logger.With("handler", "NewRepo")
464
465 user := s.oauth.GetMultiAccountUser(r)
466 l = l.With("did", user.Did)
467
468 // form validation
469 domain := r.FormValue("domain")
470 if domain == "" {
471 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
472 return
473 }
474 l = l.With("knot", domain)
475
476 repoName := r.FormValue("name")
477 if repoName == "" {
478 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
479 return
480 }
481
482 if err := models.ValidateRepoName(repoName); err != nil {
483 s.pages.Notice(w, "repo", err.Error())
484 return
485 }
486 repoName = models.StripGitExt(repoName)
487 rkey := strings.ToLower(repoName)
488 l = l.With("repoName", repoName, "rkey", rkey)
489
490 defaultBranch := r.FormValue("branch")
491 if defaultBranch == "" {
492 defaultBranch = "main"
493 }
494 l = l.With("defaultBranch", defaultBranch)
495
496 description := r.FormValue("description")
497 if len([]rune(description)) > 140 {
498 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
499 return
500 }
501
502 // ACL validation
503 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) {
504 l.Info("unauthorized")
505 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
506 return
507 }
508
509 // Check for existing repos
510 existingRepo, err := db.GetRepo(
511 s.db,
512 orm.FilterEq("did", user.Did),
513 orm.FilterEq("rkey", rkey),
514 )
515 if err == nil && existingRepo != nil {
516 l.Info("repo exists")
517 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
518 return
519 }
520
521 atpClient, err := s.oauth.AuthorizedClient(r)
522 if err != nil {
523 l.Error("failed to get authorized client", "err", err)
524 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.")
525 return
526 }
527
528 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
529 l.Info("rkey occupied by prior rename alias")
530 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey))
531 return
532 }
533
534 client, err := s.oauth.ServiceClient(
535 r,
536 oauth.WithService(domain),
537 oauth.WithLxm(tangled.RepoCreateNSID),
538 oauth.WithDev(s.config.Core.Dev),
539 )
540 if err != nil {
541 l.Error("service auth failed", "err", err)
542 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.")
543 return
544 }
545
546 input := &tangled.RepoCreate_Input{
547 Rkey: rkey,
548 Name: rkey,
549 DefaultBranch: &defaultBranch,
550 }
551 createResp, err := tangled.RepoCreate(
552 r.Context(),
553 client,
554 input,
555 )
556 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
557 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
558 s.pages.Notice(w, "repo", err.Error())
559 return
560 }
561
562 var repoDid string
563 if createResp != nil && createResp.RepoDid != nil {
564 repoDid = *createResp.RepoDid
565 }
566 if repoDid == "" {
567 l.Error("knot returned empty repo DID")
568 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
569 return
570 }
571
572 repo := &models.Repo{
573 Did: user.Did,
574 Name: repoName,
575 Knot: domain,
576 Rkey: rkey,
577 Description: description,
578 Created: time.Now(),
579 Labels: s.config.Label.DefaultLabelDefs,
580 RepoDid: repoDid,
581 }
582 record := repo.AsRecord()
583
584 cleanupKnot := func() {
585 go func() {
586 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
587 for attempt, delay := range delays {
588 time.Sleep(delay)
589 deleteClient, dErr := s.oauth.ServiceClient(
590 r,
591 oauth.WithService(domain),
592 oauth.WithLxm(tangled.RepoDeleteNSID),
593 oauth.WithDev(s.config.Core.Dev),
594 )
595 if dErr != nil {
596 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
597 continue
598 }
599 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
600 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
601 Did: user.Did,
602 Name: rkey,
603 Rkey: rkey,
604 }); dErr != nil {
605 cancel()
606 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
607 continue
608 }
609 cancel()
610 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
611 return
612 }
613 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
614 "did", user.Did, "repo", repoName, "knot", domain)
615 }()
616 }
617
618 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
619 Collection: tangled.RepoNSID,
620 Repo: user.Did,
621 Rkey: &rkey,
622 Record: &lexutil.LexiconTypeDecoder{
623 Val: &record,
624 },
625 })
626 if err != nil {
627 l.Info("PDS write failed", "err", err)
628 cleanupKnot()
629 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
630 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey))
631 } else {
632 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
633 }
634 return
635 }
636
637 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey)
638 l = l.With("aturi", aturi)
639 l.Info("wrote to PDS")
640
641 tx, err := s.db.BeginTx(r.Context(), nil)
642 if err != nil {
643 l.Info("txn failed", "err", err)
644 s.pages.Notice(w, "repo", "Failed to save repository information.")
645 return
646 }
647
648 rollback := func() {
649 err1 := tx.Rollback()
650 err2 := s.enforcer.E.LoadPolicy()
651 err3 := rollbackRecord(context.Background(), aturi, atpClient)
652
653 if errors.Is(err1, sql.ErrTxDone) {
654 err1 = nil
655 }
656
657 if errs := errors.Join(err1, err2, err3); errs != nil {
658 l.Error("failed to rollback changes", "errs", errs)
659 }
660
661 if aturi != "" {
662 cleanupKnot()
663 }
664 }
665 defer rollback()
666
667 err = db.AddRepo(tx, repo)
668 if err != nil {
669 l.Error("db write failed", "err", err)
670 s.pages.Notice(w, "repo", "Failed to save repository information.")
671 return
672 }
673
674 rbacPath := repo.RepoIdentifier()
675 err = s.enforcer.AddRepo(user.Did, domain, rbacPath)
676 if err != nil {
677 l.Error("acl setup failed", "err", err)
678 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
679 return
680 }
681
682 err = tx.Commit()
683 if err != nil {
684 l.Error("txn commit failed", "err", err)
685 http.Error(w, err.Error(), http.StatusInternalServerError)
686 return
687 }
688
689 err = s.enforcer.E.SavePolicy()
690 if err != nil {
691 l.Error("acl save failed", "err", err)
692 http.Error(w, err.Error(), http.StatusInternalServerError)
693 return
694 }
695
696 aturi = ""
697
698 s.notifier.NewRepo(r.Context(), repo)
699 switch {
700 case repoDid != "":
701 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
702 default:
703 handle := s.pages.DisplayHandle(r.Context(), user.Did)
704 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey))
705 }
706 }
707}
708
709func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool {
710 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
711 defer cancel()
712 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey)
713 return err == nil && resp != nil
714}
715
716// this is used to rollback changes made to the PDS
717//
718// it is a no-op if the provided ATURI is empty
719func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
720 if aturi == "" {
721 return nil
722 }
723
724 parsed := syntax.ATURI(aturi)
725
726 collection := parsed.Collection().String()
727 repo := parsed.Authority().String()
728 rkey := parsed.RecordKey().String()
729
730 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
731 Collection: collection,
732 Repo: repo,
733 Rkey: rkey,
734 })
735 return err
736}
737
738func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
739 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
740 if err != nil {
741 return err
742 }
743 // already present
744 if len(defaultLabels) == len(defaults) {
745 return nil
746 }
747
748 labelDefs, err := models.FetchLabelDefs(r, defaults)
749 if err != nil {
750 return err
751 }
752
753 // Insert each label definition to the database
754 for _, labelDef := range labelDefs {
755 _, err = db.AddLabelDefinition(e, &labelDef)
756 if err != nil {
757 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
758 }
759 }
760
761 return nil
762}
763
764func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
765 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
766 if err != nil {
767 logger.Error("failed to resolve tangled.org DID", "err", err)
768 return
769 }
770
771 pdsEndpoint := resolved.PDSEndpoint()
772 if pdsEndpoint == "" {
773 logger.Error("no PDS endpoint found for tangled.sh DID")
774 return
775 }
776
777 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
778 if err != nil {
779 logger.Error("failed to create appassword session... skipping fetch", "err", err)
780 return
781 }
782
783 l := log.SubLogger(logger, "bluesky")
784
785 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
786 defer ticker.Stop()
787
788 for {
789 // refresh session if necessary
790 if !session.IsValid() {
791 l.Debug("access token expired, refreshing session")
792 if err := session.RefreshSession(); err != nil {
793 l.Error("failed to refresh session, stopping bluesky updater", "err", err)
794 return
795 }
796 l.Debug("session refreshed")
797 }
798
799 // make client
800 client := xrpc.Client{
801 Auth: &xrpc.AuthInfo{
802 AccessJwt: session.AccessJwt,
803 Did: session.Did,
804 },
805 Host: session.PdsEndpoint,
806 }
807
808 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
809 if err != nil {
810 l.Error("failed to fetch bluesky posts", "err", err)
811 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
812 l.Error("failed to insert bluesky posts", "err", err)
813 } else {
814 l.Info("inserted bluesky posts", "count", len(posts))
815 }
816
817 select {
818 case <-ticker.C:
819 case <-ctx.Done():
820 l.Info("stopping bluesky updater")
821 return
822 }
823 }
824}