This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 24 kB View raw
1#!/usr/bin/env python3 2"""LangGraph agent loop on Anthropic with prompt caching. 3 4Core pieces: 5 - ``AgentState`` — message list reducer state 6 - ``create_anthropic_model`` — ChatAnthropic factory 7 - ``cached_system_message`` / ``AnthropicCacheSettings`` — explicit + automatic caching 8 - ``build_agent_graph`` — agent → (tools) → agent loop 9 - ``run_agent`` — single-turn or threaded invoke helper 10""" 11 12from __future__ import annotations 13 14import json 15import os 16import sys 17from dataclasses import dataclass 18from pathlib import Path 19from typing import Annotated, Any, Literal, Sequence 20 21from dotenv import load_dotenv 22from langchain_anthropic import ChatAnthropic 23from langchain_core.messages import ( 24 AIMessage, 25 BaseMessage, 26 HumanMessage, 27 SystemMessage, 28 ToolMessage, 29) 30from langchain_core.tools import BaseTool 31from langgraph.checkpoint.base import BaseCheckpointSaver 32from langgraph.graph import END, START, StateGraph 33from langgraph.graph.message import add_messages 34from langgraph.prebuilt import ToolNode 35from typing_extensions import TypedDict 36 37from agent.context import IssueSessionContext, build_issue_system_prompt 38from agent.load_issue import load_issue_context 39from agent.questionnaire_store import parse_questionnaire_json, save_questionnaire 40from agent.questionnaire_repo_store import publish_to_repo, publishing_enabled 41from agent.questionnaire_prompt import build_questionnaire_system_prompt 42from agent.tools import make_file_tools 43 44REPO_ROOT = Path(__file__).resolve().parent.parent 45 46DEFAULT_SYSTEM_PROMPT = """\ 47You are a helpful assistant for the Sunstead / Tangled hackathon stack. 48 49You can reason about: 50- Tangled repos, issues, and README embeddings in Postgres 51- The recommendation API (DID → ranked repos/issues) 52- The daily scraper that ingests Tangled network data 53 54Be concise and actionable. Use tools when they help answer factual questions. 55""" 56 57CacheTTL = Literal["5m", "1h"] 58 59 60@dataclass(frozen=True) 61class AnthropicCacheSettings: 62 """Anthropic prompt cache configuration. 63 64 We use two layers (both are valid together): 65 1. Explicit ``cache_control`` on the static system block (always cached). 66 2. Automatic ``cache_control`` on each ``model.invoke`` call so tools + 67 conversation prefix are cached on Anthropic's side (breakpoint moves 68 forward as the thread grows). 69 """ 70 71 type: Literal["ephemeral"] = "ephemeral" 72 ttl: CacheTTL = "5m" 73 74 def as_api_dict(self) -> dict[str, str]: 75 return {"type": self.type, "ttl": self.ttl} 76 77 78class AgentState(TypedDict): 79 """Graph state: append-only message history.""" 80 81 messages: Annotated[list[BaseMessage], add_messages] 82 83 84def load_env() -> None: 85 for candidate in (REPO_ROOT / ".env", Path(__file__).parent / ".env"): 86 if candidate.exists(): 87 load_dotenv(candidate) 88 return 89 load_dotenv() 90 91 92def require_anthropic_api_key() -> str: 93 key = os.getenv("ANTHROPIC_API_KEY", "").strip() 94 if not key: 95 print("ERROR: ANTHROPIC_API_KEY is not set", file=sys.stderr) 96 raise SystemExit(1) 97 return key 98 99 100def cached_system_message( 101 text: str, 102 *, 103 cache: AnthropicCacheSettings | None = None, 104) -> SystemMessage: 105 """System prompt block with explicit Anthropic ``cache_control``.""" 106 cache = cache or AnthropicCacheSettings() 107 return SystemMessage( 108 content=[ 109 { 110 "type": "text", 111 "text": text, 112 "cache_control": cache.as_api_dict(), 113 } 114 ] 115 ) 116 117 118def _tag_last_content_block( 119 message: BaseMessage, 120 cache: AnthropicCacheSettings, 121) -> BaseMessage: 122 """Add ``cache_control`` to the last text block of a message (explicit breakpoint).""" 123 content = message.content 124 if isinstance(content, str): 125 return message.model_copy( 126 update={ 127 "content": [ 128 { 129 "type": "text", 130 "text": content, 131 "cache_control": cache.as_api_dict(), 132 } 133 ] 134 } 135 ) 136 if not isinstance(content, list) or not content: 137 return message 138 blocks = [dict(block) if isinstance(block, dict) else block for block in content] 139 last = blocks[-1] 140 if isinstance(last, dict) and last.get("type") == "text": 141 blocks[-1] = {**last, "cache_control": cache.as_api_dict()} 142 return message.model_copy(update={"content": blocks}) 143 return message 144 145 146def _can_tag_message_for_cache(message: BaseMessage) -> bool: 147 """Anthropic forbids cache_control on tool_result blocks.""" 148 if isinstance(message, ToolMessage): 149 return False 150 return isinstance(message, (HumanMessage, AIMessage)) 151 152 153def prepare_messages_for_anthropic( 154 messages: Sequence[BaseMessage], 155 *, 156 system_message: SystemMessage, 157 cache: AnthropicCacheSettings | None = None, 158 cache_conversation_tail: bool = True, 159) -> list[BaseMessage]: 160 """Build the message list sent to Claude. 161 162 - Prepends the cached system message. 163 - Optionally marks the last non-tool message for explicit prefix caching. 164 Invoke-level ``cache_control`` still applies to the full request. 165 """ 166 cache = cache or AnthropicCacheSettings() 167 history = list(messages) 168 # After tool rounds, only invoke-level cache_control is safe — Anthropic 169 # rejects cache_control nested inside tool_result content blocks. 170 if ( 171 cache_conversation_tail 172 and history 173 and not any(isinstance(m, ToolMessage) for m in history) 174 ): 175 idx = None 176 for i in range(len(history) - 1, -1, -1): 177 if _can_tag_message_for_cache(history[i]): 178 idx = i 179 break 180 if idx is not None: 181 history[idx] = _tag_last_content_block(history[idx], cache) 182 return [system_message, *history] 183 184 185def extract_cache_usage(message: AIMessage) -> dict[str, int]: 186 """Pull Anthropic cache token stats from a model response, if present.""" 187 usage = (message.response_metadata or {}).get("usage") or {} 188 return { 189 "input_tokens": int(usage.get("input_tokens") or 0), 190 "output_tokens": int(usage.get("output_tokens") or 0), 191 "cache_creation_input_tokens": int( 192 usage.get("cache_creation_input_tokens") or 0 193 ), 194 "cache_read_input_tokens": int(usage.get("cache_read_input_tokens") or 0), 195 } 196 197 198def create_anthropic_model( 199 *, 200 model: str | None = None, 201 temperature: float | None = None, 202 max_tokens: int | None = None, 203 api_key: str | None = None, 204) -> ChatAnthropic: 205 """Construct ``ChatAnthropic`` from env defaults.""" 206 load_env() 207 return ChatAnthropic( 208 model=model or os.getenv("ANTHROPIC_MODEL", "claude-sonnet-4-6"), 209 api_key=api_key or require_anthropic_api_key(), 210 temperature=float( 211 temperature if temperature is not None else os.getenv("ANTHROPIC_TEMPERATURE", "0") 212 ), 213 max_tokens=int( 214 max_tokens if max_tokens is not None else os.getenv("ANTHROPIC_MAX_TOKENS", "4096") 215 ), 216 ) 217 218 219def create_questionnaire_model( 220 *, 221 api_key: str | None = None, 222) -> ChatAnthropic: 223 """Opus by default — questionnaire generation needs deep repo reasoning.""" 224 load_env() 225 return create_anthropic_model( 226 model=os.getenv("ANTHROPIC_QUESTIONNAIRE_MODEL", "claude-opus-4-6"), 227 max_tokens=int(os.getenv("ANTHROPIC_QUESTIONNAIRE_MAX_TOKENS", "16384")), 228 temperature=float(os.getenv("ANTHROPIC_QUESTIONNAIRE_TEMPERATURE", "0")), 229 api_key=api_key, 230 ) 231 232 233def _cache_settings_from_env() -> AnthropicCacheSettings: 234 ttl = os.getenv("ANTHROPIC_CACHE_TTL", "5m").strip() 235 if ttl not in ("5m", "1h"): 236 ttl = "5m" 237 return AnthropicCacheSettings(ttl=ttl) # type: ignore[arg-type] 238 239 240def should_continue(state: AgentState) -> Literal["tools", "__end__"]: 241 """Route to tools when the model emitted tool calls.""" 242 last = state["messages"][-1] 243 if isinstance(last, AIMessage) and last.tool_calls: 244 return "tools" 245 return END 246 247 248def extract_ai_text(message: BaseMessage) -> str: 249 """Pull plain text from an AIMessage (string or block content).""" 250 if not isinstance(message, AIMessage): 251 return "" 252 content = message.content 253 if isinstance(content, str): 254 return content.strip() 255 if isinstance(content, list): 256 parts: list[str] = [] 257 for block in content: 258 if isinstance(block, str): 259 parts.append(block) 260 elif isinstance(block, dict) and block.get("type") == "text": 261 text = block.get("text") 262 if isinstance(text, str) and text.strip(): 263 parts.append(text) 264 return "\n".join(parts).strip() 265 return str(content).strip() 266 267 268def find_questionnaire_json_text(messages: Sequence[BaseMessage]) -> str: 269 """Last non-tool AIMessage whose text is valid questionnaire JSON.""" 270 for message in reversed(messages): 271 if isinstance(message, AIMessage) and not message.tool_calls: 272 text = extract_ai_text(message) 273 if not text: 274 continue 275 try: 276 parse_questionnaire_json(text) 277 except (ValueError, json.JSONDecodeError): 278 continue 279 return text 280 last = messages[-1] if messages else None 281 raise ValueError( 282 "Agent did not produce questionnaire JSON " 283 f"(messages={len(messages)}, last={type(last).__name__ if last else 'none'})" 284 ) 285 286 287def build_agent_graph( 288 *, 289 tools: Sequence[BaseTool] | None = None, 290 system_prompt: str = DEFAULT_SYSTEM_PROMPT, 291 model: ChatAnthropic | None = None, 292 cache: AnthropicCacheSettings | None = None, 293 checkpointer: BaseCheckpointSaver | None = None, 294 min_tool_reads: int = 0, 295 verbose_tools: bool = False, 296): 297 """Compile the LangGraph agent loop: agent ⟷ tools (optional).""" 298 load_env() 299 tools = list(tools or []) 300 model = model or create_anthropic_model() 301 cache = cache or _cache_settings_from_env() 302 system_message = cached_system_message(system_prompt, cache=cache) 303 bound_model = model.bind_tools(tools) if tools else model 304 tool_node = ToolNode(tools) if tools else None 305 306 def call_model(state: AgentState) -> dict[str, list[BaseMessage]]: 307 history = state["messages"] 308 tool_read_count = sum(1 for m in history if isinstance(m, ToolMessage)) 309 has_tool_results = tool_read_count > 0 310 payload = prepare_messages_for_anthropic( 311 history, 312 system_message=system_message, 313 cache=cache, 314 ) 315 invoke_kwargs: dict[str, Any] = {} 316 if not has_tool_results: 317 invoke_kwargs["cache_control"] = cache.as_api_dict() 318 319 model_to_invoke = bound_model 320 if tools and min_tool_reads and tool_read_count < min_tool_reads: 321 model_to_invoke = bound_model.bind(tool_choice={"type": "any"}) 322 323 response = model_to_invoke.invoke(payload, **invoke_kwargs) 324 if isinstance(response, AIMessage): 325 stats = extract_cache_usage(response) 326 if any(stats[k] for k in ("cache_creation_input_tokens", "cache_read_input_tokens")): 327 print( 328 "[anthropic-cache]", 329 f"read={stats['cache_read_input_tokens']}", 330 f"write={stats['cache_creation_input_tokens']}", 331 f"in={stats['input_tokens']}", 332 f"out={stats['output_tokens']}", 333 ) 334 if verbose_tools and response.tool_calls: 335 for tc in response.tool_calls: 336 name = tc.get("name", "?") if isinstance(tc, dict) else getattr(tc, "name", "?") 337 args = tc.get("args", {}) if isinstance(tc, dict) else getattr(tc, "args", {}) 338 print(f"[agent] calling {name}({args})", file=sys.stderr) 339 return {"messages": [response]} 340 341 def run_tools(state: AgentState) -> dict[str, list[BaseMessage]]: 342 assert tool_node is not None 343 result = tool_node.invoke(state) 344 if verbose_tools: 345 for msg in result.get("messages", []): 346 if isinstance(msg, ToolMessage): 347 preview = msg.content 348 if isinstance(preview, str) and len(preview) > 120: 349 preview = preview[:120] + "" 350 print(f"[agent] tool result ({len(str(msg.content))} chars)", file=sys.stderr) 351 return result 352 353 graph = StateGraph(AgentState) 354 graph.add_node("agent", call_model) 355 356 if tools: 357 graph.add_node("tools", run_tools) 358 graph.add_edge(START, "agent") 359 graph.add_conditional_edges( 360 "agent", 361 should_continue, 362 {"tools": "tools", END: END}, 363 ) 364 graph.add_edge("tools", "agent") 365 else: 366 graph.add_edge(START, "agent") 367 graph.add_edge("agent", END) 368 369 return graph.compile(checkpointer=checkpointer) 370 371 372def build_issue_agent_graph( 373 ctx: IssueSessionContext, 374 *, 375 model: ChatAnthropic | None = None, 376 cache: AnthropicCacheSettings | None = None, 377 checkpointer: BaseCheckpointSaver | None = None, 378 include_list_tool: bool = False, 379): 380 """Issue investigator: context upfront, file tools only.""" 381 tools = make_file_tools(ctx) 382 if not include_list_tool: 383 tools = [t for t in tools if t.name == "read_repo_file"] 384 return build_agent_graph( 385 tools=tools, 386 system_prompt=build_issue_system_prompt(ctx), 387 model=model, 388 cache=cache, 389 checkpointer=checkpointer, 390 ) 391 392 393def build_questionnaire_agent_graph( 394 ctx: IssueSessionContext, 395 *, 396 model: ChatAnthropic | None = None, 397 cache: AnthropicCacheSettings | None = None, 398 checkpointer: BaseCheckpointSaver | None = None, 399 include_list_tool: bool = False, 400): 401 """Generate AI-solve questionnaire JSON: Opus + file tools + contract prompt.""" 402 tools = make_file_tools(ctx) 403 if not include_list_tool: 404 tools = [t for t in tools if t.name == "read_repo_file"] 405 return build_agent_graph( 406 tools=tools, 407 system_prompt=build_questionnaire_system_prompt(ctx), 408 model=model or create_questionnaire_model(), 409 cache=cache, 410 checkpointer=checkpointer, 411 min_tool_reads=int(os.getenv("QUESTIONNAIRE_MIN_TOOL_READS", "0")), 412 verbose_tools=os.getenv("AGENT_VERBOSE_TOOLS", "1").strip().lower() 413 not in ("0", "false", "no"), 414 ) 415 416 417def _finalize_questionnaire_json( 418 ctx: IssueSessionContext, 419 messages: Sequence[BaseMessage], 420 *, 421 model: ChatAnthropic | None = None, 422 cache: AnthropicCacheSettings | None = None, 423) -> str: 424 """Dedicated JSON-only model call after research (no tools).""" 425 model = model or create_questionnaire_model() 426 cache = cache or _cache_settings_from_env() 427 system_message = cached_system_message( 428 build_questionnaire_system_prompt(ctx), cache=cache 429 ) 430 history = [ 431 *messages, 432 HumanMessage( 433 content=( 434 "You have finished reading the repository. Output the complete questionnaire " 435 "JSON object now (schema version 2). Single JSON object only — no markdown " 436 "fences, no commentary, no tool calls." 437 ) 438 ), 439 ] 440 payload = prepare_messages_for_anthropic( 441 history, system_message=system_message, cache=cache 442 ) 443 response = model.invoke(payload) 444 text = extract_ai_text(response) 445 if text: 446 return text 447 block_types: list[str] = [] 448 if isinstance(response, AIMessage) and isinstance(response.content, list): 449 for block in response.content: 450 if isinstance(block, dict): 451 block_types.append(str(block.get("type", "?"))) 452 raise ValueError( 453 "Finalize turn returned empty text " 454 f"(blocks={block_types or 'none'})" 455 ) 456 457 458def run_questionnaire_agent( 459 ctx: IssueSessionContext, 460 *, 461 graph=None, 462 thread_id: str = "default", 463 include_list_tool: bool = False, 464) -> BaseMessage: 465 """Run questionnaire generation (no user prompt — instructions are in the system prompt).""" 466 app = graph or build_questionnaire_agent_graph( 467 ctx, include_list_tool=include_list_tool 468 ) 469 config = { 470 "configurable": {"thread_id": thread_id}, 471 "recursion_limit": int(os.getenv("QUESTIONNAIRE_RECURSION_LIMIT", "50")), 472 } 473 result = app.invoke( 474 { 475 "messages": [ 476 HumanMessage( 477 content=( 478 "Phase 1 — research: use read_repo_file to explore the repository " 479 "for as long as you need (README, issue-related source, tests, " 480 "patterns). Stop calling tools when you have enough context. " 481 "Do not output questionnaire JSON yet." 482 ) 483 ) 484 ] 485 }, 486 config=config, 487 ) 488 messages = result["messages"] 489 tool_reads = sum(1 for m in messages if isinstance(m, ToolMessage)) 490 print(f"[agent] research done ({tool_reads} file reads)", file=sys.stderr) 491 492 try: 493 text = find_questionnaire_json_text(messages) 494 except ValueError: 495 print("[agent] running JSON finalize turn", file=sys.stderr) 496 text = _finalize_questionnaire_json(ctx, messages) 497 return AIMessage(content=text) 498 499 500def generate_and_save_questionnaire( 501 issue_uri: str, 502 *, 503 fetch_file_tree: bool = True, 504 include_list_tool: bool = False, 505 thread_id: str = "job", 506 save: bool = True, 507) -> dict[str, Any]: 508 """Load issue, run questionnaire agent, parse JSON, optionally upsert to Postgres.""" 509 load_env() 510 ctx = load_issue_context(issue_uri, fetch_file_tree=fetch_file_tree) 511 reply = run_questionnaire_agent( 512 ctx, 513 thread_id=thread_id, 514 include_list_tool=include_list_tool, 515 ) 516 text = extract_ai_text(reply) if isinstance(reply, AIMessage) else str(reply.content) 517 try: 518 payload = parse_questionnaire_json(text) 519 except (json.JSONDecodeError, ValueError) as exc: 520 preview = (text or "")[:400].replace("\n", " ") 521 raise ValueError(f"{exc} — response preview: {preview!r}") from exc 522 523 result: dict[str, Any] = { 524 "issue_uri": ctx.issue_uri, 525 "version": payload.get("version"), 526 "question_count": _count_questions(payload.get("items") or []), 527 } 528 if not save: 529 result["payload"] = payload 530 print("[agent] --no-save: skipped DB write", file=sys.stderr) 531 return result 532 533 row = save_questionnaire(ctx.issue_uri, payload) 534 _maybe_publish_to_repo(ctx.issue_uri, payload, row) 535 result.update( 536 { 537 "issue_uri": row["issue_uri"], 538 "created_at": row["created_at"].isoformat() if row.get("created_at") else None, 539 "updated_at": row["updated_at"].isoformat() if row.get("updated_at") else None, 540 } 541 ) 542 return result 543 544 545def _maybe_publish_to_repo(issue_uri: str, payload: dict, row: dict | None = None) -> None: 546 """Best-effort dual-write: also publish the questionnaire to the knot repo when 547 QUESTIONNAIRE_PUBLISH_REPO is set. A failure here never fails the DB write.""" 548 if not publishing_enabled(): 549 return 550 try: 551 rel = publish_to_repo( 552 issue_uri, 553 payload, 554 (row or {}).get("created_at"), 555 (row or {}).get("updated_at"), 556 ) 557 print(f"[agent] published questionnaire to repo: {rel}", file=sys.stderr) 558 except Exception as exc: # noqa: BLE001 - publishing is best-effort 559 print(f"[agent] warning: repo publish failed (DB write still ok): {exc}", file=sys.stderr) 560 561 562def _count_questions(items: list) -> int: 563 n = 0 564 for q in items: 565 n += 1 566 for opt in q.get("options") or []: 567 n += _count_questions(opt.get("followups") or []) 568 return n 569 570 571def run_issue_agent( 572 ctx: IssueSessionContext, 573 user_input: str, 574 *, 575 graph=None, 576 thread_id: str = "default", 577 include_list_tool: bool = False, 578) -> BaseMessage: 579 """Run the issue agent with metadata + file tree already in the system prompt.""" 580 app = graph or build_issue_agent_graph(ctx, include_list_tool=include_list_tool) 581 config = {"configurable": {"thread_id": thread_id}} 582 result = app.invoke( 583 {"messages": [HumanMessage(content=user_input)]}, 584 config=config, 585 ) 586 return result["messages"][-1] 587 588 589def run_agent( 590 user_input: str, 591 *, 592 graph=None, 593 thread_id: str = "default", 594) -> BaseMessage: 595 """Invoke the compiled graph with a single user turn.""" 596 app = graph or build_agent_graph() 597 config = {"configurable": {"thread_id": thread_id}} 598 result = app.invoke( 599 {"messages": [HumanMessage(content=user_input)]}, 600 config=config, 601 ) 602 return result["messages"][-1] 603 604 605def main(argv: list[str] | None = None) -> None: 606 import argparse 607 608 parser = argparse.ArgumentParser(description="Run the Tangled issue investigation agent.") 609 parser.add_argument("prompt", nargs="?", help="User message (omit for stdin)") 610 parser.add_argument( 611 "--issue-uri", 612 metavar="URI", 613 help="Issue at:// URI — loads meta + file tree from DB/knot", 614 ) 615 parser.add_argument( 616 "--no-file-tree", 617 action="store_true", 618 help="Skip live knot tree walk (use with --list-tool)", 619 ) 620 parser.add_argument( 621 "--list-tool", 622 action="store_true", 623 help="Also expose list_repo_files (default: read_repo_file only)", 624 ) 625 parser.add_argument( 626 "--questionnaire", 627 "--questionaire", 628 action="store_true", 629 help="Generate AI-solve questionnaire JSON (uses Opus)", 630 ) 631 parser.add_argument( 632 "--no-save", 633 action="store_true", 634 help="Do not write questionnaire JSON to Postgres (questionnaire mode)", 635 ) 636 parser.add_argument("--thread-id", default="cli", help="Checkpoint thread id") 637 args = parser.parse_args(argv) 638 639 text = args.prompt 640 if not text and not sys.stdin.isatty(): 641 text = sys.stdin.read().strip() 642 if not text and not args.questionnaire: 643 print("ERROR: provide a prompt argument, stdin, or --questionnaire", file=sys.stderr) 644 raise SystemExit(1) 645 646 if not args.issue_uri: 647 print("ERROR: --issue-uri is required", file=sys.stderr) 648 raise SystemExit(1) 649 650 load_env() 651 ctx = load_issue_context( 652 args.issue_uri.strip(), 653 fetch_file_tree=not args.no_file_tree, 654 ) 655 if args.questionnaire: 656 print("[agent] questionnaire mode — will read repo via tools first", file=sys.stderr) 657 reply = run_questionnaire_agent( 658 ctx, 659 thread_id=args.thread_id, 660 include_list_tool=args.list_tool, 661 ) 662 text_out = reply.content if isinstance(reply.content, str) else str(reply.content) 663 if not args.no_save: 664 try: 665 payload = parse_questionnaire_json(text_out) 666 row = save_questionnaire(ctx.issue_uri, payload) 667 _maybe_publish_to_repo(ctx.issue_uri, payload, row) 668 print( 669 f"[agent] saved questionnaire for {row['issue_uri']}", 670 file=sys.stderr, 671 ) 672 except Exception as exc: # noqa: BLE001 673 print(f"[agent] warning: could not save to DB: {exc}", file=sys.stderr) 674 else: 675 reply = run_issue_agent( 676 ctx, 677 text, 678 thread_id=args.thread_id, 679 include_list_tool=args.list_tool, 680 ) 681 if isinstance(reply.content, str): 682 print(reply.content) 683 else: 684 print(reply.content) 685 686 687if __name__ == "__main__": 688 main()