This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

1"""Daily sync pipeline — imports stage runners from scraper/.""" 2 3from __future__ import annotations 4 5import os 6import sys 7import time 8from collections.abc import Callable 9from dataclasses import dataclass, field 10from pathlib import Path 11from typing import Any 12 13REPO_ROOT = Path(__file__).resolve().parent.parent 14SCRAPER_ROOT = REPO_ROOT / "scraper" 15if str(SCRAPER_ROOT) not in sys.path: 16 sys.path.insert(0, str(SCRAPER_ROOT)) 17 18from check_readmes import run_check_readmes # noqa: E402 19from db import connect, init_schema, set_crawl_state # noqa: E402 20from embed_issues import run_embed_issues # noqa: E402 21from embed_readmes import run_embed_readmes # noqa: E402 22from fetch_collaborators import run_fetch_collaborators # noqa: E402 23from fetch_issues import run_fetch_issues # noqa: E402 24from progress import banner, log, summary_block # noqa: E402 25from stage2_network import run_stage2_network # noqa: E402 26from stage2_pds import run_stage2_accounts_only, run_stage2_repos_only # noqa: E402 27 28CRAWL_KEY = "sync:daily" 29 30StageFn = Callable[[str], dict[str, Any]] 31 32 33@dataclass 34class Stage: 35 key: str 36 title: str 37 run: StageFn 38 enabled: bool = True 39 40 41@dataclass 42class SyncReport: 43 started_at: float = field(default_factory=time.time) 44 stages: dict[str, dict[str, Any]] = field(default_factory=dict) 45 errors: list[str] = field(default_factory=list) 46 47 @property 48 def elapsed_s(self) -> float: 49 return time.time() - self.started_at 50 51 52def _env_flag(name: str, *, default: bool) -> bool: 53 raw = os.getenv(name, "").strip().lower() 54 if not raw: 55 return default 56 return raw in ("1", "true", "yes") 57 58 59def _configure_daily_env() -> None: 60 """Defaults tuned for scheduled daily runs (override via env).""" 61 os.environ.setdefault("TANGLED_ISSUE_REFRESH", "1") 62 os.environ.setdefault("TANGLED_ISSUE_ALL_USERS", "1") 63 os.environ.setdefault("TANGLED_STAGE2_NETWORK_REFRESH", "0") 64 65 66def _format_stats(stats: dict[str, Any]) -> str: 67 """One-line summary of rows processed for sync logs.""" 68 if not stats: 69 return "(no stats)" 70 ordered = ( 71 "repos_stored", 72 "already_in_db", 73 "account_count", 74 "users_scanned", 75 "issues_upserted", 76 "open_issues", 77 "found", 78 "missing", 79 "repos_fetched", 80 "collaborator_edges", 81 "embedded", 82 "batches", 83 "errors", 84 "resolve_failed", 85 "record_failed", 86 "already_synced", 87 "skipped", 88 "skipped_knot", 89 "error", 90 ) 91 parts: list[str] = [] 92 seen: set[str] = set() 93 for key in ordered: 94 if key in stats and stats[key] is not None: 95 parts.append(f"{key}={stats[key]}") 96 seen.add(key) 97 for key, value in stats.items(): 98 if key not in seen and value is not None: 99 parts.append(f"{key}={value}") 100 return ", ".join(parts) if parts else "(no stats)" 101 102 103def build_stages() -> list[Stage]: 104 return [ 105 Stage( 106 key="network", 107 title="Discover repos (tangled.org search)", 108 run=run_stage2_network, 109 enabled=_env_flag("TANGLED_SYNC_NETWORK", default=True), 110 ), 111 Stage( 112 key="accounts", 113 title="Refresh tngl.sh accounts", 114 run=run_stage2_accounts_only, 115 enabled=_env_flag("TANGLED_SYNC_ACCOUNTS", default=True), 116 ), 117 Stage( 118 key="repos", 119 title="Scan tngl.sh repo records (heavy)", 120 run=run_stage2_repos_only, 121 enabled=_env_flag("TANGLED_SYNC_TNGL_REPOS", default=False), 122 ), 123 Stage( 124 key="issues", 125 title="Re-scan all users for issues", 126 run=run_fetch_issues, 127 enabled=_env_flag("TANGLED_SYNC_ISSUES", default=True), 128 ), 129 Stage( 130 key="readmes", 131 title="Fetch missing READMEs from knots", 132 run=run_check_readmes, 133 enabled=_env_flag("TANGLED_SYNC_READMES", default=True), 134 ), 135 Stage( 136 key="collaborators", 137 title="Fetch repo collaborators", 138 run=run_fetch_collaborators, 139 enabled=_env_flag("TANGLED_SYNC_COLLABORATORS", default=True), 140 ), 141 Stage( 142 key="embed_readmes", 143 title="Embed READMEs (Gemini)", 144 run=run_embed_readmes, 145 enabled=_env_flag("TANGLED_SYNC_EMBED_READMES", default=True), 146 ), 147 Stage( 148 key="embed_issues", 149 title="Embed issues (Gemini)", 150 run=run_embed_issues, 151 enabled=_env_flag("TANGLED_SYNC_EMBED_ISSUES", default=True), 152 ), 153 ] 154 155 156def run_daily_sync(dsn: str, *, only: set[str] | None = None) -> SyncReport: 157 _configure_daily_env() 158 report = SyncReport() 159 stages = [s for s in build_stages() if s.enabled and (not only or s.key in only)] 160 161 banner("DAILY SYNC — Tangled → Postgres") 162 log("sync", f"Stages: {', '.join(s.key for s in stages) or '(none)'}") 163 164 init_schema(dsn) 165 166 with connect(dsn) as conn: 167 set_crawl_state( 168 conn, 169 key=CRAWL_KEY, 170 status="running", 171 meta={"stages": [s.key for s in stages]}, 172 ) 173 conn.commit() 174 175 for i, stage in enumerate(stages, start=1): 176 log("sync", f"── Stage {i}/{len(stages)}: {stage.title} ({stage.key}) ──") 177 t0 = time.time() 178 try: 179 stats = stage.run(dsn) 180 report.stages[stage.key] = { 181 "status": "ok", 182 "elapsed_s": round(time.time() - t0, 1), 183 "stats": stats, 184 } 185 log( 186 "sync", 187 f"{stage.key} done in {report.stages[stage.key]['elapsed_s']}s — {_format_stats(stats)}", 188 ) 189 except Exception as exc: 190 msg = f"{stage.key}: {exc}" 191 report.errors.append(msg) 192 report.stages[stage.key] = { 193 "status": "error", 194 "elapsed_s": round(time.time() - t0, 1), 195 "error": str(exc), 196 } 197 log("sync", f"{msg}") 198 if _env_flag("TANGLED_SYNC_FAIL_FAST", default=False): 199 break 200 201 final_status = "complete" if not report.errors else "partial" 202 with connect(dsn) as conn: 203 set_crawl_state( 204 conn, 205 key=CRAWL_KEY, 206 status=final_status, 207 meta={ 208 "elapsed_s": round(report.elapsed_s, 1), 209 "stages": report.stages, 210 "errors": report.errors, 211 }, 212 ) 213 conn.commit() 214 215 lines = [f"Elapsed: {report.elapsed_s:.0f}s", ""] 216 for key, info in report.stages.items(): 217 mark = "OK" if info["status"] == "ok" else "ERR" 218 line = f" [{mark}] {key} ({info['elapsed_s']}s)" 219 if info["status"] == "ok" and info.get("stats"): 220 line += f"{_format_stats(info['stats'])}" 221 lines.append(line) 222 if report.errors: 223 lines.append("") 224 lines.append("Errors:") 225 lines.extend(f" - {e}" for e in report.errors) 226 227 summary_block("Daily sync finished", lines) 228 229 if report.errors and _env_flag("TANGLED_SYNC_STRICT", default=True): 230 raise SystemExit(1) 231 return report