How to Design an OpenHarness Style Agent Runtime with Tools, Memory, Permissions, Skills, and Multi-Agent Coordination

How to Design an OpenHarness Style Agent Runtime with Tools, Memory, Permissions, Skills, and Multi-Agent Coordination


async def demo_memory():
   explain(
       "DEMO 4 — Memory: persistent MEMORY.md across sessions",
       """Long-term memory survives between runs by persisting to MEMORY.md. In
       session 1 the agent records a user preference; in a brand-new session 2
       (fresh engine, fresh transcript) that memory is injected into the system
       prompt, so the agent already 'knows' the user.""")
   mem_path = os.path.join(tempfile.gettempdir(), "oh_demo4_MEMORY.md")
   memory = MemoryStore(mem_path)
   memory.reset()
   registry = build_registry()
   print("  ── Session 1 ──")
   ctx1 = ToolContext(vfs=VirtualFS(), memory=memory, skills=SkillLibrary())
   s1 = [
       Use("I'll remember the user's stated preferences.",
           [("remember", {"note": "User prefers metric units and concise answers."})]),
       lambda m: Say("Noted your preferences for next time."),
   ]
   eng1 = QueryEngine(brain=ScriptedBrain(s1), registry=registry, ctx=ctx1,
                      perms=PermissionChecker(PermissionMode.AUTO),
                      hooks=HookManager(),
                      system_prompt=assemble_system_prompt(
                          base=BASE_SYSTEM, project_context="",
                          memory=memory.read(),
                          skills_summary="(none)", tool_names=registry.names()))
   await eng1.run("Remember that I like metric units and short answers.")
   print(f"  MEMORY.md is now:\n{textwrap.indent(memory.read(), '      ')}")
   print("\n  ── Session 2 (new session, memory reloaded from disk) ──")
   memory2 = MemoryStore(mem_path)
   ctx2 = ToolContext(vfs=VirtualFS(), memory=memory2, skills=SkillLibrary())
   sysprompt2 = assemble_system_prompt(
       base=BASE_SYSTEM, project_context="", memory=memory2.read(),
       skills_summary="(none)", tool_names=registry.names())
   print("  The new system prompt already contains:")
   print(textwrap.indent("## Long-term memory (MEMORY.md)\n" + memory2.read(),
                         "      "))
   s2 = [lambda m: Say("Since you prefer metric and brevity: it's about 5 km. 🙂")]
   eng2 = QueryEngine(brain=ScriptedBrain(s2), registry=registry, ctx=ctx2,
                      perms=PermissionChecker(PermissionMode.AUTO),
                      hooks=HookManager(), system_prompt=sysprompt2)
   final = await eng2.run("How far is a 5000 meter run, roughly?")
   print(f"\n  FINAL: {final}")
   print("\n  TAKEAWAY: state that should outlive a conversation goes to memory, "
         "then is re-injected at the start of future sessions.")
async def demo_compaction():
   explain(
       "DEMO 5 — Context auto-compaction (multi-day sessions without overflow)",
       """As a session grows, the transcript can blow past the context window.
       Auto-compaction summarizes the older middle of the conversation into a
       compact note while preserving the original task and the most recent
       turns — so long-running agents keep going. (We force a tiny threshold to
       trigger it; real OpenHarness asks the model to write the summary.)""")
   msgs = [Message(role="user", content="Build and verify a data pipeline.")]
   for i in range(8):
       msgs.append(Message(role="assistant", content=f"Step {i}: doing work...",
                           tool_calls=[ToolCall(f"c{i}", "shell",
                                                {"command": f"process chunk {i}"})]))
       msgs.append(Message(role="tool", name="shell", tool_call_id=f"c{i}",
                           content=f"chunk {i} processed: 1000 rows ok " * 4))
   before = estimate_messages_tokens(msgs)
   print(f"  Before: {len(msgs)} messages, ~{before} tokens")
   compacted = maybe_compact(msgs, max_tokens=300, keep_last=4)
   after = estimate_messages_tokens(compacted)
   print(f"  After:  {len(compacted)} messages, ~{after} tokens "
         f"({100 * (before - after) // before}% smaller)")
   print("\n  The injected summary message:")
   print(textwrap.indent(compacted[1].content, "      "))
   print("\n  TAKEAWAY: the harness manages the context window so the agent can "
         "run far longer than a single window allows.")
async def demo_multi_agent():
   explain(
       "DEMO 6 — Swarm coordination: spawning parallel subagents",
       """A lead agent decomposes a task and delegates to specialized subagents.
       Each subagent is its OWN harness (own loop, own brain, own tools). Two
       researchers run IN PARALLEL (issued in the same turn → asyncio.gather),
       then a writer synthesizes their findings. The team registry tracks who
       did what.""")
   def researcher_profile():
       reg = build_registry([WebSearchTool])
       script = [
           Use("Researching via web search.",
               [("web_search", {"query": "PLACEHOLDER"})]),
           lambda m: Say("Summary: " +
                         short(last_tool_results(m)[0]["content"], 160)),
       ]
       return ScriptedBrain(script), reg
   def writer_profile():
       reg = build_registry([WriteFileTool])
       script = [lambda m: Say("Synthesized brief combining both research notes "
                               "into a coherent paragraph.")]
       return ScriptedBrain(script), reg
   profiles = {"researcher": researcher_profile, "writer": writer_profile}
   vfs = VirtualFS()
   memory = MemoryStore(os.path.join(tempfile.gettempdir(), "oh_d6.md"))
   skills = SkillLibrary()
   team: list = []
   def make_spawn():
       async def spawn(role: str, task: str) -> str:
           factory = profiles.get(role)
           if not factory:
               return f"(no such role: {role})"
           child_brain, child_reg = factory()
           if role == "researcher" and child_brain.script:
               child_brain.script[0] = Use(f"Researching: {task}",
                                           [("web_search", {"query": task})])
           child_ctx = ToolContext(vfs=vfs, memory=memory, skills=skills,
                                   spawn=spawn)
           child_engine = QueryEngine(
               brain=child_brain, registry=child_reg, ctx=child_ctx,
               perms=PermissionChecker(PermissionMode.AUTO),
               hooks=HookManager(), system_prompt="(subagent)",
               approve=auto_approve, max_turns=6)
           print(f"        🧑‍🔧 spawned [{role}] for: {short(task, 60)}")
           result = await child_engine.run(task, on_event=None)
           team.append({"role": role, "task": task, "result": result})
           return result
       return spawn
   ctx = ToolContext(vfs=vfs, memory=memory, skills=skills, spawn=make_spawn())
   registry = build_registry()
   lead_script = [
       Use("I'll split this: research vector databases AND agent harnesses in "
           "parallel, then have a writer combine the findings.",
           [("spawn_agent", {"role": "researcher",
                             "task": "vector database for RAG"}),
            ("spawn_agent", {"role": "researcher",
                             "task": "agent harness design"})]),
       Use("Both research notes are in — delegating synthesis to the writer.",
           [("spawn_agent", {"role": "writer",
                             "task": "combine the two research notes"})]),
       lambda m: Say("Coordination complete: 2 researchers (parallel) + 1 "
                     "writer produced a combined brief."),
   ]
   engine = QueryEngine(brain=ScriptedBrain(lead_script), registry=registry,
                        ctx=ctx, perms=PermissionChecker(PermissionMode.AUTO),
                        hooks=HookManager(), system_prompt="(lead agent)",
                        max_turns=8)
   print("\n[running the lead agent]\n")
   t0 = time.time()
   final = await engine.run("Produce a short brief on building RAG agents.")
   dt = time.time() - t0
   print(f"\n  FINAL: {final}")
   print(f"\n  Team registry ({len(team)} subagent runs, total {dt:.3f}s):")
   for entry in team:
       print(f"    - [{entry['role']}] {short(entry['task'], 40)} -> "
             f"{short(entry['result'], 80)}")
   print("\n  TAKEAWAY: the same loop nests — a 'tool' can be an entire agent, "
         "enabling parallel teams and delegation.")
async def demo_real_provider():
   explain(
       "DEMO 7 — Swap in a REAL model (Anthropic / OpenAI-compatible)",
       """Everything above ran on a deterministic mock brain — zero keys, zero
       cost. Going live changes exactly ONE thing: the brain. The engine, tools,
       permissions, hooks, skills, memory, and coordinator are untouched. This
       is the whole point of a harness: the model is pluggable.""")
   print(textwrap.dedent("""\
         To run the SAME harness on a real model, set environment variables and
         re-run (works with any OpenAI- or Anthropic-compatible endpoint that
         OpenHarness supports: Claude, GPT, Kimi, GLM, DeepSeek, Qwen, Groq,
         Ollama, OpenRouter, ...):
             import os
             os.environ["USE_REAL_LLM"]    = "1"
             # --- Anthropic-style ---
             os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
             os.environ["MODEL"]             = "claude-sonnet-4-6"
             # --- or OpenAI-style (incl. local Ollama) ---
             # os.environ["OPENAI_API_KEY"]  = "sk-..."
             # os.environ["OPENAI_BASE_URL"] = "http://localhost:11434/v1"
             # os.environ["MODEL"]           = "llama-3.3-70b"
         Then build the engine with the real brain instead of the mock:
             brain = make_real_brain(system=system_prompt) or ScriptedBrain([...])
             engine = QueryEngine(brain=brain, registry=registry, ctx=ctx, ...)
             await engine.run("Refactor utils.py and add tests.")
   """))
   sysprompt = assemble_system_prompt(
       base=BASE_SYSTEM, project_context="", memory="",
       skills_summary="(none)", tool_names=build_registry().names())
   real = make_real_brain(system=sysprompt)
   if real is None:
       print("  [USE_REAL_LLM not set → staying on the mock brain. "
             "Set the env vars above and re-run to go live.]")
       return
   print(f"  [LIVE] Using real provider: {real.api_format} / {real.model}\n")
   vfs = VirtualFS()
   ctx = ToolContext(vfs=vfs, memory=MemoryStore(
       os.path.join(tempfile.gettempdir(), "oh_real.md")),
       skills=SkillLibrary(), canned_answers={})
   engine = QueryEngine(
       brain=RetryingBrain(real), registry=build_registry(), ctx=ctx,
       perms=PermissionChecker(PermissionMode.AUTO), hooks=HookManager(),
       system_prompt=sysprompt, cost=CostMeter(real.model), max_turns=12)
   final = await engine.run(
       "Create greet.py with a function greet(name) that returns "
       "'Hello, !', then write and run a quick test to prove it works.")
   print(f"\n  FINAL: {final}")
   print(f"\n  Files:\n{vfs.tree()}")
   print(f"\n  💰 {engine.cost.summary()}")
async def main():
   banner("OpenHarness From Scratch — guided walkthrough")
   print(textwrap.dedent("""
       We will build up the harness one subsystem at a time:
         1. The agent loop  (tools, run/verify/fix, retries, cost)
         2. Permissions     (modes, sensitive paths, rules, hook veto)
         3. Skills          (on-demand knowledge)
         4. Memory          (persistent MEMORY.md across sessions)
         5. Compaction      (surviving long sessions)
         6. Multi-agent     (parallel subagent delegation)
         7. Real provider   (one-line swap to a live model)
       Architecture (what each piece is responsible for):
           User prompt
                │
                ▼
           QueryEngine ──► LLM brain (mock or real)   "WHAT to do"
                │  ▲            │ tool_use
                │  └────────────┘
                ▼
           For each tool call:  Permission ─► PreHook ─► Execute ─► PostHook
                                     │           │          │          │
                                 deny/ask     veto/edit   sandbox    redact
                │
                ▼
           Tool result ──► back into the transcript ──► loop
   """).rstrip())
   await demo_agent_loop()
   await demo_permissions()
   await demo_skills()
   await demo_memory()
   await demo_compaction()
   await demo_multi_agent()
   await demo_real_provider()
   banner("All demos complete 🎉")
   print(textwrap.dedent("""\
       You just built the core of an agent harness:
         • a streaming tool-call loop with retries & cost tracking
         • type-validated, self-describing tools
         • layered governance (permission modes + lifecycle hooks)
         • on-demand skills and persistent memory
         • context auto-compaction
         • nested multi-agent coordination
         • a one-line swap to a real LLM provider
       To go deeper, study the real project: https://github.com/HKUDS/OpenHarness
       (43+ tools, plugin ecosystem, MCP client, React/Ink TUI, the `oh` CLI,
       and the `ohmo` personal agent). "The model is the agent; the code is the
       harness."
   """))
run_async(main())



Source link