CoeFontの機械学習推論を支える技術
株式会社 CoeFont は、MLOps の重要性を踏まえ、FastAPI の非同期処理を活用して機械学習推論のレスポンス速度と信頼性を向上させる具体的な技術実装と課題解決策を公開した。
キーポイント
MLOps における周辺システムの重要性
モデル自体よりもインフラや連携コードなど周辺システムが占める割合が大きく、これらを効率化・自動化する MLOps の役割が本番環境の安定性に直結すると指摘している。
FastAPI 非同期処理によるパフォーマンス向上
I/O バウンドな処理において、従来のスレッドベースの同期処理よりも低負荷なイベントループを用いた非同期処理(async/await)を採用し、レスポンス速度を劇的に改善する手法を解説している。
非同期環境下での同期コードのリスクと対策
非同期エンドポイント内で同期ライブラリ(例:boto3 の通常版)を使用するとイベントループがブロックされるため、aiohttp や aiboto3 などの非同期対応ライブラリへの切り替えやスレッドプールの活用が必要であると警告している。
同期処理の非同期化によるパフォーマンス最適化
重い同期処理をスレッドプールにオフロードして非同期実行することで、ブロッキングを防ぎつつ FastAPI の性能向上を図る手法が紹介されています。
Server Sent Event(SSE)を活用したストリーミング推論
推論完了を待たずに逐次的に結果を送信する SSE を採用することで、ユーザーエクスペリエンスの向上とリソース効率の改善を実現しています。
SSE におけるエラーハンドリングの実装パターン
接続確立後の通信エラーを HTTP ステータスコードではなく、JSON 形式のイベントデータとしてクライアントへ伝達する具体的な実装例が示されています。
クライアント直結によるパフォーマンス向上
従来のバックエンドを介さない直接通信へ変更し、JWS署名検証のみでリソースを提供することで、仲介層の排除と応答速度の大幅な改善を実現しています。
影響分析・編集コメントを表示
影響分析
この記事は、大規模な推論サービスを提供する現場において、単にモデル精度を高めるだけでなく、インフラ層の設計(特に非同期処理の実装)がユーザー体験に直結することを示唆しており、実務レベルでの MLOps 改善の具体的な指針となる。
編集コメント
モデル開発だけでなく、運用基盤の設計品質がサービス品質を決定づけるという視点が高く評価できる実務的な記事です。
翻訳全文
同期処理エンドポイントの例は下記になります。この例では、リクエストがあるたびに hello 関数が呼び出され、その結果がレスポンスとして順次返されます。
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def hello():
return {"Hello": "World"}
下記の図は、FastAPI サーバーが複数の HTTP リクエストを受け取った場合の、ワーカースレッドによる同期処理の流れを示しています。この図解では
リクエスト 1 をワーカースレッド 1 で行い、レスポンスが返されます。
リクエスト 2 をワーカースレッド 2 で行い、レスポンスが返されます。
同期処理の場合、リクエストを受けたスレッドは処理が完了するまでブロックされます。たとえば、外部サービスとの通信やデータベースクエリといった I/O バウンド(I/O bound)の処理が発生すると、そのスレッドは応答が返るまで待機状態になり、他の処理は行われません。
FastAPI は、Gunicorn などの Web サーバーと組み合わせて利用することが推奨されており、公式ドキュメント参照ではワーカー数を「CPU コア数 × 2 + 1」に設定することが推奨されています
これにより、複数の FastAPI が立ちあげられ、並列でのリクエスト処理が可能になります。また、各ワーカーで設定されるスレッド数(Python3.12 だと min(32, (os.process_cpu_count() or 1) + 4)。参照)がパフォーマンスに影響を与えます。
しかし、プロセスやスレッド間のコンテキストスイッチには高いオーバーヘッドが伴うため、数を増やすことがパフォーマンスが向上するとは限らず、大量のリクエストを処理する際には限界が生じる可能性があります。
そこで使うのが、非同期処理です。同期処理エンドポイントの例は下記になります。この例では、hello 関数は非同期関数として定義され、await asyncio.sleep(1) により 1 秒間待機しています。
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/async")
async def hello():
await asyncio.sleep(1)
return {"message": "This is an async response"}
FastAPI 上では、イベントループが各非同期タスクの進捗を管理します。例えば、I/O 待ち中に他のタスクを実行することで、全体のパフォーマンスが向上する可能性があります。また、タスク間の切り替えはコンテキストスイッチよりもはるかに低負荷です。
非同期処理エンドポイントの注意点
しかし、注意する点もあります。パフォーマンスの大幅な悪化を防ぐために、非同期エンドポイントの中で、同期処理を行ってはいけません。
下記の図は、FastAPI が複数の HTTP リクエスト(短い I/O と長い I/O)を受け、同期処理がイベントループ上でブロックするため、タスクの開始が遅れている状態を示しています。
非同期に対応していないライブラリ(例:boto3)などは、別のライブラリ(例:aiboto3)に切り替えたり、無理やりイベントループから外部スレッドプールのスレッドに処理を退避させて実行するなど、徹底的に非同期化する必要があります。外部スレッドプールのスレッドに処理を退避させる例は下記です。
import asyncio
import concurrent.futures
import time
def sync_inference(input_data):
# ここでは 2 秒間のスリープで重い処理をシミュレーション
time.sleep(2)
return f"同期推論結果:{input_data}"
async def run_sync_inference_in_threadpool(input_data):
loop = asyncio.get_running_loop()
# ThreadPoolExecutor を利用して、同期処理を非同期的に実行
with concurrent.futures.ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(executor, sync_inference, input_data)
return result
async def main_sync():
result = await run_sync_inference_in_threadpool("サンプルデータ")
print(result)
非同期処理は、実装の複雑さやレビュー負荷の増大という側面や、実装ミスによるパフォーマンス悪化という側面もあるため、性能向上のメリットとこれらのコストとのバランスを考慮して、適切な手法を選択することが重要です。
FastAPI における同期処理と非同期処理の特徴やパフォーマンスの違いをより深く理解するために、以下の資料がおすすめです。
Pycon 2024 FastAPI での async def と def の使い分け
「FastAPI での async/def と def の違い:非同期を使うならいつ使わないのか」では、同期処理と非同期処理の基本概念やそれぞれの実装上のメリット・デメリットが丁寧に解説されています。
FastAPI で async def と def をちゃんと使い分ける
こちらの記事では、非同期処理内での同期処理がブロッキングを引き起こすのか、また同期処理をスレッドプールにオフロードすることでパフォーマンスが向上するかどうかの検証結果が示されています。実際の検証結果に基づいた実践的な知見が得られるため、パフォーマンス改善のヒントになります。
これらの資料を参照することで、FastAPI の同期処理と非同期処理の適材適所な使い分けや、パフォーマンス向上のための具体的な手法について、より深い理解を得ることができるでしょう。
Server Sent Event で通信する
Server Sent Event とは
従来の HTTP 通信は、リクエストごとに一度だけレスポンスを返す仕組みです。すべての機械学習による推論処理が完了してから、応答が返されます。
一方、Server Sent Event(以下 SSE)はサーバーからクライアントへの一方向のストリーミングに特化しており、実装がシンプルでリソース効率が高く、モデル推論結果などの逐次的なデータ送信に最適です。例えば ChatGPT などで使われています。下記は、SSE を利用して、サーバーから文字列がリアルタイムに送信される様子を示しています 
モデルの推論結果をストリーミングで返す場合、SSE を活用すると、部分的な結果を早期に表示でき、ユーザーエクスペリエンスが向上します。以下は、サーバーからクライアントへ文字列データをリアルタイムに送信する例です。
詳細な仕様については、MDN の公式ドキュメントなども参考にされると良いでしょう.
Server Sent Event のエラーハンドリング
SSE では、接続が正常に確立されるとサーバーは HTTP ステータスコード 200(OK)を返し、クライアントへ継続的なイベント送信が可能な状態となります。
一方、データの送信中にエラーが発生した場合は、HTTP ステータスコード以外の方法でエラー情報を伝える必要があります。弊社では、例えば以下のような形式でエラー情報を送信し、クライアント側で適切にハンドリングできるようにしています。
{"event": "error", "data": "invalid-request-error"}
全体のコード例は次のようになります。
必ず JSON 形式で返してください。translation フィールドのみ。他のフィールド (technical_terms 等)は一切追加しないこと — 余計なフィールドを書こうとして本文翻訳がトークン上限で打ち切られる事故を防ぐため:
{"translation": "翻訳全文"}
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
app = FastAPI()
def fakeLLM_generator():
messages = [
"これは Server Sent Event のテスト文章です。",
"この文章は、LLM の出力を模倣するために作られた長文であり、",
"様々な情報が盛り込まれています。",
"ランダムなタイミングで切り取られることにより、",
"部分的な情報の伝達が行われます。",
"まるで実際の会話のように、途中で途切れることもありますが、",
"全体としては一貫した内容を持っています。",
"これにより、ユーザーは逐次的なデータの受信を体験することができます。",
"さらに、このテストはシステムのストリーミング機能を確認するためのものです。",
"出力が途中で切れても、後続のメッセージで続きが送られます。"
]
for msg in messages:
yield msg
async def event_generator():
try:
llm_iterator = fakeLLM_generator()
yield {"event": "start", "data": "開始します"}
for message in llm_iterator:
yield {"data": message}
await asyncio.sleep(1)
yield {"event": "complete", "data": "送信完了"}
# 何かしらのクライアントエラー
except CustomClientError as e:
yield {"event": "error", "data": "invalid-request-error"}
# それ以外のエラー
except Exception as e:
yield {"event": "error", "data": "unknown-error"}
@app.get("/sse")
async def sse_endpoint():
return EventSourceResponse(event_generator())
中間層なしでのリクエストの処理
システムアーキテクチャとして、よく見られる形態は「クライアント ⇔ バックエンド ⇔ 推論エンドポイント」という構成です。これは、従来のバックエンド(例:登録・認証・決済)と複雑な推論システムとの役割を明確に分離することで、システム全体の管理性や拡張性を向上させるアプローチとなっています。
弊社では、クライアントが直接 Resource サーバー(推論サーバー)からレスポンスを受け取る方式へ変更している箇所があります。初回の認証は Authorization サーバー(通常のバックエンド)で実施され、その後の各リクエストでは、Resource サーバーが JWS 署名の検証を行い、リソースを返却します。この仕組みにより、不要な仲介層を排除し、通信経路を短縮することで、応答速度および全体パフォーマンスが大幅に向上します。
なお、ユーザー認証には JWS 認証を採用し、高速な検証を実現しています。以下のシーケンス図は、新しいフローを示しています。
このアーキテクチャでは、各リクエストがデータベースにアクセスする頻度が減少するため、システム全体の応答速度が向上します。
推論の信頼性向上を実現する技術
システムの信頼性を向上させるためのモニタリング、エラーハンドリング、インフラ管理などの取り組みについて説明します。たとえ高速な推論結果を提供できるシステムであっても、信頼性が担保されなければユーザーの信頼を得ることは難しいため、信頼性向上は高速応答施策と連動して、全体のサービス品質を支える重要な要素となります。
各種モニタリングツールを活用する
アプリケーションモニタリング・インフラ監視・外形監視
Sentry はアプリケーションモニタリング(APM)ツールとして、発生したエラーをリアルタイムにキャッチし、詳細なコンテキスト(スタックトレース、ユーザー情報、環境情報など)を提供します。料金もかなり抑えめに利用できます。
エラー発生時のスタックトレースの表示
エラー時のリクエストペイロードの表示
一定の閾値を超えるエラーに対する通知
特定エンドポイントのエラー率・パフォーマンスの表示
リクエストのプロファイリングの表示
音声合成 AI の場合、本番環境でのユーザーからのさまざまな文字入力がなされ、予期せぬエラーを吐いているケースがあります。ペイロードを詳細に確認できる機能は、問題発生から発見・解決までの時間短縮に大いに役立っています。

また、弊社では Datadog も用いています。料金は相対的に高いですが、インフラ、アプリケーション、ログ、メトリクス、トレースなどを一元管理でき、さまざまな機能を提供しています。
監視という観点だと、Datadog Synthetic Monitoring を用いた AWS ネットワーク外からの監視により、システム全体が正常に動作しているかを継続的にチェックしています。
SLI SLI は、サービスのパフォーマンスや品質を数値で表現する指標です。具体的には、システムの稼働率、レスポンスタイム、エラーレートなどが該当します。
SLO SLO は、サービス提供側がユーザーに対して約束する具体的な性能目標です。たとえば、「99.9% の稼働率」や「平均レスポンス時間 500 ミリ秒以内」など、明確な数値目標が設定されます。
サービスレベルの目標(SLO)を策定し、その達成状況を示す指標(SLI)を継続的にモニタリングしています。GPU を使った計算コストが高い処理や動作が不安定な外部 API(LLM)の影響により、予期せぬ変動が生じる可能性があるため、常に数値を確認しています。
チームで、外形監視による稼働率やレイテンシの SLO と、実ユーザー向けの稼働率・レイテンシの SLO を設定し、毎週確認を行っています。エラーバジェットを考慮しながら、各タスクとして改善に取り組んだ結果、チーム発足以降、SLI は大幅に改善しました。

また、PO/PdM との共通指標としてシステムの状態を数値化することで、トラブル発生時の対応や改善策の議論がスムーズに進むようになっています。
また、機械学習システムでは、単にシステム自体の稼働状況をモニタリングするだけでなく、出力されるコンテンツの品質チェックも不可欠です。 たとえば、
音声生成の推論結果のロギング・モニタリング
必ず JSON 形式で返してください。translation フィールドのみ。他のフィールド (technical_terms 等) は一切追加しないこと — 余計なフィールドを書こうとして本文翻訳がトークン上限で打ち切られる事故を防ぐため:
{"translation": "推論の信頼性向上を実現する技術\n\nシステムの信頼性を向上させるためのモニタリング、エラーハンドリング、インフラ管理などの取り組みについて説明します。たとえ高速な推論結果を提供できるシステムであっても、信頼性が担保されなければユーザーの信頼を得ることは難しいため、信頼性向上は高速応答施策と連動して、全体のサービス品質を支える重要な要素となります。\n\n各種モニタリングツールを活用する\n\nアプリケーションモニタリング・インフラ監視・外形監視\n\nSentry はアプリケーションモニタリング(APM)ツールとして、発生したエラーをリアルタイムにキャッチし、詳細なコンテキスト(スタックトレース、ユーザー情報、環境情報など)を提供します。料金もかなり抑えめに利用できます。\n\nエラー発生時のスタックトレースの表示\n\nエラー時のリクエストペイロードの表示\n\n一定の閾値を超えるエラーに対する通知\n\n特定エンドポイントのエラー率・パフォーマンスの表示\n\nリクエストのプロファイリングの表示\n\n音声合成 AI の場合、本番環境でのユーザーからのさまざまな文字入力がなされ、予期せぬエラーを吐いているケースがあります。ペイロードを詳細に確認できる機能は、問題発生から発見・解決までの時間短縮に大いに役立っています。\n\n
\n\nまた、弊社では Datadog も用いています。料金は相対的に高いですが、インフラ、アプリケーション、ログ、メトリクス、トレースなどを一元管理でき、さまざまな機能を提供しています。\n\n監視という観点だと、Datadog Synthetic Monitoring を用いた AWS ネットワーク外からの監視により、システム全体が正常に動作しているかを継続的にチェックしています。\n\nSLI SLI は、サービスのパフォーマンスや品質を数値で表現する指標です。具体的には、システムの稼働率、レスポンスタイム、エラーレートなどが該当します。\n\nSLO SLO は、サービス提供側がユーザーに対して約束する具体的な性能目標です。たとえば、「99.9% の稼働率」や「平均レスポンス時間 500 ミリ秒以内」など、明確な数値目標が設定されます。\n\nサービスレベルの目標(SLO)を策定し、その達成状況を示す指標(SLI)を継続的にモニタリングしています。GPU を使った計算コストが高い処理や動作が不安定な外部 API(LLM)の影響により、予期せぬ変動が生じる可能性があるため、常に数値を確認しています。\n\nチームで、外形監視による稼働率やレイテンシの SLO と、実ユーザー向けの稼働率・レイテンシの SLO を設定し、毎週確認を行っています。エラーバジェットを考慮しながら、各タスクとして改善に取り組んだ結果、チーム発足以降、SLI は大幅に改善しました。\n\n
\n\nまた、PO/PdM との共通指標としてシステムの状態を数値化することで、トラブル発生時の対応や改善策の議論がスムーズに進むようになっています。\n\nまた、機械学習システムでは、単にシステム自体の稼働状況をモニタリングするだけでなく、出力されるコンテンツの品質チェックも不可欠です。 たとえば、\n\n音声生成の推論結果のロギング・モニタリング"}
LLM の監視(例:langfuse の利用)
などが求められます。弊社でも絶賛取組中の領域になっています。
現行の Text2Speech の仕様では、リクエスト数だけでなく、リクエストごとの文字数もサーバー負荷にほぼ線形で影響を与えます。同じ RPM でも、文字数の違いにより実際の負荷が 100 倍に達する可能性があるため、単純なリクエスト数のみでの負荷状況は把握できません。
そこで、各リクエストの文字数情報も Cloud Watch Metrics に流し込み、迅速かつ柔軟なスケーリングを行い、システム全体のパフォーマンスを改善しようとしています。
クライアントが直接リクエストを送信する設計は、不正アクセスのリスクを増加させる可能性があります。
そのため、AWS WAF を活用した DDoS 攻撃や脆弱性を狙った攻撃への防御、ライブラリの脆弱性管理、そして IDS(侵入検知システム)など、包括的なセキュリティ対策が不可欠となります。
本記事でご紹介した各技術と取り組みは、我々のサービスが迅速かつ安定して提供されるための基盤となっています。MLOps チームは、非同期処理の最適化やリアルタイム通信、そして継続的なモニタリングを通じて、システム全体のパフォーマンスと信頼性の向上に努めています。これからも最新技術の積極的な採用と運用改善により、さらなる品質向上を追求していきます。
CoeFontPublicationAI 音声プラットフォーム「CoeFont(コエフォント)」の公式テックブログです。

原文を表示
AI
Python
Sentry
MLOps tech はじめに
株式会社CoeFontで、エンジニアリングマネージャーをしているsugasugaです。
本ブログでは、下記の流れで、「推論結果を迅速に提供する技術」と「推論の信頼性向上を実現する技術」についてご紹介します。
推論の信頼性向上を実現する技術
MLOps(Machine Learning Operations) とは、機械学習モデルの開発、運用、継続的改善までのプロセスを効率化・自動化する手法です。実際の運用現場では、モデル自体のコードはシステム全体のごく一部に過ぎず、多くはインフラや連携を担うプログラミングコードなど、複雑に絡み合う周辺システムで構成されています。
以下の図は、機械学習システムにおけるコードの構成比率を示しており、機械学習アルゴリズムや学習プロセスに関する部分が小さな黒い箱で表現され、残りは周辺システムが占める部分が多い様子を視覚的に表現しています。

Hidden Technical Debt in Machine Learning Systemsのfigure1より引用
音声合成技術を基盤とするクラウドプラットフォームを提供し、ユーザーが高品質な音声サービスを手軽に利用できる環境を実現しています。
AI技術を活用した通訳ソリューションを展開し、言語の壁を越えたコミュニケーションをサポートするサービスを提供しています。
両サービスを提供するチームの裏側で、MLOpsチームが機械学習サービスを提供しています。
弊社においては、特にServing Infrastructure は、機械学習モデルを本番環境で安定的かつ効率的に提供するための中核部分として重要な役割を果たし、ソフトウェアエンジニアリング面で多くの課題に取り組んでいます。
今回は、推論結果を迅速に提供する技術と、推論の信頼性向上を実現する技術についてご紹介します。
推論結果を迅速に提供する技術を解説します。迅速なレスポンスは、ユーザー体験の向上に直結します。
FastAPIの非同期処理エンドポイントを活用する
Web APIを実装する際、リクエストに対するレスポンス速度は非常に重要です。 非同期処理を活用することで、外部サービスへのアクセス、ファイル操作、データベースとの通信など、I/O待ちが発生する処理を効率的に実行でき、全体のパフォーマンスを向上させることが可能です。 まずは、非同期処理ではなく、FastAPIの同期処理エンドポイントから解説します。
同期処理エンドポイントの例は下記になります。 この例では、リクエストがあるたびに hello 関数が呼び出され、その結果がレスポンスとして順次返されます。
from fastapi import FastAPI app = FastAPI() @app.get("/") def hello(): return {"Hello": "World"}
下記の図は、FastAPIサーバーが複数のHTTPリクエストを受け取った場合の、ワーカースレッドによる同期処理の流れを示しています。この図解では
リクエスト1をワーカースレッド1で行い、レスポンスが返されます。
リクエスト2をワーカースレッド2で行い、レスポンスが返されます。
同期処理の場合、リクエストを受けたスレッドは処理が完了するまでブロックされます。たとえば、外部サービスとの通信やデータベースクエリといったI/Oバウンドの処理が発生すると、そのスレッドは応答が返るまで待機状態になり、他の処理は行われません。
FastAPIは、GunicornなどのWebサーバーと組み合わせて利用することが推奨されており、公式ドキュメント参照ではワーカー数を「CPUコア数 × 2 + 1」に設定することが推奨されています
これにより、複数のFastAPIが立ちあげ、並列でのリクエスト処理が可能になります。また、各ワーカーで設定されるスレッド数(Python3.12だとmin(32, (os.process_cpu_count() or 1) + 4)。参照)がパフォーマンスに影響を与えます。
しかし、プロセスやスレッド間のコンテキストスイッチには高いオーバーヘッドが伴うため、数を増やすことがパフォーマンスが向上するとは限らず、大量のリクエストを処理する際には限界が生じる可能性があります。
そこで使うのが、非同期処理です。同期処理エンドポイントの例は下記になります。 この例では、hello 関数は非同期関数として定義され、await asyncio.sleep(1) により1秒間待機しています。
from fastapi import FastAPI import asyncio app = FastAPI() @app.get("/async") async def hello(): await asyncio.sleep(1) return {"message": "This is an async response"}
FastAPI上では、イベントループが各非同期タスクの進捗を管理します。例えば、I/O待ち中に他のタスクを実行することで、全体のパフォーマンスが向上する可能性があります。また、タスク間の切り替えはコンテキストスイッチよりもはるかに低負荷です。
非同期処理エンドポイントの注意点
しかし、注意する点もあります。パフォーマンスの大幅な悪化を防ぐために、非同期エンドポイントの中で、同期処理を行ってはいけません。
下記の図は、FastAPIが複数のHTTPリクエスト(短いI/Oと長いI/O)を受け、同期処理がイベントループ上でブロックするため、タスクの開始が遅れている状態を示しています。
非同期に対応していないライブラリ (ie. boto3)などは、別のライブラリ(ie. aiboto3)に切り替えたり、無理やりイベントループから、外部スレッドプールのスレッドに処理を退避させて実行するなど、徹底的に非同期にする必要があります。外部スレッドプールのスレッドに処理を退避させる例は下記です。
import asyncio import concurrent.futures import time def sync_inference(input_data): # ここでは2秒間のスリープで重い処理をシミュレーション time.sleep(2) return f"同期推論結果: {input_data}" async def run_sync_inference_in_threadpool(input_data): loop = asyncio.get_running_loop() # ThreadPoolExecutor を利用して、同期処理を非同期的に実行 with concurrent.futures.ThreadPoolExecutor() as executor: result = await loop.run_in_executor(executor, sync_inference, input_data) return result async def main_sync(): result = await run_sync_inference_in_threadpool("サンプルデータ") print(result)
非同期処理は、実装の複雑さやレビュー負荷の増大という側面や、実装ミスによるパフォーマンス悪化という側面もあるため、性能向上のメリットとこれらのコストとのバランスを考慮して、適切な手法を選択することが重要です。
FastAPIにおける同期処理と非同期処理の特徴やパフォーマンスの違いをより深く理解するために、以下の資料がおすすめです。
Pycon 2024 FastAPIでのasync defとdefの使い分け 「FastAPIでのasync/defとdefの違い:非同期を使うならいつ使わないのか」では、同期処理と非同期処理の基本概念やそれぞれの実装上のメリット・デメリットが丁寧に解説されています。
FastAPIでasync defとdefをちゃんと使い分ける こちらの記事では、非同期処理内での同期処理がブロッキングを引き起こすのか、また同期処理をスレッドプールにオフロードすることでパフォーマンスが向上するかどうかの検証結果が示されています。実際の検証結果に基づいた実践的な知見が得られるため、パフォーマンス改善のヒントになります。
これらの資料を参照することで、FastAPIの同期処理と非同期処理の適材適所な使い分けや、パフォーマンス向上のための具体的な手法について、より深い理解を得ることができるでしょう。
Server Sent Eventで通信する
Server Sent Eventとは
従来のHTTP通信は、リクエストごとに一度だけレスポンスを返す仕組みです。すべての機械学習による推論処理が完了してから、応答が返されます。
対して、Server Sent Event(以下SSE)はサーバーからクライアントへの一方向のストリーミングに特化しており、実装がシンプルでリソース効率が高く、モデル推論結果などの逐次的なデータ送信に最適です。例えばChatGPTなどで使われています。下記は、SSEを利用して、サーバーから文字列がリアルタイムに送信される様子を示しています 
モデルの推論結果をストリーミングで返す場合、SSE を活用すると、部分的な結果を早期に表示でき、ユーザーエクスペリエンスが向上します。以下は、サーバーからクライアントへ文字列データをリアルタイムに送信する例です。
詳細な仕様については、MDNの公式ドキュメントなども参考にされると良いでしょう.
Server Sent Eventのエラーハンドリング
SSE では、接続が正常に確立されるとサーバーは HTTP ステータスコード200(OK)を返し、クライアントへ継続的なイベント送信が可能な状態となります。
一方、データの送信中にエラーが発生した場合は、HTTPステータスコード以外の方法でエラー情報を伝える必要があります。弊社では、例えば以下のような形式でエラー情報を送信し、クライアント側で適切にハンドリングできるようにしています。
{"event": "error", "data": "invalid-request-error"}
全体のコード例は次のようになります。
from fastapi import FastAPI from sse_starlette.sse import EventSourceResponse import asyncio app = FastAPI() def fakeLLM_generator(): messages = [ "これはServer Sent Eventのテスト文章です。", "この文章は、LLMの出力を模倣するために作られた長文であり、", "様々な情報が盛り込まれています。", "ランダムなタイミングで切り取られることにより、", "部分的な情報の伝達が行われます。", "まるで実際の会話のように、途中で途切れることもありますが、", "全体としては一貫した内容を持っています。", "これにより、ユーザーは逐次的なデータの受信を体験することができます。", "さらに、このテストはシステムのストリーミング機能を確認するためのものです。", "出力が途中で切れても、後続のメッセージで続きが送られます。" ] for msg in messages: yield msg async def event_generator(): try: llm_iterator = fakeLLM_generator() yield {"event": "start", "data": "開始します"} for message in llm_iterator: yield {"data": message} await asyncio.sleep(1) yield {"event": "complete", "data": "送信完了"} # 何かしらのクライアントエラー except CustomClientError as e: yield {"event": "error", "data": "invalid-request-error"} # それ以外のエラー except Exception as e: yield {"event": "error", "data": "unknown-error"} @app.get("/sse") async def sse_endpoint(): return EventSourceResponse(event_generator())
中間層なしでのリクエストの処理
システムアーキテクチャとして、よく見られる形態は「クライアント ⇔ バックエンド ⇔ 推論エンドポイント」という構成です。 これは、従来のバックエンド ( ie. 登録・認証・決済)と複雑な推論システムとの役割を明確に分離することで、システム全体の管理性や拡張性を向上させるアプローチとなっています。
弊社では、クライアントが直接Resourceサーバー(推論サーバー)からレスポンスを受け取る方式へ変更している箇所があります。初回の認証はAuthorizationサーバー(通常のバックエンド)で実施され、その後の各リクエストでは、ResourceサーバーがJWS署名の検証を行い、リソースを返却します。この仕組みにより、不要な仲介層を排除し、通信経路を短縮することで、応答速度および全体パフォーマンスが大幅に向上します。
なお、ユーザー認証にはJWS認証を採用し、高速な検証を実現しています。以下のシーケンス図は、新しいフローを示しています。
このアーキテクチャでは、各リクエストがデータベースにアクセスする頻度が減少するため、システム全体の応答速度が向上します。
推論の信頼性向上を実現する技術
システムの信頼性を向上させるためのモニタリング、エラーハンドリング、インフラ管理などの取り組みについて説明します。たとえ高速な推論結果を提供できるシステムであっても、信頼性が担保されなければユーザーの信頼を得ることは難しいため、信頼性向上は高速応答施策と連動して、全体のサービス品質を支える重要な要素となります。
各種モニタリングツールを活用する
アプリケーションモニタリング・インフラ監視・外形監視
Sentryはアプリケーションモニタリング(APM)ツールとして、発生したエラーをリアルタイムにキャッチし、詳細なコンテキスト(スタックトレース、ユーザー情報、環境情報など)を提供します。料金もかなり抑えめに利用できます
エラー発生時のスタックトレースの表示
エラー時のリクエストペイロードの表示
一定の閾値を超えるエラーに対する通知
特定エンドポイントのエラー率・パフォーマンスの表示
リクエストのプロファイリングの表示
音声合成AIの場合、本番環境でのユーザーからのさまざまな文字入力がなされ、予期せぬエラーを吐いているケースがあります。ペイロードを詳細に確認できる機能は、問題発生から発見・解決までの時間短縮に大いに役立っています。

また、弊社ではDatadogも用いています。 料金は相対的に高いですが、インフラ、アプリケーション、ログ、メトリクス、トレースなどを一元管理でき、さまざまな機能を提供しています。
監視という観点だと、Datadog Synthetic Monitoringを用いたAWSネットワーク外からの監視により、システム全体が正常に動作しているかを継続的にチェックしています。
SLI SLIは、サービスのパフォーマンスや品質を数値で表現する指標です。具体的には、システムの稼働率、レスポンスタイム、エラーレートなどが該当します。
SLO SLOは、サービス提供側がユーザーに対して約束する具体的な性能目標です。たとえば、「99.9%の稼働率」や「平均レスポンス時間500ミリ秒以内」など、明確な数値目標が設定されます。
サービスレベルの目標(SLO)を策定し、その達成状況を示す指標(SLI)を継続的にモニタリングしています。GPUを使った計算コストが高い処理や動作が不安定な外部API(LLM)の影響により、予期せぬ変動が生じる可能性があるため、常に数値を確認しています。
チームで、外形監視による稼働率やレイテンシのSLOと、実ユーザー向けの稼働率・レイテンシのSLOを設定し、毎週確認を行っています。エラーバジェットを考慮しながら、各タスクとして改善に取り組んだ結果、チーム発足以降、SLIは大幅に改善しました。 
また、PO/PdMとの共通指標としてシステムの状態を数値化することで、トラブル発生時の対応や改善策の議論がスムーズに進むようになっています。
また、機械学習システムでは、単にシステム自体の稼働状況をモニタリングするだけでなく、出力されるコンテンツの品質チェックも不可欠です。 たとえば、
音声生成の推論結果のロギング・モニタリング
LLMの監視(例:langfuseの利用)
などが求められます。弊社でも絶賛取組中の領域になっています。
現行のText2Speechの仕様では、リクエスト数だけでなく、リクエストごとの文字数もサーバー負荷にほぼ線形で影響を与えます。同じRPMでも、文字数の違いにより実際の負荷が100倍に達する可能性があるため、単純なリクエスト数のみでの負荷状況は把握できません。
そこで、各リクエストの文字数情報もCloud Watch Metricsに流し込み、迅速かつ柔軟なスケーリングを行い、システム全体のパフォーマンスを改善しようとしています。
クライアントが直接リクエストを送信する設計は、不正アクセスのリスクを増加させる可能性があります。
そのため、AWS WAFを活用したDDoS攻撃や脆弱性を狙った攻撃への防御、ライブラリの脆弱性管理、そしてIDS(侵入検知システム)など、包括的なセキュリティ対策が不可欠となります。
本記事でご紹介した各技術と取り組みは、我々のサービスが迅速かつ安定して提供されるための基盤となっています。MLOpsチームは、非同期処理の最適化やリアルタイム通信、そして継続的なモニタリングを通じて、システム全体のパフォーマンスと信頼性の向上に努めています。これからも最新技術の積極的な採用と運用改善により、さらなる品質向上を追求していきます。
CoeFontPublicationAI音声プラットフォーム「CoeFont(コエフォント)」の公式テックブログです。

関連記事
今日のまとめ
AI日報で今日の重要ニュースをまとめ読み