DagsterとMetaxyによるMLパイプラインのサンプルレベルバージョン管理
AnamのMLOpsエンジニアは、マルチモーダルデータパイプラインにおけるサンプルレベルのバージョン管理と不要な再計算を回避するため、DagsterとオープンソースフレームワークMetaxyを組み合わせた手法を紹介している。
キーポイント
従来のアセットレベルバージョン管理の限界
Dagsterなどのオーケストレーターは通常、テーブルやアセット全体のバージョン管理を行うため、一部の変更(例:ビデオ解像度の変更)に対して、関連しないオーディオ処理ステップまで再計算してしまう非効率な問題があった。
Metaxyによるサンプルレベルの制御
MetaxyはDagsterのようなオーケストレーターとRayのような低レベル処理エンジンを接続し、変更されたサンプルのみを正確に処理する「サンプルレベルのバージョン管理」を実現した。
マルチモーダルデータ処理の最適化
動画、音声、テキストなど多様なデータを扱うパイプラインにおいて、不要なMLモデル呼び出しやAPIコストを削減し、リアルタイムインタラクティブアバターの開発基盤を強化している。
マルチモーダルパイプラインの再実行コストと増分処理の必要性
マルチモーダルデータは計算量とI/Oコストが非常に高く、従来のテーブル型データとは異なり、再実行による高額な損失を防ぐため増分処理の実装が必須である。
Metaxyの役割と基本機能
MetaxyはDagster(データセット単位)と計算世界(個々のサンプル単位)を繋ぐツールであり、インフラ非依存で部分的なデータ更新を追跡する機能を持つ。
データフィールドに基づく粒度の細かいバージョン管理
Metaxyは単なる文字列ではなく辞書形式の「データフィールド」を用いることで、mp4ファイルなどのデータを記述し、数百万行バッチでの正確な増分バージョン管理を実現する。
フィールドレベルのバージョン管理による不要な再計算の回避
Metaxyは各フィールドごとに個別のデータバージョンを記録し、上位の変更が下位機能に影響しない場合、関連する機能の再計算をスキップすることで「小さな変更」の問題を解決します。
影響分析・編集コメントを表示
影響分析
この記事は、大規模なマルチモーダルAI開発において、データパイプラインの効率化とコスト削減を実現する具体的な技術的解決策を示しています。Dagsterエコシステムにおける「メタデータ管理」の重要性を再認識させると同時に、オープンソースツールによる既存インフラの拡張可能性を示す良い事例となっています。
編集コメント
マルチモーダルデータ処理における「不要な計算コスト」は実務上の大きな課題であり、メタデータの粒度を細かく制御するMetaxyのようなツールの存在は、大規模AI開発において無視できません。
Metaxyを使用して、Dagster上でサンプルレベルの粒度を持つマルチモーダルデータパイプラインを構築する方法を学びましょう
私はダニエル・ガフニと申します。Anam社のMLOpsエンジニアです。
Anamでは、リアルタイムでインタラクティブなアバターを構築するためのプラットフォームを開発しています。当社製品を支える重要なコンポーネントの一つが、自社開発のビデオ生成モデルです。
私たちは、ビデオやオーディオデータのあらゆる種類の前処理を必要とするカスタムトレーニングデータセットでこのモデルを訓練します。MLモデルで埋め込みを抽出したり、アノテーションやデータ合成のために外部APIを利用したりします。もちろん、これらのステップをデータアセットとしてオーケストレーションするためにDagsterを使用しています。
このブログ記事では、これらのパイプラインにおけるサンプルレベルバージョニングを解決するために、私たちが新しいオープンソースフレームワークであるMetaxyとDagsterをどのように組み合わせて使用したかを共有します。
Metaxyは、通常テーブル(またはアセット)レベルで動作するDagsterのようなオーケストレーターと、Rayのような低レベルの処理エンジンを接続し、各ステップで処理すべきサンプルを正確に処理し、余分なサンプルを処理しないようにすることを可能にします。
小さな変更
数ヶ月前、私たちはデータ準備パイプラインに小さな変更を加えることに決めました。当時、私たちは各サンプルのバージョンを追跡するカスタムデータバージョニングシステムを使用していました。このシステムは、手動で指定されたステップのcode_versionと上流ステップ(聞き覚えがありますか?)に基づいて、各ステップのフィンガープリントを計算することができました。Dagsterのデータバージョニングとの唯一の違いは粒度レベルでした:私たちはデータセットの各行に対して計算していましたが、Dagsterはアセットレベルのバージョニングしかサポートしていません。
私たちが導入したかった変更は非常に単純なものでした:ビデオを新しい解像度でクロップしたいと考えていました。これはクロップステージのcode_versionを変更することを意味し、下流のステップは自動的に再計算されるはずでした。しかし、私たちは不快な結果にも気づきました。クロップステップの直後、私たちのパイプラインは2つの領域に分岐していました:
下流ステップの半分は、クロップされたビデオフレームを全く使用していませんでした。それらはオーディオ部分のみを操作していました。しかし、私たちのデータバージョニングシステムはこの詳細を認識しておらず、とにかくそれらを再計算してしまうのです。これは、カスタムオーディオMLモデルをトレーニングデータセット全体で実行することを意味します:非常に高価で、絶対に不要な処理です。
この瞬間、私はデータバージョニングに対する私たちの素朴なアプローチに何か問題があることに気づきました。このブログ記事で議論するプロジェクト、Metaxyのアイデアはここで生まれました。
マルチモーダルデータへの一瞥
ソフトウェアの世界がAIに侵食されるにつれ、より多くのチームや組織がマルチモーダルデータパイプラインと関わり始めています。従来のデータエンジニアリングワークフローとは異なり、これらのパイプラインはテーブルだけでなく、テキスト、画像、オーディオ、ビデオ、ベクトル埋め込み、医療データなども扱います。
マルチモーダルデータパイプラインは非常にユニークであり、要件と複雑さはユースケースによって大きく異なります。HTTP経由でAI APIを呼び出す場合でも、ローカルでML推論を実行する場合でも、単にffmpegを呼び出す場合でも、共通点があります:計算とI/Oは非常に速く高価になります。
従来の(表形式)データパイプラインが再実行されるとき、通常はそれほどコストがかかりません。確かに、ビッグデータは存在し、Sparkジョブはペタバイト級の表形式データをクエリできますが、実際にはこれらの問題に直面するチームはほとんどありません。それがSmall Dataムーブメントが成功した大きな理由です:Snowflakeのスキャンの中央値は100MB未満のデータを読み取り、組織の80%は10TB未満のデータしか持っていません!したがって、表形式パイプラインを再実行することは通常問題ありません。また、増分処理を実装するよりもはるかに簡単です。
マルチモーダルパイプラインは全く別の獣です。それらは数桁多い計算とデータ移動を必要とします。データセット全体でWhisper音声文字起こしステップを誤って再実行してしまいましたか?おめでとうございます:1万ドルが無駄になりました!
そのため、マルチモーダルパイプラインでは、増分アプローチの実装は選択肢ではなく必要条件となります。そして、それは非常に複雑であることがわかります。
Metaxyの紹介
Metaxyは、データセット(またはそのパーティション)しか認識しないDagsterの世界と、個々のサンプルを扱わなければならない計算の世界をつなぐ、欠けていたピースです。
Metaxyには、それをユニークにする2つの特徴があります:
部分的なデータ更新を追跡することができます。
インフラストラクチャに依存せず、Pythonで書かれた任意のデータパイプラインに接続できます。
Dagsterがデータバージョンを計算するために採用しているのと同じアプローチを実装していますが、それを拡張しています:
一度に数百万行を処理するために、バッチで動作するように
リモートデータベースまたはローカルで実行できるように
データフレームエンジンやDBに依存しないように
データフィールドを認識するように:Metaxyの各データバージョンは文字列ではなく、実際には辞書です。
Metaxyの主な目標の一つは、粒度の細かい部分的なデータバージョニングを可能にし、部分的な更新が正しく認識されるようにすることです。Metaxyでは、通常のメタデータ列に加えて、ユーザーはデータフィールドを定義し、バージョン管理することもできます:
データフィールドは任意であり、ユーザーが適切と考えるように設計できます。重要なポイントは、データフィールドが表形式のメタデータではなく、データ(例:mp4ファイル)を記述することです。
その後、数行のPythonコードを実行できます:
with store:
increment = store.resolve_update("video/face_crop")これは裏側で多くの作業を行います:
上流ステップの状態テーブルを結合する
各行の期待されるデータバージョンを計算する - これが最も複雑なステップです。各バージョンが辞書であり、辞書の各フィールドが独自の上流フィールドのサブセットに依存する可能性があるため、複雑です
解決対象のステップの状態テーブルをロードし、バージョンを期待されるものと比較する
新規、陳腐化、孤立したサンプルをユーザーに返す
ユーザーがincrementオブジェクトを取得すると(ちなみに、これは遅延評価可能です!)、各カテゴリのサンプルをどうするかを決定できます:通常、新規と陳腐化は再処理され、孤立したものは削除される可能性があります。
Metaxyは、部分的なデータ依存関係を認識することで、「小さな変更」問題を解決します。
3つのMetaxyフィーチャー(Metaxyが各ステップで生成されるデータを呼ぶ方法)を考えてみましょう:ビデオファイル(video/full)、Whisper文字起こし(transcript/whisper)、顔の周りでクロップされたビデオファイル(video/face_crop):
オーディオとフレームの別々の情報パスは色分けされています。フィーチャー間に明確なフィールドレベル、または部分的なデータ依存関係があることに注目してください。各フィールドバージョンは、それが依存するフィールドのバージョンから計算されます。同じフィーチャーからのフィールドバージョンは、その後結合されてフィーチャーバージョンを生成します。
transcript/whisperフィーチャーのtextフィールドがvideo/fullフィーチャーのaudioフィールドにのみ依存していることは明らかです。もしvideo/fullのサイズを変更することに決めたとしても、transcript/whisperは再計算する必要はありません。
Metaxyはこの種の「無関係な」更新を検出し、上流の変更の影響を受けないフィールドを持つ下流フィーチャーの再計算をスキップします。これは、各フィーチャーの各サンプルの各フィールドに対して個別のデータバージョンを記録することで実現されます:
metaxy_provenance_by_field
{"audio": "a7f3c2d8", "frames": "b9e1f4a2"}
{"audio": "d4b8e9c1", "frames": "f2a6d7b3"}
{"audio": "c9f2a8e4", "frames": "e7d3b1c5"}
{"audio": "b1e4f9a7", "frames": "a8c2e6d9"}構成可能性の課題
先に述べたように、増分パイプラインは多様です:それらはしばしばユニークな環境で実行され、特定のインフラストラクチャ、異なるクラウドプロバイダー(Neocloudsを含む)、またはRayやModalのようなスケーリングエンジンを必要とします。
Metaxyの目標の一つは、Dagsterと同じくらい汎用的で依存性がなく、この多様なユースケースをサポートすることでした。Metaxyは、異なるユーザーや組織が使用できるようにするために、プラグ可能でなければなりませんでした。
そして、これは可能であることがわかりました!Metaxyによって行われるメタデータ管理作業の95%は、データベースに依存しない方法で実装されており、PolarsやDuckDBでローカルでも実行できます!
これは、IbisとNarwhalsプロジェクトに投入された信じられないほどの作業量によってのみ可能になりました。Ibisは同じPythonインターフェース(典型的なDataFrame APIではありません)を実装しています...
原文を表示
Learn how Metaxy can be used to build multimodal data pipelines with sample-level granularity on Dagster
My name is Daniel Gafni, and I'm an MLOps engineer at Anam.
At Anam, we are making a platform for building real-time interactive avatars. One of the key components powering our product is our own video generation model.
We train it on custom training datasets that require all sorts of pre-processing of video and audio data. We extract embeddings with ML models, use external APIs for annotation and data synthesis, and so on. Of course, we use Dagster to orchestrate these steps as data assets.
In this blog post, we're going to share how we used Dagster together with our new open source Metaxy framework to solve sample-level versioning for these pipelines.
Metaxy connects orchestrators such as Dagster that usually operate at table (or asset) level with low-level processing engines (such as Ray), allowing us to process exactly the samples that have to be processed at each step and not a sample more.
The Little Change
A few months ago we decided to introduce a little change into the data preparation pipeline. At that time, we were using a custom data versioning system which tracked a version for each sample. The system could compute a fingerprint for each step based on a manually specified code_version on the step and the upstream steps (sounds familiar?). The only difference with Dagster's data versioning was the granularity level: we computed it for each row in the dataset, while Dagster only supports asset-level versioning.
The change we wanted to introduce was very simple: we wanted to crop our videos at a new resolution. This implied changing the code_version of the cropping stage, and the downstream steps would be re-computed automatically. However, we also noticed an unpleasant outcome. Right after the cropping step, our pipeline branched into two regions:
Half of the downstream steps were not even using the cropped video frames. They only operated on the audio part. But our data versioning system was unaware of this detail and would re-compute them anyway. This means running our custom audio ML models on the entire training dataset: very expensive and absolutely unnecessary.
This was the moment I realized there was something wrong with our naive approach to data versioning. The idea of Metaxy - the project we are going to discuss in this blog post - was born.
A Glimpse Into Multimodal Data
As the software world is being eaten by AI, more teams and organizations are starting to interact with multimodal data pipelines. Unlike traditional data engineering workflows, these pipelines are not dealing just with tables, but also with text, images, audio, videos, vector embeddings, medical data, and so on.
Multimodal data pipelines can be very unique, with requirements and complexity varying from use case to use case. Whether calling AI APIs over HTTP, running local ML inference, or simply invoking ffmpeg, there is something in common: compute and I/O gets expensive very quickly.
When traditional (tabular) data pipelines are re-executed, they typically don't cost much. Sure, Big Data exists, and Spark jobs can query petabytes of tabular data, but in reality very few teams actually run into these issues. That's a big reason behind the Small Data movement's success: the median Snowflake scan reads less than 100MB of data, and 80% of organizations have less than 10TB of data! Therefore, re-running a tabular pipeline is usually fine. It's also much easier to do than to implement incremental processing.
Multimodal pipelines are a whole different beast. They require a few orders of magnitude more compute and data movement. Accidentally re-executed your Whisper voice transcription step on the whole dataset? Congratulations: $10k just wasted!
That's why with multimodal pipelines, implementing incremental approaches is a requirement rather than an option. And it turns out, it's damn complicated.
Introducing Metaxy
Metaxy is the missing piece connecting the Dagster world - that's only aware of datasets (or their partitions) - with the compute world, which has to deal with individual samples.
Metaxy has two features that make it unique:
It is able to track partial data updates.
It is agnostic to infrastructure and can be plugged into any data pipeline written in Python.
It implements the same approach Dagster takes for computing data versions, but extends it:
to work in batches, for millions of rows at a time
to run in a remote database or locally
to be agnostic to dataframe engines or DBs
to be aware of data fields: instead of being a string, each data version in Metaxy is actually a dictionary.
One of the main goals of Metaxy is to enable granular partial data versioning, so that partial updates are recognized correctly. In Metaxy, alongside the normal metadata columns, users can also define and version data fields:
Data fields can be arbitrary and designed as users see fit. The key takeaway is that data fields describe data (e.g. mp4 files) and not tabular metadata.
Then, you can run a few lines of Python code:
with store: increment = store.resolve_update("video/face_crop")
which does a lot of work behind the scenes:
joining state tables for upstream steps
computing expected data versions for each row - this is the most complicated step. It is complicated because each version is a dictionary, and each field in the dictionary may depend on its own subset of upstream fields
loading the state table for the step being resolved and comparing versions with the expected ones
returning new, stale and orphaned samples to the user
Once the user gets the increment object (by the way - it can be lazy!), they can decide what to do with each category of samples: typically new and stale are processed again, while orphaned may be deleted.
Metaxy solves the "little change" problem by being aware of partial data dependencies.
Consider 3 Metaxy features (that's how Metaxy calls the data produced at each step): video files (video/full), Whisper transcripts (transcript/whisper), and video files cropped around the face (video/face_crop):
Separate information paths for audio and frames are color-coded. Notice how there are clear field-level, or partial data dependencies between features. Each field version is computed from the versions of the fields it depends on. Field versions from the same feature are then combined together to produce a feature version.
It is obvious that the text field of the transcript/whisper feature only depends on the audio field of the video/full feature. If we decided to resize video/full, then transcript/whisper doesn't have to be recomputed.
Metaxy detects these kinds of "irrelevant" updates and skips re-omputation for downstream features that do not have fields affected by upstream changes. This is achieved by recording a separate data version for each field of every sample of every feature:
metaxy_provenance_by_field
{"audio": "a7f3c2d8", "frames": "b9e1f4a2"}
{"audio": "d4b8e9c1", "frames": "f2a6d7b3"}
{"audio": "c9f2a8e4", "frames": "e7d3b1c5"}
{"audio": "b1e4f9a7", "frames": "a8c2e6d9"}
The challenge of composability
As was mentioned earlier, incremental pipelines are diverse: they often run in unique environments, require specific infrastructure, different cloud providers (including Neoclouds), or scaling engines such as Ray or Modal.
One of the goals of Metaxy was to be as versatile and agnostic as Dagster and support this variety of use cases. Metaxy had to be pluggable in order to be usable by different users and organizations.
And it turns out, this is possible! 95% of metadata management work done by Metaxy is implemented in a way that's agnostic to databases, and can even run locally with Polars or DuckDB!
This is only possible due to the incredible amount of work that has been put into the Ibis and Narwhals projects. Ibis implements the same Python interface (not a typical DataFrame API) for 20+ databases, while Narwhals does the same for different DataFrame engines (Pandas, Polars, DuckDB, Ibis, and more), converging everything to a subset of the Polars API.
Narwhals (or Polars) expressions are the GOAT for programmatic query building. Most of Metaxy's versioning engine is implemented in Narwhals expressions, while a few narrow parts had to be pushed back to specific backends.
The importance of this cannot be stressed enough. Entire new generations of composable data tooling can be built on top of Narwhals - and of course, none of this would be possible without Apache Arrow.
Metaxy and Dagster
Of course, Metaxy was designed to be used with Dagster. Not only did it take inspiration from Dagster's data versioning design, but it also naturally inherited some of the other properties of Dagster: being declarative and asset-oriented. Just take a look at this API, which should look extremely familiar to any Dagster user:
import metaxy as mx spec = mx.FeatureSpec( key="video/crop", id_columns=["id"], deps=["video/raw"], description="Videos cropped to 720x480.", metadata={"team": "ML"}, )
Like Dagster, Metaxy has a declarative DSL for defining DAGs, where nodes represent data. They are called Features in Metaxy. Metaxy Features directly map into Dagster Assets.
The spec from above can be attached to a feature class:
class VideoCrop(mx.BaseFeature, spec=spec): path: str duration: float frame_count: int
Now, these feature definitions can be trivially integrated with Dagster assets:
import dagster as dg import metaxy as mx from metaxy.ext.dagster import metaxify @metaxify() @dg.asset(metadata={"metaxy/feature": "video/crop"}) def video_crops(store: dg.ResourceParam[mx.MetadataStore]): with mx.BufferedMetadataWriter(store) as writer: changes = store.resolve_update("video/crop") for sample in changes.new: # handle each sample here, e.g. run ffmpeg to crop videos # or do something else # once done, submit metadata to Metaxy # BufferedMetadataWriter flushes them automatically writer.put({"video/crop": results})
The @metaxify decorator here does a lot of heavy lifting, injecting all the information available to Metaxy - and Metaxy knows everything about the feature/asset graph - into the otherwise bare-bones Dagster asset. It gets correct lineage, Dagster's code_version, table schema metadata, and more.
Metaxy also integrates with Dagster in a few other ways. A notable mention would be the MetaxyIOManager, which allows doing I/O with any of the Metaxy-supported metadata stores, such as DuckDB, DeltaLake, ClickHouse - and you can even use the same code while swapping them for development and production on demand!
Because this integration is so non-invasive and simple, Metaxy acts as an extension to Dagster for orchestrating individual samples in multimodal datasets.
See the @metaxify documentation for the full details.
Until now, managing individual files in Dagster pipelines has been a very cumbersome task. Metaxy makes this easy, allowing Dagster users to focus on transformations. Dagster and Metaxy elegantly complement each other: the former manages asset-level orchestration and kicks off pipelines calling Metaxy code, while the latter handles row-level orchestration with sub-sample granularity.
Metaxy can also be used outside of Dagster! And surely we're going to discover a lot more of what's possible with it.
Read our docs here and uv pip install metaxy[dagster]!
We are thrilled to help more users solve their metadata management problems with Metaxy. Please do not hesitate to reach out on GitHub!
Acknowledgments
Thanks to Georg Heiler for contributing to the project with discussions and code, and thanks to the open source projects Metaxy is built on: Narwhals, Ibis, and others.
Have feedback or questions? Start a discussion in Slack or Github.
Interested in working with us? View our open roles.
Want more content like this? Follow us on LinkedIn.
![](https://cdn.prod.website-files.com/681399f654933b
関連記事
今日のまとめ
AI日報で今日の重要ニュースをまとめ読み