Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cloudflare"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/email"
21 "tangled.org/core/appview/indexer"
22 "tangled.org/core/appview/knotacl"
23 "tangled.org/core/appview/knotcompat"
24 "tangled.org/core/appview/mentions"
25 "tangled.org/core/appview/models"
26 "tangled.org/core/appview/notify"
27 dbnotify "tangled.org/core/appview/notify/db"
28 lognotify "tangled.org/core/appview/notify/logging"
29 phnotify "tangled.org/core/appview/notify/posthog"
30 whnotify "tangled.org/core/appview/notify/webhook"
31 "tangled.org/core/appview/oauth"
32 "tangled.org/core/appview/pages"
33 "tangled.org/core/appview/pipelines"
34 pipelinessh "tangled.org/core/appview/pipelines/ssh"
35 "tangled.org/core/appview/reporesolver"
36 "tangled.org/core/appview/repoverify"
37 "tangled.org/core/appview/validator"
38 xrpcclient "tangled.org/core/appview/xrpcclient"
39 "tangled.org/core/blobstore"
40 "tangled.org/core/consts"
41 "tangled.org/core/eventconsumer"
42 "tangled.org/core/idresolver"
43 "tangled.org/core/jetstream"
44 "tangled.org/core/log"
45 tlog "tangled.org/core/log"
46 "tangled.org/core/orm"
47 "tangled.org/core/rbac"
48
49 comatproto "github.com/bluesky-social/indigo/api/atproto"
50 "github.com/bluesky-social/indigo/atproto/atclient"
51 "github.com/bluesky-social/indigo/atproto/syntax"
52 lexutil "github.com/bluesky-social/indigo/lex/util"
53 "github.com/bluesky-social/indigo/xrpc"
54
55 "github.com/go-chi/chi/v5"
56 "github.com/posthog/posthog-go"
57)
58
59type State struct {
60 db *db.DB
61 notifier notify.Notifier
62 indexer *indexer.Indexer
63 oauth *oauth.OAuth
64 enforcer *rbac.Enforcer
65 pages *pages.Pages
66 idResolver *idresolver.Resolver
67 blobStore blobstore.BlobStore
68 rdb *cache.Cache
69 mentionsResolver *mentions.Resolver
70 posthog posthog.Client
71 jc *jetstream.JetstreamClient
72 config *config.Config
73 repoResolver *reporesolver.RepoResolver
74 aclService *knotacl.Service
75 knotstream *eventconsumer.Consumer
76 spindlestream *eventconsumer.Consumer
77 pipelineNotifier *pipelines.StatusNotifier
78 logger *slog.Logger
79 validator *validator.Validator
80 cfClient *cloudflare.Client
81}
82
83func Make(ctx context.Context, config *config.Config) (*State, error) {
84 logger := tlog.FromContext(ctx)
85
86 d, err := db.Make(ctx, config.Core.DbPath)
87 if err != nil {
88 return nil, fmt.Errorf("failed to create db: %w", err)
89 }
90
91 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
92 err = indexer.Init(ctx)
93 if err != nil {
94 return nil, fmt.Errorf("failed to create indexer: %w", err)
95 }
96
97 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
98 if err != nil {
99 return nil, fmt.Errorf("failed to create enforcer: %w", err)
100 }
101
102 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
103 if err != nil {
104 logger.Error("failed to create redis resolver", "err", err)
105 res = idresolver.DefaultResolver(config.Plc.PLCURL)
106 }
107
108 var rdb *cache.Cache
109 if config.Redis.Addr != "" {
110 rdb = cache.New(config.Redis.Addr)
111 }
112
113 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
114 if err != nil {
115 return nil, fmt.Errorf("failed to create posthog client: %w", err)
116 }
117
118 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages"))
119 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch")))
120 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl"))
121 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth"))
122 if err != nil {
123 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
124 }
125
126 validator := validator.New(d, res, aclService)
127
128 repoResolver := reporesolver.New(config, aclService, d, rdb)
129
130 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
131
132 var blobStore blobstore.BlobStore
133 if config.Porxie.Url != "" {
134 blobStore = blobstore.NewPorxieBlobStore(config.Porxie.Url)
135 } else {
136 blobStore = blobstore.NewPdsBlobStore(res.Directory())
137 }
138
139 jc, err := jetstream.NewJetstreamClient(
140 config.Jetstream.Endpoint,
141 "appview",
142 []string{
143 tangled.ActorProfileNSID,
144 tangled.FeedStarNSID,
145 tangled.FeedReactionNSID,
146 tangled.FeedCommentNSID,
147 tangled.GraphFollowNSID,
148 tangled.GraphVouchNSID,
149 tangled.KnotMemberNSID,
150 tangled.KnotNSID,
151 tangled.LabelDefinitionNSID,
152 tangled.LabelOpNSID,
153 tangled.PublicKeyNSID,
154 tangled.RepoArtifactNSID,
155 tangled.RepoIssueCommentNSID,
156 tangled.RepoIssueNSID,
157 tangled.RepoNSID,
158 tangled.RepoPullNSID,
159 tangled.RepoPullCommentNSID,
160 tangled.SpindleMemberNSID,
161 tangled.SpindleNSID,
162 tangled.StringNSID,
163 },
164 nil,
165 tlog.SubLogger(logger, "jetstream"),
166 d,
167 false,
168
169 // in-memory filter is inapplicable to appview so
170 // we'll never log dids anyway.
171 false,
172 )
173 if err != nil {
174 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
175 }
176
177 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
178 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
179 }
180
181 var notifiers []notify.Notifier
182
183 // Always add the database notifier
184 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
185
186 // Add other notifiers in production only
187 if !config.Core.Dev {
188 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
189 }
190 notifiers = append(notifiers, indexer)
191
192 notifiers = append(notifiers, whnotify.NewNotifier(d))
193
194 notifier := notify.NewMergedNotifier(notifiers)
195 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
196
197 ingester := appview.Ingester{
198 Ctx: ctx,
199 Db: d,
200 Enforcer: enforcer,
201 Acl: aclService,
202 IdResolver: res,
203 BlobStore: blobStore,
204 Cache: rdb,
205 Config: config,
206 Logger: log.SubLogger(logger, "ingester"),
207 Validator: validator,
208 MentionsResolver: mentionsResolver,
209 Notifier: notifier,
210 Verifier: repoverify.New(res, config.Core.Dev),
211 }
212 err = jc.StartJetstream(ctx, ingester.Ingest())
213 if err != nil {
214 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
215 }
216
217 go ingester.SweepPendingVerifications()
218
219 var cfClient *cloudflare.Client
220 if config.Cloudflare.ApiToken != "" {
221 cfClient, err = cloudflare.New(config)
222 if err != nil {
223 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
224 cfClient = nil
225 }
226 }
227
228 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
229 if err != nil {
230 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
231 }
232 knotstream.Start(ctx)
233
234 pipelineNotifier := pipelines.NewStatusNotifier()
235
236 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier)
237 if err != nil {
238 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
239 }
240 spindlestream.Start(ctx)
241
242 state := &State{
243 db: d,
244 notifier: notifier,
245 indexer: indexer,
246 oauth: oauth,
247 enforcer: enforcer,
248 pages: pages,
249 idResolver: res,
250 blobStore: blobStore,
251 rdb: rdb,
252 mentionsResolver: mentionsResolver,
253 posthog: posthog,
254 jc: jc,
255 config: config,
256 repoResolver: repoResolver,
257 aclService: aclService,
258 knotstream: knotstream,
259 spindlestream: spindlestream,
260 pipelineNotifier: pipelineNotifier,
261 logger: logger,
262 validator: validator,
263 cfClient: cfClient,
264 }
265
266 // fetch initial bluesky posts if configured
267 go fetchBskyPosts(ctx, res, config, d, logger)
268
269 return state, nil
270}
271
272func (s *State) Close() error {
273 // other close up logic goes here
274 return s.db.Close()
275}
276
277func (s *State) NewSSHServer() *pipelinessh.Server {
278 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh"))
279}
280
281func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
282 w.Header().Set("Content-Type", "text/plain")
283 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
284
285 securityTxt := `Contact: mailto:security@tangled.org
286Preferred-Languages: en
287Canonical: https://tangled.org/.well-known/security.txt
288Expires: 2030-01-01T21:59:00.000Z
289`
290 w.Write([]byte(securityTxt))
291}
292
293func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
294 w.Header().Set("Content-Type", "text/plain")
295 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
296
297 robotsTxt := `# Hello, Tanglers!
298User-agent: *
299Allow: /
300Disallow: /*/*/settings
301Disallow: /settings
302Disallow: /*/*/compare
303Disallow: /*/*/fork
304
305Crawl-delay: 1
306`
307 w.Write([]byte(robotsTxt))
308}
309
310func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
311 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
312 BaseParams: pages.BaseParamsFromContext(r.Context()),
313 })
314}
315
316func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
317 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
318 BaseParams: pages.BaseParamsFromContext(r.Context()),
319 })
320}
321
322func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
323 s.pages.Brand(w, pages.BrandParams{
324 BaseParams: pages.BaseParamsFromContext(r.Context()),
325 })
326}
327
328func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
329 user := s.oauth.GetMultiAccountUser(r)
330 if user == nil {
331 return
332 }
333
334 l := s.logger.With("handler", "UpgradeBanner")
335 l = l.With("did", user.Did)
336
337 regs, err := db.GetRegistrations(
338 s.db,
339 orm.FilterEq("did", user.Did),
340 orm.FilterEq("needs_upgrade", 1),
341 )
342 if err != nil {
343 l.Error("non-fatal: failed to get registrations", "err", err)
344 }
345
346 spindles, err := db.GetSpindles(
347 r.Context(),
348 s.db,
349 orm.FilterEq("owner", user.Did),
350 orm.FilterEq("needs_upgrade", 1),
351 )
352 if err != nil {
353 l.Error("non-fatal: failed to get spindles", "err", err)
354 }
355
356 if regs == nil && spindles == nil {
357 return
358 }
359
360 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
361 Registrations: regs,
362 Spindles: spindles,
363 })
364}
365
366func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) {
367 // target is echoed back from the form via hx-vals so the response span's
368 // id matches the form's hx-target. Fallback keeps the handler useful if
369 // a caller forgets to send it.
370 target := strings.TrimSpace(r.FormValue("target"))
371 if target == "" {
372 target = "home"
373 }
374
375 w.Header().Set("Content-Type", "text/html")
376
377 emailAddr := strings.TrimSpace(r.FormValue("email"))
378 if !email.IsValidEmail(emailAddr) {
379 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{
380 Id: target,
381 Error: "Invalid email address.",
382 })
383 return
384 }
385
386 // For logged-in users, persist the signup locally so the widget stays
387 // hidden across devices. The DB row is the render-time source of truth;
388 // Resend still owns the mailing list itself.
389 if user := s.oauth.GetMultiAccountUser(r); user != nil {
390 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil {
391 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err)
392 }
393 }
394
395 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" {
396 go func() {
397 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil {
398 s.logger.Error("failed to add newsletter contact", "error", err)
399 }
400 }()
401 } else {
402 s.logger.Error(
403 "failed to add newsletter contact, missing resend config",
404 "isKeyPresent", s.config.Resend.ApiKey != "",
405 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "",
406 "emailAddr", emailAddr,
407 )
408 }
409
410 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target})
411}
412
413// NewsletterDismiss records that a logged-in user has dismissed the newsletter
414// widget so it stays hidden across their devices. Anonymous callers get a 204
415// with no DB write — localStorage handles the per-browser fallback.
416func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) {
417 user := s.oauth.GetMultiAccountUser(r)
418 if user == nil {
419 w.WriteHeader(http.StatusNoContent)
420 return
421 }
422
423 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil {
424 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err)
425 }
426 w.WriteHeader(http.StatusNoContent)
427}
428
429func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
430 user := chi.URLParam(r, "user")
431 user = strings.TrimPrefix(user, "@")
432
433 if user == "" {
434 w.WriteHeader(http.StatusBadRequest)
435 return
436 }
437
438 id, err := s.idResolver.ResolveIdent(r.Context(), user)
439 if err != nil {
440 w.WriteHeader(http.StatusInternalServerError)
441 return
442 }
443
444 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
445 if err != nil {
446 s.logger.Error("failed to get public keys", "err", err)
447 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
448 return
449 }
450
451 if len(pubKeys) == 0 {
452 w.WriteHeader(http.StatusNoContent)
453 return
454 }
455
456 for _, k := range pubKeys {
457 key := strings.TrimRight(k.Key, "\n")
458 fmt.Fprintln(w, key)
459 }
460}
461
462func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
463 switch r.Method {
464 case http.MethodGet:
465 user := s.oauth.GetMultiAccountUser(r)
466 knots := s.aclService.KnotsForUser(r.Context(), user.Did)
467
468 s.pages.NewRepo(w, pages.NewRepoParams{
469 BaseParams: pages.BaseParamsFromContext(r.Context()),
470 Knots: knots,
471 })
472
473 case http.MethodPost:
474 l := s.logger.With("handler", "NewRepo")
475
476 user := s.oauth.GetMultiAccountUser(r)
477 l = l.With("did", user.Did)
478
479 // form validation
480 domain := r.FormValue("domain")
481 if domain == "" {
482 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
483 return
484 }
485 l = l.With("knot", domain)
486
487 repoName := r.FormValue("name")
488 if repoName == "" {
489 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
490 return
491 }
492
493 if err := models.ValidateRepoName(repoName); err != nil {
494 s.pages.Notice(w, "repo", err.Error())
495 return
496 }
497 repoName = models.StripGitExt(repoName)
498 rkey := strings.ToLower(repoName)
499 l = l.With("repoName", repoName, "rkey", rkey)
500
501 defaultBranch := r.FormValue("branch")
502 if defaultBranch == "" {
503 defaultBranch = "main"
504 }
505 l = l.With("defaultBranch", defaultBranch)
506
507 description := r.FormValue("description")
508 if len([]rune(description)) > 140 {
509 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
510 return
511 }
512
513 // ACL validation
514 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) {
515 l.Info("unauthorized")
516 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
517 return
518 }
519
520 // Check for existing repos
521 existingRepo, err := db.GetRepo(
522 s.db,
523 orm.FilterEq("did", user.Did),
524 orm.FilterEq("rkey", rkey),
525 )
526 if err == nil && existingRepo != nil {
527 l.Info("repo exists")
528 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
529 return
530 }
531
532 atpClient, err := s.oauth.AuthorizedClient(r)
533 if err != nil {
534 l.Error("failed to get authorized client", "err", err)
535 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.")
536 return
537 }
538
539 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
540 l.Info("rkey occupied by prior rename alias")
541 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey))
542 return
543 }
544
545 client, err := s.oauth.ServiceClient(
546 r,
547 oauth.WithService(domain),
548 oauth.WithLxm(tangled.RepoCreateNSID),
549 oauth.WithDev(s.config.Core.Dev),
550 )
551 if err != nil {
552 l.Error("service auth failed", "err", err)
553 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.")
554 return
555 }
556
557 input := &tangled.RepoCreate_Input{
558 Rkey: rkey,
559 Name: rkey,
560 DefaultBranch: &defaultBranch,
561 }
562 createResp, err := tangled.RepoCreate(
563 r.Context(),
564 client,
565 input,
566 )
567 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
568 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
569 s.pages.Notice(w, "repo", err.Error())
570 return
571 }
572
573 var repoDid string
574 if createResp != nil && createResp.RepoDid != nil {
575 repoDid = *createResp.RepoDid
576 }
577 if repoDid == "" {
578 l.Error("knot returned empty repo DID")
579 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
580 return
581 }
582
583 repo := &models.Repo{
584 Did: user.Did,
585 Name: repoName,
586 Knot: domain,
587 Rkey: rkey,
588 Description: description,
589 Created: time.Now(),
590 Labels: s.config.Label.DefaultLabelDefs,
591 RepoDid: repoDid,
592 }
593 record := repo.AsRecord()
594
595 cleanupKnot := func() {
596 go func() {
597 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
598 for attempt, delay := range delays {
599 time.Sleep(delay)
600 deleteClient, dErr := s.oauth.ServiceClient(
601 r,
602 oauth.WithService(domain),
603 oauth.WithLxm(tangled.RepoDeleteNSID),
604 oauth.WithDev(s.config.Core.Dev),
605 )
606 if dErr != nil {
607 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
608 continue
609 }
610 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
611 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
612 Did: user.Did,
613 Name: rkey,
614 Rkey: rkey,
615 }); dErr != nil {
616 cancel()
617 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
618 continue
619 }
620 cancel()
621 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
622 return
623 }
624 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
625 "did", user.Did, "repo", repoName, "knot", domain)
626 }()
627 }
628
629 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
630 Collection: tangled.RepoNSID,
631 Repo: user.Did,
632 Rkey: &rkey,
633 Record: &lexutil.LexiconTypeDecoder{
634 Val: &record,
635 },
636 })
637 if err != nil {
638 l.Info("PDS write failed", "err", err)
639 cleanupKnot()
640 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
641 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey))
642 } else {
643 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
644 }
645 return
646 }
647
648 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey)
649 l = l.With("aturi", aturi)
650 l.Info("wrote to PDS")
651
652 tx, err := s.db.BeginTx(r.Context(), nil)
653 if err != nil {
654 l.Info("txn failed", "err", err)
655 s.pages.Notice(w, "repo", "Failed to save repository information.")
656 return
657 }
658
659 rollback := func() {
660 err1 := tx.Rollback()
661 err2 := s.enforcer.E.LoadPolicy()
662 err3 := rollbackRecord(context.Background(), aturi, atpClient)
663
664 if errors.Is(err1, sql.ErrTxDone) {
665 err1 = nil
666 }
667
668 if errs := errors.Join(err1, err2, err3); errs != nil {
669 l.Error("failed to rollback changes", "errs", errs)
670 }
671
672 if aturi != "" {
673 cleanupKnot()
674 }
675 }
676 defer rollback()
677
678 err = db.AddRepo(tx, repo)
679 if err != nil {
680 l.Error("db write failed", "err", err)
681 s.pages.Notice(w, "repo", "Failed to save repository information.")
682 return
683 }
684
685 rbacPath := repo.RepoIdentifier()
686 err = s.enforcer.AddRepo(user.Did, domain, rbacPath)
687 if err != nil {
688 l.Error("acl setup failed", "err", err)
689 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
690 return
691 }
692
693 err = tx.Commit()
694 if err != nil {
695 l.Error("txn commit failed", "err", err)
696 http.Error(w, err.Error(), http.StatusInternalServerError)
697 return
698 }
699
700 err = s.enforcer.E.SavePolicy()
701 if err != nil {
702 l.Error("acl save failed", "err", err)
703 http.Error(w, err.Error(), http.StatusInternalServerError)
704 return
705 }
706
707 aturi = ""
708
709 s.notifier.NewRepo(r.Context(), repo)
710 switch {
711 case repoDid != "":
712 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
713 default:
714 handle := s.pages.DisplayHandle(r.Context(), user.Did)
715 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey))
716 }
717 }
718}
719
720func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool {
721 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
722 defer cancel()
723 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey)
724 return err == nil && resp != nil
725}
726
727// this is used to rollback changes made to the PDS
728//
729// it is a no-op if the provided ATURI is empty
730func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
731 if aturi == "" {
732 return nil
733 }
734
735 parsed := syntax.ATURI(aturi)
736
737 collection := parsed.Collection().String()
738 repo := parsed.Authority().String()
739 rkey := parsed.RecordKey().String()
740
741 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
742 Collection: collection,
743 Repo: repo,
744 Rkey: rkey,
745 })
746 return err
747}
748
749func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
750 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
751 if err != nil {
752 return err
753 }
754 // already present
755 if len(defaultLabels) == len(defaults) {
756 return nil
757 }
758
759 labelDefs, err := models.FetchLabelDefs(r, defaults)
760 if err != nil {
761 return err
762 }
763
764 // Insert each label definition to the database
765 for _, labelDef := range labelDefs {
766 _, err = db.AddLabelDefinition(e, &labelDef)
767 if err != nil {
768 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
769 }
770 }
771
772 return nil
773}
774
775func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
776 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
777 if err != nil {
778 logger.Error("failed to resolve tangled.org DID", "err", err)
779 return
780 }
781
782 pdsEndpoint := resolved.PDSEndpoint()
783 if pdsEndpoint == "" {
784 logger.Error("no PDS endpoint found for tangled.sh DID")
785 return
786 }
787
788 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
789 if err != nil {
790 logger.Error("failed to create appassword session... skipping fetch", "err", err)
791 return
792 }
793
794 l := log.SubLogger(logger, "bluesky")
795
796 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
797 defer ticker.Stop()
798
799 for {
800 // refresh session if necessary
801 if !session.IsValid() {
802 l.Debug("access token expired, refreshing session")
803 if err := session.RefreshSession(); err != nil {
804 l.Error("failed to refresh session, stopping bluesky updater", "err", err)
805 return
806 }
807 l.Debug("session refreshed")
808 }
809
810 // make client
811 client := xrpc.Client{
812 Auth: &xrpc.AuthInfo{
813 AccessJwt: session.AccessJwt,
814 Did: session.Did,
815 },
816 Host: session.PdsEndpoint,
817 }
818
819 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
820 if err != nil {
821 l.Error("failed to fetch bluesky posts", "err", err)
822 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
823 l.Error("failed to insert bluesky posts", "err", err)
824 } else {
825 l.Info("inserted bluesky posts", "count", len(posts))
826 }
827
828 select {
829 case <-ticker.C:
830 case <-ctx.Done():
831 l.Info("stopping bluesky updater")
832 return
833 }
834 }
835}