ClawTeamのマルチエージェント群集オーケストレーションとOpenAI関数呼び出しを実装したコーディング実演
香港大学データサイエンス研究所(HKUDS)が開発したオープンソースのマルチエージェントフレームワーク「ClawTeam」のコアアーキテクチャと、OpenAI Function Callingを活用したマルチエージェント・オーケストレーションの実装チュートリアルを紹介する記事です。
キーポイント
ClawTeamのコアアーキテクチャ
リーダーエージェントが複雑な目標をサブタスクに分解し、専門的なワーカーエージェントが自律的に実行するマルチエージェントフレームワークで、依存関係解決機能付きの共有タスクボードとリアルタイム調整のためのエージェント間メッセージングシステムを備えています。
実装チュートリアルの特徴
Colab環境で動作し、OpenAI APIキーのみで実行可能な設計で、ローカルインフラ(tmux、git worktrees、ファイルシステムベースのメッセージキュー)のセットアップ不要でマルチエージェント・オーケストレーションを体験できます。
技術的実装の詳細
OpenAIとrichパッケージのインストール、APIキーの安全な取得、スレッドによる並行処理、dataclassesによるデータモデリング、Richによるターミナル出力の美装など、実用的な実装コードを提供しています。
コスト効率の考慮
デフォルトモデルとしてgpt-4o-miniを使用することで、スウォーム実行のコスト効率化を図っています。
影響分析・編集コメントを表示
影響分析
この記事は、マルチエージェントAIシステムの実装障壁を下げる実用的なチュートリアルを提供することで、研究者や開発者が複雑なタスク分解と協調実行の概念を実際に体験する機会を創出します。オープンソースフレームワークの普及と実装知識の共有を通じて、AIエージェント技術の民主化と実用化を促進する可能性があります。
編集コメント
マルチエージェントAIの実装チュートリアルとして実用的な価値がありますが、技術的な新規性よりも既存技術の応用・実装解説に重点が置かれており、業界を変えるほどの革新性は限定的です。
この包括的なチュートリアルでは、HKUDSによって開発されたオープンソースのAgent Swarm IntelligenceフレームワークであるClawTeamのコア・アーキテクチャを紹介します。ClawTeamを強力にする基本概念を実装します。具体的には、複雑な目標をサブタスクに分解するリーダーエージェント、それらのタスクを自律的に実行する専門ワーカーエージェント、自動依存関係解決機能を持つ共有タスクボード、そしてリアルタイム調整を可能にするエージェント間メッセージングシステムです。このチュートリアルはColabでシームレスに実行できるように設計されており、OpenAI APIキーのみが必要です。これにより、元のClawTeam CLIが必要とするtmux、git worktrees、ファイルシステムベースのメッセージキューなどのローカルインフラストラクチャを設定することなく、誰でもマルチエージェント・オーケストレーションを体験できます。
必要なパッケージであるOpenAIとrichをインストールし、Colab Secretsまたはターミナル入力を通じてOpenAI APIキーを安全に収集することから始めます。次に、並行処理のためのthreading、クリーンなデータモデリングのためのdataclasses、美しいターミナル出力のためのRichなど、チュートリアル全体で必要なすべてのライブラリをインポートします。グローバルなOpenAIクライアント、Richコンソールを初期化し、コスト効率の良いスウォーム実行のためにgpt-4o-miniをデフォルトモデルとして設定します。
ClawTeamのファイルシステムベースのアーキテクチャを反映する3つの基盤システムを構築します。完全なステータス・ライフサイクルと自動依存関係ブロック解除機能を持つTaskBoard、エージェント間のポイントツーポイントおよびブロードキャストメッセージングをサポートするInboxSystem、そして各エージェントの役割、ステータス、完了タスク数を追跡するTeamRegistryです。複数のエージェントが同時に読み書きできるように、これら3つのシステムすべてにスレッドセーフ・ロックを実装します。ブロックされたタスクがその前提条件完了時に保留中に自動的に移行する、blocked_by依存関係チェーンでタスクをモデル化します。これは、ClawTeamのCLIがタスク依存関係を解決する方法とまったく同じです。
SWARM_TOOLS = [
{
"type": "function",
"function": {
"name": "task_update",
"description": "タスクのステータスを更新します。開始時は'in_progress'、完了時は'completed'を使用してください。",
"parameters": {
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "タスクID"},
"status": {"type": "string", "enum": ["in_progress", "completed", "failed"]},
"result": {"type": "string", "description": "タスクの結果または出力"},
},
"required": ["task_id", "status"],
},
},
},
{
"type": "function",
"function": {
"name": "inbox_send",
"description": "他のエージェント(例:'leader'またはワーカー名)にメッセージを送信します。",
"parameters": {
"type": "object",
"properties": {
"to": {"type": "string", "description": "受信者エージェント名"},
"message": {"type": "string", "description": "メッセージ内容"},
},
"required": ["to", "message"],
},
},
},
{
"type": "function",
"function": {
"name": "inbox_receive",
"description": "受信トレイのすべてのメッセージを確認して消費します。",
"parameters": {
"type": "object",
"properties": {},
},
},
},
{
"type": "function",
"function": {
"name": "task_list",
"description": "自分に割り当てられたタスクまたはチーム全体のタスクを一覧表示します。",
"parameters": {
"type": "object",
"properties": {
"owner": {"type": "string", "description": "所有者名でフィルタリング(オプション)"},
},
},
},
},
]
class SwarmAgent:
def __init__(
self,
name: str,
role: str,
system_prompt: str,
task_board: TaskBoard,
inbox: InboxSystem,
registry: TeamRegistry,
):
self.name = name
self.role = role
self.system_prompt = system_prompt
self.task_board = task_board
self.inbox = inbox
self.registry = registry
self.conversation_history: list[dict] = []
self.inbox.register(name)
self.registry.register(name, role)
def _build_system_prompt(self) -> str:
coord_protocol = f"""
調整プロトコル(自動挿入 — あなたはエージェント'{self.name}'です)
あなたはAIエージェント・スウォームの一員です。あなたの役割: {self.role}
あなたの名前: {self.name}
利用可能なツール(ClawTeam CLIと同等):
- task_list: 割り当てられたタスクを確認(
clawteam task listと同様) - task_update: タスクステータスをin_progress/completed/failedに更新(
clawteam task updateと同様) - inbox_send: 他のエージェントにメッセージを送信(
clawteam inbox sendと同様) - inbox_receive: 受信トレイのメッセージを確認(
clawteam inbox receiveと同様)
ワークフロー:
- task_listでタスクを確認
- 開始時にタスクをin_progressとしてマーク
- 作業を実行(思考、分析、出力生成)
- 結果とともにタスクをcompletedとしてマーク
- 完了時に'leader'に要約メッセージを送信
"""
return self.system_prompt + "\n" + coord_protocol
def _handle_tool_call(self, tool_name: str, args: dict) -> str:
if tool_name == "task_update":
status = TaskStatus(args["status"])
result = args.get("result", "")
self.task_board.update_status(args["task_id"], status, result)
if status == TaskStatus.COMPLETED:
self.registry.increment_completed(self.name)
return json.dumps({"ok": True, "task_id": args["task_id"], "new_status": args["status"]})
elif tool_name == "inbox_send":
self.inbox.send(self.name, args["to"], args["message"])
return json.dumps({"ok": True, "sent_to": args["to"]})
elif tool_name == "inbox_receive":
msgs = self.inbox.receive(self.name)
if not msgs:
return json.dumps({"messages": [], "note": "新しいメッセージはありません"})
return json.dumps({
"messages": [
{"from": m.sender, "content": m.content, "time": m.timestamp}
for m in msgs
]
})
elif tool_name == "task_list":
owner = args.get("owner", self.name)
tasks = self.task_board.get_tasks(owner=owner)
return json.dumps({"tasks": [t.to_dict() for t in tasks]})
return json.dumps({"error": f"不明なツール: {tool_name}"})
def run(self, user_message: str, max_iterations: int = 6) -> str:
self.conversation_history.append({"role": "user", "content": user_message})
for iteration in range(max_iterations):
try:
response = client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": self._build_system_prompt()},
*self.conversation_history,
],
tools=SWARM_TOOLS,
tool_choice="auto",
temperature=0.4,
)
except Exception as e:
return f"[APIエラー] {e}"
choice = response.choices[0]
msg = choice.message
assistant_msg = {"role": "assistant", "content": msg.content or ""}
if msg.tool_calls:
assistant_msg["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {"name": tc.function.name, "arguments": tc.function.arguments},
}
for tc in msg.tool_calls
]
self.conversation_history.append(assistant_msg)
if not msg.tool_calls:
return msg.content or "(応答なし)"
for tc in msg.tool_calls:
fn_name = tc.function.name
fn_args = json.loads(tc.function.arguments)
result = self._handle_tool_call(fn_name, fn_args)
self.conversation_history.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result,
})
return "(エージェントが最大反復回数に達しました)"
4つのOpenAI Function Callingツール、task_update、inbox_send、inbox_receive、task_listを定義します。これらはエージェントのClawTeam CLIコマンドと同等の機能を提供します。SwarmAgentクラスを構築し、これはLLMの推論ループをラップして、エージェントが最終的な回答を生成する前に複数回ツールを呼び出すことを可能にします。これは、実際のClawTeamエージェントが反復的にタスクを確認し、作業を実行し、結果を報告する方法を模倣しています。すべてのエージェントのシステムプロンプトに調整プロトコルを自動挿入し、その名前、役割、および従わなければならない正確なワークフローを認識させます。これはClawTeamがすべての生成エージェントに指示を挿入するのと同様です。
LEADER_SYSTEM_PROMPT = """あなたはAIエージェント・スウォーム(ClawTeamスタイル)のリーダーです。
あなたの仕事:
- 人間から高レベルの目標を受け取る
- 明確な成果物を持つ具体的なサブタスクに分解する
- 専門的なワーカーエージェントにタスクを割り当てる
- 進捗を監視し、最終結果を統合する
あなたは戦略的に考え、効果的に委任します。詳細な作業は自分では行いません —
スウォームを調整します。
タスク計画を作成する際は、この正確な構造を持つ有効なJSONを出力してください:
{
"team_description": "このチームが行うことの簡潔な説明",
"tasks": [
{
"subject": "短いタスクタイトル",
"description": "ワーカーへの詳細な指示",
"worker_role": "必要な専門家の役割(例:'市場調査アナリスト')",
"worker_name": "短いスネークケース名(例:'market_analyst')",
"blocked_by_indices": []
}
]
}
blocked_by_indicesは、先に完了しなければならない他のタスクの0ベースのインデックスのリストです。
3-5個のタスクを作成してください。各タスクは専門家によって独立して実行可能であるべきです。
"""
def leader_plan_tasks(goal: str) -> dict:
console.print(Panel(
f"[bold cyan]
image リーダーエージェント起動[/bold cyan]\n\n"
f"[white]目標:[/white] {goal}\n"
f"[dim]タスク分解を計画中...[/dim]",
border_style="cyan",
))
response = client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": LEADER_SYSTEM_PROMPT},
{"role": "user", "content": f"この目標をAIエージェント・スウォームのサブタスクに分解してください:\n\n{goal}"},
],
temperature=0.3,
response_format={"type": "json_object"},
)
plan_text = response.choices[0].message.content
plan = json.loads(plan_text)
return plan
def display_task_board(task_board: TaskBoard, registry: TeamRegistry):
table = Table(title="
image タスクボード(かんばん)", show_lines=True, expand=True)
table.add_column("ID", style="dim", width=8)
table.add_column("件名", style="bold")
table.add_column("所有者", style="cyan")
table.add_column("ステータス", width=14)
table.add_column("結果プレビュー", max_width=50)
status_colors = {
TaskStatus.PENDING: "yellow",
TaskStatus.IN_PROGRESS: "blue",
TaskStatus.COMPLETED: "green",
TaskStatus.BLOCKED: "red",
TaskStatus.FAILED: "magenta",
}
for t in task_board.get_tasks():
color = status_colors.get(t.status, "white")
status_str = f"[{color}]●[/{color}] {t.status.value}"
result_preview = (t.result[:47] + "...") if len(t.result) > 50 else t.result
table.add_row(t.id, t.subject, t.owner, status_str, result_preview)
console.print(table)
agent_table = Table(title="
image エージェント名簿", show_lines=True)
agent_table.add_column("エージェント", style="bold cyan")
agent_table.add_column("役割")
agent_table.add_column("ステータス")
agent_table.add_column("完了タスク", justify="center")
for a in registry.agents.values():
status_emoji = {"active": "
image", "idle": "
image", "completed": "
image", "failed": "
image"}.get(a.status, "
image")
agent_table.add_row(a.name, a.role, f"{status_emoji} {a.status}", str(a.tasks_completed))
console.print(agent_table)
def run_swarm(goal: str):
console.print(Panel(
"[bold white on blue]
image ClawTeam スウォーム・インテリジェンス — チュートリアル実行 [/bold white on blue]",
expand=True,
))
console.print(f"\n[bold]人間の目標:[/bold] {goal}\n")
console.rule("[bold cyan]フェーズ1: リーダーが目標をタスクに分解")
plan = leader_plan_tasks(goal)
console.print(f"\n[bold green]チーム:[/bold green] {plan.get('team_description', 'N/A')}")
console.print(f"[bold green]計画されたタスク:[/bold green] {len(plan
原文を表示
In this comprehensive tutorial, we present the core architecture of ClawTeam, an open-source Agent Swarm Intelligence framework developed by HKUDS. We implement the fundamental concepts that make ClawTeam powerful: a leader agent that decomposes complex goals into sub-tasks, specialized worker agents that execute those tasks autonomously, a shared task board with automatic dependency resolution, and an inter-agent messaging system that enables real-time coordination. We designed this tutorial to run seamlessly in Colab, requiring only an OpenAI API key, so anyone can experience multi-agent orchestration without setting up local infrastructure like tmux, git worktrees, or filesystem-based message queues that the original ClawTeam CLI requires.
Copy CodeCopiedUse a different Browser
import subprocess
import sys
def install_packages():
packages = ["openai", "rich"]
for pkg in packages:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "-q", pkg],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
print("
image All packages installed.")
install_packages()
import os
import getpass
try:
from google.colab import userdata
OPENAI_API_KEY = userdata.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
raise ValueError("Key not set in Colab secrets")
print("
image API key loaded from Colab Secrets.")
except Exception:
OPENAI_API_KEY = getpass.getpass("
image Enter your OpenAI API key: ")
print("
image API key received (hidden).")
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
import json
import uuid
import time
import threading
from enum import Enum
from datetime import datetime
from dataclasses import dataclass, field
from typing import Optional
from collections import defaultdict
from openai import OpenAI
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.tree import Tree
from rich.live import Live
from rich.layout import Layout
from rich.text import Text
console = Console()
client = OpenAI(api_key=OPENAI_API_KEY)
MODEL = "gpt-4o-mini"
We begin by installing the required packages, OpenAI and rich, and securely collecting the OpenAI API key through either Colab Secrets or terminal input. We then import all the libraries we need throughout the tutorial, including threading for concurrency, dataclasses for clean data modeling, and Rich for beautiful terminal output. We initialize the global OpenAI client, the Rich console, and set gpt-4o-mini as our default model for cost-efficient swarm execution.
Copy CodeCopiedUse a different Browser
class TaskStatus(str, Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
BLOCKED = "blocked"
FAILED = "failed"
@dataclass
class Task:
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
subject: str = ""
description: str = ""
owner: str = ""
status: TaskStatus = TaskStatus.PENDING
blocked_by: list = field(default_factory=list)
result: str = ""
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
completed_at: Optional[str] = None
def to_dict(self):
return {
"id": self.id,
"subject": self.subject,
"description": self.description,
"owner": self.owner,
"status": self.status.value,
"blocked_by": self.blocked_by,
"result": self.result[:200] if self.result else "",
}
class TaskBoard:
def __init__(self):
self._tasks: dict[str, Task] = {}
self._lock = threading.Lock()
def create(self, subject: str, description: str = "", owner: str = "",
blocked_by: list = None) -> Task:
task = Task(
subject=subject,
description=description,
owner=owner,
blocked_by=blocked_by or [],
)
if task.blocked_by:
task.status = TaskStatus.BLOCKED
with self._lock:
self._tasks[task.id] = task
return task
def update_status(self, task_id: str, status: TaskStatus, result: str = ""):
with self._lock:
if task_id not in self._tasks:
return
task = self._tasks[task_id]
task.status = status
if result:
task.result = result
if status == TaskStatus.COMPLETED:
task.completed_at = datetime.now().isoformat()
self._resolve_dependencies(task_id)
def _resolve_dependencies(self, completed_id: str):
for t in self._tasks.values():
if completed_id in t.blocked_by:
t.blocked_by.remove(completed_id)
if not t.blocked_by and t.status == TaskStatus.BLOCKED:
t.status = TaskStatus.PENDING
def get_tasks(self, owner: str = None, status: TaskStatus = None) -> list[Task]:
with self._lock:
tasks = list(self._tasks.values())
if owner:
tasks = [t for t in tasks if t.owner == owner]
if status:
tasks = [t for t in tasks if t.status == status]
return tasks
def get(self, task_id: str) -> Optional[Task]:
return self._tasks.get(task_id)
def all_completed(self) -> bool:
with self._lock:
return all(
t.status == TaskStatus.COMPLETED
for t in self._tasks.values()
)
def summary_json(self) -> str:
return json.dumps(
[t.to_dict() for t in self._tasks.values()],
indent=2,
)
@dataclass
class Message:
sender: str
recipient: str
content: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
class InboxSystem:
def __init__(self):
self._inboxes: dict[str, list[Message]] = defaultdict(list)
self._lock = threading.Lock()
def send(self, sender: str, recipient: str, content: str):
msg = Message(sender=sender, recipient=recipient, content=content)
with self._lock:
self._inboxes[recipient].append(msg)
def broadcast(self, sender: str, content: str, exclude: list = None):
exclude = set(exclude or [])
exclude.add(sender)
with self._lock:
recipients = [r for r in self._inboxes.keys() if r not in exclude]
for r in recipients:
self.send(sender, r, content)
def receive(self, agent_name: str) -> list[Message]:
with self._lock:
msgs = self._inboxes.pop(agent_name, [])
return msgs
def peek(self, agent_name: str) -> list[Message]:
with self._lock:
return list(self._inboxes.get(agent_name, []))
def register(self, agent_name: str):
with self._lock:
if agent_name not in self._inboxes:
self._inboxes[agent_name] = []
@dataclass
class AgentInfo:
name: str
role: str
status: str = "active"
tasks_completed: int = 0
spawned_at: str = field(default_factory=lambda: datetime.now().isoformat())
class TeamRegistry:
def __init__(self, team_name: str, description: str = ""):
self.team_name = team_name
self.description = description
self.agents: dict[str, AgentInfo] = {}
self._lock = threading.Lock()
def register(self, name: str, role: str):
with self._lock:
self.agents[name] = AgentInfo(name=name, role=role)
def update_status(self, name: str, status: str):
with self._lock:
if name in self.agents:
self.agents[name].status = status
def increment_completed(self, name: str):
with self._lock:
if name in self.agents:
self.agents[name].tasks_completed += 1
We build the three foundational systems that mirror ClawTeam’s filesystem-based architecture: a TaskBoard with full status lifecycle and automatic dependency unblocking, an InboxSystem supporting point-to-point and broadcast messaging between agents, and a TeamRegistry that tracks every agent’s role, status, and completed task count. We implement thread-safe locking across all three systems to allow multiple agents to read and write concurrently. We model tasks with a blocked_by dependency chain that automatically transitions blocked tasks to pending when their prerequisites complete, exactly how ClawTeam’s CLI resolves task dependencies.
Copy CodeCopiedUse a different Browser
SWARM_TOOLS = [
{
"type": "function",
"function": {
"name": "task_update",
"description": "Update the status of a task. Use 'in_progress' when starting, 'completed' when done.",
"parameters": {
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "The task ID"},
"status": {"type": "string", "enum": ["in_progress", "completed", "failed"]},
"result": {"type": "string", "description": "Result or output of the task"},
},
"required": ["task_id", "status"],
},
},
},
{
"type": "function",
"function": {
"name": "inbox_send",
"description": "Send a message to another agent (e.g., 'leader' or a worker name).",
"parameters": {
"type": "object",
"properties": {
"to": {"type": "string", "description": "Recipient agent name"},
"message": {"type": "string", "description": "Message content"},
},
"required": ["to", "message"],
},
},
},
{
"type": "function",
"function": {
"name": "inbox_receive",
"description": "Check and consume all messages in your inbox.",
"parameters": {
"type": "object",
"properties": {},
},
},
},
{
"type": "function",
"function": {
"name": "task_list",
"description": "List tasks assigned to you or all team tasks.",
"parameters": {
"type": "object",
"properties": {
"owner": {"type": "string", "description": "Filter by owner name (optional)"},
},
},
},
},
]
class SwarmAgent:
def __init__(
self,
name: str,
role: str,
system_prompt: str,
task_board: TaskBoard,
inbox: InboxSystem,
registry: TeamRegistry,
):
self.name = name
self.role = role
self.system_prompt = system_prompt
self.task_board = task_board
self.inbox = inbox
self.registry = registry
self.conversation_history: list[dict] = []
self.inbox.register(name)
self.registry.register(name, role)
def _build_system_prompt(self) -> str:
coord_protocol = f"""
Coordination Protocol (auto-injected — you are agent '{self.name}')
You are part of an AI agent swarm. Your role: {self.role}
Your name: {self.name}
Available tools (equivalent to ClawTeam CLI):
- task_list: Check your assigned tasks (like
clawteam task list) - task_update: Update task status to in_progress/completed/failed (like
clawteam task update) - inbox_send: Send messages to other agents (like
clawteam inbox send) - inbox_receive: Check your inbox for messages (like
clawteam inbox receive)
WORKFLOW:
- Check your tasks with task_list
- Mark a task as in_progress when you start
- Do the work (think, analyze, produce output)
- Mark the task as completed with your result
- Send a summary message to 'leader' when done
"""
return self.system_prompt + "\n" + coord_protocol
def _handle_tool_call(self, tool_name: str, args: dict) -> str:
if tool_name == "task_update":
status = TaskStatus(args["status"])
result = args.get("result", "")
self.task_board.update_status(args["task_id"], status, result)
if status == TaskStatus.COMPLETED:
self.registry.increment_completed(self.name)
return json.dumps({"ok": True, "task_id": args["task_id"], "new_status": args["status"]})
elif tool_name == "inbox_send":
self.inbox.send(self.name, args["to"], args["message"])
return json.dumps({"ok": True, "sent_to": args["to"]})
elif tool_name == "inbox_receive":
msgs = self.inbox.receive(self.name)
if not msgs:
return json.dumps({"messages": [], "note": "No new messages"})
return json.dumps({
"messages": [
{"from": m.sender, "content": m.content, "time": m.timestamp}
for m in msgs
]
})
elif tool_name == "task_list":
owner = args.get("owner", self.name)
tasks = self.task_board.get_tasks(owner=owner)
return json.dumps({"tasks": [t.to_dict() for t in tasks]})
return json.dumps({"error": f"Unknown tool: {tool_name}"})
def run(self, user_message: str, max_iterations: int = 6) -> str:
self.conversation_history.append({"role": "user", "content": user_message})
for iteration in range(max_iterations):
try:
response = client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": self._build_system_prompt()},
*self.conversation_history,
],
tools=SWARM_TOOLS,
tool_choice="auto",
temperature=0.4,
)
except Exception as e:
return f"[API Error] {e}"
choice = response.choices[0]
msg = choice.message
assistant_msg = {"role": "assistant", "content": msg.content or ""}
if msg.tool_calls:
assistant_msg["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {"name": tc.function.name, "arguments": tc.function.arguments},
}
for tc in msg.tool_calls
]
self.conversation_history.append(assistant_msg)
if not msg.tool_calls:
return msg.content or "(No response)"
for tc in msg.tool_calls:
fn_name = tc.function.name
fn_args = json.loads(tc.function.arguments)
result = self._handle_tool_call(fn_name, fn_args)
self.conversation_history.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result,
})
return "(Agent reached max iterations)"
We define four OpenAI function-calling tools, task_update, inbox_send, inbox_receive, and task_list, that serve as the agent’s equivalent of ClawTeam’s CLI commands. We construct the SwarmAgent class, which wraps an LLM reasoning loop that allows the agent to call tools multiple times before producing a final answer, mimicking how a real ClawTeam agent iteratively checks tasks, performs work, and reports results. We auto-inject a coordination protocol into every agent’s system prompt, giving it awareness of its name, role, and the exact workflow it must follow, just as ClawTeam injects instructions into every spawned agent.
Copy CodeCopiedUse a different Browser
LEADER_SYSTEM_PROMPT = """You are the LEADER of an AI agent swarm (ClawTeam style).
Your job:
- Receive a high-level goal from the human
- Decompose it into concrete sub-tasks with clear deliverables
- Assign tasks to specialized worker agents
- Monitor progress and synthesize the final result
You think strategically and delegate effectively. You don't do the detail work yourself —
you coordinate the swarm.
When producing the task plan, output valid JSON with this exact structure:
{
"team_description": "Brief description of what this team does",
"tasks": [
{
"subject": "Short task title",
"description": "Detailed instructions for the worker",
"worker_role": "The specialist role needed (e.g., 'Market Research Analyst')",
"worker_name": "Short snake_case name (e.g., 'market_analyst')",
"blocked_by_indices": []
}
]
}
blocked_by_indices is a list of 0-based indices of other tasks that must complete first.
Create 3-5 tasks. Each task should be independently executable by a specialist.
"""
def leader_plan_tasks(goal: str) -> dict:
console.print(Panel(
f"[bold cyan]
image Leader Agent Activated[/bold cyan]\n\n"
f"[white]Goal:[/white] {goal}\n"
f"[dim]Planning task decomposition...[/dim]",
border_style="cyan",
))
response = client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": LEADER_SYSTEM_PROMPT},
{"role": "user", "content": f"Decompose this goal into sub-tasks for a swarm of AI agents:\n\n{goal}"},
],
temperature=0.3,
response_format={"type": "json_object"},
)
plan_text = response.choices[0].message.content
plan = json.loads(plan_text)
return plan
def display_task_board(task_board: TaskBoard, registry: TeamRegistry):
table = Table(title="
image Task Board (Kanban)", show_lines=True, expand=True)
table.add_column("ID", style="dim", width=8)
table.add_column("Subject", style="bold")
table.add_column("Owner", style="cyan")
table.add_column("Status", width=14)
table.add_column("Result Preview", max_width=50)
status_colors = {
TaskStatus.PENDING: "yellow",
TaskStatus.IN_PROGRESS: "blue",
TaskStatus.COMPLETED: "green",
TaskStatus.BLOCKED: "red",
TaskStatus.FAILED: "magenta",
}
for t in task_board.get_tasks():
color = status_colors.get(t.status, "white")
status_str = f"[{color}]●[/{color}] {t.status.value}"
result_preview = (t.result[:47] + "...") if len(t.result) > 50 else t.result
table.add_row(t.id, t.subject, t.owner, status_str, result_preview)
console.print(table)
agent_table = Table(title="
image Agent Roster", show_lines=True)
agent_table.add_column("Agent", style="bold cyan")
agent_table.add_column("Role")
agent_table.add_column("Status")
agent_table.add_column("Tasks Done", justify="center")
for a in registry.agents.values():
status_emoji = {"active": "
image", "idle": "
image", "completed": "
image", "failed": "
image"}.get(a.status, "
image")
agent_table.add_row(a.name, a.role, f"{status_emoji} {a.status}", str(a.tasks_completed))
console.print(agent_table)
def run_swarm(goal: str):
console.print(Panel(
"[bold white on blue]
image ClawTeam Swarm Intelligence — Tutorial Run [/bold white on blue]",
expand=True,
))
console.print(f"\n[bold]Human Goal:[/bold] {goal}\n")
console.rule("[bold cyan]Phase 1: Leader Decomposes Goal into Tasks")
plan = leader_plan_tasks(goal)
console.print(f"\n[bold green]Team:[/bold green] {plan.get('team_description', 'N/A')}")
console.print(f"[bold green]Tasks planned:[/bold green] {len(plan
関連記事
今日のまとめ
AI日報で今日の重要ニュースをまとめ読み