Monorepo for Tangled
tangled.org
1package migration
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7
8 "github.com/bluesky-social/indigo/api/agnostic"
9 comatproto "github.com/bluesky-social/indigo/api/atproto"
10 "github.com/bluesky-social/indigo/atproto/atclient"
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/xrpc"
14 "github.com/ipfs/go-cid"
15 "github.com/multiformats/go-multihash"
16 "tangled.org/core/api/tangled"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/orm"
20)
21
22func (s *Migration) migrateUseFeedComment(ctx context.Context, client *atclient.APIClient, did syntax.DID, record syntax.ATURI) error {
23 l := s.logger.With("aturi", record)
24 l.Debug("migrating record")
25
26 switch record.Collection() {
27 case tangled.RepoIssueCommentNSID:
28 case tangled.RepoPullCommentNSID:
29 default:
30 return fmt.Errorf("unexpected collection: '%s'", record.Collection())
31 }
32
33 comments, err := db.GetComments(s.db, orm.FilterEq("at_uri", record))
34 if err != nil {
35 return fmt.Errorf("db: %w", err)
36 }
37 if len(comments) < 1 {
38 l.Info("can't found legacy record from db. skipping migration")
39 return nil
40 }
41 comment := comments[0]
42
43 comment.Collection = tangled.FeedCommentNSID
44
45 // only update from DB if comment is deleted
46 if comment.Deleted != nil {
47 l.Info("skipping pds migration for deleted record")
48
49 return nil
50 }
51
52 // fill missing reference CIDs
53 if comment.Subject.Cid == "" {
54 cid, err := s.getRecordCid(ctx, syntax.ATURI(comment.Subject.Uri))
55 if err != nil {
56 return fmt.Errorf("pds: getRecordCid for subject.uri: %w", err)
57 }
58 comment.Subject.Cid = cid.String()
59 }
60 if comment.ReplyTo != nil && comment.ReplyTo.Cid == "" {
61 uri, err := syntax.ParseATURI(comment.ReplyTo.Uri)
62 if err != nil {
63 return fmt.Errorf("invalid replyTo.uri: %w", err)
64 }
65
66 // assume parent comment is already migrated to `sh.tangled.feed.comment`.
67 // fail if it isn't ready
68 uri = syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", uri.Authority(), tangled.FeedCommentNSID, uri.RecordKey()))
69
70 cid, err := s.guessParentCommentCid(uri, &comment)
71 if err != nil {
72 return fmt.Errorf("cbor: guessParentCommentCid for replyTo.uri: %w", err)
73 }
74 comment.ReplyTo.Uri = uri.String()
75 comment.ReplyTo.Cid = cid.String()
76 }
77
78 // use same rkey for new record
79 rkey := record.RecordKey().String()
80
81 // ensure new record is missing in PDS
82 if _, err := agnostic.RepoGetRecord(ctx, client, "", tangled.FeedCommentNSID, did.String(), rkey); err == nil {
83 l.Info("New comment record already exists")
84 } else {
85 // insert new record
86 if _, err := comatproto.RepoCreateRecord(ctx, client, &comatproto.RepoCreateRecord_Input{
87 Repo: did.String(),
88 Collection: tangled.FeedCommentNSID,
89 Rkey: &rkey,
90 Record: &util.LexiconTypeDecoder{Val: comment.AsRecord()},
91 }); err != nil {
92 return fmt.Errorf("pds: putRecord: %w", err)
93 }
94 }
95
96 if _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
97 Repo: did.String(),
98 Collection: record.Collection().String(),
99 Rkey: rkey,
100 }); err != nil {
101 l.Info("Failed to cleanup old record. Proceeding migration...", "err", err)
102 }
103
104 return nil
105}
106
107func (s *Migration) getRecordCid(ctx context.Context, uri syntax.ATURI) (syntax.CID, error) {
108 ident, err := s.dir.Lookup(ctx, uri.Authority())
109 if err != nil {
110 return "", err
111 }
112
113 xrpcc := xrpc.Client{Host: ident.PDSEndpoint()}
114 out, err := agnostic.RepoGetRecord(ctx, &xrpcc, "", uri.Collection().String(), ident.DID.String(), uri.RecordKey().String())
115 if err != nil {
116 return "", err
117 }
118 if out.Cid == nil {
119 return "", fmt.Errorf("record CID is empty")
120 }
121
122 cid, err := syntax.ParseCID(*out.Cid)
123 if err != nil {
124 return "", err
125 }
126
127 return cid, nil
128}
129
130func (s *Migration) guessParentCommentCid(uri syntax.ATURI, comment *models.Comment) (syntax.CID, error) {
131 parent, err := db.GetComment(s.db, orm.FilterEq("did", uri.Authority()), orm.FilterEq("rkey", uri.RecordKey()))
132 if err != nil {
133 return "", fmt.Errorf("db: failed to queyr subject comment: %w", err)
134 }
135 if parent.Deleted != nil {
136 // leave cid empty. reply comment won't pass the schema validation.
137 return "", nil
138 }
139
140 // since parent comment is also migrating, parent comments subject.cid might be empty
141 if parent.Subject.Cid == "" {
142 parent.Subject.Cid = comment.Subject.Cid
143 }
144
145 buf := new(bytes.Buffer)
146 if err := parent.AsRecord().MarshalCBOR(buf); err != nil {
147 return "", fmt.Errorf("MarshalCBOR: %w", err)
148 }
149 c, err := cid.NewPrefixV1(cid.DagCBOR, multihash.SHA2_256).Sum(buf.Bytes())
150 if err != nil {
151 return "", fmt.Errorf("cid: sum: %w", err)
152 }
153 return syntax.CID(c.String()), nil
154}