Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package migration 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 8 "github.com/bluesky-social/indigo/api/agnostic" 9 comatproto "github.com/bluesky-social/indigo/api/atproto" 10 "github.com/bluesky-social/indigo/atproto/atclient" 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/xrpc" 14 "github.com/ipfs/go-cid" 15 "github.com/multiformats/go-multihash" 16 "tangled.org/core/api/tangled" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/orm" 20) 21 22func (s *Migration) migrateUseFeedComment(ctx context.Context, client *atclient.APIClient, did syntax.DID, record syntax.ATURI) error { 23 l := s.logger.With("aturi", record) 24 l.Debug("migrating record") 25 26 switch record.Collection() { 27 case tangled.RepoIssueCommentNSID: 28 case tangled.RepoPullCommentNSID: 29 default: 30 return fmt.Errorf("unexpected collection: '%s'", record.Collection()) 31 } 32 33 comments, err := db.GetComments(s.db, orm.FilterEq("at_uri", record)) 34 if err != nil { 35 return fmt.Errorf("db: %w", err) 36 } 37 if len(comments) < 1 { 38 l.Info("can't found legacy record from db. skipping migration") 39 return nil 40 } 41 comment := comments[0] 42 43 comment.Collection = tangled.FeedCommentNSID 44 45 // only update from DB if comment is deleted 46 if comment.Deleted != nil { 47 l.Info("skipping pds migration for deleted record") 48 49 return nil 50 } 51 52 // fill missing reference CIDs 53 if comment.Subject.Cid == "" { 54 cid, err := s.getRecordCid(ctx, syntax.ATURI(comment.Subject.Uri)) 55 if err != nil { 56 return fmt.Errorf("pds: getRecordCid for subject.uri: %w", err) 57 } 58 comment.Subject.Cid = cid.String() 59 } 60 if comment.ReplyTo != nil && comment.ReplyTo.Cid == "" { 61 uri, err := syntax.ParseATURI(comment.ReplyTo.Uri) 62 if err != nil { 63 return fmt.Errorf("invalid replyTo.uri: %w", err) 64 } 65 66 // assume parent comment is already migrated to `sh.tangled.feed.comment`. 67 // fail if it isn't ready 68 uri = syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", uri.Authority(), tangled.FeedCommentNSID, uri.RecordKey())) 69 70 cid, err := s.guessParentCommentCid(uri, &comment) 71 if err != nil { 72 return fmt.Errorf("cbor: guessParentCommentCid for replyTo.uri: %w", err) 73 } 74 comment.ReplyTo.Uri = uri.String() 75 comment.ReplyTo.Cid = cid.String() 76 } 77 78 // use same rkey for new record 79 rkey := record.RecordKey().String() 80 81 // ensure new record is missing in PDS 82 if _, err := agnostic.RepoGetRecord(ctx, client, "", tangled.FeedCommentNSID, did.String(), rkey); err == nil { 83 l.Info("New comment record already exists") 84 } else { 85 // insert new record 86 if _, err := comatproto.RepoCreateRecord(ctx, client, &comatproto.RepoCreateRecord_Input{ 87 Repo: did.String(), 88 Collection: tangled.FeedCommentNSID, 89 Rkey: &rkey, 90 Record: &util.LexiconTypeDecoder{Val: comment.AsRecord()}, 91 }); err != nil { 92 return fmt.Errorf("pds: putRecord: %w", err) 93 } 94 } 95 96 if _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 97 Repo: did.String(), 98 Collection: record.Collection().String(), 99 Rkey: rkey, 100 }); err != nil { 101 l.Info("Failed to cleanup old record. Proceeding migration...", "err", err) 102 } 103 104 return nil 105} 106 107func (s *Migration) getRecordCid(ctx context.Context, uri syntax.ATURI) (syntax.CID, error) { 108 ident, err := s.dir.Lookup(ctx, uri.Authority()) 109 if err != nil { 110 return "", err 111 } 112 113 xrpcc := xrpc.Client{Host: ident.PDSEndpoint()} 114 out, err := agnostic.RepoGetRecord(ctx, &xrpcc, "", uri.Collection().String(), ident.DID.String(), uri.RecordKey().String()) 115 if err != nil { 116 return "", err 117 } 118 if out.Cid == nil { 119 return "", fmt.Errorf("record CID is empty") 120 } 121 122 cid, err := syntax.ParseCID(*out.Cid) 123 if err != nil { 124 return "", err 125 } 126 127 return cid, nil 128} 129 130func (s *Migration) guessParentCommentCid(uri syntax.ATURI, comment *models.Comment) (syntax.CID, error) { 131 parent, err := db.GetComment(s.db, orm.FilterEq("did", uri.Authority()), orm.FilterEq("rkey", uri.RecordKey())) 132 if err != nil { 133 return "", fmt.Errorf("db: failed to queyr subject comment: %w", err) 134 } 135 if parent.Deleted != nil { 136 // leave cid empty. reply comment won't pass the schema validation. 137 return "", nil 138 } 139 140 // since parent comment is also migrating, parent comments subject.cid might be empty 141 if parent.Subject.Cid == "" { 142 parent.Subject.Cid = comment.Subject.Cid 143 } 144 145 buf := new(bytes.Buffer) 146 if err := parent.AsRecord().MarshalCBOR(buf); err != nil { 147 return "", fmt.Errorf("MarshalCBOR: %w", err) 148 } 149 c, err := cid.NewPrefixV1(cid.DagCBOR, multihash.SHA2_256).Sum(buf.Bytes()) 150 if err != nil { 151 return "", fmt.Errorf("cid: sum: %w", err) 152 } 153 return syntax.CID(c.String()), nil 154}