ツール、メモリ、権限、スキル、マルチエージェント協調を備えた OpenHarness スタイルのエージェントランタイム設計方法
このチュートリアルは、ツール使用、メモリ管理、権限制御、マルチエージェント調整などを備えた「OpenHarness」というエージェントランタイムをゼロから実装する詳細なガイドを提供し、ブラックボックス化されたフレームワークの内部動作を可視化する。
キーポイント
エージェントシステムの構成要素の再構築
ツール使用、型付きスキーマ、ライフサイクルフック、メモリ、スキル、コンテキスト圧縮などの主要なビルディングブロックを実装し、システムがどのように機能するかを詳細に解説しています。
制御フローの可視化とブラックボックス排除
ユーザータスクからモデルによるアクション決定、ツール呼び出しの検証・実行、観測の返却までの一連の流れを明示的に追跡し、エージェントフレームワークの内部動作を隠蔽せず理解することを目的としています。
実装可能なランタイム環境
API キーや複雑なインフラストラクチャを必要とせず、アーキテクチャの実験が可能となるよう、実行可能なコードベースとして提供されています。
動的な非同期実行環境の構築
ネストされたループやスレッド内でコルーチンを安全に実行するための `run_async` 関数が実装されており、複雑なランタイム環境での並行処理を可能にしています。
型安全なツール引数の自動変換と検証
Python のデータクラス定義から自動的に JSON スキーマを生成し、入力された辞書データを厳密な型(文字列、数値、リストなど)に強制変換・検証する仕組みが実装されています。
階層的なパーミッションとライフサイクル管理
ツール実行前にパスやコマンドの危険性をチェックする `PermissionChecker` と、実行前後のフックを定義して処理を制御・監視する `HookManager` が統合されています。
基盤アーキテクチャの確立
インポート、非同期実行サポート、ヘルパー関数、およびコアデータモデルを含む基礎的なチュートリアルの土台を構築します。
影響分析・編集コメントを表示
影響分析
この記事は、現在の AI エージェント開発における「ブラックボックス化」への対抗策として、内部ロジックの透明性を高めるアプローチを提示しています。開発者がフレームワークの挙動を深く理解し、カスタマイズされた堅牢なエージェントシステムを構築するための実践的な指針となるため、エンジニアリングコミュニティに大きな影響を与える可能性があります。
編集コメント
エージェント開発の現場では、高機能なフレームワークが普及する一方で内部ロジックの理解が追いつかないケースが多く見られます。この記事は、そのギャップを埋めるための貴重な実践的リソースです。
このチュートリアルでは、実用的なエージェントハッチがどのように機能するかをより深く理解するために、OpenHarness をゼロから構築します。ツール使用、型付きツールスキーマ、権限、ライフサイクルフック、メモリ、スキル、コンテキスト圧縮、リトライロジック、コスト追跡、マルチエージェント調整など、エージェントシステムを実用的にする主要な構成要素を再現します。エージェントフレームワークをブラックボックスとして扱うのではなく、フルコントロールフローを公開し、ハッチがユーザータスクを受け取り、モデルに次のアクションを決定させ、ツール呼び出しを検証・実行し、観測結果を返し、タスク完了までループを続ける様子を追跡します。また、実装は実行可能に保ち、API キーや複雑なインフラストラクチャを必要とせずにアーキテクチャを実験できるようにしています。
OpenHarness コアのセットアップ
コードをコピーしました
別のブラウザを使用してください
from __future__ import annotations
import asyncio
import contextlib
import dataclasses
import fnmatch
import io
import json
import os
import re
import tempfile
import textwrap
import time
import traceback
import types
import typing
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from enum import Enum
MISSING = dataclasses.MISSING
UnionType = getattr(types, "UnionType", None)
def run_async(coro):
"""同期コードから、生きているループ内であってもコルーチンを完了まで実行する。"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop is not None and loop.is_running():
try:
import nest_asyncio
nest_asyncio.apply()
return loop.run_until_complete(coro)
except Exception:
import threading
box: dict = {}
def _runner():
new_loop = asyncio.new_event_loop()
try:
box["value"] = new_loop.run_until_complete(coro)
finally:
new_loop.close()
t = threading.Thread(target=_runner)
t.start()
t.join()
return box["value"]
return asyncio.run(coro)
BANNER = "═" * 78
def banner(title: str) -> None:
print("\n" + BANNER)
print(f" {title}")
print(BANNER)
def explain(title: str, body: str) -> None:
banner(title)
print(textwrap.fill(textwrap.dedent(body).strip(), width=78))
print("-" * 78)
def short(text: str, n: int = 240) -> str:
text = " ".join(str(text).split())
return text if len(text) <= n else text[:n] + "..."
class Usage:
def __init__(self, input_tokens: int = 0, output_tokens: int = 0):
self.input_tokens = input_tokens
self.output_tokens = output_tokens
def __add__(self, other) -> "Usage":
if not isinstance(other, Usage):
return NotImplemented
return Usage(self.input_tokens + other.input_tokens,
self.output_tokens + other.output_tokens)
@dataclass
class ToolCall:
id: str
name: str
arguments: dict
@dataclass
class AssistantTurn:
"""モデルによって生成された 1 ターン:テキストと、0 個以上のツール呼び出し。"""
text: str = ""
tool_calls: list = field(default_factory=list)
stop_reason: str = "end_turn"
usage: Usage = field(default_factory=Usage)
@dataclass
class Message:
"""実行中の会話トランスクリプトにおける 1 つのメッセージ。"""
role: str
content: str = ""
tool_calls: list = field(default_factory=list)
tool_call_id: str = ""
name: str = ""
def count_tokens(text: str) -> int:
"""安価でプロバイダー非依存なトークン推定(約 4 文字/1 トークン)。"""
if not text:
return 0
return max(1, round(len(text) / 4))
PRICE_BOOK = {
"mock-sonnet": (3.00, 15.00),
"claude-sonnet-4": (3.00, 15.00),
"gpt-4.1": (2.00, 8.00),
"default": (1.00, 3.00),
}
class CostMeter:
"""トークン使用量を累積し、推定ドルコストに変換する。"""
def __init__(self, model: str):
self.model = model
self.total = Usage()
self.calls = 0
def add(self, usage: Usage) -> None:
self.total = self.total + usage
self.calls += 1
@property
def dollars(self) -> float:
pin, pout = PRICE_BOOK.get(self.model, PRICE_BOOK["default"])
return (self.total.input_tokens / 1e6) * pin + \
(self.total.output_tokens / 1e6) * pout
def summary(self) -> str:
return (f"{self.calls} model call(s) | "
f"in={self.total.input_tokens} out={self.total.output_tokens} tok | "
f"~${self.dollars:.5f} ({self.model})")
def fld(description: str = "", default=MISSING, default_factory=MISSING):
"""説明(およびオプションのデフォルト値)付きのツール入力フィールドを宣言する。"""
md = {"description": description}
if default_factory is not MISSING:
return field(default_factory=default_factory, metadata=md)
if default is not MISSING:
return field(default=default, metadata=md)
return field(metadata=md)
def _is_optional(t) -> bool:
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType is not None and origin is UnionType):
return type(None) in typing.get_args(t)
return False
def _py_to_json_type(t) -> dict:
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType is not None and origin is UnionType):
args = [a for a in typing.get_args(t) if a is not type(None)]
return _py_to_json_type(args[0]) if args else {"type": "string"}
if t is str:
return {"type": "string"}
if t is bool:
return {"type": "boolean"}
if t is int:
return {"type": "integer"}
if t is float:
return {"type": "number"}
if origin is list or t is list:
args = typing.get_args(t)
item = _py_to_json_type(args[0]) if args else {"type": "string"}
return {"type": "array", "items": item}
if origin is dict or t is dict:
return {"type": "object"}
return {"type": "string"}
def build_json_schema(model_cls) -> dict:
"""dataclass 入力モデルを JSON Schema(プロパティを持つオブジェクト)に変換する。"""
hints = typing.get_type_hints(model_cls)
props, required = {}, []
for f in dataclasses.fields(model_cls):
t = hints.get(f.name, str)
js = dict(_py_to_json_type(t))
desc = f.metadata.get("description", "")
if desc:
js["description"] = desc
props[f.name] = js
has_default = (f.default is not MISSING) or (f.default_factory is not MISSING)
if not has_default and not _is_optional(t):
required.append(f.name)
schema = {"type": "object", "properties": props}
if required:
schema["required"] = required
return schema
def _coerce(v, t):
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType is not None and origin is UnionType):
if v is None:
return None
args = [a for a in typing.get_args(t) if a is not type(None)]
return _coerce(v, args[0]) if args else v
if t is str:
return v if isinstance(v, str) else str(v)
if t is bool:
if isinstance(v, bool):
return v
if isinstance(v, str):
return v.strip().lower() in ("1", "true", "yes", "y", "on")
return bool(v)
if t is int:
return int(v)
if t is float:
return float(v)
if origin is list or t is list:
args = typing.get_args(t)
it = args[0] if args else str
if not isinstance(v, list):
v = [v]
return [_coerce(x, it) for x in v]
if origin is dict or t is dict:
return dict(v) if v else {}
return v
def instantiate(model_cls, raw: dict):
"""検証および型付き入力インスタンスへの生の JSON 引数の強制変換。"""
hints = typing.get_type_hints(model_cls)
raw = raw or {}
kwargs = {}
for f in dataclasses.fields(model_cls):
t = hints.get(f.name, str)
if f.name in raw and raw[f.name] is not None:
try:
kwargs[f.name] = _coerce(raw[f.name], t)
except (TypeError, ValueError) as e:
raise ValueError(f"'{f.name}' の値が不正です: {e}")
elif f.default is not MISSING:
kwargs[f.name] = f.default
elif f.default_factory is not MISSING:
kwargs[f.name] = f.default_factory()
elif _is_optional(t):
kwargs[f.name] = None
else:
raise ValueError(f"必須引数 '{f.name}' が不足しています")
return model_cls(**kwargs)
class PermissionKind(Enum):
"""ツールの危険度 — デフォルトの権限ポリシーを決定する。"""
READ = "read"
WRITE = "write"
EXECUTE = "execute"
META = "meta"
@dataclass
class ToolResult:
output: str
is_error: bool = False
metadata: dict = field(default_factory=dict)
class ToolContext:
"""ツールが実行時に必要とするすべてのもの(サービスと共有状態)。"""
def __init__(self, **services):
self.__dict__.update(services)
class BaseTool:
"""すべてのツールの基底クラス。サブクラスは name/description/InputModel/kind を設定し、execute を実装する。
スキーマと検証はここで処理される。"""
name: str = "base"
description: str = ""
InputModel = None
kind: PermissionKind = PermissionKind.READ
def schema(self) -> dict:
return {
"name": self.name,
"description": self.description,
"kind": self.kind.value,
"input_schema": (build_json_schema(self.InputModel)
if self.InputModel else
{"type": "object", "properties": {}}),
}
async def run(self, raw_args: dict, ctx: ToolContext) -> ToolResult:
args = instantiate(self.InputModel, raw_args) if self.InputModel else None
return await self.execute(args, ctx)
async def execute(self, args, ctx: ToolContext) -> ToolResult:
raise NotImplementedError
class ToolRegistry:
def __init__(self):
self._tools: dict = {}
def register(self, tool: BaseTool) -> "ToolRegistry":
self._tools[tool.name] = tool
return self
def get(self, name: str) -> BaseTool | None:
return self._tools.get(name)
def schemas(self) -> list:
return [t.schema() for t in self._tools.values()]
def names(self) -> list:
return list(self._tools)
class VirtualFS:
"""メモリ上のファイルシステム。チュートリアルを安全かつ Colab で決定論的に保つ。"""
def __init__(self):
self.files: dict = {}
@staticmethod
def norm(path: str) -> str:
return path.lstrip("./").strip()
def write(self, path: str, content: str) -> None:
self.files[self.norm(path)] = content
def read(self, path: str) -> str:
return self.files[self.norm(path)]
def exists(self, path: str) -> bool:
return self.norm(path) in self.files
def list(self, pattern: str = "*") -> list:
return sorted(p for p in self.files if fnmatch.fnmatch(p, pattern))
def tree(self) -> str:
if not self.files:
return "(empty)"
return "\n".join(f" {p} ({len(c)} bytes)"
for p, c in sorted(self.files.items()))
class PermissionMode(Enum):
DEFAULT = "default"
AUTO = "auto"
PLAN = "plan"
@dataclass
class PermissionDecision:
action: str
reason: str = ""
SENSITIVE_PATTERNS = [
"/etc/*", "*/.ssh/*", "*.pem", "*id_rsa*", "*/.aws/*",
"*credentials*", "*.env", "*/secrets/*",
]
class PermissionChecker:
def __init__(self, mode: PermissionMode = PermissionMode.DEFAULT,
path_rules: list | None = None,
denied_commands: list | None = None):
self.mode = mode
self.path_rules = path_rules or []
self.denied_commands = denied_commands or []
def _check_path(self, path: str) -> PermissionDecision | None:
for pat in SENSITIVE_PATTERNS:
if fnmatch.fnmatch(path, pat):
return PermissionDecision("deny", f"機密パス '{path}' ({pat})")
for rule in self.path_rules:
if fnmatch.fnmatch(path, rule["pattern"]):
if rule.get("allow", True):
return PermissionDecision("allow", f"パスルールが '{rule['pattern']}' を許可します")
return PermissionDecision("deny", f"パスルールが '{rule['pattern']}' をブロックします")
return None
def _check_command(self, command: str) -> PermissionDecision | None:
for pat in self.denied_commands:
if re.search(pat, command):
return PermissionDecision("deny", f"拒否されたコマンドが一致しました /{pat}/")
return None
def check(self, tool: BaseTool, args: dict) -> PermissionDecision:
if "path" in args and tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE):
d = self._check_path(str(args["path"]))
if d:
return d
if "command" in args:
d = self._check_command(str(args["command"]))
if d:
return d
if self.mode is PermissionMode.AUTO:
return PermissionDecision("allow", "自動モード")
if self.mode is PermissionMode.PLAN:
if tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE):
return PermissionDecision("deny", "プランモードは書き込み/実行をブロックします")
return PermissionDecision("allow", "プランモードは読み取りを許可します")
if tool.kind in (PermissionKind.READ, PermissionKind.META):
return PermissionDecision("allow", "安全なツール")
return PermissionDecision("ask", f"{tool.kind.value} には承認が必要です")
async def auto_approve(tool, args, reason) -> bool:
print(f"
image 承認が必要: {tool.name} ({reason}) -> [自動承認済み]")
return True
async def interactive_approve(tool, args, reason) -> bool:
ans = input(f"
image {tool.name}({short(json.dumps(args), 80)}) を許可しますか? [y/N] ")
return ans.strip().lower().startswith("y")
@dataclass
class HookOutcome:
blocked: bool = False
reason: str = ""
arguments: dict | None = None
class HookManager:
"""すべてのツール呼び出し周りのライフサイクルイベント(PreToolUse/PostToolUse など)。"""
def __init__(self):
self.pre: list = []
self.post: list = []
def add_pre(self, fn):
self.pre.append(fn); return self
def add_post(self, fn):
self.post.append(fn); return self
def run_pre(self, call: ToolCall, tool: BaseTool, ctx: ToolContext) -> HookOutcome:
args = dict(call.arguments)
for fn in self.pre:
out = fn(call, tool, ctx)
if out is None:
continue
if out.blocked:
return out
if out.arguments is not None:
args = out.arguments
return HookOutcome(arguments=args)
def run_post(self, call, tool, result: ToolResult, ctx) -> ToolResult:
for fn in self.post:
new = fn(call, tool, result, ctx)
if new is not None:
result = new
return result
OpenHarness スタイルのチュートリアルの基盤を確立することから始めます。これには、インポート、非同期実行サポート、ヘルパー関数、およびコアデータモデルが含まれます。メッセージ、ツール呼び出し、使用状況追跡、トークンカウント、コスト推定、権限モード、フック、そして実行の安全性を保つ仮想ファイルシステムを定義します。このスニペットは、その後のすべてのツール、エージェントループ、デモに依存する基本アーキテクチャを確立するために使用されます。
ツールレイヤーの構築
コードをコピーしました
別のブラウザを使用してください
@dataclass
class WriteFileInput:
path: str = fld("ファイルの書き込み先パス")
content: str = fld("ファイル全体のコンテンツ")
class WriteFileTool(BaseTool):
name = "write_file"
description = "指定されたコンテンツでファイルを作成または上書きします。"
InputModel = WriteFileInput
kind = PermissionKind.WRITE
async def execute(self, args: WriteFileInput, ctx) -> ToolResult:
ctx.vfs.write(args.path, args.content)
return ToolResult(f"{len(args.content)} バイトを {args.path} に書き込みました")
@dataclass
class ReadFileInput:
path: str = fld("読み込むファイルのパス")
class ReadFileTool(BaseTool):
name = "read_file"
description = "ファイル全体のコンテンツを読み取ります。"
InputModel = ReadFileInput
kind = PermissionKind.READ
async def execute(self, args: ReadFileInput, ctx) -> ToolResult:
if not ctx.vfs.exists(args.path):
return ToolResult(f"そのようなファイルはありません:{args.path}", is_error=True)
return ToolResult(ctx.vfs.read(args.path))
@dataclass
class EditInput:
path: str = fld("編集対象のファイル")
old: str = fld("置換する正確な部分文字列")
new: str = fld("置換後のテキスト")
class EditTool(BaseTool):
name = "edit"
description = "ファイル内の old の最初の出現箇所を new に置き換えます。"
InputModel = EditInput
kind = PermissionKind.WRITE
async def execute(self, args: EditInput, ctx) -> ToolResult:
if not ctx.vfs.exists(args.path):
return ToolResult(f"そのようなファイルはありません:{args.path}", is_error=True)
text = ctx.vfs.read(args.path)
if args.old not in text:
return ToolResult(f"old が {args.path} に見つかりません", is_error=True)
ctx.vfs.write(args.path, text.replace(args.old, args.new, 1))
return ToolResult(f"{args.path} を編集しました:出現箇所を 1 つ置換しました。")
@dataclass
class ListFilesInput:
pattern: str = fld("グロブパターン", default="*")
class ListFilesTool(BaseTool):
name = "list_files"
description = "グロブパターンに一致するファイルをリストアップします。"
InputModel = ListFilesInput
kind = PermissionKind.READ
async def execute(self, args: ListFilesInput, ctx) -> ToolResult:
files = ctx.vfs.list(args.pattern)
return ToolResult("\n".join(files) if files else "(一致なし)")
@dataclass
class GrepInput:
pattern: str = fld("検索する正規表現")
path_glob: str = fld("対象ファイル", default="*")
class GrepTool(BaseTool):
name = "grep"
description = "ファイルコンテンツを正規表現で検索します。"
InputModel = GrepInput
kind = PermissionKind.READ
async def execute(self, args: GrepInput, ctx) -> ToolResult:
rx = re.compile(args.pattern)
hits = []
for p in ctx.vfs.list(args.path_glob):
for i, line in enumerate(ctx.vfs.read(p).splitlines(), 1):
if rx.search(line):
hits.append(f"{p}:{i}: {line.strip()}")
return ToolResult("\n".join(hits) if hits else "(一致なし)")
@dataclass
class RunPythonInput:
files: list = fld("名前空間内で順番に実行する VFS ファイル", default_factory=list)
code: str = fld("ファイル実行後に追加で実行する Python コード", default="")
class RunPythonTool(BaseTool):
name = "run_python"
description = ("仮想ファイルシステム(および/またはインラインスニペット)から Python を実行し、標準出力をキャプチャします。テストを実行するために使用されます。")
InputModel = RunPythonInput
kind = PermissionKind.EXECUTE
async def execute(self, args: RunPythonInput, ctx) -> ToolResult:
source_parts = []
for p in args.files:
if not ctx.vfs.exists(p):
return ToolResult(f"そのようなファイルはありません:{p}", is_error=True)
source_parts.append(ctx.vfs.read(p))
if args.code:
source_parts.append(args.code)
source = "\n\n".join(source_parts)
buf = io.StringIO()
sandbox_globals = {"__name__": "__main__", "__builtins__": __builtins__}
try:
with contextlib.redirect_stdout(buf):
exec(compile(source, "", "exec"), sandbox_globals)
except Exception as e:
frames = [f for f in traceback.extract_tb(e.__traceback__)
if f.filename == ""]
loc = ""
if frames:
last = frames[-1]
src_lines = source.splitlines()
text = (src_lines[last.lineno - 1].strip()
if 0 < last.lineno <= len(src_lines) else "")
loc = f" (line {last.lineno}: {text})" if text else f" (line {last.lineno})"
out = buf.getvalue()
msg = f"{type(e).__name__}: {e}{loc}"
return ToolResult(f"{out}\n{msg}".strip(), is_error=True)
return ToolResult(buf.getvalue().strip() or "(実行しましたが出力はありません)")
@dataclass
class
原文を表示
In this tutorial, we build OpenHarness from scratch to better understand how a practical agent harness works. We recreate the major building blocks that make an agent system useful, including tool use, typed tool schemas, permissions, lifecycle hooks, memory, skills, context compaction, retry logic, cost tracking, and multi-agent coordination. Instead of treating an agent framework as a black box, we expose the full control flow and watch how the harness receives a user task, lets the model decide the next action, validates and executes tool calls, returns observations, and continues the loop until the task is complete. We also keep the implementation runnable so we can experiment with the architecture without needing API keys or complex infrastructure.
Setting Up the OpenHarness Core
Copy CodeCopiedUse a different Browser
from __future__ import annotations
import asyncio
import contextlib
import dataclasses
import fnmatch
import io
import json
import os
import re
import tempfile
import textwrap
import time
import traceback
import types
import typing
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from enum import Enum
MISSING = dataclasses.MISSING
UnionType = getattr(types, "UnionType", None)
def run_async(coro):
"""Run a coroutine to completion from sync code, even inside a live loop."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop is not None and loop.is_running():
try:
import nest_asyncio
nest_asyncio.apply()
return loop.run_until_complete(coro)
except Exception:
import threading
box: dict = {}
def _runner():
new_loop = asyncio.new_event_loop()
try:
box["value"] = new_loop.run_until_complete(coro)
finally:
new_loop.close()
t = threading.Thread(target=_runner)
t.start()
t.join()
return box["value"]
return asyncio.run(coro)
BANNER = "═" * 78
def banner(title: str) -> None:
print("\n" + BANNER)
print(f" {title}")
print(BANNER)
def explain(title: str, body: str) -> None:
banner(title)
print(textwrap.fill(textwrap.dedent(body).strip(), width=78))
print("-" * 78)
def short(text: str, n: int = 240) -> str:
text = " ".join(str(text).split())
return text if len(text) <= n else text[: n - 1] + "…"
@dataclass
class Usage:
input_tokens: int = 0
output_tokens: int = 0
def __add__(self, other: "Usage") -> "Usage":
return Usage(self.input_tokens + other.input_tokens,
self.output_tokens + other.output_tokens)
@dataclass
class ToolCall:
id: str
name: str
arguments: dict
@dataclass
class AssistantTurn:
"""One turn produced by the model: some text + zero or more tool calls."""
text: str = ""
tool_calls: list = field(default_factory=list)
stop_reason: str = "end_turn"
usage: Usage = field(default_factory=Usage)
@dataclass
class Message:
"""A single message in the running conversation transcript."""
role: str
content: str = ""
tool_calls: list = field(default_factory=list)
tool_call_id: str = ""
name: str = ""
def count_tokens(text: str) -> int:
"""Cheap, provider-agnostic token estimate (~4 chars/token)."""
if not text:
return 0
return max(1, round(len(text) / 4))
PRICE_BOOK = {
"mock-sonnet": (3.00, 15.00),
"claude-sonnet-4": (3.00, 15.00),
"gpt-4.1": (2.00, 8.00),
"default": (1.00, 3.00),
}
class CostMeter:
"""Accumulates token usage and converts it to an estimated dollar cost."""
def __init__(self, model: str):
self.model = model
self.total = Usage()
self.calls = 0
def add(self, usage: Usage) -> None:
self.total = self.total + usage
self.calls += 1
@property
def dollars(self) -> float:
pin, pout = PRICE_BOOK.get(self.model, PRICE_BOOK["default"])
return (self.total.input_tokens / 1e6) * pin + \
(self.total.output_tokens / 1e6) * pout
def summary(self) -> str:
return (f"{self.calls} model call(s) | "
f"in={self.total.input_tokens} out={self.total.output_tokens} tok | "
f"~${self.dollars:.5f} ({self.model})")
def fld(description: str = "", default=MISSING, default_factory=MISSING):
"""Declare a tool-input field with a description (and optional default)."""
md = {"description": description}
if default_factory is not MISSING:
return field(default_factory=default_factory, metadata=md)
if default is not MISSING:
return field(default=default, metadata=md)
return field(metadata=md)
def _is_optional(t) -> bool:
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType is not None and origin is UnionType):
return type(None) in typing.get_args(t)
return False
def _py_to_json_type(t) -> dict:
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType is not None and origin is UnionType):
args = [a for a in typing.get_args(t) if a is not type(None)]
return _py_to_json_type(args[0]) if args else {"type": "string"}
if t is str:
return {"type": "string"}
if t is bool:
return {"type": "boolean"}
if t is int:
return {"type": "integer"}
if t is float:
return {"type": "number"}
if origin is list or t is list:
args = typing.get_args(t)
item = _py_to_json_type(args[0]) if args else {"type": "string"}
return {"type": "array", "items": item}
if origin is dict or t is dict:
return {"type": "object"}
return {"type": "string"}
def build_json_schema(model_cls) -> dict:
"""Turn a dataclass input model into a JSON Schema (object with properties)."""
hints = typing.get_type_hints(model_cls)
props, required = {}, []
for f in dataclasses.fields(model_cls):
t = hints.get(f.name, str)
js = dict(_py_to_json_type(t))
desc = f.metadata.get("description", "")
if desc:
js["description"] = desc
props[f.name] = js
has_default = (f.default is not MISSING) or (f.default_factory is not MISSING)
if not has_default and not _is_optional(t):
required.append(f.name)
schema = {"type": "object", "properties": props}
if required:
schema["required"] = required
return schema
def _coerce(v, t):
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType is not None and origin is UnionType):
if v is None:
return None
args = [a for a in typing.get_args(t) if a is not type(None)]
return _coerce(v, args[0]) if args else v
if t is str:
return v if isinstance(v, str) else str(v)
if t is bool:
if isinstance(v, bool):
return v
if isinstance(v, str):
return v.strip().lower() in ("1", "true", "yes", "y", "on")
return bool(v)
if t is int:
return int(v)
if t is float:
return float(v)
if origin is list or t is list:
args = typing.get_args(t)
it = args[0] if args else str
if not isinstance(v, list):
v = [v]
return [_coerce(x, it) for x in v]
if origin is dict or t is dict:
return dict(v) if v else {}
return v
def instantiate(model_cls, raw: dict):
"""Validate + coerce raw JSON args into a typed input instance."""
hints = typing.get_type_hints(model_cls)
raw = raw or {}
kwargs = {}
for f in dataclasses.fields(model_cls):
t = hints.get(f.name, str)
if f.name in raw and raw[f.name] is not None:
try:
kwargs[f.name] = _coerce(raw[f.name], t)
except (TypeError, ValueError) as e:
raise ValueError(f"Bad value for '{f.name}': {e}")
elif f.default is not MISSING:
kwargs[f.name] = f.default
elif f.default_factory is not MISSING:
kwargs[f.name] = f.default_factory()
elif _is_optional(t):
kwargs[f.name] = None
else:
raise ValueError(f"Missing required argument '{f.name}'")
return model_cls(**kwargs)
class PermissionKind(Enum):
"""How dangerous a tool is — drives the default permission policy."""
READ = "read"
WRITE = "write"
EXECUTE = "execute"
META = "meta"
@dataclass
class ToolResult:
output: str
is_error: bool = False
metadata: dict = field(default_factory=dict)
class ToolContext:
"""Everything a tool may need at runtime (services + shared state)."""
def __init__(self, **services):
self.__dict__.update(services)
class BaseTool:
"""Base class for all tools. Subclasses set name/description/InputModel/kind
and implement execute. Schema + validation are handled here."""
name: str = "base"
description: str = ""
InputModel = None
kind: PermissionKind = PermissionKind.READ
def schema(self) -> dict:
return {
"name": self.name,
"description": self.description,
"kind": self.kind.value,
"input_schema": (build_json_schema(self.InputModel)
if self.InputModel else
{"type": "object", "properties": {}}),
}
async def run(self, raw_args: dict, ctx: ToolContext) -> ToolResult:
args = instantiate(self.InputModel, raw_args) if self.InputModel else None
return await self.execute(args, ctx)
async def execute(self, args, ctx: ToolContext) -> ToolResult:
raise NotImplementedError
class ToolRegistry:
def __init__(self):
self._tools: dict = {}
def register(self, tool: BaseTool) -> "ToolRegistry":
self._tools[tool.name] = tool
return self
def get(self, name: str) -> BaseTool | None:
return self._tools.get(name)
def schemas(self) -> list:
return [t.schema() for t in self._tools.values()]
def names(self) -> list:
return list(self._tools)
class VirtualFS:
"""In-memory filesystem. Keeps the tutorial safe & deterministic in Colab."""
def __init__(self):
self.files: dict = {}
@staticmethod
def norm(path: str) -> str:
return path.lstrip("./").strip()
def write(self, path: str, content: str) -> None:
self.files[self.norm(path)] = content
def read(self, path: str) -> str:
return self.files[self.norm(path)]
def exists(self, path: str) -> bool:
return self.norm(path) in self.files
def list(self, pattern: str = "*") -> list:
return sorted(p for p in self.files if fnmatch.fnmatch(p, pattern))
def tree(self) -> str:
if not self.files:
return "(empty)"
return "\n".join(f" {p} ({len(c)} bytes)"
for p, c in sorted(self.files.items()))
class PermissionMode(Enum):
DEFAULT = "default"
AUTO = "auto"
PLAN = "plan"
@dataclass
class PermissionDecision:
action: str
reason: str = ""
SENSITIVE_PATTERNS = [
"/etc/*", "*/.ssh/*", "*.pem", "*id_rsa*", "*/.aws/*",
"*credentials*", "*.env", "*/secrets/*",
]
class PermissionChecker:
def __init__(self, mode: PermissionMode = PermissionMode.DEFAULT,
path_rules: list | None = None,
denied_commands: list | None = None):
self.mode = mode
self.path_rules = path_rules or []
self.denied_commands = denied_commands or []
def _check_path(self, path: str) -> PermissionDecision | None:
for pat in SENSITIVE_PATTERNS:
if fnmatch.fnmatch(path, pat):
return PermissionDecision("deny", f"sensitive path '{path}' ({pat})")
for rule in self.path_rules:
if fnmatch.fnmatch(path, rule["pattern"]):
if rule.get("allow", True):
return PermissionDecision("allow", f"path rule allows '{rule['pattern']}'")
return PermissionDecision("deny", f"path rule blocks '{rule['pattern']}'")
return None
def _check_command(self, command: str) -> PermissionDecision | None:
for pat in self.denied_commands:
if re.search(pat, command):
return PermissionDecision("deny", f"denied command matched /{pat}/")
return None
def check(self, tool: BaseTool, args: dict) -> PermissionDecision:
if "path" in args and tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE):
d = self._check_path(str(args["path"]))
if d:
return d
if "command" in args:
d = self._check_command(str(args["command"]))
if d:
return d
if self.mode is PermissionMode.AUTO:
return PermissionDecision("allow", "auto mode")
if self.mode is PermissionMode.PLAN:
if tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE):
return PermissionDecision("deny", "plan mode blocks writes/executes")
return PermissionDecision("allow", "plan mode allows reads")
if tool.kind in (PermissionKind.READ, PermissionKind.META):
return PermissionDecision("allow", "safe tool")
return PermissionDecision("ask", f"{tool.kind.value} requires approval")
async def auto_approve(tool, args, reason) -> bool:
print(f"
image approval needed: {tool.name} ({reason}) -> [auto-approved]")
return True
async def interactive_approve(tool, args, reason) -> bool:
ans = input(f"
image Allow {tool.name}({short(json.dumps(args), 80)})? [y/N] ")
return ans.strip().lower().startswith("y")
@dataclass
class HookOutcome:
blocked: bool = False
reason: str = ""
arguments: dict | None = None
class HookManager:
"""Lifecycle events around every tool call (like PreToolUse/PostToolUse)."""
def __init__(self):
self.pre: list = []
self.post: list = []
def add_pre(self, fn):
self.pre.append(fn); return self
def add_post(self, fn):
self.post.append(fn); return self
def run_pre(self, call: ToolCall, tool: BaseTool, ctx: ToolContext) -> HookOutcome:
args = dict(call.arguments)
for fn in self.pre:
out = fn(call, tool, ctx)
if out is None:
continue
if out.blocked:
return out
if out.arguments is not None:
args = out.arguments
return HookOutcome(arguments=args)
def run_post(self, call, tool, result: ToolResult, ctx) -> ToolResult:
for fn in self.post:
new = fn(call, tool, result, ctx)
if new is not None:
result = new
return result
We begin by establishing the foundation for the OpenHarness-style tutorial, including imports, async execution support, helper functions, and core data models. We define messages, tool calls, usage tracking, token counting, cost estimation, permission modes, hooks, and the virtual filesystem that keeps execution safe. We use this snippet to establish the basic architecture on which all subsequent tools, agent loops, and demos depend.
Building the Tool Layer
Copy CodeCopiedUse a different Browser
@dataclass
class WriteFileInput:
path: str = fld("File path to write")
content: str = fld("Full file content")
class WriteFileTool(BaseTool):
name = "write_file"
description = "Create or overwrite a file with the given content."
InputModel = WriteFileInput
kind = PermissionKind.WRITE
async def execute(self, args: WriteFileInput, ctx) -> ToolResult:
ctx.vfs.write(args.path, args.content)
return ToolResult(f"Wrote {len(args.content)} bytes to {args.path}")
@dataclass
class ReadFileInput:
path: str = fld("File path to read")
class ReadFileTool(BaseTool):
name = "read_file"
description = "Read the full contents of a file."
InputModel = ReadFileInput
kind = PermissionKind.READ
async def execute(self, args: ReadFileInput, ctx) -> ToolResult:
if not ctx.vfs.exists(args.path):
return ToolResult(f"No such file: {args.path}", is_error=True)
return ToolResult(ctx.vfs.read(args.path))
@dataclass
class EditInput:
path: str = fld("File to edit")
old: str = fld("Exact substring to replace")
new: str = fld("Replacement text")
class EditTool(BaseTool):
name = "edit"
description = "Replace the first occurrence of old with new in a file."
InputModel = EditInput
kind = PermissionKind.WRITE
async def execute(self, args: EditInput, ctx) -> ToolResult:
if not ctx.vfs.exists(args.path):
return ToolResult(f"No such file: {args.path}", is_error=True)
text = ctx.vfs.read(args.path)
if args.old not in text:
return ToolResult(f"old not found in {args.path}", is_error=True)
ctx.vfs.write(args.path, text.replace(args.old, args.new, 1))
return ToolResult(f"Edited {args.path}: replaced 1 occurrence.")
@dataclass
class ListFilesInput:
pattern: str = fld("Glob pattern", default="*")
class ListFilesTool(BaseTool):
name = "list_files"
description = "List files matching a glob pattern."
InputModel = ListFilesInput
kind = PermissionKind.READ
async def execute(self, args: ListFilesInput, ctx) -> ToolResult:
files = ctx.vfs.list(args.pattern)
return ToolResult("\n".join(files) if files else "(no matches)")
@dataclass
class GrepInput:
pattern: str = fld("Regex to search for")
path_glob: str = fld("Which files to search", default="*")
class GrepTool(BaseTool):
name = "grep"
description = "Search file contents with a regular expression."
InputModel = GrepInput
kind = PermissionKind.READ
async def execute(self, args: GrepInput, ctx) -> ToolResult:
rx = re.compile(args.pattern)
hits = []
for p in ctx.vfs.list(args.path_glob):
for i, line in enumerate(ctx.vfs.read(p).splitlines(), 1):
if rx.search(line):
hits.append(f"{p}:{i}: {line.strip()}")
return ToolResult("\n".join(hits) if hits else "(no matches)")
@dataclass
class RunPythonInput:
files: list = fld("VFS files to exec in order in one namespace",
default_factory=list)
code: str = fld("Extra Python code to run after the files", default="")
class RunPythonTool(BaseTool):
name = "run_python"
description = ("Execute Python from the virtual filesystem (and/or an inline "
"snippet) and capture stdout. Used to run tests.")
InputModel = RunPythonInput
kind = PermissionKind.EXECUTE
async def execute(self, args: RunPythonInput, ctx) -> ToolResult:
source_parts = []
for p in args.files:
if not ctx.vfs.exists(p):
return ToolResult(f"No such file: {p}", is_error=True)
source_parts.append(ctx.vfs.read(p))
if args.code:
source_parts.append(args.code)
source = "\n\n".join(source_parts)
buf = io.StringIO()
sandbox_globals = {"__name__": "__main__", "__builtins__": __builtins__}
try:
with contextlib.redirect_stdout(buf):
exec(compile(source, "<agent_code>", "exec"), sandbox_globals)
except Exception as e:
frames = [f for f in traceback.extract_tb(e.__traceback__)
if f.filename == "<agent_code>"]
loc = ""
if frames:
last = frames[-1]
src_lines = source.splitlines()
text = (src_lines[last.lineno - 1].strip()
if 0 < last.lineno <= len(src_lines) else "")
loc = f" (line {last.lineno}: {text})" if text else f" (line {last.lineno})"
out = buf.getvalue()
msg = f"{type(e).__name__}: {e}{loc}"
return ToolResult(f"{out}\n{msg}".strip(), is_error=True)
return ToolResult(buf.getvalue().strip() or "(ran with no output)")
@dataclass
class
関連記事
[AINews] メタハーネスの夏が到来
メタハーネスの歴史を振り返り、ダatabricks の CTO マタイ・ザハリヤ氏が、あらゆるコーディングや知識作業を取り込むためのオープンソースでプラグ可能アーキテクチャ「オムニジェント」に賭けていると報じる。
ブラウザ互換性データベースをSQLite化
Simon Willison氏が、MozillaのMDNが提供する包括的なブラウザ互換性情報データを基に、SQLiteデータベース形式に変換するプロジェクト「simonw/browser-compat-db」を開始した。
2026 年版生成 AI コーディングツール比較:16 の最良ツールと特徴、最適な用途
MarkTechPost は、ソフトウェア構築を再定義した生成 AI ツールの中から、2026 年に最も優れた 16 のツールを選定し、各機能や開発者への適合性を比較して紹介している。
今日のまとめ
AI日報で今日の重要ニュースをまとめ読み