OpenAI のデータベース変更分析(28 分読)
OpenAI Agents SDK を活用した AI 支援データベース変更ワークフローの解説記事は、自然言語から構造化された分析・実装・検証までを自動化する実用的なパターンを示している。
キーポイント
エージェントによる多段階ワークフローの実現
単一のモデル呼び出しに頼らず、要件解析、影響度評価、ロールアウト計画策定、SQL 生成、検証という明確な工程を分割して実行するアーキテクチャを採用している。
RAG とファイル検索の活用
PDF ベースの文書検索(File Search)をコンテキストとして組み込むことで、ドキュメントに基づいた正確な影響分析と根拠のある判断を可能にしている。
安全な実装のためのガバナンス
決定論的なガードレールによる検証やロールバック手順の自動生成、そして再使用可能なアーティファクトの保存により、企業環境での信頼性を確保する仕組みを提示している。
業界横断的な適用可能性
小売業の事例を用いているが、製造、医療、金融など構造化データと運用推論が必要なあらゆる産業で応用可能なドメイン非依存なパターンとして設計されている。
データベース変更の隠れたリスクと構造化アプローチ
データ変更は手渡しプロセスで文脈が失われ、カラム伝播漏れや型不一致などの失敗モードを生むため、SchemaFlow は自然言語リクエストを構造化されたワークフローに変換してリスクを低減します。
専門エージェントと型安全な出力による分離責任
解析、影響分析、ロールアウト計画、SQL 生成に特化したエージェントを使用し、Pydantic モデルで出力を型指定することで、各工程の品質を保証します。
ガードレールと評価可能な設計
段階間の決定論的チェック(ガードレール)で早期に失敗を検出し、最終成果物を JSON として保存して Promptfoo を使用した自動評価を可能にする設計です。
影響分析・編集コメントを表示
影響分析
この記事は、LLM を単なるコード生成ツールとしてではなく、複雑な業務プロセスを管理・推論する「エージェント」として活用する具体的な実装例を提供しています。特に、企業インフラにおけるデータベース変更のような高リスクタスクにおいて、AI の判断根拠を可視化し、安全装置(ガードレール)を組み込むことで、実際の現場導入への障壁を下げる重要な指針となります。
編集コメント
LLM の活用が「コード生成」から「業務プロセスの自律的推論と管理」へと進化していることを示す、非常に質の高い実践例です。特に監査可能性を重視した設計は、企業導入における最大の課題である信頼性確保に直結する内容と言えます。
このクックブックでは、OpenAI Agents SDK を用いた AI 支援型データベース変更ワークフローの全体像を解説します。
これは、OpenAI のツールエコシステムが、現代のエンタープライズインフラストラクチャにわたる複雑でデータ集約型のワークフローをオーケストレーションするためにどのように適用できるかを示すものです。現在の実装は小売業向けのスキーマ変更と影響分析ユースケースに焦点を当てていますが、基盤となるアーキテクチャパターンはドメインに依存せず拡張可能です。このワークフロー設計は、構造化データワークフロー、運用推論、検索強化分析、自動検証が必要とされる製造、製薬、医療、物流、金融、サプライチェーン運営など、あらゆる業界に適応して利用できます。
実行例として取り上げられているのは小売業のロイヤリティティア変更ですが、このパターンは、チームが追跡可能な影響分析とレビュー可能な実装出力を必要とする多くのデータベース変更リクエストにも適用可能です。
ワークフローは自然言語によるデータベース変更要求から始まり、これを構造化 JSON 形式に変換します。必要に応じて PDF ベースのファイル検索コンテキストを用いて影響分析に根拠を持たせ、安全なロールアウトプランを生成し、データプラットフォーム層全体にわたる SQL を草案化します。その後、決定論的なガードレールを用いて出力を検証し、再利用可能なアーティファクトとして保存します。必要に応じて Promptfoo を用いたフロー評価も実施可能です。
このノートブックは意図的に自己完結型となっています:ワークフローの核心となるロジック、プロンプト、ガードレール、アーティファクト生成、および評価ランタイムファイルはすべてノートブックセルから作成されます。
スキーマ変更は、一見すると単純に見えます。「null 許容の列を追加してバックフィルする」といったリクエストは、着陸テーブル、ステージングモデル、次元テーブル、マート、レポートロジック、リンケージの前提条件、検証チェック、ロールバック手順、リリースシーケンシングに影響を及ぼす可能性があります。
例では小売顧客データを使用していますが、依存関係が視覚化しやすいためです。しかし、同様の引き継ぎは多くの分析チームやプラットフォームチームで発生します。
このクックブックは、データベースエンジニアリング作業においてエージェントを変更分析および実装アシスタントとして活用するための実践的なパターンを示しています。1 つのモデルに最終的な SQL スクリプトを直接生成させるのではなく、ワークフローはタスクを明示的なステージに分割します:
- 自然言語のリクエストを構造化された JSON に解析する。
- 影響を受けるオブジェクトと運用上のリスクを分析する。
- プリチェック、ポストチェック、ロールバックガイダンスを含む展開計画を作成する。
- プラットフォーム層全体で SQL を生成する。
- 決定論的な健全性チェックを実行する。
- マシン可読なアーティファクトを保存する。
- 必要に応じて現在のフローに対して Promptfoo 評価(evals)を実行する。
結果は単に生成された SQL スクリプトではありません。解釈された変更リクエスト、影響分析、計画、SQL、検証結果、オプションの RAG エビデンスサマリー、および評価出力を含む監査可能なバンドルです。
データベース変更要求は、多くの場合複数の引き継ぎプロセスを経ます。プロダクトオーナーが要件を説明し、データエンジニアがそれを解釈し、プラットフォームチームがリスクを評価し、アナリティクスエンジニアがフィールドを下流へ伝播させ、レビュー担当者が変更の安全性を確認します。各ステップで重要な文脈が失われる可能性があります。
SchemaFlow は、自由形式の変更要求を構造化され、検証可能なワークフローに変換することでこの課題に対処します。
これは重要です。なぜならデータベース変更は隠れた障害モードを引き起こす可能性があるからです:
- ODS に追加された列が、ステージング、コア、またはマートへ伝播されない場合があります。
- NULL 許容フィールドが誤って NOT NULL として生成される可能性があります。
- 要求で歴史的データの充満を求めているにもかかわらず、バックフィルロジックが省略されることがあります。
- インデックス要件が見落とされる可能性があります。
- リファレンスドキュメントを参照しない限り、下流のレポート依存関係が不明な場合があります。
- 生成された SQL は妥当に見えても、基本的な整合性チェックで失敗する可能性があります。
このクックブックは、段階的なエージェント推論、型付き出力、オプションのリトリーバルコンテキスト、決定論的ガードレール、保存されたアーティファクト、および反復可能な評価を用いてこれらのリスクを低減するためのパターンを示しています。
- 構造化解釈 – 自然言語のデータベース要求を変換し、正規化された change_json 契約に変換します。
- 責任の分離 – パース、影響分析、ロールアウト計画、SQL 生成のために専門的なエージェントを使用します。
- オプションな RAG グラウンディング – 影響分析エージェントが、アップロードされた PDF(IFD、スキーマ仕様、またはリンケージドキュメントなど)に対して File Search を使用できるようにします。
- Typed stage outputs – Pydantic モデルと Agents SDK の出力スキーマを使用して、解析(parse)、影響分析(impact)、計画(plan)の各ステージで構造化された出力を取得します。
- Guardrail-first workflow – 下流のステップが不良状態を消費する前に明らかな失敗を検出できるよう、ステージ間に決定論的なチェックを追加したワークフローです。
- Traceability – エージェント実行、ガードレール、アーティファクト生成、評価(eval)実行に関する OpenAI Agents SDK のトレースとスパンを出力します。
- Portable artifacts – 最終的なワークフローバンドルを JSON 形式で artifacts/notebook_runs/ ディレクトリに保存します。
- Eval-ready design – ライブなノートブックの状態から、Promptfoo プロバイダー、アサーション、設定(config)、および結果ファイル(result files)を生成する設計です。
- No database side effects – 生データベースに対して実行することなく、ドラフト SQL と検証出力を生成します。
このノートブックを終える頃には、以下の成果物を生成する動作中の SchemaFlow パイプラインが完成しています:
- 解析されたデータベース変更リクエスト:
タイトル(title)
ドメイン(domain)
対象スキーマ(target schema)
対象テーブル(target table)
正規化された操作(normalized operations)
注記(notes)
- 影響分析レポート:
影響を受けるテーブル、列、インデックス、ビュー、またはリレーションシップ
リスク(risks)
仮定(assumptions)
オプションの File Search エビデンスサマリー
- ロールアウト計画:
実装ステップ
事前チェック(prechecks)
事後チェック(postchecks)
ロールバックアクション
- 4 つの必須セクションを含むドラフト SQL スクリプト:
LANDING (ODS)
STAGING (STG)
CORE (DIM/FACT/VIEW)
MARTS (SERVING)
- 検証結果:
期待されるテーブルチェック(expected table checks)
期待される列チェック(expected column checks)
ALTER TABLE、UPDATE、CREATE INDEX などの必須キーワードチェック
- 保存された JSON アーティファクト:
変更リクエスト(change request)
影響分析(impact analysis)
- plan
- SQL
- validation
- optional RAG metadata
- A Promptfoo eval harness:
Python provider
- Python assertion file
- generated Promptfoo config
- parse-only eval case
- full-flow eval case
- timestamped JSON and HTML eval reports
このクックブックは、一般的なエンタープライズデータエンジニアリングのシナリオに焦点を当てています。ステークホルダーが自然言語でデータベーススキーマの変更を要求し、データチームがその要求を実装可能な計画に変換する必要があるというケースです。
ここでは小売ドメインは、ワークフローを具体的な形で示すための単なる手段に過ぎません。この段階的なアプローチは、他のソースシステム、データプロダクト、およびレビュープロセスにも適応可能です。
本ノートブックにおけるデフォルトの要求は以下の通りです:
LOYALTY_TIER VARCHAR(20) を ODS.ODS_CUSTOMER_PROFILE に NULL 許容として追加する。
CORE.DIM_CUSTOMER から CUSTOMER_ID および IS_CURRENT=true の条件でバックフィルを行う。
(CUSTOMER_ID, LOYALTY_TIER) に非一意インデックスを追加する。
人間のデータエンジニアが本番用の SQL を記述する前に、通常は以下のいくつかの質問に答える必要があります:
- どのテーブルとスキーマを変更するのか?
- 具体的にどの列、型、および NULL 許容性が要求されたのか?
- 歴史的データのバックフィルが必要か?
- この要求はインデックスの作成を意味しているか?
- どの下流レイヤーにこのフィールドを伝播させる必要があるか?
- レビュー担当者が注意すべきリスクは何らか?
- デプロイ前後に実行すべきチェックは何らか?
- どのようなロールバック手順が妥当か?
- 生成された SQL に必要な要素が含まれているか?
SchemaFlow は、段階的なエージェント・ワークフローとして実装されています。各ステージは、次のステージが消費する型付きの中間出力を作成します。その後、決定論的なチェックによって出力が検証され、ノートブックが最終バンドルを保存し、オプションで評価(evals)を実行します。
高レベルでは、SchemaFlow は以下のシーケンスに従います:

このノートブックは、読者がまずコア・ワークフローを実行し、その後、オプションの Promptfoo 評価セクションを実行するかどうかを判断できるように構成されています。
目次
概念的ガイド
- 概要
- なぜこれが重要なのか
- 主な利点
- 構築するもの
- イントロダクション:ユースケースとソリューション
- ワークフローの概要
- アーキテクチャ - デザイン・パターン
- システム設計
- 実行ワークフロー
ノートブックの実装
- 環境設定
- 入力
- オプションの PDF RAG コンテキスト(注:RAG は Retrieval-Augmented Generation の略)
- ステージ 1-2: 変更要求の解析とインパクト分析
影響ダッシュボードのプレビュー
- ステージ 1-2 の出力ガードレール
- ステージ 3-4: 実行計画と SQL 生成(注:SQL は Structured Query Language の略)
ステージ 3-4 の出力ガードレール
- ステージ 5: 軽量な SQL 健全性チェック
- 最終バンドル
- アーティファクトの保存
アーティファクト生成後の健全性チェック
- オプションのクリーンアップ
Promptfoo 実行前のチェック/ガードレール
- Promptfoo を使用したフローの評価
Promptfoo ランタイムディレクトリ設定
- Node.js および npm ランタイムの確認
- SchemaFlow コア・ランタイムのパブリッシュ
- Promptfoo プロバイダー・ランタイム
- Promptfoo アサーション・ランタイム(注:アサーションは検証や主張の文脈で使われる)
- Promptfoo テストケースと設定の構築
- Promptfoo Eval の実行
- Review Latest Promptfoo Results
Reference
- Notes, Assumptions, and Extension Points
SchemaFlow は、段階的で契約駆動型のエージェントアーキテクチャを採用しています。その目的は、モデルを単一のブラックボックスな SQL 生成器として扱うことを避けることです。代わりに、各ステージには狭い責任範囲があり、出力は検査・検証・追跡・再利用が可能になります。
1. エージェントの専門化
各エージェントは 1 つの主要タスクを実行します:
AgentResponsibilityMain Output
Parse AgentExtract structured fields from the natural-language requestchange_json
Impact AgentIdentify affected objects, assumptions, and risksimpact_json
Plan AgentConvert the change and impact into rollout stepsplan_json
SQL AgentDraft SQL across data platform layerssql_text
この専門化により、ワークフローのデバッグが容易になります。もし SQL に列が含まれていない場合でも、問題が発生したのが解析段階か、影響分析段階か、計画段階か、それとも SQL 生成段階かを検査して特定できます。
2. タイプ付き出力契約
このノートブックでは、構造化された各ステージに対して Pydantic モデルを定義しています:
- ChangeRequestModel
- ImpactModel
- PlanModel
これらのモデルは AgentOutputSchema でラップされており、Agents SDK が期待される出力形状を理解できるようになっています。また、ワークフローでは、モデル呼び出し後の出力を正規化し、下流のステージが実行される前に必要なキーが存在することを確認しています。
3. 検索拡張型影響分析
PDF RAG セクションはオプションです。PDF_PATH が設定されている場合、ノートブックは以下の処理を行います:
- OpenAI ベクトルストアを作成します。
- PDF をアップロードします。
- OpenAI に解析・チャンク化・埋め込み・インデックス作成を任せます。
- Impact Agent に FileSearchTool を提供します。
- 返されたファイル検索結果の要約をキャプチャします。
これは、変更要求が IFD(機能定義)、スキーマドキュメント、系図ファイル、データ契約、またはアーキテクチャ参照に基づいている必要がある場合に有用です。
4. ステージ間のガードレールゲート
ノートブックは主要なステージの後に決定論的なチェックを追加します:
- ステージ 1〜2 のガードレールは、パース結果と影響分析の結果を検証します。
- ステージ 3〜4 のガードレールは、計画の完全性、データ型の伝播、および nullability(null 値の扱い)の処理を検証します。
- ステージ 5 の SQL チェックは、期待されるテーブル、列、および SQL キーワードの存在を検証します。
- アートifact 後のチェックでは、保存された JSON アートファクトが存在し、完全なループ(round-trip)が可能であることを確認します。
- Promptfoo 実行前のチェックでは、ノートブックの状態が評価(evals)に準備できていることを確認します。
これらのチェックは人間のレビューを代替するものではなく、一般的な沈黙の失敗を早期に検出するためのものです。
5. アートファクト中心の実行
最終的なバンドルが主要なワークフローアートファクトとなります。これは、実行のレビューやデバッグに必要な状態をキャプチャします:
bundle = {
"summary": ...,
"rag": ...,
"change_json": ...,
"impact_json": ...,
"plan": ...,
"sql": ...,
"validation": ...
}
ノートブックはこのバンドルを artifacts/notebook_runs/ ディレクトリ下に保存します。
6. ノートブック状態から生成された Eval ランタイム
Promptfoo は別プロセスで実行されるため、アクティブなノートブックカーネルの変数を直接読み取ることはできません。これを解決するため、セクション 10 では、現在のノートブックの状態から再利用可能な小さな Python モジュールと Promptfoo のランタイムファイルを記述します。
これにより、プロンプトの編集、CHANGE_TEXT の編集、およびモデル設定の変更は、評価ファイルが再生成された際に反映されます。
コンポーネントアーキテクチャ

主要なランタイムオブジェクト
| オブジェクト | 作成元 | 目的 |
|---|---|---|
| CHANGE_TEXT | 入力セクション | 自然言語によるデータベース変更リクエスト |
| change_json | ステージ 1 | リクエストの構造化解釈 |
| rag_vector_store_id | オプション PDF RAG セクション | アップロードされた PDF コンテキスト用のホスト型ベクトルストア ID |
| rag_file_search_results | ステージ 2 | インパクトエージェントに返されるファイル検索結果のサマリー |
| impact_json | ステージ 2 | 影響を受けるオブジェクト、リスク、および前提条件 |
| plan_json | ステージ 3 | ロールアウト計画、チェック項目、ロールバックガイダンス |
| sql_text | ステージ 4 | SQL スクリプトのドラフト |
| validation | ステージ 5 | 決定論的な SQL の健全性チェック結果 |
| bundle | 最終バンドルセクション | 統合されたワークフロー出力 |
| out_path | アーティファクト保存セクション | 保存される JSON アーティファクトのパス |
| promptfoo_config | Promptfoo セクション | 生成された評価設定 |
重要な境界条件
SchemaFlow はドラフトの実装アーティファクトを生成します。データベースに対して SQL を実行したり、マイグレーションを適用したり、プルリクエストを開いたり、本番システムを変更したりすることはありません。
ノートブックは順序通りに実行してください。
コアワークフロー
- 環境設定
依存関係のインポート。
- OpenAI Agents SDK のバージョンを確認します。
- OPENAI_API_KEY を読み取ります。
- 追跡とモデル選択を設定します。
- 入力
CHANGE_TEXT を定義します。
- これはコアワークフローに必要な唯一のビジネス入力です。
- オプションの PDF RAG コンテキスト
PDF_PATH を None に設定して、検索機能なしで実行します。
- PDF_PATH にローカルの PDF ファイルを指定すると、影響分析のためのファイル検索コンテキストが有効になります。
- ステージ 1〜2
変更要求を解析します。
- 影響を分析します。
- 必要に応じて、影響分析中にファイル検索を使用できます。
- ステージ 1〜2 のガードレール
解析出力が適切に構造化されていることを確認します。
- 影響分析の出力に対象が含まれていることを確認します。
- 影響を受けるオブジェクトに必要なフィールドが含まれていることを確認します。
- ステージ 3〜4
実行計画を生成します。
- ランディング、ステージング、コア、マートの各レイヤーにわたって SQL を生成します。
- ステージ 3〜4 のガードレール
計画セクションが適切に埋められていることを確認します。
- データ型の伝播を確認します。
- nullability(null 許容性)の動作が要求と一致していることを確認します。
- ステージ 5 SQL 健全性チェック
空の SQL が存在しないか確認します。
- 期待される対象テーブルと列を確認します。
- 要求から推測される必要な SQL アクションを確認します。
- 最終的なバンドルとアーティファクト
完全な出力バンドルを組み立てます。
- JSON 形式で保存します。
- アーティファクトが正常に往復(round-trip)することを確認します。
オプションの評価ワークフロー
- プリプロンプトフューチェック
ノートブックの状態が評価準備完了であることを確認します。
- プロンプトフューランタイム生成
再利用可能な SchemaFlow コアモジュールを作成します。
- プロンプトフュープロバイダーを記述します。
- プロンプトフューアサートファイル(検証ファイル)を記述します。
- プロンプトフューテストケースと設定ファイルを生成します。
- プロンプトフュー評価実行
解析のみを行う評価と、フルフローの評価を実行します。
- タイムスタンプ付きの JSON および HTML レポートを保存します。
- schemaflow_cookbook_eval_latest.* のエイリアスを更新します。
このセクションでは、SchemaFlow ワークフローのためのランタイム環境を準備します。
セットアップセルは以下の処理を行います:
- ノートブック全体で使用される標準的な Python ユーティリティをインポートします。
- OpenAI クライアントをインポートします。
- OpenAI Agents SDK のプリミティブをインポートします:
Agent
- Runner
- RunConfig
- AgentOutputSchema
- FileSearchTool
- 追跡(tracing)およびスパン(span)のヘルパー関数
- インストールされた openai-agents パッケージが最小必要なバージョンを満たしているか確認します。
- 環境変数から OPENAI_API_KEY を読み込み、存在しない場合は入力プロンプトを表示して取得します。
- OPENAI_MODEL でモデルを指定し、デフォルトは gpt-5.5 と設定します。
- すべての関連するエージェント実行とガードレールスパンをグループ化できるよう、トレースグループ ID を作成します。
このデモでは意図的に機密性の高いトレースペイロードを有効にしており、プロンプト、出力、評価バンドル、およびツールのデータがトレース上で確認できるようにしています。本番環境での利用においては、個人データを扱う前にこの設定を見直す必要があります。
%pip install --quiet -U "openai" "openai-agents>=0.17.0"
import os
import json
import re
import uuid
from datetime import datetime, timezone
from getpass import getpass
from importlib.metadata import PackageNotFoundError, version
try:
from openai import OpenAI
except Exception as e:
raise RuntimeError("Install dependency first: pip install -U openai") from e
MIN_AGENTS_SDK_VERSION = "0.17.0"
try:
from agents import (
Agent,
AgentOutputSchema,
FileSearchTool,
Runner,
RunConfig,
custom_span,
flush_traces,
function_span,
guardrail_span,
trace,
)
except Exception as e:
raise RuntimeError(
'Install or upgrade the OpenAI Agents SDK first: pip install -U "openai-agents>=0.17.0"'
) from e
def _version_tuple(value):
match = re.match(r"^(\d+)\.(\d+)\.(\d+)", str(value or ""))
return tuple(int(part) for part in match.groups()) if match else (0, 0, 0)
try:
AGENTS_SDK_VERSION = version("openai-agents")
except PackageNotFoundError as e:
raise RuntimeError('Install the OpenAI Agents SDK first: pip install -U "openai-agents>=0.17.0"') from e
if _version_tuple(AGENTS_SDK_VERSION) < _version_tuple(MIN_AGENTS_SDK_VERSION):
raise RuntimeError(
f'OpenAI Agents SDK {MIN_AGENTS_SDK_VERSION}+ is required; found {AGENTS_SDK_VERSION}. '
'Upgrade with: pip install -U "openai-agents>=0.17.0"'
)
def _clean_openai_api_key(value):
key = (value or "").strip()
if not key:
raise RuntimeError("OPENAI_API_KEY is required.")
return key
if not os.getenv("OPENAI_API_KEY", "").strip():
os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key: ")
os.environ["OPENAI_API_KEY"] = _clean_openai_api_key(os.getenv("OPENAI_API_KEY"))
OPENAI_ORG_ID = os.getenv("OPENAI_ORG_ID", "").strip()
if OPENAI_ORG_ID:
os.environ["OPENAI_ORG_ID"] = OPENAI_ORG_ID
MODEL = os.getenv("OPENAI_MODEL", "gpt-5.5")
TRACE_INCLUDE_SENSITIVE_DATA = os.getenv("OPENAI_AGENTS_TRACE_INCLUDE_SENSITIVE_DATA", "false").lower() in {"1", "true", "yes", "on"}
os.environ["OPENAI_AGENTS_TRACE_INCLUDE_SENSITIVE_DATA"] = "true" if TRACE_INCLUDE_SENSITIVE_DATA else "false"
SCHEMAFLOW_TRACE_GROUP_ID = os.getenv("SCHEMAFLOW_TRACE_GROUP_ID") or (
"schemaflow-cookbook-" + datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + "-" + uuid.uuid4().hex[:8]
)
os.environ["SCHEMAFLOW_TRACE_GROUP_ID"] = SCHEMAFLOW_TRACE_GROUP_ID
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
print("Using model:", MODEL)
print("OpenAI Agents SDK:", AGENTS_SDK_VERSION)
print("OpenAI organization:", os.getenv("OPENAI_ORG_ID") or "(default for API key)")
print("Trace group:", SCHEMAFLOW_TRACE_GROUP_ID)
print("Trace payloads include prompts/outputs:", TRACE_INCLUDE_SENSITIVE_DATA)
from concurrent.futures import ThreadPoolExecutor
from pydantic import BaseModel, ConfigDict, Field
class SchemaFlowBaseModel(BaseModel):
model_config = ConfigDict(extra="allow")
class OperationModel(SchemaFlowBaseModel):
op: str
details: dict = Field(default_factory=dict)
class ChangeRequestModel(SchemaFlowBaseModel):
domain: str | None = None
target_schema: str | None = None
target_table: str | None = None
operations: list[OperationModel] = Field(default_factory=list)
notes: list = Field(default_factory=list)
class ImpactObjectModel(SchemaFlowBaseModel):
type: str
name: str
reason: str
source: str
class ImpactModel(SchemaFlowBaseModel):
impacted_objects: list[ImpactObjectModel] = Field(default_factory=list)
risks: list[str] = Field(default_factory=list)
assumptions: list[str] = Field(default_factory=list)
class PlanStepModel(SchemaFlowBaseModel):
id: str
description: str
class PlanModel(SchemaFlowBaseModel):
plan_steps: list[PlanStepModel] = Field(default_factory=list)
prechecks: list[str] = Field(default_factory=list)
postchecks: list[str] = Field(default_factory=list)
rollback: list[str] = Field(default_factory=list)
CHANGE_OUTPUT_SCHEMA = AgentOutputSchema(ChangeRequestModel, strict_json_schema=False)
IMPACT_OUTPUT_SCHEMA = AgentOutputSchema(ImpactModel, strict_json_schema=False)
PLAN_OUTPUT_SCHEMA = AgentOutputSchema(PlanModel, strict_json_schema=False)
必ず JSON 形式で返してください。translation フィールドのみ。他のフィールド (technical_terms 等) は一切追加しないこと — 余計なフィールドを書こうとして本文翻訳がトークン上限で打ち切られる事故を防ぐため:
{"translation": "翻訳全文"}
def _parse_json_text(text: str):
text = (text or "{}").strip()
if text.startswith("`"):
text = re.sub(r"^`(?:json)?\s*", "", text)
text = re.sub(r"\s*`$", "", text).strip()
try:
return json.loads(text)
except json.JSONDecodeError:
match = re.search(r"\{.*\}", text, flags=re.DOTALL)
if not match:
raise
return json.loads(match.group(0))
def _model_dump(value):
if value is None or isinstance(value, (str, int, float, bool, bytes)):
return value
if isinstance(value, type):
return value
if hasattr(value, "model_dump"):
try:
return value.model_dump()
except TypeError:
pass
if hasattr(value, "to_dict"):
try:
return value.to_dict()
except TypeError:
pass
if hasattr(value, "__dict__"):
try:
return {k: v for k, v in vars(value).items() if not k.startswith("_")}
except TypeError:
pass
return value
def _agent_output_to_json(value):
value = _model_dump(value)
if isinstance(value, dict):
return value
if isinstance(value, str):
return _parse_json_text(value)
return json.loads(json.dumps(value, default=str))
def _agent_output_to_text(value):
value = _model_dump(value)
if isinstance(value, str):
return value.strip()
return json.dumps(value, ensure_ascii=False)
def _trace_metadata(metadata: dict | None = None):
cleaned = {}
for key, value in (metadata or {}).items():
if value is None:
cleaned[str(key)] = ""
elif isinstance(value, bool):
cleaned[str(key)] = "true" if value else "false"
elif isinstance(value, (dict, list, tuple, set)):
cleaned[str(key)] = json.dumps(value, ensure_ascii=False, default=str)
else:
cleaned[str(key)] = str(value)
return cleaned
def _schemaflow_run_config(workflow_name: str, metadata: dict | None = None):
return RunConfig(
workflow_name=workflow_name,
group_id=SCHEMAFLOW_TRACE_GROUP_ID,
trace_include_sensitive_data=TRACE_INCLUDE_SENSITIVE_DATA,
trace_metadata=_trace_metadata({"notebook": "schemaflow_cookbook", **(metadata or {})}),
)
def _runner_run_sync(agent, prompt: str, *, workflow_name: str, metadata: dict | None = None, max_turns: int = 4):
kwargs = {"run_config": _schemaflow_run_config(workflow_name, metadata), "max_turns": max_turns}
try:
return Runner.run_sync(agent, prompt, **kwargs)
except RuntimeError as exc:
if "event loop" not in str(exc).lower():
raise
with ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(lambda: Runner.run_sync(agent, prompt, **kwargs)).result()
def run_schemaflow_json_agent(*, name, instructions, prompt, output_schema, model=MODEL, tools=None, workflow_name=None, metadata=None):
agent = Agent(name=name, instructions=instructions, model=model, output_type=output_schema, tools=tools or [])
result = _runner_run_sync(agent, prompt, workflow_name=workflow_name or name, metadata={"agent": name, **(metadata or {})})
return _agent_output_to_json(result.final_output), result
def run_schemaflow_text_agent(*, name, instructions, prompt, model=MODEL, tools=None, workflow_name=None, metadata=None):
agent = Agent(name=name, instructions=instructions, model=model, tools=tools or [])
result = _runner_run_sync(agent, prompt, workflow_name=workflow_name or name, metadata={"agent": name, **(metadata or {})})
return _agent_output_to_text(result.final_output), result
def _collect_file_search_results(value):
results = []
seen = set()
def visit(node):
if node is None or isinstance(node, (str, int, float, bool, bytes)):
return
if isinstance(node, type) or callable(node):
return
node_id = id(node)
if node_id in seen:
return
seen.add(node_id)
node = _model_dump(node)
if node is None or isinstance(node, (str, int, float, bool, bytes)):
return
if isinstance(node, type) or callable(node):
return
必ず JSON 形式で返してください。translation フィールドのみ。他のフィールド(technical_terms 等)は一切追加しないこと — 余計なフィールドを書こうとして本文翻訳がトークン上限で打ち切られる事故を防ぐため:
{"translation": "翻訳全文"}
ノードが辞書型の場合:
ノードの"type"が"file_search_call"である場合、
結果リスト(存在しない場合は空リスト)を順に処理し、
各結果をモデル形式に変換します。
変換後の結果が辞書型であれば、
"text"または"content"フィールドからテキストを取得し、存在しない場合は空文字列とします。
テキストがリスト型の場合、改行で結合して文字列化します。
結果オブジェクトに"file_id"、"filename"("file_name"または"title"を優先)、"score"、およびテキストプレビュー(先頭1200文字)を追加して結果リストに格納します。
ノードのすべての子要素に対して再帰的に処理を行います。
elif ノードがリスト、タプル、セット型のいずれかの場合:
各要素に対して再帰的に処理を行います。
最終値に対して上記の訪問関数を呼び出します。
原文を表示
This cookbook walks through an end-to-end AI-assisted database change workflow using the OpenAI Agents SDK.
It demonstrates how OpenAI’s tooling ecosystem can be applied to orchestrate complex, data-intensive workflows across modern enterprise infrastructures. While the current implementation focuses on a retail-oriented schema change and impact-analysis use case, the underlying architectural patterns are domain-agnostic and extensible. The same workflow design can be adapted across industries such as manufacturing, pharmaceuticals, healthcare, logistics, finance, and supply chain operations — wherever structured data workflows, operational reasoning, retrieval-augmented analysis, and automated validation are required.
The running example is a retail loyalty-tier change, but the same pattern applies to many database-change requests where teams need traceable impact analysis and reviewable implementation output.
The workflow starts from a natural-language database change request, converts it into structured JSON, optionally grounds impact analysis with PDF-based File Search context, generates a safe rollout plan, drafts SQL across data platform layers, validates the output with deterministic guardrails, saves a reusable artifact, and optionally evaluates the flow with Promptfoo.
The notebook is intentionally self-contained: all core workflow logic, prompts, guardrails, artifact generation, and eval runtime files are created from notebook cells.
Schema changes are deceptively simple. A request like “add a nullable column and backfill it” can affect landing tables, staging models, dimensional tables, marts, reporting logic, lineage assumptions, validation checks, rollback procedures, and release sequencing.
The examples use retail customer data because the dependencies are easy to see, but the same kinds of handoffs show up in many analytics and platform teams.
This cookbook demonstrates a practical pattern for using agents as a change-analysis and implementation assistant for database engineering work. Instead of asking one model to produce a final SQL script directly, the workflow breaks the task into explicit stages:
- Parse the natural-language request into structured JSON.
- Analyze impacted objects and operational risks.
- Create a rollout plan with prechecks, postchecks, and rollback guidance.
- Generate SQL across platform layers.
- Run deterministic sanity checks.
- Save a machine-readable artifact.
- Optionally run Promptfoo evals against the current flow.
The result is not just a generated SQL script. It is an auditable bundle containing the interpreted change request, impact analysis, plan, SQL, validation results, optional RAG evidence summaries, and eval outputs.
Database change requests often move through several handoffs: product owners describe the need, data engineers interpret it, platform teams assess risk, analytics engineers propagate the field downstream, and reviewers check whether the change is safe. Important context can be lost at each step.
SchemaFlow addresses this by turning a free-form change request into a structured, inspectable workflow.
This matters because database changes can create hidden failure modes:
- A column added to ODS may not be propagated into staging, core, or marts.
- A nullable field may accidentally be generated as NOT NULL.
- Backfill logic may be omitted even though the request asks for historical population.
- Index requirements may be missed.
- Downstream reporting dependencies may be unknown unless reference documentation is consulted.
- Generated SQL may look plausible but fail basic consistency checks.
This cookbook shows a pattern for reducing those risks with staged agent reasoning, typed outputs, optional retrieval context, deterministic guardrails, saved artifacts, and repeatable evals.
- Structured interpretation – Converts natural-language database requests into a normalized change_json contract.
- Separation of responsibilities – Uses specialized agents for parse, impact analysis, rollout planning, and SQL generation.
- Optional RAG grounding – Lets the impact-analysis agent use File Search over an uploaded PDF, such as an IFD, schema spec, or lineage document.
- Typed stage outputs – Uses Pydantic models and Agents SDK output schemas for parse, impact, and plan stages.
- Guardrail-first workflow – Adds deterministic checks between stages so obvious failures are caught before downstream steps consume bad state.
- Traceability – Emits OpenAI Agents SDK traces and spans for agent runs, guardrails, artifact generation, and eval execution.
- Portable artifacts – Saves the final workflow bundle as JSON under artifacts/notebook_runs/.
- Eval-ready design – Generates Promptfoo provider, assertion, config, and result files from the live notebook state.
- No database side effects – Produces draft SQL and validation output without executing against a live database.
By the end of this notebook, you will have a working SchemaFlow pipeline that produces:
- A parsed database change request:
title
- domain
- target schema
- target table
- normalized operations
- notes
- An impact-analysis report:
impacted tables, columns, indexes, views, or relationships
- risks
- assumptions
- optional File Search evidence summaries
- A rollout plan:
implementation steps
- prechecks
- postchecks
- rollback actions
- A draft SQL script with four required sections:
LANDING (ODS)
- STAGING (STG)
- CORE (DIM/FACT/VIEW)
- MARTS (SERVING)
- A validation result:
expected table checks
- expected column checks
- required keyword checks such as ALTER TABLE, UPDATE, or CREATE INDEX
- A saved JSON artifact:
change request
- impact analysis
- plan
- SQL
- validation
- optional RAG metadata
- A Promptfoo eval harness:
Python provider
- Python assertion file
- generated Promptfoo config
- parse-only eval case
- full-flow eval case
- timestamped JSON and HTML eval reports
This cookbook focuses on a common enterprise data-engineering scenario: a stakeholder requests a database schema change in natural language, and the data team needs to turn that request into an implementation-ready plan.
Here, the retail domain is just a concrete way to make the workflow tangible. The same staged approach can be adapted to other source systems, data products, and review processes.
The default request in this notebook is:
Add LOYALTY_TIER VARCHAR(20) to ODS.ODS_CUSTOMER_PROFILE as nullable.
Backfill from CORE.DIM_CUSTOMER on CUSTOMER_ID where IS_CURRENT=true.
Add a non-unique index on (CUSTOMER_ID, LOYALTY_TIER).A human data engineer would typically need to answer several questions before writing production SQL:
- What table and schema are being changed?
- What exact column, type, and nullability were requested?
- Is historical backfill required?
- Does the request imply an index?
- Which downstream layers need the field propagated?
- What risks should reviewers look for?
- What checks should be run before and after deployment?
- What rollback steps are reasonable?
- Does the generated SQL include the required elements?
SchemaFlow implements this as a staged agent workflow. Each stage creates a typed intermediate output that the next stage consumes. Deterministic checks then validate the outputs before the notebook saves the final bundle and optionally runs evals.
At a high level, SchemaFlow follows this sequence:

The notebook is organized so readers can run the core workflow first and then decide whether they want to run the optional Promptfoo evaluation section.
Table of Contents
Conceptual Guide
- Overview
- Why This Matters
- Key Benefits
- What You’ll Build
- Introduction: Use Case and Solution
- Workflow Overview
- Architecture - Design Patterns
- System Design
- Execution Workflow
Notebook Implementation
- Environment Setup
- Input
- Optional PDF RAG Context
- Stages 1-2: Parse Change Request + Impact Analysis
Impact Dashboard Preview
- Stages 1-2 Output Guardrails
- Stages 3-4: Execution Plan + SQL Generation
Stages 3-4 Output Guardrails
- Stage 5: Lightweight SQL Sanity Checks
- Final Bundle
- Save Artifact
Post-Artifact Generation Sanity Check
- Optional Cleanup
Pre-Promptfoo Checks / Guardrails
- Evaluate the Flow with Promptfoo
Promptfoo Runtime Directory Setup
- Node.js and npm Runtime Check
- Publish SchemaFlow Core Runtime
- Promptfoo Provider Runtime
- Promptfoo Assertion Runtime
- Build Promptfoo Test Cases and Config
- Run Promptfoo Eval
- Review Latest Promptfoo Results
Reference
- Notes, Assumptions, and Extension Points
SchemaFlow uses a staged, contract-driven agent architecture. The goal is to avoid treating the model as a single black-box SQL generator. Instead, each stage has a narrow responsibility and produces an output that can be inspected, validated, traced, and reused.
1. Agent Specialization
Each agent performs one primary task:
AgentResponsibilityMain Output
Parse AgentExtract structured fields from the natural-language requestchange_json
Impact AgentIdentify affected objects, assumptions, and risksimpact_json
Plan AgentConvert the change and impact into rollout stepsplan_json
SQL AgentDraft SQL across data platform layerssql_text
This specialization makes the workflow easier to debug. If SQL is missing a column, you can inspect whether the issue started in parsing, impact analysis, planning, or SQL generation.
2. Typed Output Contracts
The notebook defines Pydantic models for the structured stages:
- ChangeRequestModel
- ImpactModel
- PlanModel
Those models are wrapped with AgentOutputSchema so the Agents SDK knows the expected output shape. The workflow also normalizes outputs after model calls to ensure expected keys exist before downstream stages run.
3. Retrieval-Augmented Impact Analysis
The PDF RAG section is optional. When PDF_PATH is set, the notebook:
- Creates an OpenAI vector store.
- Uploads the PDF.
- Lets OpenAI parse, chunk, embed, and index it.
- Gives the Impact Agent a FileSearchTool.
- Captures a summary of returned File Search results.
This is useful when the change request needs grounding in an IFD, schema document, lineage file, data contract, or architecture reference.
4. Guardrail Gates Between Stages
The notebook adds deterministic checks after major stages:
- Stages 1-2 guardrails validate parse and impact outputs.
- Stages 3-4 guardrails validate plan completeness, data type propagation, and nullability handling.
- Stage 5 SQL checks validate expected table, column, and SQL keyword presence.
- Post-artifact checks verify the saved JSON artifact exists and round-trips.
- Pre-Promptfoo checks verify the notebook state is ready for evals.
These checks do not replace human review, but they catch common silent failures early.
5. Artifact-Centered Execution
The final bundle is the main workflow artifact. It captures the state needed to review or debug the run:
bundle = {
"summary": ...,
"rag": ...,
"change_json": ...,
"impact_json": ...,
"plan": ...,
"sql": ...,
"validation": ...
}The notebook saves this bundle under artifacts/notebook_runs/.
6. Eval Runtime Generated from Notebook State
Promptfoo runs in a separate process, so it cannot directly read variables from the active notebook kernel. To solve this, Section 10 writes a small reusable Python module and Promptfoo runtime files from the current notebook state.
This ensures that prompt edits, CHANGE_TEXT edits, and model configuration changes are reflected when the eval files are regenerated.
Component Architecture

Primary Runtime Objects
ObjectCreated inPurpose
CHANGE_TEXTInput sectionThe natural-language database change request
change_jsonStage 1Structured interpretation of the request
rag_vector_store_idOptional PDF RAG sectionHosted vector store ID for uploaded PDF context
rag_file_search_resultsStage 2Summary of File Search results returned to the Impact Agent
impact_jsonStage 2Impacted objects, risks, and assumptions
plan_jsonStage 3Rollout plan, checks, and rollback guidance
sql_textStage 4Draft SQL script
validationStage 5Deterministic SQL sanity-check result
bundleFinal Bundle sectionConsolidated workflow output
out_pathSave Artifact sectionSaved JSON artifact path
promptfoo_configPromptfoo sectionGenerated eval configuration
Important Boundary
SchemaFlow generates draft implementation artifacts. It does not execute SQL against a database, apply migrations, open pull requests, or modify production systems.
Run the notebook in order.
Core Workflow
- Environment Setup
Imports dependencies.
- Verifies the OpenAI Agents SDK version.
- Reads OPENAI_API_KEY.
- Configures tracing and model selection.
- Input
Defines CHANGE_TEXT.
- This is the only required business input for the core workflow.
- Optional PDF RAG Context
Leave PDF_PATH = None to run without retrieval.
- Set PDF_PATH to a local PDF to enable File Search context for impact analysis.
- Stages 1-2
Parse the change request.
- Analyze impact.
- Optionally use File Search during impact analysis.
- Stages 1-2 Guardrails
Confirm parse output is well-formed.
- Confirm impact output includes the target.
- Confirm impacted objects contain required fields.
- Stages 3-4
Generate an execution plan.
- Generate SQL across landing, staging, core, and mart layers.
- Stages 3-4 Guardrails
Confirm plan sections are populated.
- Confirm data type propagation.
- Confirm nullability behavior matches the request.
- Stage 5 SQL Sanity Checks
Check for empty SQL.
- Check expected target table and columns.
- Check required SQL actions implied by the request.
- Final Bundle and Artifact
Assemble the full output bundle.
- Save it as JSON.
- Verify the artifact round-trips successfully.
Optional Eval Workflow
- Pre-Promptfoo Checks
Confirm the notebook state is ready for evals.
- Promptfoo Runtime Generation
Create a reusable SchemaFlow core module.
- Write a Promptfoo provider.
- Write a Promptfoo assertion file.
- Generate Promptfoo test cases and config.
- Promptfoo Eval Execution
Run parse-only and full-flow evals.
- Save timestamped JSON and HTML reports.
- Refresh schemaflow_cookbook_eval_latest.* aliases.
This section prepares the runtime for the SchemaFlow workflow.
The setup cell does the following:
- Imports standard Python utilities used throughout the notebook.
- Imports the OpenAI client.
- Imports the OpenAI Agents SDK primitives:
Agent
- Runner
- RunConfig
- AgentOutputSchema
- FileSearchTool
- tracing and span helpers
- Verifies that the installed openai-agents package meets the minimum required version.
- Reads OPENAI_API_KEY from the environment or prompts for it.
- Sets the model with OPENAI_MODEL, defaulting to gpt-5.5.
- Creates a trace group ID so all related agent runs and guardrail spans can be grouped together.
The workflow intentionally enables sensitive trace payloads for this demo so prompts, outputs, eval bundles, and tool data are visible in traces. For production usage, review this setting before handling private data.
%pip install --quiet -U "openai" "openai-agents>=0.17.0"import os
import json
import re
import uuid
from datetime import datetime, timezone
from getpass import getpass
from importlib.metadata import PackageNotFoundError, version
try:
from openai import OpenAI
except Exception as e:
raise RuntimeError("Install dependency first: pip install -U openai") from e
MIN_AGENTS_SDK_VERSION = "0.17.0"
try:
from agents import (
Agent,
AgentOutputSchema,
FileSearchTool,
Runner,
RunConfig,
custom_span,
flush_traces,
function_span,
guardrail_span,
trace,
)
except Exception as e:
raise RuntimeError(
'Install or upgrade the OpenAI Agents SDK first: pip install -U "openai-agents>=0.17.0"'
) from e
def _version_tuple(value):
match = re.match(r"^(\d+)\.(\d+)\.(\d+)", str(value or ""))
return tuple(int(part) for part in match.groups()) if match else (0, 0, 0)
try:
AGENTS_SDK_VERSION = version("openai-agents")
except PackageNotFoundError as e:
raise RuntimeError('Install the OpenAI Agents SDK first: pip install -U "openai-agents>=0.17.0"') from e
if _version_tuple(AGENTS_SDK_VERSION) < _version_tuple(MIN_AGENTS_SDK_VERSION):
raise RuntimeError(
f'OpenAI Agents SDK {MIN_AGENTS_SDK_VERSION}+ is required; found {AGENTS_SDK_VERSION}. '
'Upgrade with: pip install -U "openai-agents>=0.17.0"'
)
def _clean_openai_api_key(value):
key = (value or "").strip()
if not key:
raise RuntimeError("OPENAI_API_KEY is required.")
return key
if not os.getenv("OPENAI_API_KEY", "").strip():
os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key: ")
os.environ["OPENAI_API_KEY"] = _clean_openai_api_key(os.getenv("OPENAI_API_KEY"))
OPENAI_ORG_ID = os.getenv("OPENAI_ORG_ID", "").strip()
if OPENAI_ORG_ID:
os.environ["OPENAI_ORG_ID"] = OPENAI_ORG_ID
MODEL = os.getenv("OPENAI_MODEL", "gpt-5.5")
TRACE_INCLUDE_SENSITIVE_DATA = os.getenv("OPENAI_AGENTS_TRACE_INCLUDE_SENSITIVE_DATA", "false").lower() in {"1", "true", "yes", "on"}
os.environ["OPENAI_AGENTS_TRACE_INCLUDE_SENSITIVE_DATA"] = "true" if TRACE_INCLUDE_SENSITIVE_DATA else "false"
SCHEMAFLOW_TRACE_GROUP_ID = os.getenv("SCHEMAFLOW_TRACE_GROUP_ID") or (
"schemaflow-cookbook-" + datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + "-" + uuid.uuid4().hex[:8]
)
os.environ["SCHEMAFLOW_TRACE_GROUP_ID"] = SCHEMAFLOW_TRACE_GROUP_ID
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
print("Using model:", MODEL)
print("OpenAI Agents SDK:", AGENTS_SDK_VERSION)
print("OpenAI organization:", os.getenv("OPENAI_ORG_ID") or "(default for API key)")
print("Trace group:", SCHEMAFLOW_TRACE_GROUP_ID)
print("Trace payloads include prompts/outputs:", TRACE_INCLUDE_SENSITIVE_DATA)from concurrent.futures import ThreadPoolExecutor
from pydantic import BaseModel, ConfigDict, Field
class SchemaFlowBaseModel(BaseModel):
model_config = ConfigDict(extra="allow")
class OperationModel(SchemaFlowBaseModel):
op: str
details: dict = Field(default_factory=dict)
class ChangeRequestModel(SchemaFlowBaseModel):
title: str | None = None
domain: str | None = None
target_schema: str | None = None
target_table: str | None = None
operations: list[OperationModel] = Field(default_factory=list)
notes: list = Field(default_factory=list)
class ImpactObjectModel(SchemaFlowBaseModel):
type: str
name: str
reason: str
source: str
class ImpactModel(SchemaFlowBaseModel):
impacted_objects: list[ImpactObjectModel] = Field(default_factory=list)
risks: list[str] = Field(default_factory=list)
assumptions: list[str] = Field(default_factory=list)
class PlanStepModel(SchemaFlowBaseModel):
id: str
description: str
class PlanModel(SchemaFlowBaseModel):
plan_steps: list[PlanStepModel] = Field(default_factory=list)
prechecks: list[str] = Field(default_factory=list)
postchecks: list[str] = Field(default_factory=list)
rollback: list[str] = Field(default_factory=list)
CHANGE_OUTPUT_SCHEMA = AgentOutputSchema(ChangeRequestModel, strict_json_schema=False)
IMPACT_OUTPUT_SCHEMA = AgentOutputSchema(ImpactModel, strict_json_schema=False)
PLAN_OUTPUT_SCHEMA = AgentOutputSchema(PlanModel, strict_json_schema=False)
def _parse_json_text(text: str):
text = (text or "{}").strip()
if text.startswith("`"</span><span>):</span></span>
<span><span> text </span><span>=</span><span> re.sub(</span><span>r</span><span>"</span><span>^</span><span>`(?:json)?\s*", "", text)
text = re.sub(r"\s*`$", "", text).strip()
try:
return json.loads(text)
except json.JSONDecodeError:
match = re.search(r"\{.*\}", text, flags=re.DOTALL)
if not match:
raise
return json.loads(match.group(0))
def _model_dump(value):
if value is None or isinstance(value, (str, int, float, bool, bytes)):
return value
if isinstance(value, type):
return value
if hasattr(value, "model_dump"):
try:
return value.model_dump()
except TypeError:
pass
if hasattr(value, "to_dict"):
try:
return value.to_dict()
except TypeError:
pass
if hasattr(value, "__dict__"):
try:
return {k: v for k, v in vars(value).items() if not k.startswith("_")}
except TypeError:
pass
return value
def _agent_output_to_json(value):
value = _model_dump(value)
if isinstance(value, dict):
return value
if isinstance(value, str):
return _parse_json_text(value)
return json.loads(json.dumps(value, default=str))
def _agent_output_to_text(value):
value = _model_dump(value)
if isinstance(value, str):
return value.strip()
return json.dumps(value, ensure_ascii=False)
def _trace_metadata(metadata: dict | None = None):
cleaned = {}
for key, value in (metadata or {}).items():
if value is None:
cleaned[str(key)] = ""
elif isinstance(value, bool):
cleaned[str(key)] = "true" if value else "false"
elif isinstance(value, (dict, list, tuple, set)):
cleaned[str(key)] = json.dumps(value, ensure_ascii=False, default=str)
else:
cleaned[str(key)] = str(value)
return cleaned
def _schemaflow_run_config(workflow_name: str, metadata: dict | None = None):
return RunConfig(
workflow_name=workflow_name,
group_id=SCHEMAFLOW_TRACE_GROUP_ID,
trace_include_sensitive_data=TRACE_INCLUDE_SENSITIVE_DATA,
trace_metadata=_trace_metadata({"notebook": "schemaflow_cookbook", **(metadata or {})}),
)
def _runner_run_sync(agent, prompt: str, *, workflow_name: str, metadata: dict | None = None, max_turns: int = 4):
kwargs = {"run_config": _schemaflow_run_config(workflow_name, metadata), "max_turns": max_turns}
try:
return Runner.run_sync(agent, prompt, **kwargs)
except RuntimeError as exc:
if "event loop" not in str(exc).lower():
raise
with ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(lambda: Runner.run_sync(agent, prompt, **kwargs)).result()
def run_schemaflow_json_agent(*, name, instructions, prompt, output_schema, model=MODEL, tools=None, workflow_name=None, metadata=None):
agent = Agent(name=name, instructions=instructions, model=model, output_type=output_schema, tools=tools or [])
result = _runner_run_sync(agent, prompt, workflow_name=workflow_name or name, metadata={"agent": name, **(metadata or {})})
return _agent_output_to_json(result.final_output), result
def run_schemaflow_text_agent(*, name, instructions, prompt, model=MODEL, tools=None, workflow_name=None, metadata=None):
agent = Agent(name=name, instructions=instructions, model=model, tools=tools or [])
result = _runner_run_sync(agent, prompt, workflow_name=workflow_name or name, metadata={"agent": name, **(metadata or {})})
return _agent_output_to_text(result.final_output), result
def _collect_file_search_results(value):
results = []
seen = set()
def visit(node):
if node is None or isinstance(node, (str, int, float, bool, bytes)):
return
if isinstance(node, type) or callable(node):
return
node_id = id(node)
if node_id in seen:
return
seen.add(node_id)
node = _model_dump(node)
if node is None or isinstance(node, (str, int, float, bool, bytes)):
return
if isinstance(node, type) or callable(node):
return
if isinstance(node, dict):
if node.get("type") == "file_search_call":
for result in node.get("results", []) or []:
result = _model_dump(result)
if isinstance(result, dict):
text = result.get("text") or result.get("content") or ""
if isinstance(text, list):
text = "\n".join(str(x) for x in text)
results.append({"file_id": result.get("file_id"), "filename": result.get("filename") or result.get("file_name") or result.get("title"), "score": result.get("score"), "text_preview": str(text)[:1200]})
for child in node.values():
visit(child)
elif isinstance(node, (list, tuple, set)):
for child in node:
visit(child)
visit(val
関連記事
Google NotebookLM に Gemini 3.5 と Antigravity が登場
Google は生成 AI ツール「NotebookLM」を大幅に更新し、最新モデル「Gemini 3.5」への移行、対応ファイル形式の拡大、Web ソース統合の簡素化を実施した。また、クエリ処理能力向上のため「Antigravity」機能を組み込んだと発表した。
NotebookLM の Gemini 3.5 アップグレードでクラウドコンピューターと情報源の検索機能が追加
Google がノートアプリ「NotebookLM」をアップデートし、新モデル「Gemini 3.5」を採用して情報の精度を向上させるとともに、クラウドコンピューター機能や情報源の探索機能を新たに追加した。
Google Research、Gemini Enterprise Agent Platform にアジェンティック RAG と多段クエリ対応の充分文脈エージェントを追加
Google Research チームは、企業検索の課題である複数ソース・多段クエリへの対応を目的とした新しいアジェンティック RAG 枠組みを Gemini Enterprise Agent Platform に実装し、クロスコーパス検索機能を公開プレビューとして提供開始した。
今日のまとめ
AI日報で今日の重要ニュースをまとめ読み