Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cloudflare"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/email"
21 "tangled.org/core/appview/indexer"
22 "tangled.org/core/appview/knotacl"
23 "tangled.org/core/appview/knotcompat"
24 "tangled.org/core/appview/mentions"
25 "tangled.org/core/appview/models"
26 "tangled.org/core/appview/notify"
27 dbnotify "tangled.org/core/appview/notify/db"
28 lognotify "tangled.org/core/appview/notify/logging"
29 phnotify "tangled.org/core/appview/notify/posthog"
30 whnotify "tangled.org/core/appview/notify/webhook"
31 "tangled.org/core/appview/oauth"
32 "tangled.org/core/appview/pages"
33 "tangled.org/core/appview/pipelines"
34 pipelinessh "tangled.org/core/appview/pipelines/ssh"
35 "tangled.org/core/appview/reporesolver"
36 "tangled.org/core/appview/repoverify"
37 "tangled.org/core/appview/validator"
38 xrpcclient "tangled.org/core/appview/xrpcclient"
39 "tangled.org/core/consts"
40 "tangled.org/core/eventconsumer"
41 "tangled.org/core/idresolver"
42 "tangled.org/core/jetstream"
43 "tangled.org/core/log"
44 tlog "tangled.org/core/log"
45 "tangled.org/core/orm"
46 "tangled.org/core/rbac"
47
48 comatproto "github.com/bluesky-social/indigo/api/atproto"
49 "github.com/bluesky-social/indigo/atproto/atclient"
50 "github.com/bluesky-social/indigo/atproto/syntax"
51 lexutil "github.com/bluesky-social/indigo/lex/util"
52 "github.com/bluesky-social/indigo/xrpc"
53
54 "github.com/go-chi/chi/v5"
55 "github.com/posthog/posthog-go"
56)
57
58type State struct {
59 db *db.DB
60 notifier notify.Notifier
61 indexer *indexer.Indexer
62 oauth *oauth.OAuth
63 enforcer *rbac.Enforcer
64 pages *pages.Pages
65 idResolver *idresolver.Resolver
66 rdb *cache.Cache
67 mentionsResolver *mentions.Resolver
68 posthog posthog.Client
69 jc *jetstream.JetstreamClient
70 config *config.Config
71 repoResolver *reporesolver.RepoResolver
72 aclService *knotacl.Service
73 knotstream *eventconsumer.Consumer
74 spindlestream *eventconsumer.Consumer
75 pipelineNotifier *pipelines.StatusNotifier
76 logger *slog.Logger
77 validator *validator.Validator
78 cfClient *cloudflare.Client
79}
80
81func Make(ctx context.Context, config *config.Config) (*State, error) {
82 logger := tlog.FromContext(ctx)
83
84 d, err := db.Make(ctx, config.Core.DbPath)
85 if err != nil {
86 return nil, fmt.Errorf("failed to create db: %w", err)
87 }
88
89 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
90 err = indexer.Init(ctx)
91 if err != nil {
92 return nil, fmt.Errorf("failed to create indexer: %w", err)
93 }
94
95 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
96 if err != nil {
97 return nil, fmt.Errorf("failed to create enforcer: %w", err)
98 }
99
100 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
101 if err != nil {
102 logger.Error("failed to create redis resolver", "err", err)
103 res = idresolver.DefaultResolver(config.Plc.PLCURL)
104 }
105
106 var rdb *cache.Cache
107 if config.Redis.Addr != "" {
108 rdb = cache.New(config.Redis.Addr)
109 }
110
111 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
112 if err != nil {
113 return nil, fmt.Errorf("failed to create posthog client: %w", err)
114 }
115
116 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages"))
117 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch")))
118 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl"))
119 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth"))
120 if err != nil {
121 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
122 }
123
124 validator := validator.New(d, res, aclService)
125
126 repoResolver := reporesolver.New(config, aclService, d, rdb)
127
128 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
129
130 jc, err := jetstream.NewJetstreamClient(
131 config.Jetstream.Endpoint,
132 "appview",
133 []string{
134 tangled.ActorProfileNSID,
135 tangled.FeedStarNSID,
136 tangled.FeedReactionNSID,
137 tangled.FeedCommentNSID,
138 tangled.GraphFollowNSID,
139 tangled.GraphVouchNSID,
140 tangled.KnotMemberNSID,
141 tangled.KnotNSID,
142 tangled.LabelDefinitionNSID,
143 tangled.LabelOpNSID,
144 tangled.PublicKeyNSID,
145 tangled.RepoArtifactNSID,
146 tangled.RepoIssueCommentNSID,
147 tangled.RepoIssueNSID,
148 tangled.RepoNSID,
149 tangled.RepoPullNSID,
150 tangled.RepoPullCommentNSID,
151 tangled.SpindleMemberNSID,
152 tangled.SpindleNSID,
153 tangled.StringNSID,
154 },
155 nil,
156 tlog.SubLogger(logger, "jetstream"),
157 d,
158 false,
159
160 // in-memory filter is inapplicable to appview so
161 // we'll never log dids anyway.
162 false,
163 )
164 if err != nil {
165 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
166 }
167
168 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
169 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
170 }
171
172 var notifiers []notify.Notifier
173
174 // Always add the database notifier
175 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
176
177 // Add other notifiers in production only
178 if !config.Core.Dev {
179 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
180 }
181 notifiers = append(notifiers, indexer)
182
183 notifiers = append(notifiers, whnotify.NewNotifier(d))
184
185 notifier := notify.NewMergedNotifier(notifiers)
186 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
187
188 ingester := appview.Ingester{
189 Ctx: ctx,
190 Db: d,
191 Enforcer: enforcer,
192 Acl: aclService,
193 IdResolver: res,
194 Cache: rdb,
195 Config: config,
196 Logger: log.SubLogger(logger, "ingester"),
197 Validator: validator,
198 MentionsResolver: mentionsResolver,
199 Notifier: notifier,
200 Verifier: repoverify.New(res, config.Core.Dev),
201 }
202 err = jc.StartJetstream(ctx, ingester.Ingest())
203 if err != nil {
204 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
205 }
206
207 go ingester.SweepPendingVerifications()
208
209 var cfClient *cloudflare.Client
210 if config.Cloudflare.ApiToken != "" {
211 cfClient, err = cloudflare.New(config)
212 if err != nil {
213 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
214 cfClient = nil
215 }
216 }
217
218 knotstream, err := Knotstream(ctx, config, d, aclService, enforcer, posthog, notifier, cfClient)
219 if err != nil {
220 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
221 }
222 knotstream.Start(ctx)
223
224 pipelineNotifier := pipelines.NewStatusNotifier()
225
226 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier)
227 if err != nil {
228 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
229 }
230 spindlestream.Start(ctx)
231
232 state := &State{
233 db: d,
234 notifier: notifier,
235 indexer: indexer,
236 oauth: oauth,
237 enforcer: enforcer,
238 pages: pages,
239 idResolver: res,
240 rdb: rdb,
241 mentionsResolver: mentionsResolver,
242 posthog: posthog,
243 jc: jc,
244 config: config,
245 repoResolver: repoResolver,
246 aclService: aclService,
247 knotstream: knotstream,
248 spindlestream: spindlestream,
249 pipelineNotifier: pipelineNotifier,
250 logger: logger,
251 validator: validator,
252 cfClient: cfClient,
253 }
254
255 if config.Project.Enabled {
256 if config.Project.User == "" {
257 logger.Warn("project mode enabled but PROJECT_USER is not set")
258 } else {
259 logger.Info("running in project mode", "project_user", config.Project.User)
260 }
261 }
262
263 // fetch initial bluesky posts if configured
264 go fetchBskyPosts(ctx, res, config, d, logger)
265
266 return state, nil
267}
268
269func (s *State) Close() error {
270 // other close up logic goes here
271 return s.db.Close()
272}
273
274func (s *State) NewSSHServer() *pipelinessh.Server {
275 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh"))
276}
277
278func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
279 w.Header().Set("Content-Type", "text/plain")
280 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
281
282 securityTxt := `Contact: mailto:security@tangled.org
283Preferred-Languages: en
284Canonical: https://tangled.org/.well-known/security.txt
285Expires: 2030-01-01T21:59:00.000Z
286`
287 w.Write([]byte(securityTxt))
288}
289
290func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
291 w.Header().Set("Content-Type", "text/plain")
292 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
293
294 robotsTxt := `# Hello, Tanglers!
295User-agent: *
296Allow: /
297Disallow: /*/*/settings
298Disallow: /settings
299Disallow: /*/*/compare
300Disallow: /*/*/fork
301
302Crawl-delay: 1
303`
304 w.Write([]byte(robotsTxt))
305}
306
307func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
308 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
309 BaseParams: pages.BaseParamsFromContext(r.Context()),
310 })
311}
312
313func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
314 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
315 BaseParams: pages.BaseParamsFromContext(r.Context()),
316 })
317}
318
319func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
320 s.pages.Brand(w, pages.BrandParams{
321 BaseParams: pages.BaseParamsFromContext(r.Context()),
322 })
323}
324
325func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
326 user := s.oauth.GetMultiAccountUser(r)
327 if user == nil {
328 return
329 }
330
331 l := s.logger.With("handler", "UpgradeBanner")
332 l = l.With("did", user.Did)
333
334 regs, err := db.GetRegistrations(
335 s.db,
336 orm.FilterEq("did", user.Did),
337 orm.FilterEq("needs_upgrade", 1),
338 )
339 if err != nil {
340 l.Error("non-fatal: failed to get registrations", "err", err)
341 }
342
343 spindles, err := db.GetSpindles(
344 r.Context(),
345 s.db,
346 orm.FilterEq("owner", user.Did),
347 orm.FilterEq("needs_upgrade", 1),
348 )
349 if err != nil {
350 l.Error("non-fatal: failed to get spindles", "err", err)
351 }
352
353 if regs == nil && spindles == nil {
354 return
355 }
356
357 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
358 Registrations: regs,
359 Spindles: spindles,
360 })
361}
362
363func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) {
364 // target is echoed back from the form via hx-vals so the response span's
365 // id matches the form's hx-target. Fallback keeps the handler useful if
366 // a caller forgets to send it.
367 target := strings.TrimSpace(r.FormValue("target"))
368 if target == "" {
369 target = "home"
370 }
371
372 w.Header().Set("Content-Type", "text/html")
373
374 emailAddr := strings.TrimSpace(r.FormValue("email"))
375 if !email.IsValidEmail(emailAddr) {
376 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{
377 Id: target,
378 Error: "Invalid email address.",
379 })
380 return
381 }
382
383 // For logged-in users, persist the signup locally so the widget stays
384 // hidden across devices. The DB row is the render-time source of truth;
385 // Resend still owns the mailing list itself.
386 if user := s.oauth.GetMultiAccountUser(r); user != nil {
387 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil {
388 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err)
389 }
390 }
391
392 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" {
393 go func() {
394 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil {
395 s.logger.Error("failed to add newsletter contact", "error", err)
396 }
397 }()
398 } else {
399 s.logger.Error(
400 "failed to add newsletter contact, missing resend config",
401 "isKeyPresent", s.config.Resend.ApiKey != "",
402 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "",
403 "emailAddr", emailAddr,
404 )
405 }
406
407 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target})
408}
409
410// NewsletterDismiss records that a logged-in user has dismissed the newsletter
411// widget so it stays hidden across their devices. Anonymous callers get a 204
412// with no DB write — localStorage handles the per-browser fallback.
413func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) {
414 user := s.oauth.GetMultiAccountUser(r)
415 if user == nil {
416 w.WriteHeader(http.StatusNoContent)
417 return
418 }
419
420 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil {
421 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err)
422 }
423 w.WriteHeader(http.StatusNoContent)
424}
425
426func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
427 user := chi.URLParam(r, "user")
428 user = strings.TrimPrefix(user, "@")
429
430 if user == "" {
431 w.WriteHeader(http.StatusBadRequest)
432 return
433 }
434
435 id, err := s.idResolver.ResolveIdent(r.Context(), user)
436 if err != nil {
437 w.WriteHeader(http.StatusInternalServerError)
438 return
439 }
440
441 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
442 if err != nil {
443 s.logger.Error("failed to get public keys", "err", err)
444 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
445 return
446 }
447
448 if len(pubKeys) == 0 {
449 w.WriteHeader(http.StatusNoContent)
450 return
451 }
452
453 for _, k := range pubKeys {
454 key := strings.TrimRight(k.Key, "\n")
455 fmt.Fprintln(w, key)
456 }
457}
458
459func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
460 switch r.Method {
461 case http.MethodGet:
462 user := s.oauth.GetMultiAccountUser(r)
463 knots := s.aclService.KnotsForUser(r.Context(), user.Did)
464
465 s.pages.NewRepo(w, pages.NewRepoParams{
466 BaseParams: pages.BaseParamsFromContext(r.Context()),
467 Knots: knots,
468 })
469
470 case http.MethodPost:
471 l := s.logger.With("handler", "NewRepo")
472
473 user := s.oauth.GetMultiAccountUser(r)
474 l = l.With("did", user.Did)
475
476 // form validation
477 domain := r.FormValue("domain")
478 if domain == "" {
479 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
480 return
481 }
482 l = l.With("knot", domain)
483
484 repoName := r.FormValue("name")
485 if repoName == "" {
486 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
487 return
488 }
489
490 if err := models.ValidateRepoName(repoName); err != nil {
491 s.pages.Notice(w, "repo", err.Error())
492 return
493 }
494 repoName = models.StripGitExt(repoName)
495 rkey := strings.ToLower(repoName)
496 l = l.With("repoName", repoName, "rkey", rkey)
497
498 defaultBranch := r.FormValue("branch")
499 if defaultBranch == "" {
500 defaultBranch = "main"
501 }
502 l = l.With("defaultBranch", defaultBranch)
503
504 description := r.FormValue("description")
505 if len([]rune(description)) > 140 {
506 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
507 return
508 }
509
510 // ACL validation
511 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) {
512 l.Info("unauthorized")
513 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
514 return
515 }
516
517 // Check for existing repos
518 existingRepo, err := db.GetRepo(
519 s.db,
520 orm.FilterEq("did", user.Did),
521 orm.FilterEq("rkey", rkey),
522 )
523 if err == nil && existingRepo != nil {
524 l.Info("repo exists")
525 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
526 return
527 }
528
529 atpClient, err := s.oauth.AuthorizedClient(r)
530 if err != nil {
531 l.Error("failed to get authorized client", "err", err)
532 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.")
533 return
534 }
535
536 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
537 l.Info("rkey occupied by prior rename alias")
538 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey))
539 return
540 }
541
542 client, err := s.oauth.ServiceClient(
543 r,
544 oauth.WithService(domain),
545 oauth.WithLxm(tangled.RepoCreateNSID),
546 oauth.WithDev(s.config.Core.Dev),
547 )
548 if err != nil {
549 l.Error("service auth failed", "err", err)
550 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.")
551 return
552 }
553
554 input := &tangled.RepoCreate_Input{
555 Rkey: rkey,
556 Name: rkey,
557 DefaultBranch: &defaultBranch,
558 }
559 createResp, err := tangled.RepoCreate(
560 r.Context(),
561 client,
562 input,
563 )
564 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
565 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
566 s.pages.Notice(w, "repo", err.Error())
567 return
568 }
569
570 var repoDid string
571 if createResp != nil && createResp.RepoDid != nil {
572 repoDid = *createResp.RepoDid
573 }
574 if repoDid == "" {
575 l.Error("knot returned empty repo DID")
576 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
577 return
578 }
579
580 repo := &models.Repo{
581 Did: user.Did,
582 Name: repoName,
583 Knot: domain,
584 Rkey: rkey,
585 Description: description,
586 Created: time.Now(),
587 Labels: s.config.Label.DefaultLabelDefs,
588 RepoDid: repoDid,
589 }
590 record := repo.AsRecord()
591
592 cleanupKnot := func() {
593 go func() {
594 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
595 for attempt, delay := range delays {
596 time.Sleep(delay)
597 deleteClient, dErr := s.oauth.ServiceClient(
598 r,
599 oauth.WithService(domain),
600 oauth.WithLxm(tangled.RepoDeleteNSID),
601 oauth.WithDev(s.config.Core.Dev),
602 )
603 if dErr != nil {
604 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
605 continue
606 }
607 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
608 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
609 Did: user.Did,
610 Name: rkey,
611 Rkey: rkey,
612 }); dErr != nil {
613 cancel()
614 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
615 continue
616 }
617 cancel()
618 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
619 return
620 }
621 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
622 "did", user.Did, "repo", repoName, "knot", domain)
623 }()
624 }
625
626 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
627 Collection: tangled.RepoNSID,
628 Repo: user.Did,
629 Rkey: &rkey,
630 Record: &lexutil.LexiconTypeDecoder{
631 Val: &record,
632 },
633 })
634 if err != nil {
635 l.Info("PDS write failed", "err", err)
636 cleanupKnot()
637 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
638 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey))
639 } else {
640 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
641 }
642 return
643 }
644
645 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey)
646 l = l.With("aturi", aturi)
647 l.Info("wrote to PDS")
648
649 tx, err := s.db.BeginTx(r.Context(), nil)
650 if err != nil {
651 l.Info("txn failed", "err", err)
652 s.pages.Notice(w, "repo", "Failed to save repository information.")
653 return
654 }
655
656 rollback := func() {
657 err1 := tx.Rollback()
658 err2 := s.enforcer.E.LoadPolicy()
659 err3 := rollbackRecord(context.Background(), aturi, atpClient)
660
661 if errors.Is(err1, sql.ErrTxDone) {
662 err1 = nil
663 }
664
665 if errs := errors.Join(err1, err2, err3); errs != nil {
666 l.Error("failed to rollback changes", "errs", errs)
667 }
668
669 if aturi != "" {
670 cleanupKnot()
671 }
672 }
673 defer rollback()
674
675 err = db.AddRepo(tx, repo)
676 if err != nil {
677 l.Error("db write failed", "err", err)
678 s.pages.Notice(w, "repo", "Failed to save repository information.")
679 return
680 }
681
682 rbacPath := repo.RepoIdentifier()
683 err = s.enforcer.AddRepo(user.Did, domain, rbacPath)
684 if err != nil {
685 l.Error("acl setup failed", "err", err)
686 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
687 return
688 }
689
690 err = tx.Commit()
691 if err != nil {
692 l.Error("txn commit failed", "err", err)
693 http.Error(w, err.Error(), http.StatusInternalServerError)
694 return
695 }
696
697 err = s.enforcer.E.SavePolicy()
698 if err != nil {
699 l.Error("acl save failed", "err", err)
700 http.Error(w, err.Error(), http.StatusInternalServerError)
701 return
702 }
703
704 aturi = ""
705
706 s.notifier.NewRepo(r.Context(), repo)
707 switch {
708 case repoDid != "":
709 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
710 default:
711 handle := s.pages.DisplayHandle(r.Context(), user.Did)
712 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey))
713 }
714 }
715}
716
717func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool {
718 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
719 defer cancel()
720 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey)
721 return err == nil && resp != nil
722}
723
724// this is used to rollback changes made to the PDS
725//
726// it is a no-op if the provided ATURI is empty
727func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
728 if aturi == "" {
729 return nil
730 }
731
732 parsed := syntax.ATURI(aturi)
733
734 collection := parsed.Collection().String()
735 repo := parsed.Authority().String()
736 rkey := parsed.RecordKey().String()
737
738 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
739 Collection: collection,
740 Repo: repo,
741 Rkey: rkey,
742 })
743 return err
744}
745
746func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
747 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
748 if err != nil {
749 return err
750 }
751 // already present
752 if len(defaultLabels) == len(defaults) {
753 return nil
754 }
755
756 labelDefs, err := models.FetchLabelDefs(r, defaults)
757 if err != nil {
758 return err
759 }
760
761 // Insert each label definition to the database
762 for _, labelDef := range labelDefs {
763 _, err = db.AddLabelDefinition(e, &labelDef)
764 if err != nil {
765 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
766 }
767 }
768
769 return nil
770}
771
772func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
773 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
774 if err != nil {
775 logger.Error("failed to resolve tangled.org DID", "err", err)
776 return
777 }
778
779 pdsEndpoint := resolved.PDSEndpoint()
780 if pdsEndpoint == "" {
781 logger.Error("no PDS endpoint found for tangled.sh DID")
782 return
783 }
784
785 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
786 if err != nil {
787 logger.Error("failed to create appassword session... skipping fetch", "err", err)
788 return
789 }
790
791 l := log.SubLogger(logger, "bluesky")
792
793 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
794 defer ticker.Stop()
795
796 for {
797 // refresh session if necessary
798 if !session.IsValid() {
799 l.Debug("access token expired, refreshing session")
800 if err := session.RefreshSession(); err != nil {
801 l.Error("failed to refresh session, stopping bluesky updater", "err", err)
802 return
803 }
804 l.Debug("session refreshed")
805 }
806
807 // make client
808 client := xrpc.Client{
809 Auth: &xrpc.AuthInfo{
810 AccessJwt: session.AccessJwt,
811 Did: session.Did,
812 },
813 Host: session.PdsEndpoint,
814 }
815
816 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
817 if err != nil {
818 l.Error("failed to fetch bluesky posts", "err", err)
819 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
820 l.Error("failed to insert bluesky posts", "err", err)
821 } else {
822 l.Info("inserted bluesky posts", "count", len(posts))
823 }
824
825 select {
826 case <-ticker.C:
827 case <-ctx.Done():
828 l.Info("stopping bluesky updater")
829 return
830 }
831 }
832}