Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cloudflare"
18 "tangled.org/core/appview/codesearch"
19 "tangled.org/core/appview/config"
20 "tangled.org/core/appview/db"
21 "tangled.org/core/appview/email"
22 "tangled.org/core/appview/indexer"
23 "tangled.org/core/appview/knotacl"
24 "tangled.org/core/appview/knotcompat"
25 "tangled.org/core/appview/mentions"
26 "tangled.org/core/appview/models"
27 "tangled.org/core/appview/notify"
28 dbnotify "tangled.org/core/appview/notify/db"
29 lognotify "tangled.org/core/appview/notify/logging"
30 phnotify "tangled.org/core/appview/notify/posthog"
31 whnotify "tangled.org/core/appview/notify/webhook"
32 "tangled.org/core/appview/oauth"
33 "tangled.org/core/appview/pages"
34 "tangled.org/core/appview/pipelines"
35 pipelinessh "tangled.org/core/appview/pipelines/ssh"
36 "tangled.org/core/appview/reporesolver"
37 "tangled.org/core/appview/repoverify"
38 "tangled.org/core/appview/validator"
39 xrpcclient "tangled.org/core/appview/xrpcclient"
40 "tangled.org/core/consts"
41 "tangled.org/core/eventconsumer"
42 "tangled.org/core/idresolver"
43 "tangled.org/core/jetstream"
44 "tangled.org/core/log"
45 tlog "tangled.org/core/log"
46 "tangled.org/core/orm"
47 "tangled.org/core/rbac"
48
49 comatproto "github.com/bluesky-social/indigo/api/atproto"
50 "github.com/bluesky-social/indigo/atproto/atclient"
51 "github.com/bluesky-social/indigo/atproto/syntax"
52 lexutil "github.com/bluesky-social/indigo/lex/util"
53 "github.com/bluesky-social/indigo/xrpc"
54
55 "github.com/go-chi/chi/v5"
56 "github.com/posthog/posthog-go"
57)
58
59type State struct {
60 db *db.DB
61 notifier notify.Notifier
62 indexer *indexer.Indexer
63 oauth *oauth.OAuth
64 enforcer *rbac.Enforcer
65 pages *pages.Pages
66 idResolver *idresolver.Resolver
67 rdb *cache.Cache
68 mentionsResolver *mentions.Resolver
69 posthog posthog.Client
70 jc *jetstream.JetstreamClient
71 config *config.Config
72 repoResolver *reporesolver.RepoResolver
73 aclService *knotacl.Service
74 knotstream *eventconsumer.Consumer
75 spindlestream *eventconsumer.Consumer
76 pipelineNotifier *pipelines.StatusNotifier
77 logger *slog.Logger
78 validator *validator.Validator
79 cfClient *cloudflare.Client
80 codesearch *codesearch.CodeSearch
81}
82
83func Make(ctx context.Context, config *config.Config) (*State, error) {
84 logger := tlog.FromContext(ctx)
85
86 d, err := db.Make(ctx, config.Core.DbPath)
87 if err != nil {
88 return nil, fmt.Errorf("failed to create db: %w", err)
89 }
90
91 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
92 err = indexer.Init(ctx)
93 if err != nil {
94 return nil, fmt.Errorf("failed to create indexer: %w", err)
95 }
96
97 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
98 if err != nil {
99 return nil, fmt.Errorf("failed to create enforcer: %w", err)
100 }
101
102 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
103 if err != nil {
104 logger.Error("failed to create redis resolver", "err", err)
105 res = idresolver.DefaultResolver(config.Plc.PLCURL)
106 }
107
108 var rdb *cache.Cache
109 if config.Redis.Addr != "" {
110 rdb = cache.New(config.Redis.Addr)
111 }
112
113 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
114 if err != nil {
115 return nil, fmt.Errorf("failed to create posthog client: %w", err)
116 }
117
118 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages"))
119 knotcompat.UseNativeLatch(knotacl.NewLatch(d, log.SubLogger(logger, "knotacl-latch")))
120 aclService := knotacl.NewService(enforcer, d, config.Core.Dev, log.SubLogger(logger, "knotacl"))
121 oauth, err := oauth.New(config, posthog, d, enforcer, aclService, res, log.SubLogger(logger, "oauth"))
122 if err != nil {
123 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
124 }
125
126 validator := validator.New(d, res, aclService)
127
128 repoResolver := reporesolver.New(config, aclService, d, rdb)
129
130 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
131
132 jc, err := jetstream.NewJetstreamClient(
133 config.Jetstream.Endpoint,
134 "appview",
135 []string{
136 tangled.ActorProfileNSID,
137 tangled.FeedStarNSID,
138 tangled.FeedReactionNSID,
139 tangled.FeedCommentNSID,
140 tangled.GraphFollowNSID,
141 tangled.GraphVouchNSID,
142 tangled.KnotMemberNSID,
143 tangled.KnotNSID,
144 tangled.LabelDefinitionNSID,
145 tangled.LabelOpNSID,
146 tangled.PublicKeyNSID,
147 tangled.RepoArtifactNSID,
148 tangled.RepoIssueCommentNSID,
149 tangled.RepoIssueNSID,
150 tangled.RepoNSID,
151 tangled.RepoPullNSID,
152 tangled.RepoPullCommentNSID,
153 tangled.SpindleMemberNSID,
154 tangled.SpindleNSID,
155 tangled.StringNSID,
156 },
157 nil,
158 tlog.SubLogger(logger, "jetstream"),
159 d,
160 false,
161
162 // in-memory filter is inapplicable to appview so
163 // we'll never log dids anyway.
164 false,
165 )
166 if err != nil {
167 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
168 }
169
170 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
171 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
172 }
173
174 var notifiers []notify.Notifier
175
176 // Always add the database notifier
177 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
178
179 // Add other notifiers in production only
180 if !config.Core.Dev {
181 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
182 }
183 notifiers = append(notifiers, indexer)
184
185 notifiers = append(notifiers, whnotify.NewNotifier(d))
186
187 notifier := notify.NewMergedNotifier(notifiers)
188 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
189
190 ingester := appview.Ingester{
191 Ctx: ctx,
192 Db: d,
193 Enforcer: enforcer,
194 Acl: aclService,
195 IdResolver: res,
196 Cache: rdb,
197 Config: config,
198 Logger: log.SubLogger(logger, "ingester"),
199 Validator: validator,
200 MentionsResolver: mentionsResolver,
201 Notifier: notifier,
202 Verifier: repoverify.New(res, config.Core.Dev),
203 }
204 err = jc.StartJetstream(ctx, ingester.Ingest())
205 if err != nil {
206 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
207 }
208
209 go ingester.SweepPendingVerifications()
210
211 var cfClient *cloudflare.Client
212 if config.Cloudflare.ApiToken != "" {
213 cfClient, err = cloudflare.New(config)
214 if err != nil {
215 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
216 cfClient = nil
217 }
218 }
219
220 knotstream, err := Knotstream(ctx, config, d, aclService, enforcer, posthog, notifier, cfClient)
221 if err != nil {
222 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
223 }
224 knotstream.Start(ctx)
225
226 pipelineNotifier := pipelines.NewStatusNotifier()
227
228 spindlestream, err := Spindlestream(ctx, config, d, enforcer, pipelineNotifier)
229 if err != nil {
230 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
231 }
232 spindlestream.Start(ctx)
233
234 state := &State{
235 db: d,
236 notifier: notifier,
237 indexer: indexer,
238 oauth: oauth,
239 enforcer: enforcer,
240 pages: pages,
241 idResolver: res,
242 rdb: rdb,
243 mentionsResolver: mentionsResolver,
244 posthog: posthog,
245 jc: jc,
246 config: config,
247 repoResolver: repoResolver,
248 aclService: aclService,
249 knotstream: knotstream,
250 spindlestream: spindlestream,
251 pipelineNotifier: pipelineNotifier,
252 logger: logger,
253 validator: validator,
254 cfClient: cfClient,
255 codesearch: &codesearch.CodeSearch{Host: config.CodeSearch.ZoektUrl},
256 }
257
258 // fetch initial bluesky posts if configured
259 go fetchBskyPosts(ctx, res, config, d, logger)
260
261 return state, nil
262}
263
264func (s *State) Close() error {
265 // other close up logic goes here
266 return s.db.Close()
267}
268
269func (s *State) NewSSHServer() *pipelinessh.Server {
270 return pipelinessh.New(s.db, s.config, s.pipelineNotifier, log.SubLogger(s.logger, "pipelinessh"))
271}
272
273func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
274 w.Header().Set("Content-Type", "text/plain")
275 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
276
277 securityTxt := `Contact: mailto:security@tangled.org
278Preferred-Languages: en
279Canonical: https://tangled.org/.well-known/security.txt
280Expires: 2030-01-01T21:59:00.000Z
281`
282 w.Write([]byte(securityTxt))
283}
284
285func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
286 w.Header().Set("Content-Type", "text/plain")
287 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
288
289 robotsTxt := `# Hello, Tanglers!
290User-agent: *
291Allow: /
292Disallow: /*/*/settings
293Disallow: /settings
294Disallow: /*/*/compare
295Disallow: /*/*/fork
296
297Crawl-delay: 1
298`
299 w.Write([]byte(robotsTxt))
300}
301
302func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
303 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
304 BaseParams: pages.BaseParamsFromContext(r.Context()),
305 })
306}
307
308func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
309 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
310 BaseParams: pages.BaseParamsFromContext(r.Context()),
311 })
312}
313
314func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
315 s.pages.Brand(w, pages.BrandParams{
316 BaseParams: pages.BaseParamsFromContext(r.Context()),
317 })
318}
319
320func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
321 user := s.oauth.GetMultiAccountUser(r)
322 if user == nil {
323 return
324 }
325
326 l := s.logger.With("handler", "UpgradeBanner")
327 l = l.With("did", user.Did)
328
329 regs, err := db.GetRegistrations(
330 s.db,
331 orm.FilterEq("did", user.Did),
332 orm.FilterEq("needs_upgrade", 1),
333 )
334 if err != nil {
335 l.Error("non-fatal: failed to get registrations", "err", err)
336 }
337
338 spindles, err := db.GetSpindles(
339 r.Context(),
340 s.db,
341 orm.FilterEq("owner", user.Did),
342 orm.FilterEq("needs_upgrade", 1),
343 )
344 if err != nil {
345 l.Error("non-fatal: failed to get spindles", "err", err)
346 }
347
348 if regs == nil && spindles == nil {
349 return
350 }
351
352 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
353 Registrations: regs,
354 Spindles: spindles,
355 })
356}
357
358func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) {
359 // target is echoed back from the form via hx-vals so the response span's
360 // id matches the form's hx-target. Fallback keeps the handler useful if
361 // a caller forgets to send it.
362 target := strings.TrimSpace(r.FormValue("target"))
363 if target == "" {
364 target = "home"
365 }
366
367 w.Header().Set("Content-Type", "text/html")
368
369 emailAddr := strings.TrimSpace(r.FormValue("email"))
370 if !email.IsValidEmail(emailAddr) {
371 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{
372 Id: target,
373 Error: "Invalid email address.",
374 })
375 return
376 }
377
378 // For logged-in users, persist the signup locally so the widget stays
379 // hidden across devices. The DB row is the render-time source of truth;
380 // Resend still owns the mailing list itself.
381 if user := s.oauth.GetMultiAccountUser(r); user != nil {
382 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil {
383 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err)
384 }
385 }
386
387 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" {
388 go func() {
389 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil {
390 s.logger.Error("failed to add newsletter contact", "error", err)
391 }
392 }()
393 } else {
394 s.logger.Error(
395 "failed to add newsletter contact, missing resend config",
396 "isKeyPresent", s.config.Resend.ApiKey != "",
397 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "",
398 "emailAddr", emailAddr,
399 )
400 }
401
402 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target})
403}
404
405// NewsletterDismiss records that a logged-in user has dismissed the newsletter
406// widget so it stays hidden across their devices. Anonymous callers get a 204
407// with no DB write — localStorage handles the per-browser fallback.
408func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) {
409 user := s.oauth.GetMultiAccountUser(r)
410 if user == nil {
411 w.WriteHeader(http.StatusNoContent)
412 return
413 }
414
415 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil {
416 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err)
417 }
418 w.WriteHeader(http.StatusNoContent)
419}
420
421func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
422 user := chi.URLParam(r, "user")
423 user = strings.TrimPrefix(user, "@")
424 user = strings.TrimSuffix(user, ".keys")
425
426 if user == "" {
427 w.WriteHeader(http.StatusBadRequest)
428 return
429 }
430
431 id, err := s.idResolver.ResolveIdent(r.Context(), user)
432 if err != nil {
433 w.WriteHeader(http.StatusInternalServerError)
434 return
435 }
436
437 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
438 if err != nil {
439 s.logger.Error("failed to get public keys", "err", err)
440 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
441 return
442 }
443
444 if len(pubKeys) == 0 {
445 w.WriteHeader(http.StatusNoContent)
446 return
447 }
448
449 for _, k := range pubKeys {
450 key := strings.TrimRight(k.Key, "\n")
451 fmt.Fprintln(w, key)
452 }
453}
454
455func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
456 switch r.Method {
457 case http.MethodGet:
458 user := s.oauth.GetMultiAccountUser(r)
459 knots := s.aclService.KnotsForUser(r.Context(), user.Did)
460
461 s.pages.NewRepo(w, pages.NewRepoParams{
462 BaseParams: pages.BaseParamsFromContext(r.Context()),
463 Knots: knots,
464 })
465
466 case http.MethodPost:
467 l := s.logger.With("handler", "NewRepo")
468
469 user := s.oauth.GetMultiAccountUser(r)
470 l = l.With("did", user.Did)
471
472 // form validation
473 domain := r.FormValue("domain")
474 if domain == "" {
475 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
476 return
477 }
478 l = l.With("knot", domain)
479
480 repoName := r.FormValue("name")
481 if repoName == "" {
482 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
483 return
484 }
485
486 if err := models.ValidateRepoName(repoName); err != nil {
487 s.pages.Notice(w, "repo", err.Error())
488 return
489 }
490 repoName = models.StripGitExt(repoName)
491 rkey := strings.ToLower(repoName)
492 l = l.With("repoName", repoName, "rkey", rkey)
493
494 defaultBranch := r.FormValue("branch")
495 if defaultBranch == "" {
496 defaultBranch = "main"
497 }
498 l = l.With("defaultBranch", defaultBranch)
499
500 description := r.FormValue("description")
501 if len([]rune(description)) > 140 {
502 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
503 return
504 }
505
506 // ACL validation
507 if !s.aclService.IsRepoCreateAllowed(r.Context(), domain, user.Did) {
508 l.Info("unauthorized")
509 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
510 return
511 }
512
513 // Check for existing repos
514 existingRepo, err := db.GetRepo(
515 s.db,
516 orm.FilterEq("did", user.Did),
517 orm.FilterEq("rkey", rkey),
518 )
519 if err == nil && existingRepo != nil {
520 l.Info("repo exists")
521 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
522 return
523 }
524
525 atpClient, err := s.oauth.AuthorizedClient(r)
526 if err != nil {
527 l.Error("failed to get authorized client", "err", err)
528 s.pages.Notice(w, "repo", "Failed to authorize. Try again later.")
529 return
530 }
531
532 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
533 l.Info("rkey occupied by prior rename alias")
534 s.pages.Notice(w, "repo", fmt.Sprintf("The name %q still has a record on your PDS from a prior rename. Pick a different name, or delete at://%s/%s/%s first.", rkey, user.Did, tangled.RepoNSID, rkey))
535 return
536 }
537
538 client, err := s.oauth.ServiceClient(
539 r,
540 oauth.WithService(domain),
541 oauth.WithLxm(tangled.RepoCreateNSID),
542 oauth.WithDev(s.config.Core.Dev),
543 )
544 if err != nil {
545 l.Error("service auth failed", "err", err)
546 s.pages.Notice(w, "repo", "Failed to authenticate. Please log out and log back in again.")
547 return
548 }
549
550 input := &tangled.RepoCreate_Input{
551 Rkey: rkey,
552 Name: rkey,
553 DefaultBranch: &defaultBranch,
554 }
555 createResp, err := tangled.RepoCreate(
556 r.Context(),
557 client,
558 input,
559 )
560 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
561 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
562 s.pages.Notice(w, "repo", err.Error())
563 return
564 }
565
566 var repoDid string
567 if createResp != nil && createResp.RepoDid != nil {
568 repoDid = *createResp.RepoDid
569 }
570 if repoDid == "" {
571 l.Error("knot returned empty repo DID")
572 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
573 return
574 }
575
576 repo := &models.Repo{
577 Did: user.Did,
578 Name: repoName,
579 Knot: domain,
580 Rkey: rkey,
581 Description: description,
582 Created: time.Now(),
583 Labels: s.config.Label.DefaultLabelDefs,
584 RepoDid: repoDid,
585 }
586 record := repo.AsRecord()
587
588 cleanupKnot := func() {
589 go func() {
590 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
591 for attempt, delay := range delays {
592 time.Sleep(delay)
593 deleteClient, dErr := s.oauth.ServiceClient(
594 r,
595 oauth.WithService(domain),
596 oauth.WithLxm(tangled.RepoDeleteNSID),
597 oauth.WithDev(s.config.Core.Dev),
598 )
599 if dErr != nil {
600 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
601 continue
602 }
603 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
604 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
605 Did: user.Did,
606 Name: rkey,
607 Rkey: rkey,
608 }); dErr != nil {
609 cancel()
610 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
611 continue
612 }
613 cancel()
614 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
615 return
616 }
617 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
618 "did", user.Did, "repo", repoName, "knot", domain)
619 }()
620 }
621
622 _, err = comatproto.RepoCreateRecord(r.Context(), atpClient, &comatproto.RepoCreateRecord_Input{
623 Collection: tangled.RepoNSID,
624 Repo: user.Did,
625 Rkey: &rkey,
626 Record: &lexutil.LexiconTypeDecoder{
627 Val: &record,
628 },
629 })
630 if err != nil {
631 l.Info("PDS write failed", "err", err)
632 cleanupKnot()
633 if rkeyOccupied(r.Context(), atpClient, user.Did, rkey) {
634 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository named %q.", rkey))
635 } else {
636 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
637 }
638 return
639 }
640
641 aturi := fmt.Sprintf("at://%s/%s/%s", user.Did, tangled.RepoNSID, rkey)
642 l = l.With("aturi", aturi)
643 l.Info("wrote to PDS")
644
645 tx, err := s.db.BeginTx(r.Context(), nil)
646 if err != nil {
647 l.Info("txn failed", "err", err)
648 s.pages.Notice(w, "repo", "Failed to save repository information.")
649 return
650 }
651
652 rollback := func() {
653 err1 := tx.Rollback()
654 err2 := s.enforcer.E.LoadPolicy()
655 err3 := rollbackRecord(context.Background(), aturi, atpClient)
656
657 if errors.Is(err1, sql.ErrTxDone) {
658 err1 = nil
659 }
660
661 if errs := errors.Join(err1, err2, err3); errs != nil {
662 l.Error("failed to rollback changes", "errs", errs)
663 }
664
665 if aturi != "" {
666 cleanupKnot()
667 }
668 }
669 defer rollback()
670
671 err = db.AddRepo(tx, repo)
672 if err != nil {
673 l.Error("db write failed", "err", err)
674 s.pages.Notice(w, "repo", "Failed to save repository information.")
675 return
676 }
677
678 rbacPath := repo.RepoIdentifier()
679 err = s.enforcer.AddRepo(user.Did, domain, rbacPath)
680 if err != nil {
681 l.Error("acl setup failed", "err", err)
682 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
683 return
684 }
685
686 err = tx.Commit()
687 if err != nil {
688 l.Error("txn commit failed", "err", err)
689 http.Error(w, err.Error(), http.StatusInternalServerError)
690 return
691 }
692
693 err = s.enforcer.E.SavePolicy()
694 if err != nil {
695 l.Error("acl save failed", "err", err)
696 http.Error(w, err.Error(), http.StatusInternalServerError)
697 return
698 }
699
700 aturi = ""
701
702 s.notifier.NewRepo(r.Context(), repo)
703 switch {
704 case repoDid != "":
705 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
706 default:
707 handle := s.pages.DisplayHandle(r.Context(), user.Did)
708 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, rkey))
709 }
710 }
711}
712
713func rkeyOccupied(ctx context.Context, client *atclient.APIClient, did, rkey string) bool {
714 probeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
715 defer cancel()
716 resp, err := comatproto.RepoGetRecord(probeCtx, client, "", tangled.RepoNSID, did, rkey)
717 return err == nil && resp != nil
718}
719
720// this is used to rollback changes made to the PDS
721//
722// it is a no-op if the provided ATURI is empty
723func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
724 if aturi == "" {
725 return nil
726 }
727
728 parsed := syntax.ATURI(aturi)
729
730 collection := parsed.Collection().String()
731 repo := parsed.Authority().String()
732 rkey := parsed.RecordKey().String()
733
734 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
735 Collection: collection,
736 Repo: repo,
737 Rkey: rkey,
738 })
739 return err
740}
741
742func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
743 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
744 if err != nil {
745 return err
746 }
747 // already present
748 if len(defaultLabels) == len(defaults) {
749 return nil
750 }
751
752 labelDefs, err := models.FetchLabelDefs(r, defaults)
753 if err != nil {
754 return err
755 }
756
757 // Insert each label definition to the database
758 for _, labelDef := range labelDefs {
759 _, err = db.AddLabelDefinition(e, &labelDef)
760 if err != nil {
761 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
762 }
763 }
764
765 return nil
766}
767
768func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
769 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
770 if err != nil {
771 logger.Error("failed to resolve tangled.org DID", "err", err)
772 return
773 }
774
775 pdsEndpoint := resolved.PDSEndpoint()
776 if pdsEndpoint == "" {
777 logger.Error("no PDS endpoint found for tangled.sh DID")
778 return
779 }
780
781 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
782 if err != nil {
783 logger.Error("failed to create appassword session... skipping fetch", "err", err)
784 return
785 }
786
787 l := log.SubLogger(logger, "bluesky")
788
789 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
790 defer ticker.Stop()
791
792 for {
793 // refresh session if necessary
794 if !session.IsValid() {
795 l.Debug("access token expired, refreshing session")
796 if err := session.RefreshSession(); err != nil {
797 l.Error("failed to refresh session, stopping bluesky updater", "err", err)
798 return
799 }
800 l.Debug("session refreshed")
801 }
802
803 // make client
804 client := xrpc.Client{
805 Auth: &xrpc.AuthInfo{
806 AccessJwt: session.AccessJwt,
807 Did: session.Did,
808 },
809 Host: session.PdsEndpoint,
810 }
811
812 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
813 if err != nil {
814 l.Error("failed to fetch bluesky posts", "err", err)
815 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
816 l.Error("failed to insert bluesky posts", "err", err)
817 } else {
818 l.Info("inserted bluesky posts", "count", len(posts))
819 }
820
821 select {
822 case <-ticker.C:
823 case <-ctx.Done():
824 l.Info("stopping bluesky updater")
825 return
826 }
827 }
828}