forked from
tangled.org/core
Monorepo for Tangled
1package repo
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "time"
11
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/models"
15 "tangled.org/core/appview/pages"
16 "tangled.org/core/appview/xrpcclient"
17 "tangled.org/core/orm"
18 "tangled.org/core/tid"
19 "tangled.org/core/types"
20 "tangled.org/core/xrpc"
21
22 comatproto "github.com/bluesky-social/indigo/api/atproto"
23 lexutil "github.com/bluesky-social/indigo/lex/util"
24 indigoxrpc "github.com/bluesky-social/indigo/xrpc"
25 "github.com/dustin/go-humanize"
26 "github.com/go-chi/chi/v5"
27 "github.com/go-git/go-git/v5/plumbing"
28 "github.com/ipfs/go-cid"
29)
30
31// TODO: proper statuses here on early exit
32func (rp *Repo) AttachArtifact(w http.ResponseWriter, r *http.Request) {
33 l := rp.logger.With("handler", "AttachArtifact")
34
35 user := rp.oauth.GetMultiAccountUser(r)
36 tagParam := chi.URLParam(r, "tag")
37 f, err := rp.repoResolver.Resolve(r)
38 if err != nil {
39 l.Error("failed to get repo and knot", "err", err)
40 rp.pages.Notice(w, "upload", "failed to upload artifact, error in repo resolution")
41 return
42 }
43
44 tag, err := rp.resolveTag(r.Context(), f, tagParam)
45 if err != nil {
46 l.Error("failed to resolve tag", "err", err)
47 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution")
48 return
49 }
50
51 file, header, err := r.FormFile("artifact")
52 if err != nil {
53 l.Error("failed to upload artifact", "err", err)
54 rp.pages.Notice(w, "upload", "failed to upload artifact")
55 return
56 }
57 defer file.Close()
58
59 client, err := rp.oauth.AuthorizedClient(r)
60 if err != nil {
61 l.Error("failed to get authorized client", "err", err)
62 rp.pages.Notice(w, "upload", "failed to get authorized client")
63 return
64 }
65
66 uploadBlobResp, err := xrpc.RepoUploadBlob(r.Context(), client, file, header.Header.Get("Content-Type"))
67 if err != nil {
68 l.Error("failed to upload blob", "err", err)
69 rp.pages.Notice(w, "upload", "Failed to upload blob to your PDS. Try again later.")
70 return
71 }
72
73 l.Info("uploaded blob", "size", humanize.Bytes(uint64(uploadBlobResp.Blob.Size)), "blobRef", uploadBlobResp.Blob.Ref.String())
74
75 rkey := tid.TID()
76 createdAt := time.Now()
77
78 putRecordResp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
79 Collection: tangled.RepoArtifactNSID,
80 Repo: user.Active.Did,
81 Rkey: rkey,
82 Record: &lexutil.LexiconTypeDecoder{
83 Val: repoArtifactRecord(f, uploadBlobResp.Blob, createdAt, header.Filename, tag.Tag.Hash[:]),
84 },
85 })
86 if err != nil {
87 l.Error("failed to create record", "err", err)
88 rp.pages.Notice(w, "upload", "Failed to create artifact record. Try again later.")
89 return
90 }
91
92 l.Debug("created record for blob", "aturi", putRecordResp.Uri)
93
94 tx, err := rp.db.BeginTx(r.Context(), nil)
95 if err != nil {
96 l.Error("failed to start tx")
97 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
98 return
99 }
100 defer tx.Rollback()
101
102 artifact := models.Artifact{
103 Did: user.Active.Did,
104 Rkey: rkey,
105 RepoAt: f.RepoAt(),
106 Tag: tag.Tag.Hash,
107 CreatedAt: createdAt,
108 BlobCid: cid.Cid(uploadBlobResp.Blob.Ref),
109 Name: header.Filename,
110 Size: uint64(uploadBlobResp.Blob.Size),
111 MimeType: uploadBlobResp.Blob.MimeType,
112 }
113
114 err = db.AddArtifact(tx, artifact)
115 if err != nil {
116 l.Error("failed to add artifact record to db", "err", err)
117 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
118 return
119 }
120
121 err = tx.Commit()
122 if err != nil {
123 l.Error("failed to add artifact record to db")
124 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
125 return
126 }
127
128 rp.pages.RepoArtifactFragment(w, pages.RepoArtifactParams{
129 LoggedInUser: user,
130 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
131 Artifact: artifact,
132 })
133}
134
135func (rp *Repo) DownloadArtifact(w http.ResponseWriter, r *http.Request) {
136 l := rp.logger.With("handler", "DownloadArtifact")
137
138 f, err := rp.repoResolver.Resolve(r)
139 if err != nil {
140 l.Error("failed to get repo and knot", "err", err)
141 http.Error(w, "failed to resolve repo", http.StatusInternalServerError)
142 return
143 }
144
145 tagParam := chi.URLParam(r, "tag")
146 filename := chi.URLParam(r, "file")
147
148 tag, err := rp.resolveTag(r.Context(), f, tagParam)
149 if err != nil {
150 l.Error("failed to resolve tag", "err", err)
151 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution")
152 return
153 }
154
155 artifacts, err := db.GetArtifact(
156 rp.db,
157 orm.FilterEq("repo_at", f.RepoAt()),
158 orm.FilterEq("tag", tag.Tag.Hash[:]),
159 orm.FilterEq("name", filename),
160 )
161 if err != nil {
162 l.Error("failed to get artifacts", "err", err)
163 http.Error(w, "failed to get artifact", http.StatusInternalServerError)
164 return
165 }
166
167 if len(artifacts) != 1 {
168 l.Error("too many or too few artifacts found")
169 http.Error(w, "artifact not found", http.StatusNotFound)
170 return
171 }
172
173 artifact := artifacts[0]
174
175 ownerId, err := rp.idResolver.ResolveIdent(r.Context(), f.Did)
176 if err != nil {
177 l.Error("failed to resolve repo owner did", "did", f.Did, "err", err)
178 http.Error(w, "repository owner not found", http.StatusNotFound)
179 return
180 }
181
182 ownerPds := ownerId.PDSEndpoint()
183 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
184 q := url.Query()
185 q.Set("cid", artifact.BlobCid.String())
186 q.Set("did", artifact.Did)
187 url.RawQuery = q.Encode()
188
189 req, err := http.NewRequest(http.MethodGet, url.String(), nil)
190 if err != nil {
191 l.Error("failed to create request", "err", err)
192 http.Error(w, "failed to create request", http.StatusInternalServerError)
193 return
194 }
195 req.Header.Set("Content-Type", "application/json")
196
197 resp, err := http.DefaultClient.Do(req)
198 if err != nil {
199 l.Error("failed to make request", "err", err)
200 http.Error(w, "failed to make request to PDS", http.StatusInternalServerError)
201 return
202 }
203 defer resp.Body.Close()
204
205 // copy status code and relevant headers from upstream response
206 w.WriteHeader(resp.StatusCode)
207 for key, values := range resp.Header {
208 for _, v := range values {
209 w.Header().Add(key, v)
210 }
211 }
212
213 // stream the body directly to the client
214 if _, err := io.Copy(w, resp.Body); err != nil {
215 l.Error("error streaming response to client:", "err", err)
216 }
217}
218
219// TODO: proper statuses here on early exit
220func (rp *Repo) DeleteArtifact(w http.ResponseWriter, r *http.Request) {
221 l := rp.logger.With("handler", "DeleteArtifact")
222
223 user := rp.oauth.GetMultiAccountUser(r)
224 tagParam := chi.URLParam(r, "tag")
225 filename := chi.URLParam(r, "file")
226 f, err := rp.repoResolver.Resolve(r)
227 if err != nil {
228 l.Error("failed to get repo and knot", "err", err)
229 return
230 }
231
232 client, _ := rp.oauth.AuthorizedClient(r)
233
234 tag := plumbing.NewHash(tagParam)
235
236 artifacts, err := db.GetArtifact(
237 rp.db,
238 orm.FilterEq("repo_at", f.RepoAt()),
239 orm.FilterEq("tag", tag[:]),
240 orm.FilterEq("name", filename),
241 )
242 if err != nil {
243 l.Error("failed to get artifacts", "err", err)
244 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
245 return
246 }
247 if len(artifacts) != 1 {
248 rp.pages.Notice(w, "remove", "Unable to find artifact.")
249 return
250 }
251
252 artifact := artifacts[0]
253
254 if user.Active.Did != artifact.Did {
255 l.Error("user not authorized to delete artifact", "err", err)
256 rp.pages.Notice(w, "remove", "Unauthorized deletion of artifact.")
257 return
258 }
259
260 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
261 Collection: tangled.RepoArtifactNSID,
262 Repo: user.Active.Did,
263 Rkey: artifact.Rkey,
264 })
265 if err != nil {
266 l.Error("failed to get blob from pds", "err", err)
267 rp.pages.Notice(w, "remove", "Failed to remove blob from PDS.")
268 return
269 }
270
271 tx, err := rp.db.BeginTx(r.Context(), nil)
272 if err != nil {
273 l.Error("failed to start tx")
274 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
275 return
276 }
277 defer tx.Rollback()
278
279 err = db.DeleteArtifact(tx,
280 orm.FilterEq("repo_at", f.RepoAt()),
281 orm.FilterEq("tag", artifact.Tag[:]),
282 orm.FilterEq("name", filename),
283 )
284 if err != nil {
285 l.Error("failed to remove artifact record from db", "err", err)
286 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
287 return
288 }
289
290 err = tx.Commit()
291 if err != nil {
292 l.Error("failed to remove artifact record from db")
293 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
294 return
295 }
296
297 l.Info("successfully deleted artifact", "tag", tagParam, "file", filename)
298
299 w.Write([]byte{})
300}
301
302func (rp *Repo) resolveTag(ctx context.Context, f *models.Repo, tagParam string) (*types.TagReference, error) {
303 l := rp.logger.With("handler", "resolveTag")
304
305 tagParam, err := url.QueryUnescape(tagParam)
306 if err != nil {
307 return nil, err
308 }
309
310 xrpcc := &indigoxrpc.Client{Host: rp.config.KnotMirror.Url}
311
312 xrpcBytes, err := tangled.GitTempListTags(ctx, xrpcc, "", 0, f.RepoAt().String())
313 if err != nil {
314 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
315 l.Error("failed to call XRPC repo.tags", "err", xrpcerr)
316 return nil, xrpcerr
317 }
318 l.Error("failed to reach knotserver", "err", err)
319 return nil, err
320 }
321
322 var result types.RepoTagsResponse
323 if err := json.Unmarshal(xrpcBytes, &result); err != nil {
324 l.Error("failed to decode XRPC tags response", "err", err)
325 return nil, err
326 }
327
328 var tag *types.TagReference
329 for _, t := range result.Tags {
330 if t.Tag != nil {
331 if t.Reference.Name == tagParam || t.Reference.Hash == tagParam {
332 tag = t
333 }
334 }
335 }
336
337 if tag == nil {
338 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts")
339 }
340
341 if tag.Tag.Target.IsZero() {
342 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts")
343 }
344
345 return tag, nil
346}
347
348func repoArtifactRecord(f *models.Repo, blob *lexutil.LexBlob, createdAt time.Time, name string, tag []byte) *tangled.RepoArtifact {
349 rec := &tangled.RepoArtifact{
350 Artifact: blob,
351 CreatedAt: createdAt.Format(time.RFC3339),
352 Name: name,
353 Tag: tag,
354 }
355 s := f.RepoAt().String()
356 rec.Repo = &s
357 if f.RepoDid != "" {
358 rec.RepoDid = &f.RepoDid
359 }
360 return rec
361}