AIニュース最前線
最新ニュースAI日報Hacker日報週報動画AIツールトレンド企業

AIニュース最前線

世界中のAI最新情報を日本語で毎時更新

最新ニュース日報トレンド企業このサイトについてRSS
© 2026 ainew.jp
お問い合わせ特定商取引法に基づく表記
ニュース一覧元記事を開く
MarkTechPost·2026年6月25日 04:08·約13分で読める

ツール、メモリ、権限、スキル、マルチエージェント協調を備えた OpenHarness スタイルのエージェントランタイム設計方法

#Agent Frameworks#LLM Orchestration#Open Source#Tool Use#Multi-Agent Systems
TL;DR

このチュートリアルは、ツール使用、メモリ管理、権限制御、マルチエージェント調整などを備えた「OpenHarness」というエージェントランタイムをゼロから実装する詳細なガイドを提供し、ブラックボックス化されたフレームワークの内部動作を可視化する。

AI深層分析2026年6月24日 20:02
4
重要/ 5段階
深度40%
5
関連度30%
5
実用性20%
4
革新性10%
3

キーポイント

1

エージェントシステムの構成要素の再構築

ツール使用、型付きスキーマ、ライフサイクルフック、メモリ、スキル、コンテキスト圧縮などの主要なビルディングブロックを実装し、システムがどのように機能するかを詳細に解説しています。

2

制御フローの可視化とブラックボックス排除

ユーザータスクからモデルによるアクション決定、ツール呼び出しの検証・実行、観測の返却までの一連の流れを明示的に追跡し、エージェントフレームワークの内部動作を隠蔽せず理解することを目的としています。

3

実装可能なランタイム環境

API キーや複雑なインフラストラクチャを必要とせず、アーキテクチャの実験が可能となるよう、実行可能なコードベースとして提供されています。

4

動的な非同期実行環境の構築

ネストされたループやスレッド内でコルーチンを安全に実行するための `run_async` 関数が実装されており、複雑なランタイム環境での並行処理を可能にしています。

5

型安全なツール引数の自動変換と検証

Python のデータクラス定義から自動的に JSON スキーマを生成し、入力された辞書データを厳密な型(文字列、数値、リストなど)に強制変換・検証する仕組みが実装されています。

6

階層的なパーミッションとライフサイクル管理

ツール実行前にパスやコマンドの危険性をチェックする `PermissionChecker` と、実行前後のフックを定義して処理を制御・監視する `HookManager` が統合されています。

7

基盤アーキテクチャの確立

インポート、非同期実行サポート、ヘルパー関数、およびコアデータモデルを含む基礎的なチュートリアルの土台を構築します。

影響分析・編集コメントを表示

影響分析

この記事は、現在の AI エージェント開発における「ブラックボックス化」への対抗策として、内部ロジックの透明性を高めるアプローチを提示しています。開発者がフレームワークの挙動を深く理解し、カスタマイズされた堅牢なエージェントシステムを構築するための実践的な指針となるため、エンジニアリングコミュニティに大きな影響を与える可能性があります。

編集コメント

エージェント開発の現場では、高機能なフレームワークが普及する一方で内部ロジックの理解が追いつかないケースが多く見られます。この記事は、そのギャップを埋めるための貴重な実践的リソースです。

このチュートリアルでは、実用的なエージェントハッチがどのように機能するかをより深く理解するために、OpenHarness をゼロから構築します。ツール使用、型付きツールスキーマ、権限、ライフサイクルフック、メモリ、スキル、コンテキスト圧縮、リトライロジック、コスト追跡、マルチエージェント調整など、エージェントシステムを実用的にする主要な構成要素を再現します。エージェントフレームワークをブラックボックスとして扱うのではなく、フルコントロールフローを公開し、ハッチがユーザータスクを受け取り、モデルに次のアクションを決定させ、ツール呼び出しを検証・実行し、観測結果を返し、タスク完了までループを続ける様子を追跡します。また、実装は実行可能に保ち、API キーや複雑なインフラストラクチャを必要とせずにアーキテクチャを実験できるようにしています。

OpenHarness コアのセットアップ

コードをコピーしました

別のブラウザを使用してください

from __future__ import annotations

import asyncio

import contextlib

import dataclasses

import fnmatch

import io

import json

import os

import re

import tempfile

import textwrap

import time

import traceback

import types

import typing

import urllib.error

import urllib.request

from dataclasses import dataclass, field

from enum import Enum

MISSING = dataclasses.MISSING

UnionType = getattr(types, "UnionType", None)

def run_async(coro):

"""同期コードから、生きているループ内であってもコルーチンを完了まで実行する。"""

try:

loop = asyncio.get_running_loop()

except RuntimeError:

loop = None

if loop is not None and loop.is_running():

try:

import nest_asyncio

nest_asyncio.apply()

return loop.run_until_complete(coro)

except Exception:

import threading

box: dict = {}

def _runner():

new_loop = asyncio.new_event_loop()

try:

box["value"] = new_loop.run_until_complete(coro)

finally:

new_loop.close()

t = threading.Thread(target=_runner)

t.start()

t.join()

return box["value"]

return asyncio.run(coro)

BANNER = "═" * 78

def banner(title: str) -> None:

print("\n" + BANNER)

print(f" {title}")

print(BANNER)

def explain(title: str, body: str) -> None:

banner(title)

print(textwrap.fill(textwrap.dedent(body).strip(), width=78))

print("-" * 78)

def short(text: str, n: int = 240) -> str:

text = " ".join(str(text).split())

return text if len(text) <= n else text[:n] + "..."

class Usage:

def __init__(self, input_tokens: int = 0, output_tokens: int = 0):

self.input_tokens = input_tokens

self.output_tokens = output_tokens

def __add__(self, other) -> "Usage":

if not isinstance(other, Usage):

return NotImplemented

return Usage(self.input_tokens + other.input_tokens,

self.output_tokens + other.output_tokens)

@dataclass

class ToolCall:

id: str

name: str

arguments: dict

@dataclass

class AssistantTurn:

"""モデルによって生成された 1 ターン:テキストと、0 個以上のツール呼び出し。"""

text: str = ""

tool_calls: list = field(default_factory=list)

stop_reason: str = "end_turn"

usage: Usage = field(default_factory=Usage)

@dataclass

class Message:

"""実行中の会話トランスクリプトにおける 1 つのメッセージ。"""

role: str

content: str = ""

tool_calls: list = field(default_factory=list)

tool_call_id: str = ""

name: str = ""

def count_tokens(text: str) -> int:

"""安価でプロバイダー非依存なトークン推定(約 4 文字/1 トークン)。"""

if not text:

return 0

return max(1, round(len(text) / 4))

PRICE_BOOK = {

"mock-sonnet": (3.00, 15.00),

"claude-sonnet-4": (3.00, 15.00),

"gpt-4.1": (2.00, 8.00),

"default": (1.00, 3.00),

}

class CostMeter:

"""トークン使用量を累積し、推定ドルコストに変換する。"""

def __init__(self, model: str):

self.model = model

self.total = Usage()

self.calls = 0

def add(self, usage: Usage) -> None:

self.total = self.total + usage

self.calls += 1

@property

def dollars(self) -> float:

pin, pout = PRICE_BOOK.get(self.model, PRICE_BOOK["default"])

return (self.total.input_tokens / 1e6) * pin + \

(self.total.output_tokens / 1e6) * pout

def summary(self) -> str:

return (f"{self.calls} model call(s) | "

f"in={self.total.input_tokens} out={self.total.output_tokens} tok | "

f"~${self.dollars:.5f} ({self.model})")

def fld(description: str = "", default=MISSING, default_factory=MISSING):

"""説明(およびオプションのデフォルト値)付きのツール入力フィールドを宣言する。"""

md = {"description": description}

if default_factory is not MISSING:

return field(default_factory=default_factory, metadata=md)

if default is not MISSING:

return field(default=default, metadata=md)

return field(metadata=md)

def _is_optional(t) -> bool:

origin = typing.get_origin(t)

if origin is typing.Union or (UnionType is not None and origin is UnionType):

return type(None) in typing.get_args(t)

return False

def _py_to_json_type(t) -> dict:

origin = typing.get_origin(t)

if origin is typing.Union or (UnionType is not None and origin is UnionType):

args = [a for a in typing.get_args(t) if a is not type(None)]

return _py_to_json_type(args[0]) if args else {"type": "string"}

if t is str:

return {"type": "string"}

if t is bool:

return {"type": "boolean"}

if t is int:

return {"type": "integer"}

if t is float:

return {"type": "number"}

if origin is list or t is list:

args = typing.get_args(t)

item = _py_to_json_type(args[0]) if args else {"type": "string"}

return {"type": "array", "items": item}

if origin is dict or t is dict:

return {"type": "object"}

return {"type": "string"}

def build_json_schema(model_cls) -> dict:

"""dataclass 入力モデルを JSON Schema(プロパティを持つオブジェクト)に変換する。"""

hints = typing.get_type_hints(model_cls)

props, required = {}, []

for f in dataclasses.fields(model_cls):

t = hints.get(f.name, str)

js = dict(_py_to_json_type(t))

desc = f.metadata.get("description", "")

if desc:

js["description"] = desc

props[f.name] = js

has_default = (f.default is not MISSING) or (f.default_factory is not MISSING)

if not has_default and not _is_optional(t):

required.append(f.name)

schema = {"type": "object", "properties": props}

if required:

schema["required"] = required

return schema

def _coerce(v, t):

origin = typing.get_origin(t)

if origin is typing.Union or (UnionType is not None and origin is UnionType):

if v is None:

return None

args = [a for a in typing.get_args(t) if a is not type(None)]

return _coerce(v, args[0]) if args else v

if t is str:

return v if isinstance(v, str) else str(v)

if t is bool:

if isinstance(v, bool):

return v

if isinstance(v, str):

return v.strip().lower() in ("1", "true", "yes", "y", "on")

return bool(v)

if t is int:

return int(v)

if t is float:

return float(v)

if origin is list or t is list:

args = typing.get_args(t)

it = args[0] if args else str

if not isinstance(v, list):

v = [v]

return [_coerce(x, it) for x in v]

if origin is dict or t is dict:

return dict(v) if v else {}

return v

def instantiate(model_cls, raw: dict):

"""検証および型付き入力インスタンスへの生の JSON 引数の強制変換。"""

hints = typing.get_type_hints(model_cls)

raw = raw or {}

kwargs = {}

for f in dataclasses.fields(model_cls):

t = hints.get(f.name, str)

if f.name in raw and raw[f.name] is not None:

try:

kwargs[f.name] = _coerce(raw[f.name], t)

except (TypeError, ValueError) as e:

raise ValueError(f"'{f.name}' の値が不正です: {e}")

elif f.default is not MISSING:

kwargs[f.name] = f.default

elif f.default_factory is not MISSING:

kwargs[f.name] = f.default_factory()

elif _is_optional(t):

kwargs[f.name] = None

else:

raise ValueError(f"必須引数 '{f.name}' が不足しています")

return model_cls(**kwargs)

class PermissionKind(Enum):

"""ツールの危険度 — デフォルトの権限ポリシーを決定する。"""

READ = "read"

WRITE = "write"

EXECUTE = "execute"

META = "meta"

@dataclass

class ToolResult:

output: str

is_error: bool = False

metadata: dict = field(default_factory=dict)

class ToolContext:

"""ツールが実行時に必要とするすべてのもの(サービスと共有状態)。"""

def __init__(self, **services):

self.__dict__.update(services)

class BaseTool:

"""すべてのツールの基底クラス。サブクラスは name/description/InputModel/kind を設定し、execute を実装する。

スキーマと検証はここで処理される。"""

name: str = "base"

description: str = ""

InputModel = None

kind: PermissionKind = PermissionKind.READ

def schema(self) -> dict:

return {

"name": self.name,

"description": self.description,

"kind": self.kind.value,

"input_schema": (build_json_schema(self.InputModel)

if self.InputModel else

{"type": "object", "properties": {}}),

}

async def run(self, raw_args: dict, ctx: ToolContext) -> ToolResult:

args = instantiate(self.InputModel, raw_args) if self.InputModel else None

return await self.execute(args, ctx)

async def execute(self, args, ctx: ToolContext) -> ToolResult:

raise NotImplementedError

class ToolRegistry:

def __init__(self):

self._tools: dict = {}

def register(self, tool: BaseTool) -> "ToolRegistry":

self._tools[tool.name] = tool

return self

def get(self, name: str) -> BaseTool | None:

return self._tools.get(name)

def schemas(self) -> list:

return [t.schema() for t in self._tools.values()]

def names(self) -> list:

return list(self._tools)

class VirtualFS:

"""メモリ上のファイルシステム。チュートリアルを安全かつ Colab で決定論的に保つ。"""

def __init__(self):

self.files: dict = {}

@staticmethod

def norm(path: str) -> str:

return path.lstrip("./").strip()

def write(self, path: str, content: str) -> None:

self.files[self.norm(path)] = content

def read(self, path: str) -> str:

return self.files[self.norm(path)]

def exists(self, path: str) -> bool:

return self.norm(path) in self.files

def list(self, pattern: str = "*") -> list:

return sorted(p for p in self.files if fnmatch.fnmatch(p, pattern))

def tree(self) -> str:

if not self.files:

return "(empty)"

return "\n".join(f" {p} ({len(c)} bytes)"

for p, c in sorted(self.files.items()))

class PermissionMode(Enum):

DEFAULT = "default"

AUTO = "auto"

PLAN = "plan"

@dataclass

class PermissionDecision:

action: str

reason: str = ""

SENSITIVE_PATTERNS = [

"/etc/*", "*/.ssh/*", "*.pem", "*id_rsa*", "*/.aws/*",

"*credentials*", "*.env", "*/secrets/*",

]

class PermissionChecker:

def __init__(self, mode: PermissionMode = PermissionMode.DEFAULT,

path_rules: list | None = None,

denied_commands: list | None = None):

self.mode = mode

self.path_rules = path_rules or []

self.denied_commands = denied_commands or []

def _check_path(self, path: str) -> PermissionDecision | None:

for pat in SENSITIVE_PATTERNS:

if fnmatch.fnmatch(path, pat):

return PermissionDecision("deny", f"機密パス '{path}' ({pat})")

for rule in self.path_rules:

if fnmatch.fnmatch(path, rule["pattern"]):

if rule.get("allow", True):

return PermissionDecision("allow", f"パスルールが '{rule['pattern']}' を許可します")

return PermissionDecision("deny", f"パスルールが '{rule['pattern']}' をブロックします")

return None

def _check_command(self, command: str) -> PermissionDecision | None:

for pat in self.denied_commands:

if re.search(pat, command):

return PermissionDecision("deny", f"拒否されたコマンドが一致しました /{pat}/")

return None

def check(self, tool: BaseTool, args: dict) -> PermissionDecision:

if "path" in args and tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE):

d = self._check_path(str(args["path"]))

if d:

return d

if "command" in args:

d = self._check_command(str(args["command"]))

if d:

return d

if self.mode is PermissionMode.AUTO:

return PermissionDecision("allow", "自動モード")

if self.mode is PermissionMode.PLAN:

if tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE):

return PermissionDecision("deny", "プランモードは書き込み/実行をブロックします")

return PermissionDecision("allow", "プランモードは読み取りを許可します")

if tool.kind in (PermissionKind.READ, PermissionKind.META):

return PermissionDecision("allow", "安全なツール")

return PermissionDecision("ask", f"{tool.kind.value} には承認が必要です")

async def auto_approve(tool, args, reason) -> bool:

print(f" imageimage 承認が必要: {tool.name} ({reason}) -> [自動承認済み]")

return True

async def interactive_approve(tool, args, reason) -> bool:

ans = input(f" imageimage {tool.name}({short(json.dumps(args), 80)}) を許可しますか? [y/N] ")

return ans.strip().lower().startswith("y")

@dataclass

class HookOutcome:

blocked: bool = False

reason: str = ""

arguments: dict | None = None

class HookManager:

"""すべてのツール呼び出し周りのライフサイクルイベント(PreToolUse/PostToolUse など)。"""

def __init__(self):

self.pre: list = []

self.post: list = []

def add_pre(self, fn):

self.pre.append(fn); return self

def add_post(self, fn):

self.post.append(fn); return self

def run_pre(self, call: ToolCall, tool: BaseTool, ctx: ToolContext) -> HookOutcome:

args = dict(call.arguments)

for fn in self.pre:

out = fn(call, tool, ctx)

if out is None:

continue

if out.blocked:

return out

if out.arguments is not None:

args = out.arguments

return HookOutcome(arguments=args)

def run_post(self, call, tool, result: ToolResult, ctx) -> ToolResult:

for fn in self.post:

new = fn(call, tool, result, ctx)

if new is not None:

result = new

return result

OpenHarness スタイルのチュートリアルの基盤を確立することから始めます。これには、インポート、非同期実行サポート、ヘルパー関数、およびコアデータモデルが含まれます。メッセージ、ツール呼び出し、使用状況追跡、トークンカウント、コスト推定、権限モード、フック、そして実行の安全性を保つ仮想ファイルシステムを定義します。このスニペットは、その後のすべてのツール、エージェントループ、デモに依存する基本アーキテクチャを確立するために使用されます。

ツールレイヤーの構築

コードをコピーしました

別のブラウザを使用してください

@dataclass

class WriteFileInput:

path: str = fld("ファイルの書き込み先パス")

content: str = fld("ファイル全体のコンテンツ")

class WriteFileTool(BaseTool):

name = "write_file"

description = "指定されたコンテンツでファイルを作成または上書きします。"

InputModel = WriteFileInput

kind = PermissionKind.WRITE

async def execute(self, args: WriteFileInput, ctx) -> ToolResult:

ctx.vfs.write(args.path, args.content)

return ToolResult(f"{len(args.content)} バイトを {args.path} に書き込みました")

@dataclass

class ReadFileInput:

path: str = fld("読み込むファイルのパス")

class ReadFileTool(BaseTool):

name = "read_file"

description = "ファイル全体のコンテンツを読み取ります。"

InputModel = ReadFileInput

kind = PermissionKind.READ

async def execute(self, args: ReadFileInput, ctx) -> ToolResult:

if not ctx.vfs.exists(args.path):

return ToolResult(f"そのようなファイルはありません:{args.path}", is_error=True)

return ToolResult(ctx.vfs.read(args.path))

@dataclass

class EditInput:

path: str = fld("編集対象のファイル")

old: str = fld("置換する正確な部分文字列")

new: str = fld("置換後のテキスト")

class EditTool(BaseTool):

name = "edit"

description = "ファイル内の old の最初の出現箇所を new に置き換えます。"

InputModel = EditInput

kind = PermissionKind.WRITE

async def execute(self, args: EditInput, ctx) -> ToolResult:

if not ctx.vfs.exists(args.path):

return ToolResult(f"そのようなファイルはありません:{args.path}", is_error=True)

text = ctx.vfs.read(args.path)

if args.old not in text:

return ToolResult(f"old が {args.path} に見つかりません", is_error=True)

ctx.vfs.write(args.path, text.replace(args.old, args.new, 1))

return ToolResult(f"{args.path} を編集しました:出現箇所を 1 つ置換しました。")

@dataclass

class ListFilesInput:

pattern: str = fld("グロブパターン", default="*")

class ListFilesTool(BaseTool):

name = "list_files"

description = "グロブパターンに一致するファイルをリストアップします。"

InputModel = ListFilesInput

kind = PermissionKind.READ

async def execute(self, args: ListFilesInput, ctx) -> ToolResult:

files = ctx.vfs.list(args.pattern)

return ToolResult("\n".join(files) if files else "(一致なし)")

@dataclass

class GrepInput:

pattern: str = fld("検索する正規表現")

path_glob: str = fld("対象ファイル", default="*")

class GrepTool(BaseTool):

name = "grep"

description = "ファイルコンテンツを正規表現で検索します。"

InputModel = GrepInput

kind = PermissionKind.READ

async def execute(self, args: GrepInput, ctx) -> ToolResult:

rx = re.compile(args.pattern)

hits = []

for p in ctx.vfs.list(args.path_glob):

for i, line in enumerate(ctx.vfs.read(p).splitlines(), 1):

if rx.search(line):

hits.append(f"{p}:{i}: {line.strip()}")

return ToolResult("\n".join(hits) if hits else "(一致なし)")

@dataclass

class RunPythonInput:

files: list = fld("名前空間内で順番に実行する VFS ファイル", default_factory=list)

code: str = fld("ファイル実行後に追加で実行する Python コード", default="")

class RunPythonTool(BaseTool):

name = "run_python"

description = ("仮想ファイルシステム(および/またはインラインスニペット)から Python を実行し、標準出力をキャプチャします。テストを実行するために使用されます。")

InputModel = RunPythonInput

kind = PermissionKind.EXECUTE

async def execute(self, args: RunPythonInput, ctx) -> ToolResult:

source_parts = []

for p in args.files:

if not ctx.vfs.exists(p):

return ToolResult(f"そのようなファイルはありません:{p}", is_error=True)

source_parts.append(ctx.vfs.read(p))

if args.code:

source_parts.append(args.code)

source = "\n\n".join(source_parts)

buf = io.StringIO()

sandbox_globals = {"__name__": "__main__", "__builtins__": __builtins__}

try:

with contextlib.redirect_stdout(buf):

exec(compile(source, "", "exec"), sandbox_globals)

except Exception as e:

frames = [f for f in traceback.extract_tb(e.__traceback__)

if f.filename == ""]

loc = ""

if frames:

last = frames[-1]

src_lines = source.splitlines()

text = (src_lines[last.lineno - 1].strip()

if 0 < last.lineno <= len(src_lines) else "")

loc = f" (line {last.lineno}: {text})" if text else f" (line {last.lineno})"

out = buf.getvalue()

msg = f"{type(e).__name__}: {e}{loc}"

return ToolResult(f"{out}\n{msg}".strip(), is_error=True)

return ToolResult(buf.getvalue().strip() or "(実行しましたが出力はありません)")

@dataclass

class

原文を表示

In this tutorial, we build OpenHarness from scratch to better understand how a practical agent harness works. We recreate the major building blocks that make an agent system useful, including tool use, typed tool schemas, permissions, lifecycle hooks, memory, skills, context compaction, retry logic, cost tracking, and multi-agent coordination. Instead of treating an agent framework as a black box, we expose the full control flow and watch how the harness receives a user task, lets the model decide the next action, validates and executes tool calls, returns observations, and continues the loop until the task is complete. We also keep the implementation runnable so we can experiment with the architecture without needing API keys or complex infrastructure.

Setting Up the OpenHarness Core

Copy CodeCopiedUse a different Browser

from __future__ import annotations

import asyncio

import contextlib

import dataclasses

import fnmatch

import io

import json

import os

import re

import tempfile

import textwrap

import time

import traceback

import types

import typing

import urllib.error

import urllib.request

from dataclasses import dataclass, field

from enum import Enum

MISSING = dataclasses.MISSING

UnionType = getattr(types, "UnionType", None)

def run_async(coro):

"""Run a coroutine to completion from sync code, even inside a live loop."""

try:

loop = asyncio.get_running_loop()

except RuntimeError:

loop = None

if loop is not None and loop.is_running():

try:

import nest_asyncio

nest_asyncio.apply()

return loop.run_until_complete(coro)

except Exception:

import threading

box: dict = {}

def _runner():

new_loop = asyncio.new_event_loop()

try:

box["value"] = new_loop.run_until_complete(coro)

finally:

new_loop.close()

t = threading.Thread(target=_runner)

t.start()

t.join()

return box["value"]

return asyncio.run(coro)

BANNER = "═" * 78

def banner(title: str) -> None:

print("\n" + BANNER)

print(f" {title}")

print(BANNER)

def explain(title: str, body: str) -> None:

banner(title)

print(textwrap.fill(textwrap.dedent(body).strip(), width=78))

print("-" * 78)

def short(text: str, n: int = 240) -> str:

text = " ".join(str(text).split())

return text if len(text) <= n else text[: n - 1] + "…"

@dataclass

class Usage:

input_tokens: int = 0

output_tokens: int = 0

def __add__(self, other: "Usage") -> "Usage":

return Usage(self.input_tokens + other.input_tokens,

self.output_tokens + other.output_tokens)

@dataclass

class ToolCall:

id: str

name: str

arguments: dict

@dataclass

class AssistantTurn:

"""One turn produced by the model: some text + zero or more tool calls."""

text: str = ""

tool_calls: list = field(default_factory=list)

stop_reason: str = "end_turn"

usage: Usage = field(default_factory=Usage)

@dataclass

class Message:

"""A single message in the running conversation transcript."""

role: str

content: str = ""

tool_calls: list = field(default_factory=list)

tool_call_id: str = ""

name: str = ""

def count_tokens(text: str) -> int:

"""Cheap, provider-agnostic token estimate (~4 chars/token)."""

if not text:

return 0

return max(1, round(len(text) / 4))

PRICE_BOOK = {

"mock-sonnet": (3.00, 15.00),

"claude-sonnet-4": (3.00, 15.00),

"gpt-4.1": (2.00, 8.00),

"default": (1.00, 3.00),

}

class CostMeter:

"""Accumulates token usage and converts it to an estimated dollar cost."""

def __init__(self, model: str):

self.model = model

self.total = Usage()

self.calls = 0

def add(self, usage: Usage) -> None:

self.total = self.total + usage

self.calls += 1

@property

def dollars(self) -> float:

pin, pout = PRICE_BOOK.get(self.model, PRICE_BOOK["default"])

return (self.total.input_tokens / 1e6) * pin + \

(self.total.output_tokens / 1e6) * pout

def summary(self) -> str:

return (f"{self.calls} model call(s) | "

f"in={self.total.input_tokens} out={self.total.output_tokens} tok | "

f"~${self.dollars:.5f} ({self.model})")

def fld(description: str = "", default=MISSING, default_factory=MISSING):

"""Declare a tool-input field with a description (and optional default)."""

md = {"description": description}

if default_factory is not MISSING:

return field(default_factory=default_factory, metadata=md)

if default is not MISSING:

return field(default=default, metadata=md)

return field(metadata=md)

def _is_optional(t) -> bool:

origin = typing.get_origin(t)

if origin is typing.Union or (UnionType is not None and origin is UnionType):

return type(None) in typing.get_args(t)

return False

def _py_to_json_type(t) -> dict:

origin = typing.get_origin(t)

if origin is typing.Union or (UnionType is not None and origin is UnionType):

args = [a for a in typing.get_args(t) if a is not type(None)]

return _py_to_json_type(args[0]) if args else {"type": "string"}

if t is str:

return {"type": "string"}

if t is bool:

return {"type": "boolean"}

if t is int:

return {"type": "integer"}

if t is float:

return {"type": "number"}

if origin is list or t is list:

args = typing.get_args(t)

item = _py_to_json_type(args[0]) if args else {"type": "string"}

return {"type": "array", "items": item}

if origin is dict or t is dict:

return {"type": "object"}

return {"type": "string"}

def build_json_schema(model_cls) -> dict:

"""Turn a dataclass input model into a JSON Schema (object with properties)."""

hints = typing.get_type_hints(model_cls)

props, required = {}, []

for f in dataclasses.fields(model_cls):

t = hints.get(f.name, str)

js = dict(_py_to_json_type(t))

desc = f.metadata.get("description", "")

if desc:

js["description"] = desc

props[f.name] = js

has_default = (f.default is not MISSING) or (f.default_factory is not MISSING)

if not has_default and not _is_optional(t):

required.append(f.name)

schema = {"type": "object", "properties": props}

if required:

schema["required"] = required

return schema

def _coerce(v, t):

origin = typing.get_origin(t)

if origin is typing.Union or (UnionType is not None and origin is UnionType):

if v is None:

return None

args = [a for a in typing.get_args(t) if a is not type(None)]

return _coerce(v, args[0]) if args else v

if t is str:

return v if isinstance(v, str) else str(v)

if t is bool:

if isinstance(v, bool):

return v

if isinstance(v, str):

return v.strip().lower() in ("1", "true", "yes", "y", "on")

return bool(v)

if t is int:

return int(v)

if t is float:

return float(v)

if origin is list or t is list:

args = typing.get_args(t)

it = args[0] if args else str

if not isinstance(v, list):

v = [v]

return [_coerce(x, it) for x in v]

if origin is dict or t is dict:

return dict(v) if v else {}

return v

def instantiate(model_cls, raw: dict):

"""Validate + coerce raw JSON args into a typed input instance."""

hints = typing.get_type_hints(model_cls)

raw = raw or {}

kwargs = {}

for f in dataclasses.fields(model_cls):

t = hints.get(f.name, str)

if f.name in raw and raw[f.name] is not None:

try:

kwargs[f.name] = _coerce(raw[f.name], t)

except (TypeError, ValueError) as e:

raise ValueError(f"Bad value for '{f.name}': {e}")

elif f.default is not MISSING:

kwargs[f.name] = f.default

elif f.default_factory is not MISSING:

kwargs[f.name] = f.default_factory()

elif _is_optional(t):

kwargs[f.name] = None

else:

raise ValueError(f"Missing required argument '{f.name}'")

return model_cls(**kwargs)

class PermissionKind(Enum):

"""How dangerous a tool is — drives the default permission policy."""

READ = "read"

WRITE = "write"

EXECUTE = "execute"

META = "meta"

@dataclass

class ToolResult:

output: str

is_error: bool = False

metadata: dict = field(default_factory=dict)

class ToolContext:

"""Everything a tool may need at runtime (services + shared state)."""

def __init__(self, **services):

self.__dict__.update(services)

class BaseTool:

"""Base class for all tools. Subclasses set name/description/InputModel/kind

and implement execute. Schema + validation are handled here."""

name: str = "base"

description: str = ""

InputModel = None

kind: PermissionKind = PermissionKind.READ

def schema(self) -> dict:

return {

"name": self.name,

"description": self.description,

"kind": self.kind.value,

"input_schema": (build_json_schema(self.InputModel)

if self.InputModel else

{"type": "object", "properties": {}}),

}

async def run(self, raw_args: dict, ctx: ToolContext) -> ToolResult:

args = instantiate(self.InputModel, raw_args) if self.InputModel else None

return await self.execute(args, ctx)

async def execute(self, args, ctx: ToolContext) -> ToolResult:

raise NotImplementedError

class ToolRegistry:

def __init__(self):

self._tools: dict = {}

def register(self, tool: BaseTool) -> "ToolRegistry":

self._tools[tool.name] = tool

return self

def get(self, name: str) -> BaseTool | None:

return self._tools.get(name)

def schemas(self) -> list:

return [t.schema() for t in self._tools.values()]

def names(self) -> list:

return list(self._tools)

class VirtualFS:

"""In-memory filesystem. Keeps the tutorial safe & deterministic in Colab."""

def __init__(self):

self.files: dict = {}

@staticmethod

def norm(path: str) -> str:

return path.lstrip("./").strip()

def write(self, path: str, content: str) -> None:

self.files[self.norm(path)] = content

def read(self, path: str) -> str:

return self.files[self.norm(path)]

def exists(self, path: str) -> bool:

return self.norm(path) in self.files

def list(self, pattern: str = "*") -> list:

return sorted(p for p in self.files if fnmatch.fnmatch(p, pattern))

def tree(self) -> str:

if not self.files:

return "(empty)"

return "\n".join(f" {p} ({len(c)} bytes)"

for p, c in sorted(self.files.items()))

class PermissionMode(Enum):

DEFAULT = "default"

AUTO = "auto"

PLAN = "plan"

@dataclass

class PermissionDecision:

action: str

reason: str = ""

SENSITIVE_PATTERNS = [

"/etc/*", "*/.ssh/*", "*.pem", "*id_rsa*", "*/.aws/*",

"*credentials*", "*.env", "*/secrets/*",

]

class PermissionChecker:

def __init__(self, mode: PermissionMode = PermissionMode.DEFAULT,

path_rules: list | None = None,

denied_commands: list | None = None):

self.mode = mode

self.path_rules = path_rules or []

self.denied_commands = denied_commands or []

def _check_path(self, path: str) -> PermissionDecision | None:

for pat in SENSITIVE_PATTERNS:

if fnmatch.fnmatch(path, pat):

return PermissionDecision("deny", f"sensitive path '{path}' ({pat})")

for rule in self.path_rules:

if fnmatch.fnmatch(path, rule["pattern"]):

if rule.get("allow", True):

return PermissionDecision("allow", f"path rule allows '{rule['pattern']}'")

return PermissionDecision("deny", f"path rule blocks '{rule['pattern']}'")

return None

def _check_command(self, command: str) -> PermissionDecision | None:

for pat in self.denied_commands:

if re.search(pat, command):

return PermissionDecision("deny", f"denied command matched /{pat}/")

return None

def check(self, tool: BaseTool, args: dict) -> PermissionDecision:

if "path" in args and tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE):

d = self._check_path(str(args["path"]))

if d:

return d

if "command" in args:

d = self._check_command(str(args["command"]))

if d:

return d

if self.mode is PermissionMode.AUTO:

return PermissionDecision("allow", "auto mode")

if self.mode is PermissionMode.PLAN:

if tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE):

return PermissionDecision("deny", "plan mode blocks writes/executes")

return PermissionDecision("allow", "plan mode allows reads")

if tool.kind in (PermissionKind.READ, PermissionKind.META):

return PermissionDecision("allow", "safe tool")

return PermissionDecision("ask", f"{tool.kind.value} requires approval")

async def auto_approve(tool, args, reason) -> bool:

print(f" imageimage approval needed: {tool.name} ({reason}) -> [auto-approved]")

return True

async def interactive_approve(tool, args, reason) -> bool:

ans = input(f" imageimage Allow {tool.name}({short(json.dumps(args), 80)})? [y/N] ")

return ans.strip().lower().startswith("y")

@dataclass

class HookOutcome:

blocked: bool = False

reason: str = ""

arguments: dict | None = None

class HookManager:

"""Lifecycle events around every tool call (like PreToolUse/PostToolUse)."""

def __init__(self):

self.pre: list = []

self.post: list = []

def add_pre(self, fn):

self.pre.append(fn); return self

def add_post(self, fn):

self.post.append(fn); return self

def run_pre(self, call: ToolCall, tool: BaseTool, ctx: ToolContext) -> HookOutcome:

args = dict(call.arguments)

for fn in self.pre:

out = fn(call, tool, ctx)

if out is None:

continue

if out.blocked:

return out

if out.arguments is not None:

args = out.arguments

return HookOutcome(arguments=args)

def run_post(self, call, tool, result: ToolResult, ctx) -> ToolResult:

for fn in self.post:

new = fn(call, tool, result, ctx)

if new is not None:

result = new

return result

We begin by establishing the foundation for the OpenHarness-style tutorial, including imports, async execution support, helper functions, and core data models. We define messages, tool calls, usage tracking, token counting, cost estimation, permission modes, hooks, and the virtual filesystem that keeps execution safe. We use this snippet to establish the basic architecture on which all subsequent tools, agent loops, and demos depend.

Building the Tool Layer

Copy CodeCopiedUse a different Browser

@dataclass

class WriteFileInput:

path: str = fld("File path to write")

content: str = fld("Full file content")

class WriteFileTool(BaseTool):

name = "write_file"

description = "Create or overwrite a file with the given content."

InputModel = WriteFileInput

kind = PermissionKind.WRITE

async def execute(self, args: WriteFileInput, ctx) -> ToolResult:

ctx.vfs.write(args.path, args.content)

return ToolResult(f"Wrote {len(args.content)} bytes to {args.path}")

@dataclass

class ReadFileInput:

path: str = fld("File path to read")

class ReadFileTool(BaseTool):

name = "read_file"

description = "Read the full contents of a file."

InputModel = ReadFileInput

kind = PermissionKind.READ

async def execute(self, args: ReadFileInput, ctx) -> ToolResult:

if not ctx.vfs.exists(args.path):

return ToolResult(f"No such file: {args.path}", is_error=True)

return ToolResult(ctx.vfs.read(args.path))

@dataclass

class EditInput:

path: str = fld("File to edit")

old: str = fld("Exact substring to replace")

new: str = fld("Replacement text")

class EditTool(BaseTool):

name = "edit"

description = "Replace the first occurrence of old with new in a file."

InputModel = EditInput

kind = PermissionKind.WRITE

async def execute(self, args: EditInput, ctx) -> ToolResult:

if not ctx.vfs.exists(args.path):

return ToolResult(f"No such file: {args.path}", is_error=True)

text = ctx.vfs.read(args.path)

if args.old not in text:

return ToolResult(f"old not found in {args.path}", is_error=True)

ctx.vfs.write(args.path, text.replace(args.old, args.new, 1))

return ToolResult(f"Edited {args.path}: replaced 1 occurrence.")

@dataclass

class ListFilesInput:

pattern: str = fld("Glob pattern", default="*")

class ListFilesTool(BaseTool):

name = "list_files"

description = "List files matching a glob pattern."

InputModel = ListFilesInput

kind = PermissionKind.READ

async def execute(self, args: ListFilesInput, ctx) -> ToolResult:

files = ctx.vfs.list(args.pattern)

return ToolResult("\n".join(files) if files else "(no matches)")

@dataclass

class GrepInput:

pattern: str = fld("Regex to search for")

path_glob: str = fld("Which files to search", default="*")

class GrepTool(BaseTool):

name = "grep"

description = "Search file contents with a regular expression."

InputModel = GrepInput

kind = PermissionKind.READ

async def execute(self, args: GrepInput, ctx) -> ToolResult:

rx = re.compile(args.pattern)

hits = []

for p in ctx.vfs.list(args.path_glob):

for i, line in enumerate(ctx.vfs.read(p).splitlines(), 1):

if rx.search(line):

hits.append(f"{p}:{i}: {line.strip()}")

return ToolResult("\n".join(hits) if hits else "(no matches)")

@dataclass

class RunPythonInput:

files: list = fld("VFS files to exec in order in one namespace",

default_factory=list)

code: str = fld("Extra Python code to run after the files", default="")

class RunPythonTool(BaseTool):

name = "run_python"

description = ("Execute Python from the virtual filesystem (and/or an inline "

"snippet) and capture stdout. Used to run tests.")

InputModel = RunPythonInput

kind = PermissionKind.EXECUTE

async def execute(self, args: RunPythonInput, ctx) -> ToolResult:

source_parts = []

for p in args.files:

if not ctx.vfs.exists(p):

return ToolResult(f"No such file: {p}", is_error=True)

source_parts.append(ctx.vfs.read(p))

if args.code:

source_parts.append(args.code)

source = "\n\n".join(source_parts)

buf = io.StringIO()

sandbox_globals = {"__name__": "__main__", "__builtins__": __builtins__}

try:

with contextlib.redirect_stdout(buf):

exec(compile(source, "<agent_code>", "exec"), sandbox_globals)

except Exception as e:

frames = [f for f in traceback.extract_tb(e.__traceback__)

if f.filename == "<agent_code>"]

loc = ""

if frames:

last = frames[-1]

src_lines = source.splitlines()

text = (src_lines[last.lineno - 1].strip()

if 0 < last.lineno <= len(src_lines) else "")

loc = f" (line {last.lineno}: {text})" if text else f" (line {last.lineno})"

out = buf.getvalue()

msg = f"{type(e).__name__}: {e}{loc}"

return ToolResult(f"{out}\n{msg}".strip(), is_error=True)

return ToolResult(buf.getvalue().strip() or "(ran with no output)")

@dataclass

class

この記事をシェア

関連記事

Latent Space★42026年6月25日 11:14

[AINews] メタハーネスの夏が到来

メタハーネスの歴史を振り返り、ダatabricks の CTO マタイ・ザハリヤ氏が、あらゆるコーディングや知識作業を取り込むためのオープンソースでプラグ可能アーキテクチャ「オムニジェント」に賭けていると報じる。

Simon Willison Blog★32026年6月25日 08:59

ブラウザ互換性データベースをSQLite化

Simon Willison氏が、MozillaのMDNが提供する包括的なブラウザ互換性情報データを基に、SQLiteデータベース形式に変換するプロジェクト「simonw/browser-compat-db」を開始した。

MarkTechPost★42026年6月24日 17:12

2026 年版生成 AI コーディングツール比較:16 の最良ツールと特徴、最適な用途

MarkTechPost は、ソフトウェア構築を再定義した生成 AI ツールの中から、2026 年に最も優れた 16 のツールを選定し、各機能や開発者への適合性を比較して紹介している。

今日のまとめ

AI日報で今日の重要ニュースをまとめ読み

ニュース一覧に戻る元記事を読む