Monorepo for Tangled
tangled.org
1package pulls
2
3import (
4 "encoding/json"
5 "fmt"
6 "net/http"
7 "time"
8
9 "tangled.org/core/api/tangled"
10 "tangled.org/core/appview/db"
11 "tangled.org/core/appview/knotcompat"
12 "tangled.org/core/appview/models"
13 "tangled.org/core/appview/oauth"
14 "tangled.org/core/appview/pages"
15 "tangled.org/core/appview/reporesolver"
16 "tangled.org/core/appview/xrpcclient"
17 "tangled.org/core/orm"
18 "tangled.org/core/patchutil"
19 "tangled.org/core/types"
20 "tangled.org/core/xrpc"
21
22 comatproto "github.com/bluesky-social/indigo/api/atproto"
23 "github.com/bluesky-social/indigo/atproto/syntax"
24 lexutil "github.com/bluesky-social/indigo/lex/util"
25)
26
27func (s *Pulls) ResubmitPull(w http.ResponseWriter, r *http.Request) {
28 l := s.logger.With("handler", "ResubmitPull")
29
30 user := s.oauth.GetMultiAccountUser(r)
31 if user != nil {
32 l = l.With("user", user.Did)
33 }
34
35 pull, ok := r.Context().Value("pull").(*models.Pull)
36 if !ok {
37 l.Error("failed to get pull")
38 s.pages.Notice(w, "pull-error", "Failed to edit patch. Try again later.")
39 return
40 }
41 l = l.With("pull_id", pull.PullId, "pull_owner", pull.OwnerDid)
42
43 switch r.Method {
44 case http.MethodGet:
45 s.pages.PullResubmitFragment(w, pages.PullResubmitParams{
46 RepoInfo: s.repoResolver.GetRepoInfo(r, user),
47 Pull: pull,
48 })
49 return
50 case http.MethodPost:
51 if pull.IsPatchBased() {
52 s.resubmitPatch(w, r)
53 return
54 } else if pull.IsBranchBased() {
55 s.resubmitBranch(w, r)
56 return
57 } else if pull.IsForkBased() {
58 s.resubmitFork(w, r)
59 return
60 }
61 }
62}
63
64func (s *Pulls) resubmitPatch(w http.ResponseWriter, r *http.Request) {
65 l := s.logger.With("handler", "resubmitPatch")
66
67 user := s.oauth.GetMultiAccountUser(r)
68 if user != nil {
69 l = l.With("user", user.Did)
70 }
71
72 pull, ok := r.Context().Value("pull").(*models.Pull)
73 if !ok {
74 l.Error("failed to get pull")
75 s.pages.Notice(w, "pull-error", "Failed to edit patch. Try again later.")
76 return
77 }
78 l = l.With("pull_id", pull.PullId, "pull_owner", pull.OwnerDid)
79
80 if user == nil || user.Did != pull.OwnerDid {
81 l.Warn("unauthorized user", "actual_user", user.Did, "expected_owner", pull.OwnerDid)
82 w.WriteHeader(http.StatusUnauthorized)
83 return
84 }
85
86 f, err := s.repoResolver.Resolve(r)
87 if err != nil {
88 l.Error("failed to get repo and knot", "err", err)
89 return
90 }
91
92 patch := r.FormValue("patch")
93
94 s.resubmitPullHelper(w, r, f, syntax.DID(user.Did), pull, patch, "", "")
95}
96
97func (s *Pulls) resubmitBranch(w http.ResponseWriter, r *http.Request) {
98 l := s.logger.With("handler", "resubmitBranch")
99
100 user := s.oauth.GetMultiAccountUser(r)
101 if user != nil {
102 l = l.With("user", user.Did)
103 }
104
105 pull, ok := r.Context().Value("pull").(*models.Pull)
106 if !ok {
107 l.Error("failed to get pull")
108 s.pages.Notice(w, "resubmit-error", "Failed to edit patch. Try again later.")
109 return
110 }
111 l = l.With("pull_id", pull.PullId, "pull_owner", pull.OwnerDid, "target_branch", pull.TargetBranch)
112
113 if user == nil || user.Did != pull.OwnerDid {
114 l.Warn("unauthorized user", "actual_user", user.Did, "expected_owner", pull.OwnerDid)
115 w.WriteHeader(http.StatusUnauthorized)
116 return
117 }
118
119 f, err := s.repoResolver.Resolve(r)
120 if err != nil {
121 l.Error("failed to get repo and knot", "err", err)
122 return
123 }
124
125 roles := s.acl.RolesInRepo(r.Context(), f, user.Did)
126 if !roles.IsPushAllowed() {
127 l.Warn("unauthorized user - no push permission")
128 w.WriteHeader(http.StatusUnauthorized)
129 return
130 }
131
132 xrpcc := s.knotClient(f.Knot)
133
134 xrpcBytes, err := tangled.RepoCompare(r.Context(), xrpcc, f.RepoIdentifier(), pull.TargetBranch, pull.PullSource.Branch)
135 if err != nil {
136 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
137 l.Error("failed to call XRPC repo.compare", "xrpcerr", xrpcerr, "err", err, "source_branch", pull.PullSource.Branch)
138 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.")
139 return
140 }
141 l.Error("compare request failed", "err", err, "source_branch", pull.PullSource.Branch)
142 s.pages.Notice(w, "resubmit-error", err.Error())
143 return
144 }
145
146 var comparison types.RepoFormatPatchResponse
147 if err := json.Unmarshal(xrpcBytes, &comparison); err != nil {
148 l.Error("failed to decode XRPC compare response", "err", err)
149 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.")
150 return
151 }
152
153 sourceRev := comparison.Rev2
154 patch := comparison.FormatPatchRaw
155 combined := comparison.CombinedPatchRaw
156
157 s.resubmitPullHelper(w, r, f, syntax.DID(user.Did), pull, patch, combined, sourceRev)
158}
159
160func (s *Pulls) resubmitFork(w http.ResponseWriter, r *http.Request) {
161 l := s.logger.With("handler", "resubmitFork")
162
163 user := s.oauth.GetMultiAccountUser(r)
164 if user != nil {
165 l = l.With("user", user.Did)
166 }
167
168 pull, ok := r.Context().Value("pull").(*models.Pull)
169 if !ok {
170 l.Error("failed to get pull")
171 s.pages.Notice(w, "resubmit-error", "Failed to edit patch. Try again later.")
172 return
173 }
174 l = l.With("pull_id", pull.PullId, "pull_owner", pull.OwnerDid, "target_branch", pull.TargetBranch)
175
176 if user == nil || user.Did != pull.OwnerDid {
177 l.Warn("unauthorized user", "actual_user", user.Did, "expected_owner", pull.OwnerDid)
178 w.WriteHeader(http.StatusUnauthorized)
179 return
180 }
181
182 f, err := s.repoResolver.Resolve(r)
183 if err != nil {
184 l.Error("failed to get repo and knot", "err", err)
185 return
186 }
187
188 forkRepo, err := db.GetRepoByDid(s.db, string(*pull.PullSource.RepoDid))
189 if err != nil {
190 l.Error("failed to get source repo", "err", err, "repo_did", pull.PullSource.RepoDid.String())
191 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.")
192 return
193 }
194
195 // update the hidden tracking branch to latest
196 client, err := s.oauth.ServiceClient(
197 r,
198 oauth.WithService(forkRepo.Knot),
199 oauth.WithLxm(tangled.RepoHiddenRefNSID),
200 oauth.WithDev(s.config.Core.Dev),
201 )
202 if err != nil {
203 l.Error("failed to connect to knot server", "err", err, "fork_knot", forkRepo.Knot)
204 return
205 }
206
207 resp, err := tangled.RepoHiddenRef(
208 r.Context(),
209 client,
210 &tangled.RepoHiddenRef_Input{
211 ForkRef: pull.PullSource.Branch,
212 RemoteRef: pull.TargetBranch,
213 Repo: forkRepo.RepoAt().String(),
214 },
215 )
216 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
217 s.logger.Error("failed to set hidden ref", "xrpcerr", xrpcerr, "err", err)
218 s.pages.Notice(w, "resubmit-error", xrpcerr.Error())
219 return
220 }
221 if !resp.Success {
222 l.Error("failed to update tracking ref", "err", resp.Error, "fork_ref", pull.PullSource.Branch, "remote_ref", pull.TargetBranch)
223 s.pages.Notice(w, "resubmit-error", "Failed to update tracking ref.")
224 return
225 }
226
227 hiddenRef := fmt.Sprintf("hidden/%s/%s", pull.PullSource.Branch, pull.TargetBranch)
228 // extract patch by performing compare
229 forkXrpcBytes, err := tangled.RepoCompare(r.Context(), s.knotClient(forkRepo.Knot), forkRepo.RepoIdentifier(), hiddenRef, pull.PullSource.Branch)
230 if err != nil {
231 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
232 l.Error("failed to call XRPC repo.compare for fork", "xrpcerr", xrpcerr, "err", err, "hidden_ref", hiddenRef, "source_branch", pull.PullSource.Branch)
233 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.")
234 return
235 }
236 l.Error("failed to compare branches", "err", err, "hidden_ref", hiddenRef, "source_branch", pull.PullSource.Branch)
237 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.")
238 return
239 }
240
241 var forkComparison types.RepoFormatPatchResponse
242 if err := json.Unmarshal(forkXrpcBytes, &forkComparison); err != nil {
243 l.Error("failed to decode XRPC compare response for fork", "err", err)
244 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.")
245 return
246 }
247
248 // Use the fork comparison we already made
249 comparison := forkComparison
250
251 sourceRev := comparison.Rev2
252 patch := comparison.FormatPatchRaw
253 combined := comparison.CombinedPatchRaw
254
255 s.resubmitPullHelper(w, r, f, syntax.DID(user.Did), pull, patch, combined, sourceRev)
256}
257
258func (s *Pulls) resubmitPullHelper(
259 w http.ResponseWriter,
260 r *http.Request,
261 repo *models.Repo,
262 userDid syntax.DID,
263 pull *models.Pull,
264 patch string,
265 combined string,
266 sourceRev string,
267) {
268 l := s.logger.With("handler", "resubmitPullHelper", "user", userDid, "pull_id", pull.PullId, "target_branch", pull.TargetBranch)
269
270 stack := r.Context().Value("stack").(models.Stack)
271 if stack != nil && len(stack) != 1 {
272 l.Info("resubmitting stacked PR", "stack_size", len(stack))
273 s.resubmitStackedPullHelper(w, r, repo, userDid, pull, patch)
274 return
275 }
276
277 if err := s.validator.ValidatePatch(&patch); err != nil {
278 s.pages.Notice(w, "resubmit-error", err.Error())
279 return
280 }
281
282 if patch == pull.LatestPatch() {
283 s.pages.Notice(w, "resubmit-error", "Patch is identical to previous submission.")
284 return
285 }
286
287 // validate sourceRev if branch/fork based
288 if pull.IsBranchBased() || pull.IsForkBased() {
289 if sourceRev == pull.LatestSha() {
290 s.pages.Notice(w, "resubmit-error", "This branch has not changed since the last submission.")
291 return
292 }
293 }
294
295 pullAt := pull.AtUri()
296 newRoundNumber := len(pull.Submissions)
297 newPatch := patch
298 newSourceRev := sourceRev
299 combinedPatch := combined
300
301 client, err := s.oauth.AuthorizedClient(r)
302 if err != nil {
303 l.Error("failed to authorize client", "err", err)
304 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.")
305 return
306 }
307
308 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoPullNSID, userDid.String(), pull.Rkey)
309 if err != nil {
310 // failed to get record
311 l.Error("failed to get record from PDS", "err", err, "rkey", pull.Rkey)
312 s.pages.Notice(w, "resubmit-error", "Failed to update pull, no record found on PDS.")
313 return
314 }
315
316 blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(patch), ApplicationGzip)
317 if err != nil {
318 l.Error("failed to upload patch blob", "err", err)
319 s.pages.Notice(w, "resubmit-error", "Failed to update pull request on the PDS. Try again later.")
320 return
321 }
322 record := pull.AsRecord()
323 record.Rounds = append(record.Rounds, &tangled.RepoPull_Round{
324 CreatedAt: time.Now().Format(time.RFC3339),
325 PatchBlob: blob.Blob,
326 })
327
328 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
329 Collection: tangled.RepoPullNSID,
330 Repo: userDid.String(),
331 Rkey: pull.Rkey,
332 SwapRecord: ex.Cid,
333 Record: knotcompat.Pull(&record),
334 })
335 if err != nil {
336 l.Error("failed to update record on PDS", "err", err, "rkey", pull.Rkey)
337 s.pages.Notice(w, "resubmit-error", "Failed to update pull request on the PDS. Try again later.")
338 return
339 }
340
341 err = db.ResubmitPull(s.db, pullAt, newRoundNumber, newPatch, combinedPatch, newSourceRev, blob.Blob)
342 if err != nil {
343 l.Error("failed to resubmit pull request in database", "err", err, "round_number", newRoundNumber)
344 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.")
345 return
346 }
347
348 ownerSlashRepo := reporesolver.GetBaseRepoPath(r, repo)
349 s.pages.HxLocation(w, fmt.Sprintf("/%s/pulls/%d", ownerSlashRepo, pull.PullId))
350}
351
352func (s *Pulls) resubmitStackedPullHelper(
353 w http.ResponseWriter,
354 r *http.Request,
355 repo *models.Repo,
356 userDid syntax.DID,
357 pull *models.Pull,
358 patch string,
359) {
360 l := s.logger.With("handler", "resubmitStackedPullHelper", "user", userDid, "pull_id", pull.PullId, "target_branch", pull.TargetBranch)
361
362 targetBranch := pull.TargetBranch
363
364 origStack, _ := r.Context().Value("stack").(models.Stack)
365
366 formatPatches, err := patchutil.ExtractPatches(patch)
367 if err != nil {
368 l.Error("failed to extract patches", "err", err)
369 s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Failed to parse patches.")
370 return
371 }
372
373 // must have atleast 1 patch to begin with
374 if len(formatPatches) == 0 {
375 l.Error("no patches found in the generated format-patch")
376 s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request: No patches found in the generated patch.")
377 return
378 }
379
380 client, err := s.oauth.AuthorizedClient(r)
381 if err != nil {
382 l.Error("failed to get authorized client", "err", err)
383 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.")
384 return
385 }
386
387 // first upload all blobs
388 blobs := make([]*lexutil.LexBlob, len(formatPatches))
389 for i, p := range formatPatches {
390 blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(p.Raw), ApplicationGzip)
391 if err != nil {
392 l.Error("failed to upload patch blob", "err", err, "patch_index", i)
393 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.")
394 return
395 }
396 l.Info("uploaded blob", "idx", i+1, "total", len(formatPatches))
397 blobs[i] = blob.Blob
398 }
399
400 newStack, err := s.newStack(r.Context(), repo, userDid, targetBranch, pull.PullSource, formatPatches, blobs, nil, nil)
401 if err != nil {
402 l.Error("failed to create resubmitted stack", "err", err)
403 s.pages.Notice(w, "pull-merge-error", "Failed to merge pull request. Try again later.")
404 return
405 }
406
407 // find the diff between the stacks, first, map them by changeId
408 origById := make(map[string]*models.Pull)
409 newById := make(map[string]*models.Pull)
410 for _, p := range origStack {
411 origById[p.LatestSubmission().ChangeId()] = p
412 }
413 for _, p := range newStack {
414 newById[p.LatestSubmission().ChangeId()] = p
415 }
416
417 // commits that got deleted: corresponding pull is closed
418 // commits that got added: new pull is created
419 // commits that got updated: corresponding pull is resubmitted & new round begins
420 additions := make(map[string]*models.Pull)
421 deletions := make(map[string]*models.Pull)
422 updated := make(map[string]struct{})
423
424 // pulls in original stack but not in new one
425 for _, op := range origStack {
426 if _, ok := newById[op.LatestSubmission().ChangeId()]; !ok {
427 deletions[op.LatestSubmission().ChangeId()] = op
428 }
429 }
430
431 // pulls in new stack but not in original one
432 for _, np := range newStack {
433 if _, ok := origById[np.LatestSubmission().ChangeId()]; !ok {
434 additions[np.LatestSubmission().ChangeId()] = np
435 }
436 }
437
438 // NOTE: this loop can be written in any of above blocks,
439 // but is written separately in the interest of simpler code
440 for _, np := range newStack {
441 if op, ok := origById[np.LatestSubmission().ChangeId()]; ok {
442 // pull exists in both stacks
443 updated[op.LatestSubmission().ChangeId()] = struct{}{}
444 }
445 }
446
447 // NOTE: we can go through the newStack and update dependent relations and
448 // rkeys now that we know which ones have been updated
449 // update dependentOn relations for the entire stack
450 var parentAt *syntax.ATURI
451 for _, np := range newStack {
452 if op, ok := origById[np.LatestSubmission().ChangeId()]; ok {
453 // pull exists in both stacks
454 np.Rkey = op.Rkey
455 }
456 np.DependentOn = parentAt
457 x := np.AtUri()
458 parentAt = &x
459 }
460
461 l = l.With("additions", len(additions), "deletions", len(deletions), "updates", len(updated))
462
463 tx, err := s.db.Begin()
464 if err != nil {
465 l.Error("failed to start transaction", "err", err)
466 s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.")
467 return
468 }
469 defer tx.Rollback()
470
471 // pds updates to make
472 var writes []*comatproto.RepoApplyWrites_Input_Writes_Elem
473
474 // deleted pulls are marked as deleted in the DB
475 for _, p := range deletions {
476 // do not do delete already merged PRs
477 if p.State == models.PullMerged {
478 continue
479 }
480
481 err := db.AbandonPulls(tx, orm.FilterEq("repo_did", string(p.RepoDid)), orm.FilterEq("at_uri", p.AtUri()))
482 if err != nil {
483 l.Error("failed to delete pull", "err", err, "pull_id", p.PullId)
484 s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.")
485 return
486 }
487 writes = append(writes, &comatproto.RepoApplyWrites_Input_Writes_Elem{
488 RepoApplyWrites_Delete: &comatproto.RepoApplyWrites_Delete{
489 Collection: tangled.RepoPullNSID,
490 Rkey: p.Rkey,
491 },
492 })
493 }
494
495 // new pulls are created
496 for _, p := range additions {
497 blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(p.LatestPatch()), ApplicationGzip)
498 if err != nil {
499 l.Error("failed to upload patch blob for new pull", "err", err, "change_id", p.LatestSubmission().ChangeId())
500 s.pages.Notice(w, "resubmit-error", "Failed to update pull request on the PDS. Try again later.")
501 return
502 }
503 p.Submissions[0].Blob = *blob.Blob
504
505 if err = db.PutPull(tx, p); err != nil {
506 l.Error("failed to create pull", "err", err, "pull_id", p.PullId, "change_id", p.LatestSubmission().ChangeId())
507 s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.")
508 return
509 }
510
511 record := p.AsRecord()
512 record.Rounds = []*tangled.RepoPull_Round{
513 {
514 CreatedAt: time.Now().Format(time.RFC3339),
515 PatchBlob: blob.Blob,
516 },
517 }
518 writes = append(writes, &comatproto.RepoApplyWrites_Input_Writes_Elem{
519 RepoApplyWrites_Create: &comatproto.RepoApplyWrites_Create{
520 Collection: tangled.RepoPullNSID,
521 Rkey: &p.Rkey,
522 Value: knotcompat.Pull(&record),
523 },
524 })
525 }
526
527 // updated pulls are, well, updated; to start a new round
528 for id := range updated {
529 op, _ := origById[id]
530 np, _ := newById[id]
531
532 // do not update already merged PRs
533 if op.State == models.PullMerged {
534 continue
535 }
536
537 // resubmit the new pull
538 np.Rkey = op.Rkey
539 pullAt := op.AtUri()
540 newRoundNumber := len(op.Submissions)
541 newPatch := np.LatestPatch()
542 combinedPatch := np.LatestSubmission().Combined
543 newSourceRev := np.LatestSha()
544
545 blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(newPatch), ApplicationGzip)
546 if err != nil {
547 l.Error("failed to upload patch blob for update", "err", err, "change_id", id, "pull_id", op.PullId)
548 s.pages.Notice(w, "resubmit-error", "Failed to update pull request on the PDS. Try again later.")
549 return
550 }
551
552 // create new round
553 err = db.ResubmitPull(tx, pullAt, newRoundNumber, newPatch, combinedPatch, newSourceRev, blob.Blob)
554 if err != nil {
555 l.Error("failed to update pull in database", "err", err, "pull_id", op.PullId, "round_number", newRoundNumber)
556 s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.")
557 return
558 }
559
560 // update dependent-on relation
561 if np.DependentOn != nil {
562 err := db.SetDependentOn(tx, *np.DependentOn, orm.FilterEq("at_uri", np.AtUri()))
563 if err != nil {
564 l.Error("failed to update pull in database", "err", err, "pull_id", op.PullId, "round_number", newRoundNumber)
565 s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.")
566 return
567 }
568 }
569
570 record := np.AsRecord()
571 record.Rounds = op.AsRecord().Rounds
572 record.Rounds = append(record.Rounds, &tangled.RepoPull_Round{
573 CreatedAt: time.Now().Format(time.RFC3339),
574 PatchBlob: blob.Blob,
575 })
576 writes = append(writes, &comatproto.RepoApplyWrites_Input_Writes_Elem{
577 RepoApplyWrites_Update: &comatproto.RepoApplyWrites_Update{
578 Collection: tangled.RepoPullNSID,
579 Rkey: op.Rkey,
580 Value: knotcompat.Pull(&record),
581 },
582 })
583 }
584
585 _, err = comatproto.RepoApplyWrites(r.Context(), client, &comatproto.RepoApplyWrites_Input{
586 Repo: userDid.String(),
587 Writes: writes,
588 })
589 if err != nil {
590 l.Error("failed to apply writes for stacked pull request", "err", err, "writes_count", len(writes))
591 s.pages.Notice(w, "pull", "Failed to create stacked pull request. Try again later.")
592 return
593 }
594
595 err = tx.Commit()
596 if err != nil {
597 l.Error("failed to commit resubmit transaction", "err", err)
598 s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.")
599 return
600 }
601
602 ownerSlashRepo := reporesolver.GetBaseRepoPath(r, repo)
603 s.pages.HxLocation(w, fmt.Sprintf("/%s/pulls/%d", ownerSlashRepo, pull.PullId))
604}