Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/accountmigration"
16 "tangled.org/core/appview/bsky"
17 "tangled.org/core/appview/cache"
18 "tangled.org/core/appview/cloudflare"
19 "tangled.org/core/appview/config"
20 "tangled.org/core/appview/db"
21 "tangled.org/core/appview/email"
22 "tangled.org/core/appview/indexer"
23 "tangled.org/core/appview/knotacl"
24 "tangled.org/core/appview/knotcompat"
25 "tangled.org/core/appview/mentions"
26 "tangled.org/core/appview/models"
27 "tangled.org/core/appview/notify"
28 dbnotify "tangled.org/core/appview/notify/db"
29 lognotify "tangled.org/core/appview/notify/logging"
30 phnotify "tangled.org/core/appview/notify/posthog"
31 whnotify "tangled.org/core/appview/notify/webhook"
32 "tangled.org/core/appview/oauth"
33 "tangled.org/core/appview/pages"
34 "tangled.org/core/appview/pipelines"
35 pipelinessh "tangled.org/core/appview/pipelines/ssh"
36 "tangled.org/core/appview/reporesolver"
37 "tangled.org/core/appview/repoverify"
38 "tangled.org/core/appview/validator"
39 xrpcclient "tangled.org/core/appview/xrpcclient"
40 "tangled.org/core/consts"
41 "tangled.org/core/eventconsumer"
42 "tangled.org/core/idresolver"
43 "tangled.org/core/jetstream"
44 "tangled.org/core/log"
45 tlog "tangled.org/core/log"
46 "tangled.org/core/orm"
47 "tangled.org/core/rbac"
48
49 comatproto "github.com/bluesky-social/indigo/api/atproto"
50 "github.com/bluesky-social/indigo/atproto/atclient"
51 "github.com/bluesky-social/indigo/atproto/syntax"
52 lexutil "github.com/bluesky-social/indigo/lex/util"
53 "github.com/bluesky-social/indigo/xrpc"
54
55 "github.com/go-chi/chi/v5"
56 "github.com/posthog/posthog-go"
57)
58
59type State struct {
60 db *db.DB
61 notifier notify.Notifier
62 indexer *indexer.Indexer
63 oauth *oauth.OAuth
64 enforcer *rbac.Enforcer
65 pages *pages.Pages
66 idResolver *idresolver.Resolver
67 rdb *cache.Cache
68 mentionsResolver *mentions.Resolver
69 posthog posthog.Client
70 jc *jetstream.JetstreamClient
71 config *config.Config
72 repoResolver *reporesolver.RepoResolver
73 aclService *knotacl.Service
74 knotstream *eventconsumer.Consumer
75 spindlestream *eventconsumer.Consumer
76 pipelineNotifier *pipelines.StatusNotifier
77 logger *slog.Logger
78 validator *validator.Validator
79 cfClient *cloudflare.Client
80
81 accountMigrationWorker *accountmigration.Worker
82}
83
84func Make(ctx context.Context, config *config.Config) (*State, error) {
85 logger := tlog.FromContext(ctx)
86
87 d, err := db.Make(ctx, config.Core.DbPath)
88 if err != nil {
89 return nil, fmt.Errorf("failed to create db: %w", err)
90 }
91
92 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
93 err = indexer.Init(ctx)
94 if err != nil {
95 return nil, fmt.Errorf("failed to create indexer: %w", err)
96 }
97
98 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
99 if err != nil {
100 return nil, fmt.Errorf("failed to create enforcer: %w", err)
101 }
102
103 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
104 if err != nil {
105 logger.Error("failed to create redis resolver", "err", err)
106 res = idresolver.DefaultResolver(config.Plc.PLCURL)
107 }
108
109 var rdb *cache.Cache
110 if config.Redis.Addr != "" {
111 rdb = cache.New(config.Redis.Addr)
112 }
113
114 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
115 if err != nil {
116 return nil, fmt.Errorf("failed to create posthog client: %w", err)
117 }
118
119 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages"))
120 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch")))
121 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl"))
122 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth"))
123 if err != nil {
124 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
125 }
126
127 validator := validator.New(d, res, aclService)
128
129 repoResolver := reporesolver.New(config, aclService, d, rdb)
130
131 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
132
133 jc, err := jetstream.NewJetstreamClient(
134 config.Jetstream.Endpoint,
135 "appview",
136 []string{
137 tangled.ActorProfileNSID,
138 tangled.FeedStarNSID,
139 tangled.FeedReactionNSID,
140 tangled.FeedCommentNSID,
141 tangled.GraphFollowNSID,
142 tangled.GraphVouchNSID,
143 tangled.KnotMemberNSID,
144 tangled.KnotNSID,
145 tangled.LabelDefinitionNSID,
146 tangled.LabelOpNSID,
147 tangled.PublicKeyNSID,
148 tangled.RepoArtifactNSID,
149 tangled.RepoIssueCommentNSID,
150 tangled.RepoIssueNSID,
151 tangled.RepoNSID,
152 tangled.RepoPullNSID,
153 tangled.RepoPullCommentNSID,
154 tangled.SpindleMemberNSID,
155 tangled.SpindleNSID,
156 tangled.StringNSID,
157 },
158 nil,
159 tlog.SubLogger(logger, "jetstream"),
160 d,
161 false,
162
163 // in-memory filter is inapplicable to appview so
164 // we'll never log dids anyway.
165 false,
166 )
167 if err != nil {
168 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
169 }
170
171 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
172 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
173 }
174
175 var notifiers []notify.Notifier
176
177 // Always add the database notifier
178 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
179
180 // Add other notifiers in production only
181 if !config.Core.Dev {
182 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
183 }
184 notifiers = append(notifiers, indexer)
185
186 notifiers = append(notifiers, whnotify.NewNotifier(d))
187
188 notifier := notify.NewMergedNotifier(notifiers)
189 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
190
191 ingester := appview.Ingester{
192 Ctx: ctx,
193 Db: d,
194 Enforcer: enforcer,
195 Acl: aclService,
196 IdResolver: res,
197 Cache: rdb,
198 Config: config,
199 Logger: log.SubLogger(logger, "ingester"),
200 Validator: validator,
201 MentionsResolver: mentionsResolver,
202 Notifier: notifier,
203 Verifier: repoverify.New(res, config.Core.Dev),
204 }
205 err = jc.StartJetstream(ctx, ingester.Ingest())
206 if err != nil {
207 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
208 }
209
210 go ingester.SweepPendingVerifications()
211
212 var cfClient *cloudflare.Client
213 if config.Cloudflare.ApiToken != "" {
214 cfClient, err = cloudflare.New(config)
215 if err != nil {
216 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
217 cfClient = nil
218 }
219 }
220
221 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
222 if err != nil {
223 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
224 }
225 knotstream.Start(ctx)
226
227 pipelineNotifier := pipelines.NewStatusNotifier()
228
229 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier)
230 if err != nil {
231 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
232 }
233 spindlestream.Start(ctx)
234
235 if err := db.ReapStaleRunningGitRepoMigrations(ctx, d); err != nil {
236 logger.Warn("failed to reap stale gitrepo migrations", "err", err)
237 }
238 amWorker := accountmigration.NewWorker(d, oauth, config.Core.Dev, log.SubLogger(logger, "accountmigration"))
239 amWorker.Start(ctx)
240
241 state := &State{
242 db: d,
243 notifier: notifier,
244 indexer: indexer,
245 oauth: oauth,
246 enforcer: enforcer,
247 pages: pages,
248 idResolver: res,
249 rdb: rdb,
250 mentionsResolver: mentionsResolver,
251 posthog: posthog,
252 jc: jc,
253 config: config,
254 repoResolver: repoResolver,
255 aclService: aclService,
256 knotstream: knotstream,
257 spindlestream: spindlestream,
258 pipelineNotifier: pipelineNotifier,
259 logger: logger,
260 validator: validator,
261 cfClient: cfClient,
262
263 accountMigrationWorker: amWorker,
264 }
265
266 // fetch initial bluesky posts if configured
267 go fetchBskyPosts(ctx, res, config, d, logger)
268
269 return state, nil
270}
271
272func (s *State) Close() error {
273 // other close up logic goes here
274 return s.db.Close()
275}
276
277func (s *State) NewSSHServer() *pipelinessh.Server {
278 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh"))
279}
280
281func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
282 w.Header().Set("Content-Type", "text/plain")
283 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
284
285 securityTxt := `Contact: mailto:security@tangled.org
286Preferred-Languages: en
287Canonical: https://tangled.org/.well-known/security.txt
288Expires: 2030-01-01T21:59:00.000Z
289`
290 w.Write([]byte(securityTxt))
291}
292
293func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
294 w.Header().Set("Content-Type", "text/plain")
295 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
296
297 robotsTxt := `# Hello, Tanglers!
298User-agent: *
299Allow: /
300Disallow: /*/*/settings
301Disallow: /settings
302Disallow: /*/*/compare
303Disallow: /*/*/fork
304
305Crawl-delay: 1
306`
307 w.Write([]byte(robotsTxt))
308}
309
310func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
311 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
312 BaseParams: pages.BaseParamsFromContext(r.Context()),
313 })
314}
315
316func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
317 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
318 BaseParams: pages.BaseParamsFromContext(r.Context()),
319 })
320}
321
322func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
323 s.pages.Brand(w, pages.BrandParams{
324 BaseParams: pages.BaseParamsFromContext(r.Context()),
325 })
326}
327
328func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
329 user := s.oauth.GetMultiAccountUser(r)
330 if user == nil {
331 return
332 }
333
334 l := s.logger.With("handler", "UpgradeBanner")
335 l = l.With("did", user.Did)
336
337 regs, err := db.GetRegistrations(
338 s.db,
339 orm.FilterEq("did", user.Did),
340 orm.FilterEq("needs_upgrade", 1),
341 )
342 if err != nil {
343 l.Error("non-fatal: failed to get registrations", "err", err)
344 }
345
346 spindles, err := db.GetSpindles(
347 r.Context(),
348 s.db,
349 orm.FilterEq("owner", user.Did),
350 orm.FilterEq("needs_upgrade", 1),
351 )
352 if err != nil {
353 l.Error("non-fatal: failed to get spindles", "err", err)
354 }
355
356 if regs == nil && spindles == nil {
357 return
358 }
359
360 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
361 Registrations: regs,
362 Spindles: spindles,
363 })
364}
365
366func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) {
367 // target is echoed back from the form via hx-vals so the response span's
368 // id matches the form's hx-target. Fallback keeps the handler useful if
369 // a caller forgets to send it.
370 target := strings.TrimSpace(r.FormValue("target"))
371 if target == "" {
372 target = "home"
373 }
374
375 w.Header().Set("Content-Type", "text/html")
376
377 emailAddr := strings.TrimSpace(r.FormValue("email"))
378 if !email.IsValidEmail(emailAddr) {
379 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{
380 Id: target,
381 Error: "Invalid email address.",
382 })
383 return
384 }
385
386 // For logged-in users, persist the signup locally so the widget stays
387 // hidden across devices. The DB row is the render-time source of truth;
388 // Resend still owns the mailing list itself.
389 if user := s.oauth.GetMultiAccountUser(r); user != nil {
390 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil {
391 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err)
392 }
393 }
394
395 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" {
396 go func() {
397 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil {
398 s.logger.Error("failed to add newsletter contact", "error", err)
399 }
400 }()
401 } else {
402 s.logger.Error(
403 "failed to add newsletter contact, missing resend config",
404 "isKeyPresent", s.config.Resend.ApiKey != "",
405 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "",
406 "emailAddr", emailAddr,
407 )
408 }
409
410 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target})
411}
412
413// NewsletterDismiss records that a logged-in user has dismissed the newsletter
414// widget so it stays hidden across their devices. Anonymous callers get a 204
415// with no DB write — localStorage handles the per-browser fallback.
416func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) {
417 user := s.oauth.GetMultiAccountUser(r)
418 if user == nil {
419 w.WriteHeader(http.StatusNoContent)
420 return
421 }
422
423 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil {
424 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err)
425 }
426 w.WriteHeader(http.StatusNoContent)
427}
428
429func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
430 user := chi.URLParam(r, "user")
431 user = strings.TrimPrefix(user, "@")
432
433 if user == "" {
434 w.WriteHeader(http.StatusBadRequest)
435 return
436 }
437
438 id, err := s.idResolver.ResolveIdent(r.Context(), user)
439 if err != nil {
440 w.WriteHeader(http.StatusInternalServerError)
441 return
442 }
443
444 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
445 if err != nil {
446 s.logger.Error("failed to get public keys", "err", err)
447 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
448 return
449 }
450
451 if len(pubKeys) == 0 {
452 w.WriteHeader(http.StatusNoContent)
453 return
454 }
455
456 for _, k := range pubKeys {
457 key := strings.TrimRight(k.Key, "\n")
458 fmt.Fprintln(w, key)
459 }
460}
461
462func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
463 switch r.Method {
464 case http.MethodGet:
465 user := s.oauth.GetMultiAccountUser(r)
466 knots := s.aclService.KnotsForUser(r.Context(), user.Did)
467
468 s.pages.NewRepo(w, pages.NewRepoParams{
469 BaseParams: pages.BaseParamsFromContext(r.Context()),
470 Knots: knots,
471 })
472
473 case http.MethodPost:
474 l := s.logger.With("handler", "NewRepo")
475
476 user := s.oauth.GetMultiAccountUser(r)
477 l = l.With("did", user.Did)
478
479 // form validation
480 domain := r.FormValue("domain")
481 if domain == "" {
482 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
483 return
484 }
485 l = l.With("knot", domain)
486
487 repoName := r.FormValue("name")
488 if repoName == "" {
489 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
490 return
491 }
492
493 if err := models.ValidateRepoName(repoName); err != nil {
494 s.pages.Notice(w, "repo", err.Error())
495 return
496 }
497 repoName = models.StripGitExt(repoName)
498 rkey := strings.ToLower(repoName)
499 l = l.With("repoName", repoName, "rkey", rkey)
500
501 defaultBranch := r.FormValue("branch")
502 if defaultBranch == "" {
503 defaultBranch = "main"
504 }
505 l = l.With("defaultBranch", defaultBranch)
506
507 description := r.FormValue("description")
508 if len([]rune(description)) > 140 {
509 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
510 return
511 }
512
513 // ACL validation
514 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) {
515 l.Info("unauthorized")
516 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
517 return
518 }
519
520 // Check for existing repos
521 existingRepo, err := db.GetRepo(
522 s.db,
523 orm.FilterEq("did", user.Did),
524 orm.FilterEq("rkey", rkey),
525 )
526 if err == nil && existingRepo != nil {
527 l.Info("repo exists")
528 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
529 return
530 }
531
532 atpClient, err := s.oauth.AuthorizedClient(r)
533 if err != nil {
534 l.Error("failed to get authorized client", "err", err)
535 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.")
536 return
537 }
538
539 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
540 l.Info("rkey occupied by prior rename alias")
541 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey))
542 return
543 }
544
545 client, err := s.oauth.ServiceClient(
546 r,
547 oauth.WithService(domain),
548 oauth.WithLxm(tangled.RepoCreateNSID),
549 oauth.WithDev(s.config.Core.Dev),
550 )
551 if err != nil {
552 l.Error("service auth failed", "err", err)
553 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.")
554 return
555 }
556
557 input := &tangled.RepoCreate_Input{
558 Rkey: rkey,
559 Name: rkey,
560 DefaultBranch: &defaultBranch,
561 }
562 createResp, err := tangled.RepoCreate(
563 r.Context(),
564 client,
565 input,
566 )
567 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
568 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
569 s.pages.Notice(w, "repo", err.Error())
570 return
571 }
572
573 var repoDid string
574 if createResp != nil && createResp.RepoDid != nil {
575 repoDid = *createResp.RepoDid
576 }
577 if repoDid == "" {
578 l.Error("knot returned empty repo DID")
579 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
580 return
581 }
582
583 repo := &models.Repo{
584 Did: user.Did,
585 Name: repoName,
586 Knot: domain,
587 Rkey: rkey,
588 Description: description,
589 Created: time.Now(),
590 Labels: s.config.Label.DefaultLabelDefs,
591 RepoDid: repoDid,
592 }
593 record := repo.AsRecord()
594
595 cleanupKnot := func() {
596 go func() {
597 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
598 for attempt, delay := range delays {
599 time.Sleep(delay)
600 deleteClient, dErr := s.oauth.ServiceClient(
601 r,
602 oauth.WithService(domain),
603 oauth.WithLxm(tangled.RepoDeleteNSID),
604 oauth.WithDev(s.config.Core.Dev),
605 )
606 if dErr != nil {
607 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
608 continue
609 }
610 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
611 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
612 Did: user.Did,
613 Name: rkey,
614 Rkey: rkey,
615 }); dErr != nil {
616 cancel()
617 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
618 continue
619 }
620 cancel()
621 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
622 return
623 }
624 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
625 "did", user.Did, "repo", repoName, "knot", domain)
626 }()
627 }
628
629 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
630 Collection: tangled.RepoNSID,
631 Repo: user.Did,
632 Rkey: &rkey,
633 Record: &lexutil.LexiconTypeDecoder{
634 Val: &record,
635 },
636 })
637 if err != nil {
638 l.Info("PDS write failed", "err", err)
639 cleanupKnot()
640 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
641 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey))
642 } else {
643 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
644 }
645 return
646 }
647
648 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey)
649 l = l.With("aturi", aturi)
650 l.Info("wrote to PDS")
651
652 tx, err := s.db.BeginTx(r.Context(), nil)
653 if err != nil {
654 l.Info("txn failed", "err", err)
655 s.pages.Notice(w, "repo", "Failed to save repository information.")
656 return
657 }
658
659 rollback := func() {
660 err1 := tx.Rollback()
661 err2 := s.enforcer.E.LoadPolicy()
662 err3 := rollbackRecord(context.Background(), aturi, atpClient)
663
664 if errors.Is(err1, sql.ErrTxDone) {
665 err1 = nil
666 }
667
668 if errs := errors.Join(err1, err2, err3); errs != nil {
669 l.Error("failed to rollback changes", "errs", errs)
670 }
671
672 if aturi != "" {
673 cleanupKnot()
674 }
675 }
676 defer rollback()
677
678 err = db.AddRepo(tx, repo)
679 if err != nil {
680 l.Error("db write failed", "err", err)
681 s.pages.Notice(w, "repo", "Failed to save repository information.")
682 return
683 }
684
685 rbacPath := repo.RepoIdentifier()
686 err = s.enforcer.AddRepo(user.Did, domain, rbacPath)
687 if err != nil {
688 l.Error("acl setup failed", "err", err)
689 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
690 return
691 }
692
693 err = tx.Commit()
694 if err != nil {
695 l.Error("txn commit failed", "err", err)
696 http.Error(w, err.Error(), http.StatusInternalServerError)
697 return
698 }
699
700 err = s.enforcer.E.SavePolicy()
701 if err != nil {
702 l.Error("acl save failed", "err", err)
703 http.Error(w, err.Error(), http.StatusInternalServerError)
704 return
705 }
706
707 aturi = ""
708
709 s.notifier.NewRepo(r.Context(), repo)
710 switch {
711 case repoDid != "":
712 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
713 default:
714 handle := s.pages.DisplayHandle(r.Context(), user.Did)
715 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey))
716 }
717 }
718}
719
720func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool {
721 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
722 defer cancel()
723 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey)
724 return err == nil && resp != nil
725}
726
727// this is used to rollback changes made to the PDS
728//
729// it is a no-op if the provided ATURI is empty
730func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
731 if aturi == "" {
732 return nil
733 }
734
735 parsed := syntax.ATURI(aturi)
736
737 collection := parsed.Collection().String()
738 repo := parsed.Authority().String()
739 rkey := parsed.RecordKey().String()
740
741 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
742 Collection: collection,
743 Repo: repo,
744 Rkey: rkey,
745 })
746 return err
747}
748
749func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
750 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
751 if err != nil {
752 return err
753 }
754 // already present
755 if len(defaultLabels) == len(defaults) {
756 return nil
757 }
758
759 labelDefs, err := models.FetchLabelDefs(r, defaults)
760 if err != nil {
761 return err
762 }
763
764 // Insert each label definition to the database
765 for _, labelDef := range labelDefs {
766 _, err = db.AddLabelDefinition(e, &labelDef)
767 if err != nil {
768 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
769 }
770 }
771
772 return nil
773}
774
775func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
776 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
777 if err != nil {
778 logger.Error("failed to resolve tangled.org DID", "err", err)
779 return
780 }
781
782 pdsEndpoint := resolved.PDSEndpoint()
783 if pdsEndpoint == "" {
784 logger.Error("no PDS endpoint found for tangled.sh DID")
785 return
786 }
787
788 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
789 if err != nil {
790 logger.Error("failed to create appassword session... skipping fetch", "err", err)
791 return
792 }
793
794 l := log.SubLogger(logger, "bluesky")
795
796 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
797 defer ticker.Stop()
798
799 for {
800 // refresh session if necessary
801 if !session.IsValid() {
802 l.Debug("access token expired, refreshing session")
803 if err := session.RefreshSession(); err != nil {
804 l.Error("failed to refresh session, stopping bluesky updater", "err", err)
805 return
806 }
807 l.Debug("session refreshed")
808 }
809
810 // make client
811 client := xrpc.Client{
812 Auth: &xrpc.AuthInfo{
813 AccessJwt: session.AccessJwt,
814 Did: session.Did,
815 },
816 Host: session.PdsEndpoint,
817 }
818
819 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
820 if err != nil {
821 l.Error("failed to fetch bluesky posts", "err", err)
822 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
823 l.Error("failed to insert bluesky posts", "err", err)
824 } else {
825 l.Info("inserted bluesky posts", "count", len(posts))
826 }
827
828 select {
829 case <-ticker.C:
830 case <-ctx.Done():
831 l.Info("stopping bluesky updater")
832 return
833 }
834 }
835}