Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

knotmirror: use repoDid for resync identifier

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
committer
Tangled
date (Jun 11, 2026, 10:52 AM +0300) commit 07299d86 parent 3e84928d change-id uturmroo
+36 -48
+4 -4
knotmirror/adminpage.go
··· 182 182 return func(w http.ResponseWriter, r *http.Request) { 183 183 var repoQuery = r.FormValue("repo") 184 184 185 - repo, err := syntax.ParseATURI(repoQuery) 186 - if err != nil || repo.RecordKey() == "" { 185 + repo, err := syntax.ParseDID(repoQuery) 186 + if err != nil { 187 187 writeNotif(w, http.StatusBadRequest, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 188 188 return 189 189 } ··· 201 201 return func(w http.ResponseWriter, r *http.Request) { 202 202 var repoQuery = r.FormValue("repo") 203 203 204 - repo, err := syntax.ParseATURI(repoQuery) 205 - if err != nil || repo.RecordKey() == "" { 204 + repo, err := syntax.ParseDID(repoQuery) 205 + if err != nil { 206 206 writeNotif(w, http.StatusBadRequest, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 207 207 return 208 208 }
+30 -30
knotmirror/resyncer.go
··· 30 30 31 31 claimJobMu sync.Mutex 32 32 33 - runningJobs map[syntax.ATURI]context.CancelFunc 33 + runningJobs map[syntax.DID]context.CancelFunc 34 34 runningJobsMu sync.Mutex 35 35 36 36 repoFetchTimeout time.Duration ··· 51 51 cfg: cfg, 52 52 indexer: indexer, 53 53 54 - runningJobs: make(map[syntax.ATURI]context.CancelFunc), 54 + runningJobs: make(map[syntax.DID]context.CancelFunc), 55 55 56 56 repoFetchTimeout: cfg.GitRepoFetchTimeout, 57 57 manualResyncTimeout: 30 * time.Minute, ··· 78 78 return 79 79 default: 80 80 } 81 - repoAt, found, err := r.claimResyncJob(ctx) 81 + repoDid, found, err := r.claimResyncJob(ctx) 82 82 if err != nil { 83 83 l.Error("failed to claim resync job", "error", err) 84 84 time.Sleep(time.Second) ··· 88 88 time.Sleep(time.Second) 89 89 continue 90 90 } 91 - l.Info("processing resync", "aturi", repoAt) 92 - if err := r.resyncRepo(ctx, repoAt); err != nil { 93 - l.Error("resync failed", "aturi", repoAt, "error", err) 91 + l.Info("processing resync", "did", repoDid) 92 + if err := r.resyncRepo(ctx, repoDid); err != nil { 93 + l.Error("resync failed", "did", repoDid, "error", err) 94 94 } 95 95 } 96 96 } 97 97 98 - func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) { 98 + func (r *Resyncer) registerRunning(repo syntax.DID, cancel context.CancelFunc) { 99 99 r.runningJobsMu.Lock() 100 100 defer r.runningJobsMu.Unlock() 101 101 ··· 105 105 r.runningJobs[repo] = cancel 106 106 } 107 107 108 - func (r *Resyncer) unregisterRunning(repo syntax.ATURI) { 108 + func (r *Resyncer) unregisterRunning(repo syntax.DID) { 109 109 r.runningJobsMu.Lock() 110 110 defer r.runningJobsMu.Unlock() 111 111 112 112 delete(r.runningJobs, repo) 113 113 } 114 114 115 - func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) { 115 + func (r *Resyncer) CancelResyncJob(repo syntax.DID) { 116 116 r.runningJobsMu.Lock() 117 117 defer r.runningJobsMu.Unlock() 118 118 ··· 125 125 } 126 126 127 127 // TriggerResyncJob manually triggers the resync job 128 - func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error { 129 - repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 128 + func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoDid syntax.DID) error { 129 + repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid) 130 130 if err != nil { 131 131 return fmt.Errorf("failed to get repo: %w", err) 132 132 } 133 133 if repo == nil { 134 - return fmt.Errorf("repo not found: %s", repoAt) 134 + return fmt.Errorf("repo not found: %s", repoDid) 135 135 } 136 136 137 137 if repo.State == models.RepoStateResyncing { ··· 147 147 return nil 148 148 } 149 149 150 - func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 150 + func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.DID, bool, error) { 151 151 // use mutex to prevent duplicated jobs 152 152 r.claimJobMu.Lock() 153 153 defer r.claimJobMu.Unlock() 154 154 155 - var repoAt syntax.ATURI 155 + var repoDid syntax.DID 156 156 now := time.Now().Unix() 157 157 if err := r.db.QueryRowContext(ctx, 158 158 `update repos 159 159 set state = $1 160 - where at_uri = ( 161 - select at_uri from repos 160 + where repo_did = ( 161 + select repo_did from repos 162 162 where state in ($2, $3, $4) 163 163 and (retry_after = -1 or retry_after = 0 or retry_after < $5) 164 164 order by ··· 167 167 retry_after 168 168 limit 1 169 169 ) 170 - returning at_uri 170 + returning repo_did 171 171 `, 172 172 models.RepoStateResyncing, 173 173 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 174 174 now, 175 - ).Scan(&repoAt); err != nil { 175 + ).Scan(&repoDid); err != nil { 176 176 if errors.Is(err, sql.ErrNoRows) { 177 177 return "", false, nil 178 178 } 179 179 return "", false, err 180 180 } 181 181 182 - return repoAt, true, nil 182 + return repoDid, true, nil 183 183 } 184 184 185 - func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error { 185 + func (r *Resyncer) resyncRepo(ctx context.Context, repoDid syntax.DID) error { 186 186 // ctx, span := tracer.Start(ctx, "resyncRepo") 187 187 // span.SetAttributes(attribute.String("aturi", repoAt)) 188 188 // defer span.End() ··· 191 191 startTime := time.Now() 192 192 193 193 jobCtx, cancel := context.WithCancel(ctx) 194 - r.registerRunning(repoAt, cancel) 195 - defer r.unregisterRunning(repoAt) 194 + r.registerRunning(repoDid, cancel) 195 + defer r.unregisterRunning(repoDid) 196 196 197 - success, err := r.doResync(jobCtx, repoAt) 197 + success, err := r.doResync(jobCtx, repoDid) 198 198 if !success { 199 199 resyncsFailed.Inc() 200 200 resyncDuration.Observe(time.Since(startTime).Seconds()) 201 - return r.handleResyncFailure(ctx, repoAt, err) 201 + return r.handleResyncFailure(ctx, repoDid, err) 202 202 } 203 203 204 204 resyncsCompleted.Inc() ··· 206 206 return nil 207 207 } 208 208 209 - func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) { 209 + func (r *Resyncer) doResync(ctx context.Context, repoDid syntax.DID) (bool, error) { 210 210 // ctx, span := tracer.Start(ctx, "doResync") 211 211 // span.SetAttributes(attribute.String("aturi", repoAt)) 212 212 // defer span.End() 213 213 214 - repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 214 + repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid) 215 215 if err != nil { 216 216 return false, fmt.Errorf("failed to get repo: %w", err) 217 217 } ··· 323 323 return nil 324 324 } 325 325 326 - func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error { 327 - r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err) 326 + func (r *Resyncer) handleResyncFailure(ctx context.Context, repoDid syntax.DID, err error) error { 327 + r.logger.Debug("handleResyncFailure", "at_uri", repoDid, "err", err) 328 328 var state models.RepoState 329 329 var errMsg string 330 330 if err == nil { ··· 335 335 errMsg = err.Error() 336 336 } 337 337 338 - repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 338 + repo, err := db.GetRepoByRepoDid(ctx, r.db, repoDid) 339 339 if err != nil { 340 340 return fmt.Errorf("failed to get repo: %w", err) 341 341 } 342 342 if repo == nil { 343 - return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) 343 + return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoDid) 344 344 } 345 345 346 346 // start a 1 min & go up to 1 hr between retries
+1 -13
knotmirror/tapclient.go
··· 133 133 } 134 134 135 135 case tapc.RecordDeleteAction: 136 - aturi := syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", evt.Did, tangled.RepoNSID, evt.Rkey)) 137 - repo, err := db.GetRepoByAtUri(ctx, t.db, aturi) 138 - if err != nil { 139 - return fmt.Errorf("looking up repo before delete: %w", err) 140 - } 141 - if repo != nil { 142 - if err := t.gitm.Delete(repo); err != nil { 143 - return fmt.Errorf("removing mirror dir: %w", err) 144 - } 145 - } 146 - if err := db.DeleteRepo(ctx, t.db, evt.Did, evt.Rkey); err != nil { 147 - return fmt.Errorf("deleting repo from db: %w", err) 148 - } 136 + // no-op. deletion of sh.tangled.repo record doesn't mean repository deletion 149 137 } 150 138 return nil 151 139 }
+1 -1
knotmirror/templates/repos.html
··· 65 65 hx-swap="none" 66 66 hx-disabled-elt="find button" 67 67 > 68 - <input type="hidden" name="repo" value="{{.AtUri}}"> 68 + <input type="hidden" name="repo" value="{{.RepoDid}}"> 69 69 <button type="submit">{{ if .State.IsResyncing }}cancel{{ else }}resync{{ end }}</button> 70 70 </form> 71 71 </td>