This repository has no description
1#!/usr/bin/env python3
2"""LangGraph agent loop on Anthropic with prompt caching.
3
4Core pieces:
5 - ``AgentState`` — message list reducer state
6 - ``create_anthropic_model`` — ChatAnthropic factory
7 - ``cached_system_message`` / ``AnthropicCacheSettings`` — explicit + automatic caching
8 - ``build_agent_graph`` — agent → (tools) → agent loop
9 - ``run_agent`` — single-turn or threaded invoke helper
10"""
11
12from __future__ import annotations
13
14import json
15import os
16import sys
17from dataclasses import dataclass
18from pathlib import Path
19from typing import Annotated, Any, Literal, Sequence
20
21from dotenv import load_dotenv
22from langchain_anthropic import ChatAnthropic
23from langchain_core.messages import (
24 AIMessage,
25 BaseMessage,
26 HumanMessage,
27 SystemMessage,
28 ToolMessage,
29)
30from langchain_core.tools import BaseTool
31from langgraph.checkpoint.base import BaseCheckpointSaver
32from langgraph.graph import END, START, StateGraph
33from langgraph.graph.message import add_messages
34from langgraph.prebuilt import ToolNode
35from typing_extensions import TypedDict
36
37from agent.context import IssueSessionContext, build_issue_system_prompt
38from agent.load_issue import load_issue_context
39from agent.questionnaire_store import parse_questionnaire_json, save_questionnaire
40from agent.questionnaire_repo_store import publish_to_repo, publishing_enabled
41from agent.questionnaire_prompt import build_questionnaire_system_prompt
42from agent.tools import make_file_tools
43
44REPO_ROOT = Path(__file__).resolve().parent.parent
45
46DEFAULT_SYSTEM_PROMPT = """\
47You are a helpful assistant for the Sunstead / Tangled hackathon stack.
48
49You can reason about:
50- Tangled repos, issues, and README embeddings in Postgres
51- The recommendation API (DID → ranked repos/issues)
52- The daily scraper that ingests Tangled network data
53
54Be concise and actionable. Use tools when they help answer factual questions.
55"""
56
57CacheTTL = Literal["5m", "1h"]
58
59
60@dataclass(frozen=True)
61class AnthropicCacheSettings:
62 """Anthropic prompt cache configuration.
63
64 We use two layers (both are valid together):
65 1. Explicit ``cache_control`` on the static system block (always cached).
66 2. Automatic ``cache_control`` on each ``model.invoke`` call so tools +
67 conversation prefix are cached on Anthropic's side (breakpoint moves
68 forward as the thread grows).
69 """
70
71 type: Literal["ephemeral"] = "ephemeral"
72 ttl: CacheTTL = "5m"
73
74 def as_api_dict(self) -> dict[str, str]:
75 return {"type": self.type, "ttl": self.ttl}
76
77
78class AgentState(TypedDict):
79 """Graph state: append-only message history."""
80
81 messages: Annotated[list[BaseMessage], add_messages]
82
83
84def load_env() -> None:
85 for candidate in (REPO_ROOT / ".env", Path(__file__).parent / ".env"):
86 if candidate.exists():
87 load_dotenv(candidate)
88 return
89 load_dotenv()
90
91
92def require_anthropic_api_key() -> str:
93 key = os.getenv("ANTHROPIC_API_KEY", "").strip()
94 if not key:
95 print("ERROR: ANTHROPIC_API_KEY is not set", file=sys.stderr)
96 raise SystemExit(1)
97 return key
98
99
100def cached_system_message(
101 text: str,
102 *,
103 cache: AnthropicCacheSettings | None = None,
104) -> SystemMessage:
105 """System prompt block with explicit Anthropic ``cache_control``."""
106 cache = cache or AnthropicCacheSettings()
107 return SystemMessage(
108 content=[
109 {
110 "type": "text",
111 "text": text,
112 "cache_control": cache.as_api_dict(),
113 }
114 ]
115 )
116
117
118def _tag_last_content_block(
119 message: BaseMessage,
120 cache: AnthropicCacheSettings,
121) -> BaseMessage:
122 """Add ``cache_control`` to the last text block of a message (explicit breakpoint)."""
123 content = message.content
124 if isinstance(content, str):
125 return message.model_copy(
126 update={
127 "content": [
128 {
129 "type": "text",
130 "text": content,
131 "cache_control": cache.as_api_dict(),
132 }
133 ]
134 }
135 )
136 if not isinstance(content, list) or not content:
137 return message
138 blocks = [dict(block) if isinstance(block, dict) else block for block in content]
139 last = blocks[-1]
140 if isinstance(last, dict) and last.get("type") == "text":
141 blocks[-1] = {**last, "cache_control": cache.as_api_dict()}
142 return message.model_copy(update={"content": blocks})
143 return message
144
145
146def _can_tag_message_for_cache(message: BaseMessage) -> bool:
147 """Anthropic forbids cache_control on tool_result blocks."""
148 if isinstance(message, ToolMessage):
149 return False
150 return isinstance(message, (HumanMessage, AIMessage))
151
152
153def prepare_messages_for_anthropic(
154 messages: Sequence[BaseMessage],
155 *,
156 system_message: SystemMessage,
157 cache: AnthropicCacheSettings | None = None,
158 cache_conversation_tail: bool = True,
159) -> list[BaseMessage]:
160 """Build the message list sent to Claude.
161
162 - Prepends the cached system message.
163 - Optionally marks the last non-tool message for explicit prefix caching.
164 Invoke-level ``cache_control`` still applies to the full request.
165 """
166 cache = cache or AnthropicCacheSettings()
167 history = list(messages)
168 # After tool rounds, only invoke-level cache_control is safe — Anthropic
169 # rejects cache_control nested inside tool_result content blocks.
170 if (
171 cache_conversation_tail
172 and history
173 and not any(isinstance(m, ToolMessage) for m in history)
174 ):
175 idx = None
176 for i in range(len(history) - 1, -1, -1):
177 if _can_tag_message_for_cache(history[i]):
178 idx = i
179 break
180 if idx is not None:
181 history[idx] = _tag_last_content_block(history[idx], cache)
182 return [system_message, *history]
183
184
185def extract_cache_usage(message: AIMessage) -> dict[str, int]:
186 """Pull Anthropic cache token stats from a model response, if present."""
187 usage = (message.response_metadata or {}).get("usage") or {}
188 return {
189 "input_tokens": int(usage.get("input_tokens") or 0),
190 "output_tokens": int(usage.get("output_tokens") or 0),
191 "cache_creation_input_tokens": int(
192 usage.get("cache_creation_input_tokens") or 0
193 ),
194 "cache_read_input_tokens": int(usage.get("cache_read_input_tokens") or 0),
195 }
196
197
198def create_anthropic_model(
199 *,
200 model: str | None = None,
201 temperature: float | None = None,
202 max_tokens: int | None = None,
203 api_key: str | None = None,
204) -> ChatAnthropic:
205 """Construct ``ChatAnthropic`` from env defaults."""
206 load_env()
207 return ChatAnthropic(
208 model=model or os.getenv("ANTHROPIC_MODEL", "claude-sonnet-4-6"),
209 api_key=api_key or require_anthropic_api_key(),
210 temperature=float(
211 temperature if temperature is not None else os.getenv("ANTHROPIC_TEMPERATURE", "0")
212 ),
213 max_tokens=int(
214 max_tokens if max_tokens is not None else os.getenv("ANTHROPIC_MAX_TOKENS", "4096")
215 ),
216 )
217
218
219def create_questionnaire_model(
220 *,
221 api_key: str | None = None,
222) -> ChatAnthropic:
223 """Opus by default — questionnaire generation needs deep repo reasoning."""
224 load_env()
225 return create_anthropic_model(
226 model=os.getenv("ANTHROPIC_QUESTIONNAIRE_MODEL", "claude-opus-4-6"),
227 max_tokens=int(os.getenv("ANTHROPIC_QUESTIONNAIRE_MAX_TOKENS", "16384")),
228 temperature=float(os.getenv("ANTHROPIC_QUESTIONNAIRE_TEMPERATURE", "0")),
229 api_key=api_key,
230 )
231
232
233def _cache_settings_from_env() -> AnthropicCacheSettings:
234 ttl = os.getenv("ANTHROPIC_CACHE_TTL", "5m").strip()
235 if ttl not in ("5m", "1h"):
236 ttl = "5m"
237 return AnthropicCacheSettings(ttl=ttl) # type: ignore[arg-type]
238
239
240def should_continue(state: AgentState) -> Literal["tools", "__end__"]:
241 """Route to tools when the model emitted tool calls."""
242 last = state["messages"][-1]
243 if isinstance(last, AIMessage) and last.tool_calls:
244 return "tools"
245 return END
246
247
248def extract_ai_text(message: BaseMessage) -> str:
249 """Pull plain text from an AIMessage (string or block content)."""
250 if not isinstance(message, AIMessage):
251 return ""
252 content = message.content
253 if isinstance(content, str):
254 return content.strip()
255 if isinstance(content, list):
256 parts: list[str] = []
257 for block in content:
258 if isinstance(block, str):
259 parts.append(block)
260 elif isinstance(block, dict) and block.get("type") == "text":
261 text = block.get("text")
262 if isinstance(text, str) and text.strip():
263 parts.append(text)
264 return "\n".join(parts).strip()
265 return str(content).strip()
266
267
268def find_questionnaire_json_text(messages: Sequence[BaseMessage]) -> str:
269 """Last non-tool AIMessage whose text is valid questionnaire JSON."""
270 for message in reversed(messages):
271 if isinstance(message, AIMessage) and not message.tool_calls:
272 text = extract_ai_text(message)
273 if not text:
274 continue
275 try:
276 parse_questionnaire_json(text)
277 except (ValueError, json.JSONDecodeError):
278 continue
279 return text
280 last = messages[-1] if messages else None
281 raise ValueError(
282 "Agent did not produce questionnaire JSON "
283 f"(messages={len(messages)}, last={type(last).__name__ if last else 'none'})"
284 )
285
286
287def build_agent_graph(
288 *,
289 tools: Sequence[BaseTool] | None = None,
290 system_prompt: str = DEFAULT_SYSTEM_PROMPT,
291 model: ChatAnthropic | None = None,
292 cache: AnthropicCacheSettings | None = None,
293 checkpointer: BaseCheckpointSaver | None = None,
294 min_tool_reads: int = 0,
295 verbose_tools: bool = False,
296):
297 """Compile the LangGraph agent loop: agent ⟷ tools (optional)."""
298 load_env()
299 tools = list(tools or [])
300 model = model or create_anthropic_model()
301 cache = cache or _cache_settings_from_env()
302 system_message = cached_system_message(system_prompt, cache=cache)
303 bound_model = model.bind_tools(tools) if tools else model
304 tool_node = ToolNode(tools) if tools else None
305
306 def call_model(state: AgentState) -> dict[str, list[BaseMessage]]:
307 history = state["messages"]
308 tool_read_count = sum(1 for m in history if isinstance(m, ToolMessage))
309 has_tool_results = tool_read_count > 0
310 payload = prepare_messages_for_anthropic(
311 history,
312 system_message=system_message,
313 cache=cache,
314 )
315 invoke_kwargs: dict[str, Any] = {}
316 if not has_tool_results:
317 invoke_kwargs["cache_control"] = cache.as_api_dict()
318
319 model_to_invoke = bound_model
320 if tools and min_tool_reads and tool_read_count < min_tool_reads:
321 model_to_invoke = bound_model.bind(tool_choice={"type": "any"})
322
323 response = model_to_invoke.invoke(payload, **invoke_kwargs)
324 if isinstance(response, AIMessage):
325 stats = extract_cache_usage(response)
326 if any(stats[k] for k in ("cache_creation_input_tokens", "cache_read_input_tokens")):
327 print(
328 "[anthropic-cache]",
329 f"read={stats['cache_read_input_tokens']}",
330 f"write={stats['cache_creation_input_tokens']}",
331 f"in={stats['input_tokens']}",
332 f"out={stats['output_tokens']}",
333 )
334 if verbose_tools and response.tool_calls:
335 for tc in response.tool_calls:
336 name = tc.get("name", "?") if isinstance(tc, dict) else getattr(tc, "name", "?")
337 args = tc.get("args", {}) if isinstance(tc, dict) else getattr(tc, "args", {})
338 print(f"[agent] calling {name}({args})", file=sys.stderr)
339 return {"messages": [response]}
340
341 def run_tools(state: AgentState) -> dict[str, list[BaseMessage]]:
342 assert tool_node is not None
343 result = tool_node.invoke(state)
344 if verbose_tools:
345 for msg in result.get("messages", []):
346 if isinstance(msg, ToolMessage):
347 preview = msg.content
348 if isinstance(preview, str) and len(preview) > 120:
349 preview = preview[:120] + "…"
350 print(f"[agent] tool result ({len(str(msg.content))} chars)", file=sys.stderr)
351 return result
352
353 graph = StateGraph(AgentState)
354 graph.add_node("agent", call_model)
355
356 if tools:
357 graph.add_node("tools", run_tools)
358 graph.add_edge(START, "agent")
359 graph.add_conditional_edges(
360 "agent",
361 should_continue,
362 {"tools": "tools", END: END},
363 )
364 graph.add_edge("tools", "agent")
365 else:
366 graph.add_edge(START, "agent")
367 graph.add_edge("agent", END)
368
369 return graph.compile(checkpointer=checkpointer)
370
371
372def build_issue_agent_graph(
373 ctx: IssueSessionContext,
374 *,
375 model: ChatAnthropic | None = None,
376 cache: AnthropicCacheSettings | None = None,
377 checkpointer: BaseCheckpointSaver | None = None,
378 include_list_tool: bool = False,
379):
380 """Issue investigator: context upfront, file tools only."""
381 tools = make_file_tools(ctx)
382 if not include_list_tool:
383 tools = [t for t in tools if t.name == "read_repo_file"]
384 return build_agent_graph(
385 tools=tools,
386 system_prompt=build_issue_system_prompt(ctx),
387 model=model,
388 cache=cache,
389 checkpointer=checkpointer,
390 )
391
392
393def build_questionnaire_agent_graph(
394 ctx: IssueSessionContext,
395 *,
396 model: ChatAnthropic | None = None,
397 cache: AnthropicCacheSettings | None = None,
398 checkpointer: BaseCheckpointSaver | None = None,
399 include_list_tool: bool = False,
400):
401 """Generate AI-solve questionnaire JSON: Opus + file tools + contract prompt."""
402 tools = make_file_tools(ctx)
403 if not include_list_tool:
404 tools = [t for t in tools if t.name == "read_repo_file"]
405 return build_agent_graph(
406 tools=tools,
407 system_prompt=build_questionnaire_system_prompt(ctx),
408 model=model or create_questionnaire_model(),
409 cache=cache,
410 checkpointer=checkpointer,
411 min_tool_reads=int(os.getenv("QUESTIONNAIRE_MIN_TOOL_READS", "0")),
412 verbose_tools=os.getenv("AGENT_VERBOSE_TOOLS", "1").strip().lower()
413 not in ("0", "false", "no"),
414 )
415
416
417def _finalize_questionnaire_json(
418 ctx: IssueSessionContext,
419 messages: Sequence[BaseMessage],
420 *,
421 model: ChatAnthropic | None = None,
422 cache: AnthropicCacheSettings | None = None,
423) -> str:
424 """Dedicated JSON-only model call after research (no tools)."""
425 model = model or create_questionnaire_model()
426 cache = cache or _cache_settings_from_env()
427 system_message = cached_system_message(
428 build_questionnaire_system_prompt(ctx), cache=cache
429 )
430 history = [
431 *messages,
432 HumanMessage(
433 content=(
434 "You have finished reading the repository. Output the complete questionnaire "
435 "JSON object now (schema version 2). Single JSON object only — no markdown "
436 "fences, no commentary, no tool calls."
437 )
438 ),
439 ]
440 payload = prepare_messages_for_anthropic(
441 history, system_message=system_message, cache=cache
442 )
443 response = model.invoke(payload)
444 text = extract_ai_text(response)
445 if text:
446 return text
447 block_types: list[str] = []
448 if isinstance(response, AIMessage) and isinstance(response.content, list):
449 for block in response.content:
450 if isinstance(block, dict):
451 block_types.append(str(block.get("type", "?")))
452 raise ValueError(
453 "Finalize turn returned empty text "
454 f"(blocks={block_types or 'none'})"
455 )
456
457
458def run_questionnaire_agent(
459 ctx: IssueSessionContext,
460 *,
461 graph=None,
462 thread_id: str = "default",
463 include_list_tool: bool = False,
464) -> BaseMessage:
465 """Run questionnaire generation (no user prompt — instructions are in the system prompt)."""
466 app = graph or build_questionnaire_agent_graph(
467 ctx, include_list_tool=include_list_tool
468 )
469 config = {
470 "configurable": {"thread_id": thread_id},
471 "recursion_limit": int(os.getenv("QUESTIONNAIRE_RECURSION_LIMIT", "50")),
472 }
473 result = app.invoke(
474 {
475 "messages": [
476 HumanMessage(
477 content=(
478 "Phase 1 — research: use read_repo_file to explore the repository "
479 "for as long as you need (README, issue-related source, tests, "
480 "patterns). Stop calling tools when you have enough context. "
481 "Do not output questionnaire JSON yet."
482 )
483 )
484 ]
485 },
486 config=config,
487 )
488 messages = result["messages"]
489 tool_reads = sum(1 for m in messages if isinstance(m, ToolMessage))
490 print(f"[agent] research done ({tool_reads} file reads)", file=sys.stderr)
491
492 try:
493 text = find_questionnaire_json_text(messages)
494 except ValueError:
495 print("[agent] running JSON finalize turn", file=sys.stderr)
496 text = _finalize_questionnaire_json(ctx, messages)
497 return AIMessage(content=text)
498
499
500def generate_and_save_questionnaire(
501 issue_uri: str,
502 *,
503 fetch_file_tree: bool = True,
504 include_list_tool: bool = False,
505 thread_id: str = "job",
506 save: bool = True,
507) -> dict[str, Any]:
508 """Load issue, run questionnaire agent, parse JSON, optionally upsert to Postgres."""
509 load_env()
510 ctx = load_issue_context(issue_uri, fetch_file_tree=fetch_file_tree)
511 reply = run_questionnaire_agent(
512 ctx,
513 thread_id=thread_id,
514 include_list_tool=include_list_tool,
515 )
516 text = extract_ai_text(reply) if isinstance(reply, AIMessage) else str(reply.content)
517 try:
518 payload = parse_questionnaire_json(text)
519 except (json.JSONDecodeError, ValueError) as exc:
520 preview = (text or "")[:400].replace("\n", " ")
521 raise ValueError(f"{exc} — response preview: {preview!r}") from exc
522
523 result: dict[str, Any] = {
524 "issue_uri": ctx.issue_uri,
525 "version": payload.get("version"),
526 "question_count": _count_questions(payload.get("items") or []),
527 }
528 if not save:
529 result["payload"] = payload
530 print("[agent] --no-save: skipped DB write", file=sys.stderr)
531 return result
532
533 row = save_questionnaire(ctx.issue_uri, payload)
534 _maybe_publish_to_repo(ctx.issue_uri, payload, row)
535 result.update(
536 {
537 "issue_uri": row["issue_uri"],
538 "created_at": row["created_at"].isoformat() if row.get("created_at") else None,
539 "updated_at": row["updated_at"].isoformat() if row.get("updated_at") else None,
540 }
541 )
542 return result
543
544
545def _maybe_publish_to_repo(issue_uri: str, payload: dict, row: dict | None = None) -> None:
546 """Best-effort dual-write: also publish the questionnaire to the knot repo when
547 QUESTIONNAIRE_PUBLISH_REPO is set. A failure here never fails the DB write."""
548 if not publishing_enabled():
549 return
550 try:
551 rel = publish_to_repo(
552 issue_uri,
553 payload,
554 (row or {}).get("created_at"),
555 (row or {}).get("updated_at"),
556 )
557 print(f"[agent] published questionnaire to repo: {rel}", file=sys.stderr)
558 except Exception as exc: # noqa: BLE001 - publishing is best-effort
559 print(f"[agent] warning: repo publish failed (DB write still ok): {exc}", file=sys.stderr)
560
561
562def _count_questions(items: list) -> int:
563 n = 0
564 for q in items:
565 n += 1
566 for opt in q.get("options") or []:
567 n += _count_questions(opt.get("followups") or [])
568 return n
569
570
571def run_issue_agent(
572 ctx: IssueSessionContext,
573 user_input: str,
574 *,
575 graph=None,
576 thread_id: str = "default",
577 include_list_tool: bool = False,
578) -> BaseMessage:
579 """Run the issue agent with metadata + file tree already in the system prompt."""
580 app = graph or build_issue_agent_graph(ctx, include_list_tool=include_list_tool)
581 config = {"configurable": {"thread_id": thread_id}}
582 result = app.invoke(
583 {"messages": [HumanMessage(content=user_input)]},
584 config=config,
585 )
586 return result["messages"][-1]
587
588
589def run_agent(
590 user_input: str,
591 *,
592 graph=None,
593 thread_id: str = "default",
594) -> BaseMessage:
595 """Invoke the compiled graph with a single user turn."""
596 app = graph or build_agent_graph()
597 config = {"configurable": {"thread_id": thread_id}}
598 result = app.invoke(
599 {"messages": [HumanMessage(content=user_input)]},
600 config=config,
601 )
602 return result["messages"][-1]
603
604
605def main(argv: list[str] | None = None) -> None:
606 import argparse
607
608 parser = argparse.ArgumentParser(description="Run the Tangled issue investigation agent.")
609 parser.add_argument("prompt", nargs="?", help="User message (omit for stdin)")
610 parser.add_argument(
611 "--issue-uri",
612 metavar="URI",
613 help="Issue at:// URI — loads meta + file tree from DB/knot",
614 )
615 parser.add_argument(
616 "--no-file-tree",
617 action="store_true",
618 help="Skip live knot tree walk (use with --list-tool)",
619 )
620 parser.add_argument(
621 "--list-tool",
622 action="store_true",
623 help="Also expose list_repo_files (default: read_repo_file only)",
624 )
625 parser.add_argument(
626 "--questionnaire",
627 "--questionaire",
628 action="store_true",
629 help="Generate AI-solve questionnaire JSON (uses Opus)",
630 )
631 parser.add_argument(
632 "--no-save",
633 action="store_true",
634 help="Do not write questionnaire JSON to Postgres (questionnaire mode)",
635 )
636 parser.add_argument("--thread-id", default="cli", help="Checkpoint thread id")
637 args = parser.parse_args(argv)
638
639 text = args.prompt
640 if not text and not sys.stdin.isatty():
641 text = sys.stdin.read().strip()
642 if not text and not args.questionnaire:
643 print("ERROR: provide a prompt argument, stdin, or --questionnaire", file=sys.stderr)
644 raise SystemExit(1)
645
646 if not args.issue_uri:
647 print("ERROR: --issue-uri is required", file=sys.stderr)
648 raise SystemExit(1)
649
650 load_env()
651 ctx = load_issue_context(
652 args.issue_uri.strip(),
653 fetch_file_tree=not args.no_file_tree,
654 )
655 if args.questionnaire:
656 print("[agent] questionnaire mode — will read repo via tools first", file=sys.stderr)
657 reply = run_questionnaire_agent(
658 ctx,
659 thread_id=args.thread_id,
660 include_list_tool=args.list_tool,
661 )
662 text_out = reply.content if isinstance(reply.content, str) else str(reply.content)
663 if not args.no_save:
664 try:
665 payload = parse_questionnaire_json(text_out)
666 row = save_questionnaire(ctx.issue_uri, payload)
667 _maybe_publish_to_repo(ctx.issue_uri, payload, row)
668 print(
669 f"[agent] saved questionnaire for {row['issue_uri']}",
670 file=sys.stderr,
671 )
672 except Exception as exc: # noqa: BLE001
673 print(f"[agent] warning: could not save to DB: {exc}", file=sys.stderr)
674 else:
675 reply = run_issue_agent(
676 ctx,
677 text,
678 thread_id=args.thread_id,
679 include_list_tool=args.list_tool,
680 )
681 if isinstance(reply.content, str):
682 print(reply.content)
683 else:
684 print(reply.content)
685
686
687if __name__ == "__main__":
688 main()