Monorepo for Tangled
tangled.org
1package xrpc
2
3import (
4 "encoding/json"
5 "fmt"
6 "net/http"
7 "strings"
8
9 "github.com/bluesky-social/indigo/api/atproto"
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/indigo/xrpc"
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/rbac"
14 "tangled.org/core/spindle/models"
15 xrpcerr "tangled.org/core/xrpc/errors"
16)
17
18func (x *Xrpc) CancelPipeline(w http.ResponseWriter, r *http.Request) {
19 l := x.Logger
20 fail := func(e xrpcerr.XrpcError) {
21 l.Error("failed", "kind", e.Tag, "error", e.Message)
22 writeError(w, e, http.StatusBadRequest)
23 }
24 l.Debug("cancel pipeline")
25
26 actorDid, ok := r.Context().Value(ActorDid).(syntax.DID)
27 if !ok {
28 fail(xrpcerr.MissingActorDidError)
29 return
30 }
31
32 var input tangled.PipelineCancelPipeline_Input
33 if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
34 fail(xrpcerr.GenericError(err))
35 return
36 }
37
38 aturi := syntax.ATURI(input.Pipeline)
39 wid := models.WorkflowId{
40 PipelineId: models.PipelineId{
41 Knot: strings.TrimPrefix(aturi.Authority().String(), "did:web:"),
42 Rkey: aturi.RecordKey().String(),
43 },
44 Name: input.Workflow,
45 }
46 l.Debug("cancel pipeline", "wid", wid)
47
48 // unfortunately we have to resolve repo-at here
49 repoAt, err := syntax.ParseATURI(input.Repo)
50 if err != nil {
51 fail(xrpcerr.InvalidRepoError(input.Repo))
52 return
53 }
54
55 ident, err := x.Resolver.ResolveIdent(r.Context(), repoAt.Authority().String())
56 if err != nil || ident.Handle.IsInvalidHandle() {
57 fail(xrpcerr.GenericError(fmt.Errorf("failed to resolve handle: %w", err)))
58 return
59 }
60
61 xrpcc := xrpc.Client{Host: ident.PDSEndpoint()}
62 resp, err := atproto.RepoGetRecord(r.Context(), &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
63 if err != nil {
64 fail(xrpcerr.GenericError(err))
65 return
66 }
67
68 repoRec, ok := resp.Value.Val.(*tangled.Repo)
69 if !ok {
70 fail(xrpcerr.RepoNotFoundError)
71 return
72 }
73 if repoRec.RepoDid == nil || *repoRec.RepoDid == "" {
74 fail(xrpcerr.GenericError(fmt.Errorf("repo record %s has no repoDid", repoAt)))
75 return
76 }
77 repoDid := *repoRec.RepoDid
78
79 // TODO: fine-grained role based control
80 isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid.String(), rbac.ThisServer, repoDid)
81 if err != nil || !isRepoOwner {
82 fail(xrpcerr.AccessControlError(actorDid.String()))
83 return
84 }
85 for _, engine := range x.Engines {
86 l.Debug("destroying workflow", "wid", wid)
87 err = engine.DestroyWorkflow(r.Context(), wid)
88 if err != nil {
89 fail(xrpcerr.GenericError(fmt.Errorf("failed to destroy workflow: %w", err)))
90 return
91 }
92 err = x.Db.StatusCancelled(wid, "User canceled the workflow", -1, x.Notifier)
93 if err != nil {
94 fail(xrpcerr.GenericError(fmt.Errorf("failed to emit status failed: %w", err)))
95 return
96 }
97 }
98
99 w.WriteHeader(http.StatusOK)
100}