Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

at master 2.8 kB View raw
1package xrpc 2 3import ( 4 "encoding/json" 5 "fmt" 6 "net/http" 7 "strings" 8 9 "github.com/bluesky-social/indigo/api/atproto" 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/indigo/xrpc" 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/rbac" 14 "tangled.org/core/spindle/models" 15 xrpcerr "tangled.org/core/xrpc/errors" 16) 17 18func (x *Xrpc) CancelPipeline(w http.ResponseWriter, r *http.Request) { 19 l := x.Logger 20 fail := func(e xrpcerr.XrpcError) { 21 l.Error("failed", "kind", e.Tag, "error", e.Message) 22 writeError(w, e, http.StatusBadRequest) 23 } 24 l.Debug("cancel pipeline") 25 26 actorDid, ok := r.Context().Value(ActorDid).(syntax.DID) 27 if !ok { 28 fail(xrpcerr.MissingActorDidError) 29 return 30 } 31 32 var input tangled.PipelineCancelPipeline_Input 33 if err := json.NewDecoder(r.Body).Decode(&input); err != nil { 34 fail(xrpcerr.GenericError(err)) 35 return 36 } 37 38 aturi := syntax.ATURI(input.Pipeline) 39 wid := models.WorkflowId{ 40 PipelineId: models.PipelineId{ 41 Knot: strings.TrimPrefix(aturi.Authority().String(), "did:web:"), 42 Rkey: aturi.RecordKey().String(), 43 }, 44 Name: input.Workflow, 45 } 46 l.Debug("cancel pipeline", "wid", wid) 47 48 // unfortunately we have to resolve repo-at here 49 repoAt, err := syntax.ParseATURI(input.Repo) 50 if err != nil { 51 fail(xrpcerr.InvalidRepoError(input.Repo)) 52 return 53 } 54 55 ident, err := x.Resolver.ResolveIdent(r.Context(), repoAt.Authority().String()) 56 if err != nil || ident.Handle.IsInvalidHandle() { 57 fail(xrpcerr.GenericError(fmt.Errorf("failed to resolve handle: %w", err))) 58 return 59 } 60 61 xrpcc := xrpc.Client{Host: ident.PDSEndpoint()} 62 resp, err := atproto.RepoGetRecord(r.Context(), &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 63 if err != nil { 64 fail(xrpcerr.GenericError(err)) 65 return 66 } 67 68 repoRec, ok := resp.Value.Val.(*tangled.Repo) 69 if !ok { 70 fail(xrpcerr.RepoNotFoundError) 71 return 72 } 73 if repoRec.RepoDid == nil || *repoRec.RepoDid == "" { 74 fail(xrpcerr.GenericError(fmt.Errorf("repo record %s has no repoDid", repoAt))) 75 return 76 } 77 repoDid := *repoRec.RepoDid 78 79 // TODO: fine-grained role based control 80 isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid.String(), rbac.ThisServer, repoDid) 81 if err != nil || !isRepoOwner { 82 fail(xrpcerr.AccessControlError(actorDid.String())) 83 return 84 } 85 for _, engine := range x.Engines { 86 l.Debug("destroying workflow", "wid", wid) 87 err = engine.DestroyWorkflow(r.Context(), wid) 88 if err != nil { 89 fail(xrpcerr.GenericError(fmt.Errorf("failed to destroy workflow: %w", err))) 90 return 91 } 92 err = x.Db.StatusCancelled(wid, "User canceled the workflow", -1, x.Notifier) 93 if err != nil { 94 fail(xrpcerr.GenericError(fmt.Errorf("failed to emit status failed: %w", err))) 95 return 96 } 97 } 98 99 w.WriteHeader(http.StatusOK) 100}