Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "net/http" 12 "net/url" 13 "sync" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 18 "tangled.org/core/api/tangled" 19 avmodels "tangled.org/core/appview/models" 20 "tangled.org/core/eventconsumer" 21 "tangled.org/core/log" 22 "tangled.org/core/rbac" 23 "tangled.org/core/spindle/db" 24 "tangled.org/core/spindle/git" 25 "tangled.org/core/spindle/models" 26 "tangled.org/core/tapc" 27 "tangled.org/core/tid" 28 "tangled.org/core/workflow" 29) 30 31const ( 32 maxPendingPerRepo = 64 33 pendingCollabTTL = 10 * time.Minute 34) 35 36type pendingCollabEvent struct { 37 evt *tapc.RecordEventData 38 at time.Time 39} 40 41type Tap struct { 42 logger *slog.Logger 43 spindle *Spindle 44 tap tapc.Client 45 pendingMu sync.Mutex 46 pendingCollabs map[syntax.DID][]pendingCollabEvent 47} 48 49func NewTapClient(s *Spindle) *Tap { 50 return &Tap{ 51 logger: log.SubLogger(s.l, "tapclient"), 52 spindle: s, 53 tap: tapc.NewClient(s.cfg.Server.Tap.Url, s.cfg.Server.Tap.AdminPassword), 54 pendingCollabs: make(map[syntax.DID][]pendingCollabEvent), 55 } 56} 57 58func (t *Tap) AddOwnerDIDs(ctx context.Context, dids []syntax.DID) error { 59 if len(dids) == 0 { 60 return nil 61 } 62 return t.tap.AddRepos(ctx, dids) 63} 64 65func (t *Tap) Start(connCtx context.Context) { 66 go t.tap.Connect(connCtx, &tapc.SimpleIndexer{ 67 EventHandler: t.processEvent, 68 ConnectHandler: t.onConnect, 69 }) 70 go t.purgePendingCollabsLoop(t.spindle.rootCtx) 71} 72 73func (t *Tap) onConnect(ctx context.Context) { 74 t.spindle.declareTapInterest(ctx) 75} 76 77func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error { 78 if evt.Type != tapc.EvtRecord || evt.Record == nil { 79 return nil 80 } 81 switch evt.Record.Collection.String() { 82 case tangled.RepoNSID: 83 return t.processRepo(ctx, evt.Record) 84 case tangled.RepoCollaboratorNSID: 85 return t.processCollaborator(ctx, evt.Record) 86 case tangled.RepoPullNSID: 87 return t.processPull(ctx, evt.Record) 88 } 89 return nil 90} 91 92func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error { 93 l := t.logger.With("collection", tangled.RepoNSID, "did", evt.Did, "rkey", evt.Rkey) 94 95 ownerDid := evt.Did 96 rkey := evt.Rkey 97 98 switch evt.Action { 99 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 100 record := tangled.Repo{} 101 if err := json.Unmarshal(evt.Record, &record); err != nil { 102 l.Warn("skipping invalid repo record", "err", err) 103 return nil 104 } 105 106 hostname := t.spindle.cfg.Server.Hostname 107 prior, priorErr := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey) 108 knownRepo := priorErr == nil 109 110 if record.Spindle == nil || *record.Spindle != hostname { 111 if knownRepo { 112 l.Info("tearing down repo reassigned from this spindle", "newSpindle", record.Spindle) 113 return t.teardownRepo(l, prior, ownerDid, rkey) 114 } 115 return nil 116 } 117 118 if record.RepoDid == nil || *record.RepoDid == "" { 119 l.Warn("skipping repo record without repoDid") 120 return nil 121 } 122 repoDid, err := syntax.ParseDID(*record.RepoDid) 123 if err != nil { 124 l.Warn("skipping repo record with malformed repoDid", "value", *record.RepoDid, "err", err) 125 return nil 126 } 127 128 if err := t.spindle.e.AddRepo(ownerDid.String(), rbac.ThisServer, repoDid.String()); err != nil { 129 l.Error("failed to add repo policy", "err", err) 130 return fmt.Errorf("add repo policy: %w", err) 131 } 132 133 src := eventconsumer.NewKnotSource(record.Knot) 134 t.spindle.ks.AddSource(t.spindle.rootCtx, src) 135 136 repo := db.Repo{ 137 Knot: record.Knot, 138 Owner: ownerDid, 139 Rkey: rkey, 140 RepoDid: repoDid, 141 CreatedAt: record.CreatedAt, 142 } 143 144 if err := t.spindle.db.AddRepo(repo); err != nil { 145 l.Error("failed to add repo row", "err", err) 146 return fmt.Errorf("add repo: %w", err) 147 } 148 149 // setup sparse sync 150 repoCloneUri := t.spindle.newRepoCloneUrl(repo.Knot, repo.RepoDid) 151 repoPath := t.spindle.newRepoPath(repo.RepoDid) 152 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, ""); err != nil { 153 return fmt.Errorf("setting up sparse-clone git repo: %w", err) 154 } 155 156 legacyName := "" 157 if record.Name != nil { 158 legacyName = *record.Name 159 } 160 migrateLegacyRepoSecrets(ctx, t.spindle.db, t.spindle.vault, l, ownerDid, legacyName, rkey, repoDid) 161 migrateLegacyRepoCasbin(ctx, t.spindle.db, t.spindle.e, l, ownerDid, legacyName, rkey, repoDid) 162 163 if removed, err := t.spindle.db.CollapseRepoSiblings(ownerDid, repoDid); err != nil { 164 l.Warn("collapse rename siblings failed", "err", err) 165 } else if removed > 0 { 166 l.Info("collapsed rename leftovers", "owner", ownerDid, "repo_did", repoDid, "removed", removed) 167 } 168 169 if e := t.spindle.embedTap; e == nil || !e.closed.Load() { 170 if err := t.tap.AddRepos(ctx, []syntax.DID{ownerDid}); err != nil { 171 l.Warn("tap AddRepos rejected", "did", ownerDid, "err", err) 172 } 173 } 174 t.spindle.jc.AddDid(ownerDid.String()) 175 176 t.drainPendingCollabs(ctx, repoDid) 177 178 case tapc.RecordDeleteAction: 179 repo, err := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey) 180 if err != nil { 181 l.Info("skipping delete for unknown repo") 182 return nil 183 } 184 return t.teardownRepo(l, repo, ownerDid, rkey) 185 } 186 return nil 187} 188 189func (t *Tap) teardownRepo(l *slog.Logger, repo *db.Repo, ownerDid syntax.DID, rkey syntax.RecordKey) error { 190 if repo.RepoDid != "" { 191 collabs, err := t.spindle.db.ListCollaboratorsByRepoDid(repo.RepoDid) 192 if err != nil { 193 l.Error("failed to list collaborators for cleanup", "err", err) 194 return fmt.Errorf("list collaborators: %w", err) 195 } 196 for _, c := range collabs { 197 if err := t.spindle.e.RemoveCollaborator(c.Subject.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil { 198 l.Error("failed to remove collaborator policy", "subject", c.Subject, "err", err) 199 return fmt.Errorf("remove collaborator policy: %w", err) 200 } 201 } 202 if err := t.spindle.db.DeleteRepoCollaboratorsByRepoDid(repo.RepoDid); err != nil { 203 l.Error("failed to clear collaborator rows", "err", err) 204 return err 205 } 206 if err := t.spindle.e.RemoveRepo(ownerDid.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil { 207 l.Error("failed to remove repo policy", "err", err) 208 return fmt.Errorf("remove repo policy: %w", err) 209 } 210 } 211 if err := t.spindle.db.DeleteRepoByOwnerRkey(ownerDid, rkey); err != nil { 212 l.Error("failed to delete repo row", "err", err) 213 return fmt.Errorf("delete repo row: %w", err) 214 } 215 // TODO: clear sparse-synced git repo 216 return nil 217} 218 219func (t *Tap) processCollaborator(ctx context.Context, evt *tapc.RecordEventData) error { 220 l := t.logger.With("collection", tangled.RepoCollaboratorNSID, "did", evt.Did, "rkey", evt.Rkey) 221 222 switch evt.Action { 223 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 224 record := tangled.RepoCollaborator{} 225 if err := json.Unmarshal(evt.Record, &record); err != nil { 226 l.Warn("skipping invalid collaborator record", "err", err) 227 return nil 228 } 229 230 actor := evt.Did 231 rkey := evt.Rkey 232 233 subjectDid, err := syntax.ParseDID(record.Subject) 234 if err != nil { 235 l.Info("skipping collaborator with malformed subject DID", "subject", record.Subject, "err", err) 236 return nil 237 } 238 if _, err := t.spindle.res.ResolveIdent(ctx, subjectDid.String()); err != nil { 239 l.Info("skipping unresolvable collaborator subject", "subject", subjectDid, "err", err) 240 return nil 241 } 242 243 repoRefDid, err := syntax.ParseDID(record.Repo) 244 if err != nil { 245 l.Info("skipping collaborator with non-DID repo ref", "repo", record.Repo, "err", err) 246 return nil 247 } 248 repo, lookupErr := t.spindle.db.GetRepoByDid(repoRefDid) 249 if errors.Is(lookupErr, sql.ErrNoRows) { 250 t.bufferCollab(repoRefDid, evt) 251 l.Info("buffering collaborator until repo arrives", "repo", repoRefDid) 252 return nil 253 } 254 if lookupErr != nil { 255 return fmt.Errorf("lookup repo %s: %w", repoRefDid, lookupErr) 256 } 257 repoDid := repo.RepoDid 258 ownerDid := repo.Owner 259 260 if actor != ownerDid { 261 l.Info("rejecting collaborator with non-owner actor", "actor", actor, "owner", ownerDid) 262 return nil 263 } 264 265 ok, err := t.spindle.e.IsCollaboratorInviteAllowed(ownerDid.String(), rbac.ThisServer, repoDid.String()) 266 if err != nil { 267 l.Error("invite permission check failed", "err", err) 268 return fmt.Errorf("invite check: %w", err) 269 } 270 if !ok { 271 l.Info("rejecting collaborator invite", "owner", ownerDid, "repo", repoDid) 272 return nil 273 } 274 275 prior, priorErr := t.spindle.db.GetRepoCollaborator(actor, rkey) 276 staleSubject := priorErr == nil && (prior.Subject != subjectDid || prior.RepoDid != repoDid) 277 278 if err := t.spindle.e.AddCollaborator(subjectDid.String(), rbac.ThisServer, repoDid.String()); err != nil { 279 l.Error("failed to add collaborator policy", "err", err) 280 return fmt.Errorf("add collaborator policy: %w", err) 281 } 282 if staleSubject { 283 if err := t.spindle.e.RemoveCollaborator(prior.Subject.String(), rbac.ThisServer, prior.RepoDid.String()); err != nil { 284 l.Error("failed to remove stale collaborator policy", "err", err) 285 return fmt.Errorf("remove stale collaborator: %w", err) 286 } 287 } 288 if err := t.spindle.db.AddRepoCollaborator(db.RepoCollaborator{ 289 OwnerDid: actor, 290 Rkey: rkey, 291 Subject: subjectDid, 292 RepoDid: repoDid, 293 }); err != nil { 294 l.Error("failed to persist collaborator row", "err", err) 295 return fmt.Errorf("track collaborator: %w", err) 296 } 297 298 case tapc.RecordDeleteAction: 299 actor := evt.Did 300 rkey := evt.Rkey 301 302 tracked, err := t.spindle.db.GetRepoCollaborator(actor, rkey) 303 if err != nil { 304 l.Info("skipping delete for unknown collaborator record") 305 return nil 306 } 307 if err := t.spindle.e.RemoveCollaborator(tracked.Subject.String(), rbac.ThisServer, tracked.RepoDid.String()); err != nil { 308 l.Error("failed to remove collaborator policy", "err", err) 309 return fmt.Errorf("remove collaborator policy: %w", err) 310 } 311 if err := t.spindle.db.DeleteRepoCollaborator(actor, rkey); err != nil { 312 l.Error("failed to delete collaborator row", "err", err) 313 return fmt.Errorf("delete collaborator row: %w", err) 314 } 315 } 316 return nil 317} 318 319func (t *Tap) processPull(ctx context.Context, evt *tapc.RecordEventData) error { 320 l := t.logger.With("collection", evt.Collection, "did", evt.Did, "rkey", evt.Rkey) 321 322 // only listen to live events 323 if !evt.Live { 324 l.Info("skipping backfill event", "event", evt.AtUri()) 325 return nil 326 } 327 328 switch evt.Action { 329 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 330 record := tangled.RepoPull{} 331 if err := json.Unmarshal(evt.Record, &record); err != nil { 332 l.Error("invalid record", "err", err) 333 return fmt.Errorf("parsing record: %w", err) 334 } 335 336 // ignore legacy records 337 if record.Target == nil { 338 l.Info("ignoring pull record: target repo is nil") 339 return nil 340 } 341 342 // ignore patch-based and fork-based PRs 343 if record.Source == nil || record.Source.Repo != nil { 344 l.Info("ignoring pull record: not a branch-based pull request") 345 return nil 346 } 347 348 // skip if target repo is unknown 349 repo, err := t.spindle.db.GetRepoByDid(syntax.DID(record.Target.Repo)) 350 if err != nil { 351 l.Warn("target repo is not ingested yet", "repo", record.Target.Repo, "err", err) 352 return fmt.Errorf("target repo is unknown") 353 } 354 355 // only accept branch-based PR (excluding patch-based and fork-based) 356 if record.Source == nil || record.Source.Repo != nil { 357 l.Warn("skipping non-branch-based PR") 358 return nil 359 } 360 361 latestSubmission, err := t.fetchLatestSubmission(ctx, evt.Did.String(), evt.Rkey.String(), &record) 362 if err != nil { 363 return err 364 } 365 sourceSha := latestSubmission.SourceRev 366 367 scheme := "https" 368 if t.spindle.cfg.Server.Dev { 369 scheme = "http" 370 } 371 client := &indigoxrpc.Client{Host: fmt.Sprintf("%s://%s", scheme, repo.Knot)} 372 373 // fetch current default branch 374 defaultBranch, _ := func(repo syntax.DID) (string, error) { 375 defaultBranchOut, err := tangled.RepoGetDefaultBranch(ctx, client, repo.String()) 376 if err != nil { 377 return "", err 378 } 379 return defaultBranchOut.Name, nil 380 }(repo.RepoDid) 381 382 compiler := workflow.Compiler{ 383 Trigger: tangled.Pipeline_TriggerMetadata{ 384 Kind: string(workflow.TriggerKindPullRequest), 385 PullRequest: &tangled.Pipeline_PullRequestTriggerData{ 386 Action: "create", 387 SourceBranch: record.Source.Branch, 388 SourceSha: sourceSha, 389 TargetBranch: record.Target.Branch, 390 }, 391 Repo: &tangled.Pipeline_TriggerRepo{ 392 Did: repo.Owner.String(), 393 Knot: repo.Knot, 394 Repo: (*string)(&repo.Rkey), 395 RepoDid: (*string)(&repo.RepoDid), 396 DefaultBranch: defaultBranch, 397 }, 398 }, 399 } 400 401 repoUri := t.spindle.newRepoCloneUrl(repo.Knot, repo.RepoDid) 402 repoPath := t.spindle.newRepoPath(repo.RepoDid) 403 404 // load workflow definitions from rev (without spindle context) 405 rawPipeline, err := t.spindle.loadPipeline(ctx, repoUri, repoPath, sourceSha) 406 if err != nil { 407 // don't retry 408 l.Error("failed loading pipeline", "err", err) 409 return nil 410 } 411 if len(rawPipeline) == 0 { 412 l.Info("no workflow definition find for the repo. skipping the event") 413 return nil 414 } 415 tpl := compiler.Compile(compiler.Parse(rawPipeline)) 416 // TODO: pass compile error to workflow log 417 for _, w := range compiler.Diagnostics.Errors { 418 l.Error(w.String()) 419 } 420 for _, w := range compiler.Diagnostics.Warnings { 421 l.Warn(w.String()) 422 } 423 if len(tpl.Workflows) == 0 { 424 l.Info("no workflow matching trigger 'pull_request'. skipping the event") 425 return nil 426 } 427 428 pipelineId := models.PipelineId{ 429 Knot: tpl.TriggerMetadata.Repo.Knot, 430 Rkey: tid.TID(), 431 } 432 if err := t.spindle.db.CreatePipelineEvent(pipelineId.Rkey, tpl, t.spindle.n); err != nil { 433 l.Error("failed to create pipeline event", "err", err) 434 return nil 435 } 436 err = t.spindle.processPipeline(ctx, repo.RepoDid, tpl, pipelineId) 437 if err != nil { 438 // don't retry 439 l.Error("failed processing pipeline", "err", err) 440 return nil 441 } 442 case tapc.RecordDeleteAction: 443 // no-op 444 } 445 return nil 446} 447 448func (t *Tap) bufferCollab(repoDid syntax.DID, evt *tapc.RecordEventData) { 449 t.pendingMu.Lock() 450 defer t.pendingMu.Unlock() 451 list := t.pendingCollabs[repoDid] 452 list = append(list, pendingCollabEvent{evt: evt, at: time.Now()}) 453 if len(list) > maxPendingPerRepo { 454 list = list[len(list)-maxPendingPerRepo:] 455 } 456 t.pendingCollabs[repoDid] = list 457} 458 459func (t *Tap) drainPendingCollabs(ctx context.Context, repoDid syntax.DID) { 460 t.pendingMu.Lock() 461 list := t.pendingCollabs[repoDid] 462 delete(t.pendingCollabs, repoDid) 463 t.pendingMu.Unlock() 464 if len(list) == 0 { 465 return 466 } 467 cutoff := time.Now().Add(-pendingCollabTTL) 468 for _, p := range list { 469 if p.at.Before(cutoff) { 470 continue 471 } 472 if err := t.processCollaborator(ctx, p.evt); err != nil { 473 t.logger.Warn("replaying buffered collaborator failed", "repo", repoDid, "rkey", p.evt.Rkey, "err", err) 474 } 475 } 476} 477 478func (t *Tap) purgePendingCollabsLoop(ctx context.Context) { 479 ticker := time.NewTicker(pendingCollabTTL / 2) 480 defer ticker.Stop() 481 for { 482 select { 483 case <-ctx.Done(): 484 return 485 case <-ticker.C: 486 t.purgeStalePendingCollabs() 487 } 488 } 489} 490 491func (t *Tap) purgeStalePendingCollabs() { 492 cutoff := time.Now().Add(-pendingCollabTTL) 493 t.pendingMu.Lock() 494 defer t.pendingMu.Unlock() 495 expired := 0 496 for did, list := range t.pendingCollabs { 497 kept := list[:0] 498 for _, p := range list { 499 if !p.at.Before(cutoff) { 500 kept = append(kept, p) 501 } else { 502 expired++ 503 } 504 } 505 if len(kept) == 0 { 506 delete(t.pendingCollabs, did) 507 } else { 508 t.pendingCollabs[did] = kept 509 } 510 } 511 if expired > 0 { 512 t.logger.Warn("expired buffered collaborator events without matching repo arrival", "count", expired, "ttl", pendingCollabTTL) 513 } 514} 515 516func (t *Tap) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*avmodels.PullSubmission, error) { 517 // resolve the PR owner's identity to fetch the blob from their PDS 518 prOwnerIdent, err := t.spindle.res.ResolveIdent(ctx, did) 519 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() { 520 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err) 521 } 522 523 if len(record.Rounds) == 0 { 524 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record") 525 } 526 527 roundNumber := len(record.Rounds) - 1 528 round := record.Rounds[roundNumber] 529 530 // fetch the blob from the PR owner's PDS 531 prOwnerPds := prOwnerIdent.PDSEndpoint() 532 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds)) 533 if err != nil { 534 return nil, fmt.Errorf("failed to construct blob URL: %w", err) 535 } 536 q := blobUrl.Query() 537 q.Set("cid", round.PatchBlob.Ref.String()) 538 q.Set("did", did) 539 blobUrl.RawQuery = q.Encode() 540 541 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil) 542 if err != nil { 543 return nil, fmt.Errorf("failed to create blob request: %w", err) 544 } 545 req.Header.Set("Content-Type", "application/json") 546 547 blobResp, err := http.DefaultClient.Do(req) 548 if err != nil { 549 return nil, fmt.Errorf("failed to fetch blob: %w", err) 550 } 551 defer blobResp.Body.Close() 552 553 blob := io.ReadCloser(blobResp.Body) 554 latestSubmission, err := avmodels.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob) 555 if err != nil { 556 return nil, fmt.Errorf("failed to parse submission: %w", err) 557 } 558 559 return latestSubmission, nil 560}