π0シリーズで使用されるaction expertをコードレベルで理解する
ABEJA は、Physical Intelligence 社の VLA モデル「π0 シリーズ」に採用されている Action Expert の実装をコードレベルで解説し、Flow Matching を用いた高精度な連続アクション生成の仕組みを実践的に理解できる内容を公開した。
キーポイント
Action Expert の役割と必要性
VLM の出力速度に依存するとロボット制御がカクつくため、外部モジュールとして Action Expert を追加し、高い周波数での連続値制御を実現している。
Flow Matching によるアクション生成原理
ノイズ分布から目標のアクション分布へ滑らかに遷移させる「ベクトル場」を学習することで、中間経路を含む高精度な動作軌道を生成する技術解説。
コードレベルでの実装詳細と検証
PyTorch を用いたシンプルなモデル定義と ODE ソルバーによるサンプリングステップを実装し、サンプルデータ(Moon 形状)で学習プロセスを可視化して原理を検証している。
π0 シリーズの技術的系譜
Physical Intelligence 社が開発した π0 を皮切りに、π0.5 や π*0.6 と続くシリーズにおいて、Action Expert が標準的に採用されている背景を説明している。
サンプリングによる三日月データ生成の実装
ノイズから開始し、学習済みのベクトル場に従って時間ステップごとに状態を更新することで、三日月形状のデータを生成するフローがコードで実装されている。
action_expert の抽出と独自検証
VLM 部分は除外し、openpi リポジトリから action_expert の実装のみを抜き出してサンプルデータで動作を確認するアプローチが取られている。
Pi0 Action Expert の基本アーキテクチャ
VLM を必要とせず、Transformer ベースの独自モジュールとして設計されており、RMSNorm と SwiGLU(GELU)を備えたMLP、およびRoPEを用いたAttentionで構成されています。
影響分析・編集コメントを表示
影響分析
この記事は、VLA モデルの実用化におけるボトルネックであった「制御の滑らかさ」の問題に対し、Flow Matching を活用した具体的な解決策とその実装コードを提供している。開発者や研究者にとって、理論的な理解だけでなく、実際のコードベースでの実装アプローチを学ぶことができるため、ロボティクス AI の実装精度向上に寄与する重要な技術資料となる。
編集コメント
VLA モデルの内部構造をコードレベルで解説する記事は貴重であり、特に Flow Matching の実装例が具体的に示されている点は、実務での応用を検討するエンジニアにとって非常に参考になります。

ABEJAアドベントカレンダー2025の24日目の記事です。
ABEJAでデータサイエンティストをしている大谷です。 最近VLAに触れ合うことが増えてきました。11/7にもMacStudioで動かすSO-ARM x π0.5 -解説から実機動作まで-
というわけで今回のブログではopenpiの実装からaction_expertの部分に絞ってコードレベルで見ていくことで理解を深めていきます。
action expertとは サンプルデータでFlow Matchingを試す
π0のaction_expert部分を見る
実装 action expertで使う諸々
action expert本体
サンプルデータを投入して流れを見る
π0から始まったπシリーズ(勝手にシリーズと呼んでいます)は、Physical Intelligence 社が開発した VLA モデルです。 手前味噌で恐縮ですが、π0の仕組みは弊社のテックブログを参考にいただける幸いです。
https://arxiv.org/pdf/2410.24164arxiv.org
その後、π0.5、π*0.6と続きます。後続モデルでもaction expertは使用されています。
https://arxiv.org/pdf/2504.16054arxiv.org
action expertとは
ざっくり連続したアクション生成をするために、VLMモデルに追加された部分です。ロボットはアームの回転角制御など可能な限り細かい周期で連続的に操作していきたいです。ただ出力するactionがVLMの出力速度にピン留めされてしまうと、ロボットの制御がカクついてしまいます。そこでaction expertをVLMの外に後付けし、高精度の連続アクション分布をFlow Matchingで生成することで、高い周波数での制御値を連続値として出力できるようにしました。 ちなみにFlow Matchingもわかるようでわかっていないことが多いので、ここも合わせてコードで理解していこうと思います。 Flow Matchingはざっくりいうと、適当な確率分布(ノイズ分布など)から欲しい確率分布まで、徐々に意味のある滑らかな動きに変化させる技術です。ロボットでいうとノイズから正解のアクションへ向かう中間アクションを学習する感じです。
「Flow Matching Guide and Code」の図2
https://arxiv.org/pdf/2412.06264arxiv.org
サンプルデータでFlow Matchingを試す
Flow Matching Guide and CodeのP7にサンプルコードがあるので、これを使って基礎から理解します。
import torch from torch import nn, Tensor import matplotlib.pyplot as plt from sklearn.datasets import make_moons class Flow(nn.Module): def __init__(self, dim: int = 2, h: int = 64): super().__init__() self.net = nn.Sequential( nn.Linear(dim + 1, h), nn.ELU(), nn.Linear(h, h), nn.ELU(), nn.Linear(h, h), nn.ELU(), nn.Linear(h, dim) ) # x_t: (B,2), t: (B,1) def forward(self, x_t: Tensor, t: Tensor) -> Tensor: return self.net(torch.cat((x_t, t), dim=-1)) # simple midpoint ODE solver step def step(self, x_t: Tensor, t_start: float, t_end: float) -> Tensor: t_start_tensor = torch.full((x_t.shape[0], 1), t_start) t_end_tensor = torch.full((x_t.shape[0], 1), t_end) dt = t_end - t_start # midpoint method k1 = self.forward(x_t, t_start_tensor) x_mid = x_t + 0.5 * dt * k1 t_mid = torch.full((x_t.shape[0], 1), (t_start + t_end) / 2) k2 = self.forward(x_mid, t_mid) return x_t + dt * k2
モデル定義は超シンプルですね。step関数ではx_tをt_startからt_endまで1ステップ分移動させます。 dt = t_end - t_start
モデルを定義 flow = Flow() optimizer = torch.optim.Adam(flow.parameters(), lr=1e-2) loss_fn = nn.MSELoss() # 学習 for step in range(10000): # いつも三日月(ターゲット) x_1_np, _ = make_moons(256, noise=0.05) x_1 = torch.tensor(x_1_np, dtype=torch.float32) # ノイズ x_0 = torch.randn_like(x_1) # x_1とx_0を直線で結ぶ t = torch.rand(len(x_1), 1) x_t = (1 - t) * x_0 + t * x_1 # 正解の速度ベクトル dx_t = x_1 - x_0 optimizer.zero_grad() pred = flow(x_t, t) # MSEで学習 loss = loss_fn(pred, dx_t) loss.backward() optimizer.step() if step % 500 == 0: print(f"step {step}, loss={loss.item():.5f}")
上記の学習で、ノイズ状態のデータを特定の方向に輸送するベクトル場を獲得できるようになります。 ここまでくると同じような原理でロボットのアクション生成において、特定のスタート地点から次の地点までの中間経路を獲得できそうというのがわかってきます。
実際にサンプリングしたノイズから三日月データの生成過程を見ると以下のようになります。
x = torch.randn(300, 2) # t=0からt=1まで8ステップに刻む n_steps = 8 fig, axes = plt.subplots(1, n_steps + 1, figsize=(30, 4), sharex=True, sharey=True) time_steps = torch.linspace(0, 1.0, n_steps + 1) axes[0].scatter(x[:, 0].detach(), x[:, 1].detach(), s=10) axes[0].set_title(f't = {time_steps[0]:.2f}') axes[0].set_xlim(-3.0, 3.0) axes[0].set_ylim(-3.0, 3.0) for i in range(n_steps): # 現在のベクトルと次の時刻tを入力し、学習済みのベクトル場に従って次の地点までxが進む x = flow.step(x, float(time_steps[i]), float(time_steps[i + 1])) axes[i + 1].scatter(x[:, 0].detach(), x[:, 1].detach(), s=10) axes[i + 1].set_title(f't = {time_steps[i + 1]:.2f}') plt.tight_layout() plt.show()
ノイズから三日月の生成過程
π0のaction_expert部分を見る
openpiのレポジトリでは以下のあたりにあります。
https://github.com/Physical-Intelligence/openpi/blob/main/src/openpi/models_pytorch/pi0_pytorch.py#L100
https://github.com/Physical-Intelligence/openpi/blob/main/src/openpi/models_pytorch/pi0_pytorch.py#L376
今回はVLM側には触れず、action_expertの部分だけ見たいので、必要な実装だけ抜き出してサンプルデータで動きを見ていこうと思います。
最後に全体コードを貼りますが、長くなるので説明のため必要な部分だけを記載しています。 本家実装を見つつ、claude codeと対話しつつ、action expertの部分だけ抜き出しました。気づいたら三日月サンプルから急にややこしくなってしまってました... コード内のコメントでも何をしているのかを補足していきます。
action expertのフロー図
action expertで使う諸々
ここはaction expert特有のものでもないので割愛していきます。
@dataclass class Pi0ActionConfig: """適当な設定""" # モデル hidden_dim: int = 1024 num_layers: int = 18 mlp_dim: int = 4096 num_heads: int = 8 num_kv_heads: int = 1 head_dim: int = 256 # アクション action_dim: int = 32 # 32個の制御対象 action_horizon: int = 50 # 50ステップ先まで出力 # 時間埋め込み time_min_period: float = 4e-3 time_max_period: float = 4.0 dropout: float = 0.0 rope_theta: float = 10000.0 @property def num_kv_groups(self) -> int: return self.num_heads // self.num_kv_heads class ActionExpertAttention(nn.Module): def __init__(self, config: Pi0ActionConfig, layer_idx: int): super().__init__() (action expertと直接関係ない普通のattentionなので割愛) def forward( self, hidden_states: Tensor, attention_mask: Optional[Tensor] = None, position_ids: Optional[Tensor] = None, cos: Optional[Tensor] = None, sin: Optional[Tensor] = None, ) -> Tensor: """Forward pass Args: hidden_states: [batch, seq, hidden_dim] attention_mask: [batch, 1, seq, seq] (0 = attend, -inf = mask) position_ids: [batch, seq] cos, sin: RoPE用 [batch, seq, 1, head_dim] Returns: [batch, seq, hidden_dim] """ (省略) class ActionExpertMLP(nn.Module): def __init__(self, config: Pi0ActionConfig): super().__init__() self.gate_proj = nn.Linear(config.hidden_dim, config.mlp_dim, bias=False) self.up_proj = nn.Linear(config.hidden_dim, config.mlp_dim, bias=False) self.down_proj = nn.Linear(config.mlp_dim, config.hidden_dim, bias=False) self.act_fn = nn.GELU(approximate="tanh") def forward(self, x: Tensor) -> Tensor: return self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x)) class ActionExpertLayer(nn.Module): """Action Expert 1層分""" def __init__(self, config: Pi0ActionConfig, layer_idx: int): super().__init__() self.config = config self.input_layernorm = RMSNorm(config.hidden_dim) self.post_attention_layernorm = RMSNorm(config.hidden_dim) self.self_attn = ActionExpertAttention(config, layer_idx) self.mlp = ActionExpertMLP(config) def forward( self, hidden_states: Tensor, attention_mask: Optional[Tensor] = None, position_ids: Optional[Tensor] = None, cos: Optional[Tensor] = None, sin: Optional[Tensor] = None, ) -> Tensor: (Attention1層分の計算のみ)
action expert本体
ここがVLMなしで動くaction expertの本体です。とはいえ、基本的には普通のTransformerと同じです。 ロボットの現在状態とノイズが混ざったアクションと時間を受け取って、アクションをどの方向に修正していくかを出力します。 embed_suffix
class ActionExpert(nn.Module): """ VLMなしで動作する単体のaction expert 入力: - state: [B, 32] ロボット状態 - noisy_actions: [B, 50, 32] ノイズ付きアクション - timestep: [B] Flow Matchingのタイムステップ 出力: - velocity: [B, 50, 32] 予測されたベクトル場 """ def __init__(self, config: Pi0ActionConfig): super().__init__() self.config = config self.state_proj = nn.Linear(config.action_dim, config.hidden_dim) self.action_in_proj = nn.Linear(config.action_dim, config.hidden_dim) # アクション + 時間のMLP融合 self.action_time_mlp_in = nn.Linear( 2 * config.hidden_dim, config.hidden_dim ) self.action_time_mlp_out = nn.Linear(config.hidden_dim, config.hidden_dim) # RoPE self.rotary_emb = RotaryEmbedding( config.head_dim, max_seq_len=config.action_horizon + 10, theta=config.rope_theta, ) # Transformer層 self.layers = nn.ModuleList( [ActionExpertLayer(config, i) for i in range(config.num_layers)] ) # 正規化 self.norm = RMSNorm(config.hidden_dim) # 出力 self.action_out_proj = nn.Linear(config.hidden_dim, config.action_dim) def embed_suffix( self, state: Tensor, noisy_actions: Tensor, timestep: Tensor, ) -> Tuple[Tensor, Tensor]: """suffixトークンを埋め込み - state_emb: [B, 1, hidden_dim] - action_emb: [B, action_horizon, hidden_dim] + time_emb - suffix: [B, 1 + action_horizon, hidden_dim] Args: state: [batch, action_dim] noisy_actions: [batch, action_horizon, action_dim] t
原文を表示

ABEJAアドベントカレンダー2025の24日目の記事です。
ABEJAでデータサイエンティストをしている大谷です。 最近VLAに触れ合うことが増えてきました。11/7にもMacStudioで動かすSO-ARM x π0.5 -解説から実機動作まで-
というわけで今回のブログではopenpiの実装からaction_expertの部分に絞ってコードレベルで見ていくことで理解を深めていきます。
action expertとは サンプルデータでFlow Matchingを試す
π0のaction_expert部分を見る
実装 action expertで使う諸々
action expert本体
サンプルデータを投入して流れを見る
π0から始まったπシリーズ(勝手にシリーズと呼んでいます)は、Physical Intelligence 社が開発した VLA モデルです。 手前味噌で恐縮ですが、π0の仕組みは弊社のテックブログを参考にいただける幸いです。
https://arxiv.org/pdf/2410.24164arxiv.org
その後、π0.5、π*0.6と続きます。後続モデルでもaction expertは使用されています。
https://arxiv.org/pdf/2504.16054arxiv.org
action expertとは
ざっくり連続したアクション生成をするために、VLMモデルに追加された部分です。ロボットはアームの回転角制御など可能な限り細かい周期で連続的に操作していきたいです。ただ出力するactionがVLMの出力速度にピン留めされてしまうと、ロボットの制御がカクついてしまいます。そこでaction expertをVLMの外に後付けし、高精度の連続アクション分布をFlow Matchingで生成することで、高い周波数での制御値を連続値として出力できるようにしました。 ちなみにFlow Matchingもわかるようでわかっていないことが多いので、ここも合わせてコードで理解していこうと思います。 Flow Matchingはざっくりいうと、適当な確率分布(ノイズ分布など)から欲しい確率分布まで、徐々に意味のある滑らかな動きに変化させる技術です。ロボットでいうとノイズから正解のアクションへ向かう中間アクションを学習する感じです。
「Flow Matching Guide and Code」の図2
https://arxiv.org/pdf/2412.06264arxiv.org
サンプルデータでFlow Matchingを試す
Flow Matching Guide and CodeのP7にサンプルコードがあるので、これを使って基礎から理解します。
import torch from torch import nn, Tensor import matplotlib.pyplot as plt from sklearn.datasets import make_moons class Flow(nn.Module): def __init__(self, dim: int = 2, h: int = 64): super().__init__() self.net = nn.Sequential( nn.Linear(dim + 1, h), nn.ELU(), nn.Linear(h, h), nn.ELU(), nn.Linear(h, h), nn.ELU(), nn.Linear(h, dim) ) # x_t: (B,2), t: (B,1) def forward(self, x_t: Tensor, t: Tensor) -> Tensor: return self.net(torch.cat((x_t, t), dim=-1)) # simple midpoint ODE solver step def step(self, x_t: Tensor, t_start: float, t_end: float) -> Tensor: t_start_tensor = torch.full((x_t.shape[0], 1), t_start) t_end_tensor = torch.full((x_t.shape[0], 1), t_end) dt = t_end - t_start # midpoint method k1 = self.forward(x_t, t_start_tensor) x_mid = x_t + 0.5 * dt * k1 t_mid = torch.full((x_t.shape[0], 1), (t_start + t_end) / 2) k2 = self.forward(x_mid, t_mid) return x_t + dt * k2
モデル定義は超シンプルですね。step関数ではx_tをt_startからt_endまで1ステップ分移動させます。 dt = t_end - t_start
モデルを定義 flow = Flow() optimizer = torch.optim.Adam(flow.parameters(), lr=1e-2) loss_fn = nn.MSELoss() # 学習 for step in range(10000): # いつも三日月(ターゲット) x_1_np, _ = make_moons(256, noise=0.05) x_1 = torch.tensor(x_1_np, dtype=torch.float32) # ノイズ x_0 = torch.randn_like(x_1) # x_1とx_0を直線で結ぶ t = torch.rand(len(x_1), 1) x_t = (1 - t) * x_0 + t * x_1 # 正解の速度ベクトル dx_t = x_1 - x_0 optimizer.zero_grad() pred = flow(x_t, t) # MSEで学習 loss = loss_fn(pred, dx_t) loss.backward() optimizer.step() if step % 500 == 0: print(f"step {step}, loss={loss.item():.5f}")
上記の学習で、ノイズ状態のデータを特定の方向に輸送するベクトル場を獲得できるようになります。 ここまでくると同じような原理でロボットのアクション生成において、特定のスタート地点から次の地点までの中間経路を獲得できそうというのがわかってきます。
実際にサンプリングしたノイズから三日月データの生成過程を見ると以下のようになります。
x = torch.randn(300, 2) # t=0からt=1まで8ステップに刻む n_steps = 8 fig, axes = plt.subplots(1, n_steps + 1, figsize=(30, 4), sharex=True, sharey=True) time_steps = torch.linspace(0, 1.0, n_steps + 1) axes[0].scatter(x[:, 0].detach(), x[:, 1].detach(), s=10) axes[0].set_title(f't = {time_steps[0]:.2f}') axes[0].set_xlim(-3.0, 3.0) axes[0].set_ylim(-3.0, 3.0) for i in range(n_steps): # 現在のベクトルと次の時刻tを入力し、学習済みのベクトル場に従って次の地点までxが進む x = flow.step(x, float(time_steps[i]), float(time_steps[i + 1])) axes[i + 1].scatter(x[:, 0].detach(), x[:, 1].detach(), s=10) axes[i + 1].set_title(f't = {time_steps[i + 1]:.2f}') plt.tight_layout() plt.show()
ノイズから三日月の生成過程
π0のaction_expert部分を見る
openpiのレポジトリでは以下のあたりにあります。
https://github.com/Physical-Intelligence/openpi/blob/main/src/openpi/models_pytorch/pi0_pytorch.py#L100
https://github.com/Physical-Intelligence/openpi/blob/main/src/openpi/models_pytorch/pi0_pytorch.py#L376
今回はVLM側には触れず、action_expertの部分だけ見たいので、必要な実装だけ抜き出してサンプルデータで動きを見ていこうと思います。
最後に全体コードを貼りますが、長くなるので説明のため必要な部分だけを記載しています。 本家実装を見つつ、claude codeと対話しつつ、action expertの部分だけ抜き出しました。気づいたら三日月サンプルから急にややこしくなってしまってました... コード内のコメントでも何をしているのかを補足していきます。
action expertのフロー図
action expertで使う諸々
ここはaction expert特有のものでもないので割愛していきます。
@dataclass class Pi0ActionConfig: """適当な設定""" # モデル hidden_dim: int = 1024 num_layers: int = 18 mlp_dim: int = 4096 num_heads: int = 8 num_kv_heads: int = 1 head_dim: int = 256 # アクション action_dim: int = 32 # 32個の制御対象 action_horizon: int = 50 # 50ステップ先まで出力 # 時間埋め込み time_min_period: float = 4e-3 time_max_period: float = 4.0 dropout: float = 0.0 rope_theta: float = 10000.0 @property def num_kv_groups(self) -> int: return self.num_heads // self.num_kv_heads class ActionExpertAttention(nn.Module): def __init__(self, config: Pi0ActionConfig, layer_idx: int): super().__init__() (action expertと直接関係ない普通のattentionなので割愛) def forward( self, hidden_states: Tensor, attention_mask: Optional[Tensor] = None, position_ids: Optional[Tensor] = None, cos: Optional[Tensor] = None, sin: Optional[Tensor] = None, ) -> Tensor: """Forward pass Args: hidden_states: [batch, seq, hidden_dim] attention_mask: [batch, 1, seq, seq] (0 = attend, -inf = mask) position_ids: [batch, seq] cos, sin: RoPE用 [batch, seq, 1, head_dim] Returns: [batch, seq, hidden_dim] """ (省略) class ActionExpertMLP(nn.Module): def __init__(self, config: Pi0ActionConfig): super().__init__() self.gate_proj = nn.Linear(config.hidden_dim, config.mlp_dim, bias=False) self.up_proj = nn.Linear(config.hidden_dim, config.mlp_dim, bias=False) self.down_proj = nn.Linear(config.mlp_dim, config.hidden_dim, bias=False) self.act_fn = nn.GELU(approximate="tanh") def forward(self, x: Tensor) -> Tensor: return self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x)) class ActionExpertLayer(nn.Module): """Action Expert 1層分""" def __init__(self, config: Pi0ActionConfig, layer_idx: int): super().__init__() self.config = config self.input_layernorm = RMSNorm(config.hidden_dim) self.post_attention_layernorm = RMSNorm(config.hidden_dim) self.self_attn = ActionExpertAttention(config, layer_idx) self.mlp = ActionExpertMLP(config) def forward( self, hidden_states: Tensor, attention_mask: Optional[Tensor] = None, position_ids: Optional[Tensor] = None, cos: Optional[Tensor] = None, sin: Optional[Tensor] = None, ) -> Tensor: (Attention1層分の計算のみ)
action expert本体
ここがVLMなしで動くaction expertの本体です。とはいえ、基本的には普通のTransformerと同じです。 ロボットの現在状態とノイズが混ざったアクションと時間を受け取って、アクションをどの方向に修正していくかを出力します。 embed_suffix
class ActionExpert(nn.Module): """ VLMなしで動作する単体のaction expert 入力: - state: [B, 32] ロボット状態 - noisy_actions: [B, 50, 32] ノイズ付きアクション - timestep: [B] Flow Matchingのタイムステップ 出力: - velocity: [B, 50, 32] 予測されたベクトル場 """ def __init__(self, config: Pi0ActionConfig): super().__init__() self.config = config self.state_proj = nn.Linear(config.action_dim, config.hidden_dim) self.action_in_proj = nn.Linear(config.action_dim, config.hidden_dim) # アクション + 時間のMLP融合 self.action_time_mlp_in = nn.Linear( 2 * config.hidden_dim, config.hidden_dim ) self.action_time_mlp_out = nn.Linear(config.hidden_dim, config.hidden_dim) # RoPE self.rotary_emb = RotaryEmbedding( config.head_dim, max_seq_len=config.action_horizon + 10, theta=config.rope_theta, ) # Transformer層 self.layers = nn.ModuleList( [ActionExpertLayer(config, i) for i in range(config.num_layers)] ) # 正規化 self.norm = RMSNorm(config.hidden_dim) # 出力 self.action_out_proj = nn.Linear(config.hidden_dim, config.action_dim) def embed_suffix( self, state: Tensor, noisy_actions: Tensor, timestep: Tensor, ) -> Tuple[Tensor, Tensor]: """suffixトークンを埋め込み - state_emb: [B, 1, hidden_dim] - action_emb: [B, action_horizon, hidden_dim] + time_emb - suffix: [B, 1 + action_horizon, hidden_dim] Args: state: [batch, action_dim] noisy_actions: [batch, action_horizon, action_dim] timestep: [batch] Returns: suffix_emb: [batch, 1 + action_horizon, hidden_dim] attention_mask: [batch, 1, seq, seq] Note: アクションを時間情報を入れて埋め込む π0論文の付録Bに記載がある部分 """ batch_size = state.shape[0] device = state.device # 状態埋め込み [batch, 1, hidden_dim] state_emb = self.state_proj(state).unsqueeze(1) # 時間埋め込み [batch, hidden_dim] time_emb = create_sinusoidal_pos_embedding( timestep, self.config.hidden_dim, self.config.time_min_period, self.config.time_max_period, ) # アクション埋め込み [batch, action_horizon, hidden_dim] action_emb = self.action_in_proj(noisy_actions) # アクション + 時間をMLP time_emb_expanded = time_emb[:, None, :].expand_as(action_emb) action_time_emb = torch.cat([action_emb, time_emb_expanded], dim=-1) action_time_emb = self.action_time_mlp_in(action_time_emb) action_time_emb = F.silu(action_time_emb) action_time_emb = self.action_time_mlp_out(action_time_emb) # 結合する [batch, 1 + action_horizon, hidden_dim] suffix_emb = torch.cat([state_emb, action_time_emb], dim=1) # Attention Mask構築(あっているはず...) # att_masks = [1] (状態) + [1] + [0]*(action_horizon-1) (アクション) # 1 = 新しいブロックの開始(前のトークンはこれ以降を見れない) # 0 = 同じブロック内(双方向Attention) seq_len = suffix_emb.shape[1] # 1 + action_horizon att_pattern = torch.zeros(seq_len, dtype=torch.long, device=device) att_pattern[0] = 1 # 状態トークン: 新ブロック att_pattern[1] = 1 # アクション最初のトークン: 新ブロック cumsum = torch.cumsum(att_pattern, dim=0) att_2d = cumsum[None, :] <= cumsum[:, None] # [seq, seq] attention_mask = att_2d[None, None, :, :].expand(batch_size, 1, seq_len, seq_len) attention_mask = torch.where( attention_mask, torch.zeros_like(attention_mask, dtype=suffix_emb.dtype), torch.full_like(attention_mask, float("-inf"), dtype=suffix_emb.dtype), ) return suffix_emb, attention_mask def forward( self, state: Tensor, noisy_actions: Tensor, timestep: Tensor, ) -> Tensor: """Forward pass Args: state: [batch, action_dim] noisy_actions: [batch, action_horizon, action_dim] timestep: [batch] Returns: 予測されたベクトル場 [batch, action_horizon, action_dim] """ # 入力埋め込み hidden_states, attention_mask = self.embed_suffix(state, noisy_actions, timestep) batch_size, seq_len, _ = hidden_states.shape # Position IDs position_ids = torch.arange(seq_len, device=hidden_states.device) position_ids = position_ids[None, :].expand(batch_size, -1) # RoPE cos, sin = self.rotary_emb(hidden_states, position_ids) # Transformer層 for layer in self.layers: hidden_states = layer( hidden_states, attention_mask=attention_mask, position_ids=position_ids, cos=cos, sin=sin, ) # 正規化 hidden_states = self.norm(hidden_states) # 状態トークンを除いてアクション部分のみ取得 action_hidden = hidden_states[:, 1:, :] # ベクトル場(velocity field)の予測 velocity = self.action_out_proj(action_hidden) return velocity
以下あたりに記載のあるFlow Matching部分だけを抜き取ったものです。
三日月を学習した時と同様に、ノイズからアクションに向かう方向を学習させていきます。 openpiの実装では三日月の時とは逆で、時刻1がノイズで時刻0がアクションになります。 1→0の中間状態をモデルに入力し、ノイズ空間からアクション空間への直線的な経路(u_t
def sample_noise(shape: Tuple[int, ...], device: torch.device) -> Tensor: return torch.randn(shape, device=device) def sample_time(batch_size: int, device: torch.device) -> Tensor: """Beta(1.5, 1.0)分布からtimestepをサンプリング openpiではtime = beta * 0.999 + 0.001として [0.001, 1.0]の範囲に収める。 """ beta_dist = torch.distributions.Beta(1.5, 1.0) time = beta_dist.sample((batch_size,)).to(device) time = time * 0.999 + 0.001 # [0.001, 1.0] return time def flow_matching_loss( model: ActionExpert, state: Tensor, actions: Tensor, noise: Optional[Tensor] = None, time: Optional[Tensor] = None, ) -> Tensor: """Flow Matching訓練損失 - x_t = t * noise + (1-t) * actions - u_t = noise - actions (target velocity) - v_t = model(x_t, t) (predicted velocity) - loss = MSE(v_t, u_t) Args: model: ActionExpertモデル state: [batch, action_dim] actions: [batch, action_horizon, action_dim] noise: オプションのノイズ time: オプションのタイムステップ Returns: [batch, action_horizon, action_dim] の損失 """ batch_size = state.shape[0] device = state.device if noise is None: noise = sample_noise(actions.shape, device) if time is None: time = sample_time(batch_size, device) # ノイズ混合: x_t = t * noise + (1-t) * a
関連記事
NVIDIA AI が自己改善型ロボットフレームワーク「ASPIRE」を発表、LIBERO-Pro の長期タスクでゼロショット成功率 31% を達成
作って終わりにしないための顧客検証ループづくり ~顧客ヒアリングを「価値検証フェーズ」として組み込んだ話
On-policy のはずが Off-policy になる:LLM 強化学習 の rollout mismatchと対策(rollout correction)
今日のまとめ
AI日報で今日の重要ニュースをまとめ読み