This repository has no description
1"""Daily sync pipeline — imports stage runners from scraper/."""
2
3from __future__ import annotations
4
5import os
6import sys
7import time
8from collections.abc import Callable
9from dataclasses import dataclass, field
10from pathlib import Path
11from typing import Any
12
13REPO_ROOT = Path(__file__).resolve().parent.parent
14SCRAPER_ROOT = REPO_ROOT / "scraper"
15if str(SCRAPER_ROOT) not in sys.path:
16 sys.path.insert(0, str(SCRAPER_ROOT))
17
18from check_readmes import run_check_readmes # noqa: E402
19from db import connect, init_schema, set_crawl_state # noqa: E402
20from embed_issues import run_embed_issues # noqa: E402
21from embed_readmes import run_embed_readmes # noqa: E402
22from fetch_collaborators import run_fetch_collaborators # noqa: E402
23from fetch_issues import run_fetch_issues # noqa: E402
24from progress import banner, log, summary_block # noqa: E402
25from stage2_network import run_stage2_network # noqa: E402
26from stage2_pds import run_stage2_accounts_only, run_stage2_repos_only # noqa: E402
27
28CRAWL_KEY = "sync:daily"
29
30StageFn = Callable[[str], dict[str, Any]]
31
32
33@dataclass
34class Stage:
35 key: str
36 title: str
37 run: StageFn
38 enabled: bool = True
39
40
41@dataclass
42class SyncReport:
43 started_at: float = field(default_factory=time.time)
44 stages: dict[str, dict[str, Any]] = field(default_factory=dict)
45 errors: list[str] = field(default_factory=list)
46
47 @property
48 def elapsed_s(self) -> float:
49 return time.time() - self.started_at
50
51
52def _env_flag(name: str, *, default: bool) -> bool:
53 raw = os.getenv(name, "").strip().lower()
54 if not raw:
55 return default
56 return raw in ("1", "true", "yes")
57
58
59def _configure_daily_env() -> None:
60 """Defaults tuned for scheduled daily runs (override via env)."""
61 os.environ.setdefault("TANGLED_ISSUE_REFRESH", "1")
62 os.environ.setdefault("TANGLED_ISSUE_ALL_USERS", "1")
63 os.environ.setdefault("TANGLED_STAGE2_NETWORK_REFRESH", "0")
64
65
66def _format_stats(stats: dict[str, Any]) -> str:
67 """One-line summary of rows processed for sync logs."""
68 if not stats:
69 return "(no stats)"
70 ordered = (
71 "repos_stored",
72 "already_in_db",
73 "account_count",
74 "users_scanned",
75 "issues_upserted",
76 "open_issues",
77 "found",
78 "missing",
79 "repos_fetched",
80 "collaborator_edges",
81 "embedded",
82 "batches",
83 "errors",
84 "resolve_failed",
85 "record_failed",
86 "already_synced",
87 "skipped",
88 "skipped_knot",
89 "error",
90 )
91 parts: list[str] = []
92 seen: set[str] = set()
93 for key in ordered:
94 if key in stats and stats[key] is not None:
95 parts.append(f"{key}={stats[key]}")
96 seen.add(key)
97 for key, value in stats.items():
98 if key not in seen and value is not None:
99 parts.append(f"{key}={value}")
100 return ", ".join(parts) if parts else "(no stats)"
101
102
103def build_stages() -> list[Stage]:
104 return [
105 Stage(
106 key="network",
107 title="Discover repos (tangled.org search)",
108 run=run_stage2_network,
109 enabled=_env_flag("TANGLED_SYNC_NETWORK", default=True),
110 ),
111 Stage(
112 key="accounts",
113 title="Refresh tngl.sh accounts",
114 run=run_stage2_accounts_only,
115 enabled=_env_flag("TANGLED_SYNC_ACCOUNTS", default=True),
116 ),
117 Stage(
118 key="repos",
119 title="Scan tngl.sh repo records (heavy)",
120 run=run_stage2_repos_only,
121 enabled=_env_flag("TANGLED_SYNC_TNGL_REPOS", default=False),
122 ),
123 Stage(
124 key="issues",
125 title="Re-scan all users for issues",
126 run=run_fetch_issues,
127 enabled=_env_flag("TANGLED_SYNC_ISSUES", default=True),
128 ),
129 Stage(
130 key="readmes",
131 title="Fetch missing READMEs from knots",
132 run=run_check_readmes,
133 enabled=_env_flag("TANGLED_SYNC_READMES", default=True),
134 ),
135 Stage(
136 key="collaborators",
137 title="Fetch repo collaborators",
138 run=run_fetch_collaborators,
139 enabled=_env_flag("TANGLED_SYNC_COLLABORATORS", default=True),
140 ),
141 Stage(
142 key="embed_readmes",
143 title="Embed READMEs (Gemini)",
144 run=run_embed_readmes,
145 enabled=_env_flag("TANGLED_SYNC_EMBED_READMES", default=True),
146 ),
147 Stage(
148 key="embed_issues",
149 title="Embed issues (Gemini)",
150 run=run_embed_issues,
151 enabled=_env_flag("TANGLED_SYNC_EMBED_ISSUES", default=True),
152 ),
153 ]
154
155
156def run_daily_sync(dsn: str, *, only: set[str] | None = None) -> SyncReport:
157 _configure_daily_env()
158 report = SyncReport()
159 stages = [s for s in build_stages() if s.enabled and (not only or s.key in only)]
160
161 banner("DAILY SYNC — Tangled → Postgres")
162 log("sync", f"Stages: {', '.join(s.key for s in stages) or '(none)'}")
163
164 init_schema(dsn)
165
166 with connect(dsn) as conn:
167 set_crawl_state(
168 conn,
169 key=CRAWL_KEY,
170 status="running",
171 meta={"stages": [s.key for s in stages]},
172 )
173 conn.commit()
174
175 for i, stage in enumerate(stages, start=1):
176 log("sync", f"── Stage {i}/{len(stages)}: {stage.title} ({stage.key}) ──")
177 t0 = time.time()
178 try:
179 stats = stage.run(dsn)
180 report.stages[stage.key] = {
181 "status": "ok",
182 "elapsed_s": round(time.time() - t0, 1),
183 "stats": stats,
184 }
185 log(
186 "sync",
187 f"✓ {stage.key} done in {report.stages[stage.key]['elapsed_s']}s — {_format_stats(stats)}",
188 )
189 except Exception as exc:
190 msg = f"{stage.key}: {exc}"
191 report.errors.append(msg)
192 report.stages[stage.key] = {
193 "status": "error",
194 "elapsed_s": round(time.time() - t0, 1),
195 "error": str(exc),
196 }
197 log("sync", f"✗ {msg}")
198 if _env_flag("TANGLED_SYNC_FAIL_FAST", default=False):
199 break
200
201 final_status = "complete" if not report.errors else "partial"
202 with connect(dsn) as conn:
203 set_crawl_state(
204 conn,
205 key=CRAWL_KEY,
206 status=final_status,
207 meta={
208 "elapsed_s": round(report.elapsed_s, 1),
209 "stages": report.stages,
210 "errors": report.errors,
211 },
212 )
213 conn.commit()
214
215 lines = [f"Elapsed: {report.elapsed_s:.0f}s", ""]
216 for key, info in report.stages.items():
217 mark = "OK" if info["status"] == "ok" else "ERR"
218 line = f" [{mark}] {key} ({info['elapsed_s']}s)"
219 if info["status"] == "ok" and info.get("stats"):
220 line += f" — {_format_stats(info['stats'])}"
221 lines.append(line)
222 if report.errors:
223 lines.append("")
224 lines.append("Errors:")
225 lines.extend(f" - {e}" for e in report.errors)
226
227 summary_block("Daily sync finished", lines)
228
229 if report.errors and _env_flag("TANGLED_SYNC_STRICT", default=True):
230 raise SystemExit(1)
231 return report