Monorepo for Tangled
tangled.org
1package repo
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "time"
11
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/models"
15 "tangled.org/core/appview/pages"
16 "tangled.org/core/appview/xrpcclient"
17 "tangled.org/core/orm"
18 "tangled.org/core/tid"
19 "tangled.org/core/types"
20 "tangled.org/core/xrpc"
21
22 comatproto "github.com/bluesky-social/indigo/api/atproto"
23 "github.com/bluesky-social/indigo/atproto/syntax"
24 lexutil "github.com/bluesky-social/indigo/lex/util"
25 indigoxrpc "github.com/bluesky-social/indigo/xrpc"
26 "github.com/dustin/go-humanize"
27 "github.com/go-chi/chi/v5"
28 "github.com/go-git/go-git/v5/plumbing"
29 "github.com/ipfs/go-cid"
30)
31
32// TODO: proper statuses here on early exit
33func (rp *Repo) AttachArtifact(w http.ResponseWriter, r *http.Request) {
34 l := rp.logger.With("handler", "AttachArtifact")
35
36 user := rp.oauth.GetMultiAccountUser(r)
37 tagParam := chi.URLParam(r, "tag")
38 f, err := rp.repoResolver.Resolve(r)
39 if err != nil {
40 l.Error("failed to get repo and knot", "err", err)
41 rp.pages.Notice(w, "upload", "failed to upload artifact, error in repo resolution")
42 return
43 }
44
45 tag, err := rp.resolveTag(r.Context(), f, tagParam)
46 if err != nil {
47 l.Error("failed to resolve tag", "err", err)
48 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution")
49 return
50 }
51
52 file, header, err := r.FormFile("artifact")
53 if err != nil {
54 l.Error("failed to upload artifact", "err", err)
55 rp.pages.Notice(w, "upload", "failed to upload artifact")
56 return
57 }
58 defer file.Close()
59
60 client, err := rp.oauth.AuthorizedClient(r)
61 if err != nil {
62 l.Error("failed to get authorized client", "err", err)
63 rp.pages.Notice(w, "upload", "failed to get authorized client")
64 return
65 }
66
67 uploadBlobResp, err := xrpc.RepoUploadBlob(r.Context(), client, file, header.Header.Get("Content-Type"))
68 if err != nil {
69 l.Error("failed to upload blob", "err", err)
70 rp.pages.Notice(w, "upload", "Failed to upload blob to your PDS. Try again later.")
71 return
72 }
73
74 l.Info("uploaded blob", "size", humanize.Bytes(uint64(uploadBlobResp.Blob.Size)), "blobRef", uploadBlobResp.Blob.Ref.String())
75
76 rkey := tid.TID()
77 createdAt := time.Now()
78
79 putRecordResp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
80 Collection: tangled.RepoArtifactNSID,
81 Repo: user.Did,
82 Rkey: rkey,
83 Record: &lexutil.LexiconTypeDecoder{
84 Val: repoArtifactRecord(f, uploadBlobResp.Blob, createdAt, header.Filename, tag.Tag.Hash[:]),
85 },
86 })
87 if err != nil {
88 l.Error("failed to create record", "err", err)
89 rp.pages.Notice(w, "upload", "Failed to create artifact record. Try again later.")
90 return
91 }
92
93 l.Debug("created record for blob", "aturi", putRecordResp.Uri)
94
95 tx, err := rp.db.BeginTx(r.Context(), nil)
96 if err != nil {
97 l.Error("failed to start tx")
98 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
99 return
100 }
101 defer tx.Rollback()
102
103 artifact := models.Artifact{
104 Did: user.Did,
105 Rkey: rkey,
106 RepoDid: syntax.DID(f.RepoDid),
107 Tag: tag.Tag.Hash,
108 CreatedAt: createdAt,
109 BlobCid: cid.Cid(uploadBlobResp.Blob.Ref),
110 Name: header.Filename,
111 Size: uint64(uploadBlobResp.Blob.Size),
112 MimeType: uploadBlobResp.Blob.MimeType,
113 }
114
115 err = db.AddArtifact(tx, artifact)
116 if err != nil {
117 l.Error("failed to add artifact record to db", "err", err)
118 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
119 return
120 }
121
122 err = tx.Commit()
123 if err != nil {
124 l.Error("failed to add artifact record to db")
125 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
126 return
127 }
128
129 rp.pages.RepoArtifactFragment(w, pages.RepoArtifactParams{
130 BaseParams: pages.BaseParamsFromContext(r.Context()),
131 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
132 Artifact: artifact,
133 })
134}
135
136func (rp *Repo) DownloadArtifact(w http.ResponseWriter, r *http.Request) {
137 l := rp.logger.With("handler", "DownloadArtifact")
138
139 f, err := rp.repoResolver.Resolve(r)
140 if err != nil {
141 l.Error("failed to get repo and knot", "err", err)
142 http.Error(w, "failed to resolve repo", http.StatusInternalServerError)
143 return
144 }
145
146 tagParam := chi.URLParam(r, "tag")
147 filename := chi.URLParam(r, "file")
148
149 tag, err := rp.resolveTag(r.Context(), f, tagParam)
150 if err != nil {
151 l.Error("failed to resolve tag", "err", err)
152 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution")
153 return
154 }
155
156 artifacts, err := db.GetArtifact(
157 rp.db,
158 orm.FilterEq("repo_did", f.RepoDid),
159 orm.FilterEq("tag", tag.Tag.Hash[:]),
160 orm.FilterEq("name", filename),
161 )
162 if err != nil {
163 l.Error("failed to get artifacts", "err", err)
164 http.Error(w, "failed to get artifact", http.StatusInternalServerError)
165 return
166 }
167
168 if len(artifacts) != 1 {
169 l.Error("too many or too few artifacts found")
170 http.Error(w, "artifact not found", http.StatusNotFound)
171 return
172 }
173
174 artifact := artifacts[0]
175
176 ownerId, err := rp.idResolver.ResolveIdent(r.Context(), f.Did)
177 if err != nil {
178 l.Error("failed to resolve repo owner did", "did", f.Did, "err", err)
179 http.Error(w, "repository owner not found", http.StatusNotFound)
180 return
181 }
182
183 ownerPds := ownerId.PDSEndpoint()
184 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
185 q := url.Query()
186 q.Set("cid", artifact.BlobCid.String())
187 q.Set("did", artifact.Did)
188 url.RawQuery = q.Encode()
189
190 req, err := http.NewRequest(http.MethodGet, url.String(), nil)
191 if err != nil {
192 l.Error("failed to create request", "err", err)
193 http.Error(w, "failed to create request", http.StatusInternalServerError)
194 return
195 }
196 req.Header.Set("Content-Type", "application/json")
197
198 resp, err := http.DefaultClient.Do(req)
199 if err != nil {
200 l.Error("failed to make request", "err", err)
201 http.Error(w, "failed to make request to PDS", http.StatusInternalServerError)
202 return
203 }
204 defer resp.Body.Close()
205
206 // copy status code and relevant headers from upstream response
207 w.WriteHeader(resp.StatusCode)
208 for key, values := range resp.Header {
209 for _, v := range values {
210 w.Header().Add(key, v)
211 }
212 }
213
214 // stream the body directly to the client
215 if _, err := io.Copy(w, resp.Body); err != nil {
216 l.Error("error streaming response to client:", "err", err)
217 }
218}
219
220// TODO: proper statuses here on early exit
221func (rp *Repo) DeleteArtifact(w http.ResponseWriter, r *http.Request) {
222 l := rp.logger.With("handler", "DeleteArtifact")
223
224 user := rp.oauth.GetMultiAccountUser(r)
225 tagParam := chi.URLParam(r, "tag")
226 filename := chi.URLParam(r, "file")
227 f, err := rp.repoResolver.Resolve(r)
228 if err != nil {
229 l.Error("failed to get repo and knot", "err", err)
230 return
231 }
232
233 client, _ := rp.oauth.AuthorizedClient(r)
234
235 tag := plumbing.NewHash(tagParam)
236
237 artifacts, err := db.GetArtifact(
238 rp.db,
239 orm.FilterEq("repo_did", f.RepoDid),
240 orm.FilterEq("tag", tag[:]),
241 orm.FilterEq("name", filename),
242 )
243 if err != nil {
244 l.Error("failed to get artifacts", "err", err)
245 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
246 return
247 }
248 if len(artifacts) != 1 {
249 rp.pages.Notice(w, "remove", "Unable to find artifact.")
250 return
251 }
252
253 artifact := artifacts[0]
254
255 if user.Did != artifact.Did {
256 l.Error("user not authorized to delete artifact", "err", err)
257 rp.pages.Notice(w, "remove", "Unauthorized deletion of artifact.")
258 return
259 }
260
261 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
262 Collection: tangled.RepoArtifactNSID,
263 Repo: user.Did,
264 Rkey: artifact.Rkey,
265 })
266 if err != nil {
267 l.Error("failed to get blob from pds", "err", err)
268 rp.pages.Notice(w, "remove", "Failed to remove blob from PDS.")
269 return
270 }
271
272 tx, err := rp.db.BeginTx(r.Context(), nil)
273 if err != nil {
274 l.Error("failed to start tx")
275 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
276 return
277 }
278 defer tx.Rollback()
279
280 err = db.DeleteArtifact(tx,
281 orm.FilterEq("repo_did", f.RepoDid),
282 orm.FilterEq("tag", artifact.Tag[:]),
283 orm.FilterEq("name", filename),
284 )
285 if err != nil {
286 l.Error("failed to remove artifact record from db", "err", err)
287 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
288 return
289 }
290
291 err = tx.Commit()
292 if err != nil {
293 l.Error("failed to remove artifact record from db")
294 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
295 return
296 }
297
298 l.Info("successfully deleted artifact", "tag", tagParam, "file", filename)
299
300 w.Write([]byte{})
301}
302
303func (rp *Repo) resolveTag(ctx context.Context, f *models.Repo, tagParam string) (*types.TagReference, error) {
304 l := rp.logger.With("handler", "resolveTag")
305
306 tagParam, err := url.QueryUnescape(tagParam)
307 if err != nil {
308 return nil, err
309 }
310
311 xrpcc := &indigoxrpc.Client{Host: rp.config.KnotMirror.Url}
312
313 xrpcBytes, err := tangled.GitTempListTags(ctx, xrpcc, "", 0, f.RepoDid)
314 if err != nil {
315 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
316 l.Error("failed to call XRPC repo.tags", "xrpcerr", xrpcerr, "err", err)
317 return nil, xrpcerr
318 }
319 l.Error("failed to reach knotserver", "err", err)
320 return nil, err
321 }
322
323 var result types.RepoTagsResponse
324 if err := json.Unmarshal(xrpcBytes, &result); err != nil {
325 l.Error("failed to decode XRPC tags response", "err", err)
326 return nil, err
327 }
328
329 var tag *types.TagReference
330 for _, t := range result.Tags {
331 if t.Tag != nil {
332 if t.Reference.Name == tagParam || t.Reference.Hash == tagParam {
333 tag = t
334 }
335 }
336 }
337
338 if tag == nil {
339 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts")
340 }
341
342 if tag.Tag.Target.IsZero() {
343 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts")
344 }
345
346 return tag, nil
347}
348
349func repoArtifactRecord(f *models.Repo, blob *lexutil.LexBlob, createdAt time.Time, name string, tag []byte) *tangled.RepoArtifact {
350 rec := &tangled.RepoArtifact{
351 Artifact: blob,
352 CreatedAt: createdAt.Format(time.RFC3339),
353 Name: name,
354 Tag: tag,
355 }
356 s := f.RepoAt().String()
357 rec.Repo = &s
358 if f.RepoDid != "" {
359 rec.RepoDid = &f.RepoDid
360 }
361 return rec
362}