AIニュース最前線
最新ニュースAI日報Hacker日報週報動画AIツールトレンド企業

AIニュース最前線

世界中のAI最新情報を日本語で毎時更新

最新ニュース日報トレンド企業プレミアムRSS
© 2026 ainew.jp特定商取引法に基づく表記
ニュース一覧元記事を開く
Mercari Engineering·2026年6月19日 10:00·約24分で読める

TiDB や AlloyDB の大規模テーブルを BigQuery に高速同期するための工夫

#Data Engineering#Distributed Systems#TiDB#Cloud Spanner#BigQuery
TL;DR

Mercari Engineering チームは、TiDB や AlloyDB の大規模テーブルを BigQuery に高速同期するために、汎用 JDBC のボトルネックを解消する専用 Source モジュールを開発し、分散処理とネイティブ機能活用によるスループット向上を実現した。

AI深層分析2026年6月19日 01:02
3
注目/ 5段階
深度40%
4
関連度30%
2
実用性20%
5
革新性10%
3

キーポイント

1

汎用 JDBC のボトルネック克服

1 行ごとの SQL 処理オーバーヘッドや OFFSET ベースの分割による遅延、並列化時の一貫性維持の難しさなど、汎用ライブラリでは大規模テーブル同期が困難な課題を指摘し、専用モジュールの必要性を論証している。

2

各 DB のネイティブ機能活用

TiDB や AlloyDB が持つバルク転送機構や物理データ配置情報を活用し、Cloud Spanner で成功した「Split ベースの自動分割」アプローチを他データベースへ移植することで並列取得を実現した。

3

自動化された分割キー抽出

数百億件に達する多数のテーブルに対し、人手で分割条件を指定する非現実的な運用を防ぐため、主キーや暗黙の行 ID、物理ブロック位置をメタデータから自動検出・絞り込みを行う仕組みを実装した。

4

Cloud Dataflow 上での実装

Apache Beam を基盤とした Cloud Dataflow 環境に `TiDBSource` と `PostgresSource`(AlloyDB 対応)を構築し、Beam Worker による分散並列実行とスナップショット一貫性を担保するアーキテクチャを採用した。

影響分析・編集コメントを表示

影響分析

この記事は、大規模データウェアハウスへのデータ取り込みにおいて、汎用ライブラリに依存しない「DB 固有最適化」アプローチの有効性を示す実証事例です。特に、数百億件規模のテーブルを扱う企業にとって、スループットと一貫性の両立を実現する具体的なアーキテクチャパターンを提供しており、データインフラ設計のベストプラクティスとして広く参照される可能性があります。

編集コメント

AI や LLM の直接的な話題ではありませんが、大規模データ基盤の構築において「汎用ツール」から「最適化された専用実装」へ移行する重要性を示す、エンジニアリングレベルで極めて価値の高い技術記事です。

こんにちは。Data Ingestion チームの Data Engineer をしている @orfeon です。この記事は「Merpay & Mercoin Tech Openness Month 2026」の 14 日目の記事です。

はじめに

Data Ingestion(旧 Data Platform) チームでは、多数のマイクロサービスが管理するデータベース・テーブルから、大量のデータを継続的に DWH(データウェアハウス)へ同期する必要があります。同期対象には数億〜数百億件に達する大規模なテーブルも含まれ、これらをいかに速く・安全に・一貫性を保ったまま抽出するかが、DWH の鮮度や安定性にとって大事になります。

これまで Cloud Spanner からのデータ取得では、Spanner の分散 DB 特有の機能(後述)を活用することで、大規模テーブルでも高いスループットでの取得を実現できていました。一方、社内には TiDB や AlloyDB といった Spanner 以外のデータベースも多く利用されており、その中には数百億件以上に達するテーブルもあります。これらのテーブルは従来、主キーなどでシーク方式で取得していましたが、単一コネクションでのシーケンシャルなデータ取得になるため、大規模テーブルでは取得に非常に時間がかかっていました。

そこで今回、Spanner と同じように、それぞれの DB に特有の機能を活用して並列取得などでスループットを上げるよう工夫しました。具体的には、TiDB と AlloyDB の大規模テーブルを DWH へ同期する仕組みを Cloud Dataflow(Apache Beam)上に構築しました。本記事では、その中核となる 2 つの Source モジュール TiDBSource と PostgresSource について、高いスループットを実現するための工夫を解説します。

なぜ汎用 JDBC ではなく専用モジュールなのか

Beam/Dataflow には汎用的な JdbcIO が既に存在します。しかし汎用 JDBC は「SELECT を実行して結果を 1 行ずつ読む」という標準的な経路をたどるため、大規模テーブルでは以下のボトルネックが発生します。

  • 1 行ごとの SQL 処理オーバーヘッド:通常のクエリ実行では、サーバ側でのタプルのテキスト/プロトコル変換などが行ごとに発生する。
  • 並列化の難しさ:テーブルを並列に読むには「どこで分割するか」を決める必要があるが、OFFSET ベースの分割はオフセットが大きくなるほど遅くなり、フルスキャンを誘発する。
  • 一貫性の確保:並列に複数コネクションから読む場合、各コネクションが別々の時点を読むと整合性が崩れる。

そこで今回のモジュールでは、それぞれのデータベースが持つネイティブなバルク転送機構と物理的なデータ配置情報を活用し、汎用 JDBC のボトルネックを回避する設計にしました。

加えて運用上の大きなメリットとして、分割キー(フィルタ条件)の自動抽出があります。マイクロサービスごとに膨大なテーブルを扱う環境では、テーブル 1 つひとつに対して「どのカラムで分割するか」を手作業で指定するのは現実的ではありません。両モジュールはテーブルのメタデータから主キー(PK)や暗黙の行 ID、物理ブロック位置を自動で見つけ出し、分割範囲の絞り込み条件を組み立てます。利用者は接続先とテーブル名を指定するだけで、同じ設定が多数のテーブルに横展開することができます。

なぜ Cloud Spanner では高いスループットでデータ取得が可能なのか

今回の設計の発想は、既にうまくいっていた Spanner からの取得方法を、TiDB や AlloyDB にも持ち込むことにありました。そこでまず Spanner が大規模テーブルでも高いスループットを出せている理由を説明します。Spanner は分散データベースとして、以下の機能を組み合わせています。

  • PartitionQuery / PartitionRead(Split ベースの自動分割): Spanner はデータを内部的に Split(キー範囲+負荷ベース)へ分割して保持しています。PartitionQuery はこの Split 境界に基づいてクエリを複数のパーティションに自動分割します。クライアントはキー範囲など Split の内部構造を意識する必要がありません。
  • BatchReadOnlyTransaction(スナップショット一貫性): 全パーティションの読み取りが、TimestampBound で指定した同一スナップショットを参照することを保証します。ロックを取らずに一貫した読み取りができます。
  • Partition Token の分散・並列実行:分割結果はシリアライズ可能な Partition Token として返されるため、複数プロセス・複数マシン、そして Beam Worker に配布してそのまま並列実行できます。Apache Beam の SpannerIO も内部でこの仕組みを使っています。
  • Partition Token による自動バージョン保持: Token が有効な間は対象バージョンが GC(ガベージコレクション)されないことが保証されるため、クライアント側で明示的なバージョン保護(SafePoint 管理)が不要です。
  • Data Boost(Spanner 固有): Google 管理の独立した計算リソースで読み取るオプションで、本番ワークロードへの影響をほぼゼロにしつつ弾力的にスケールできます。

これらは「物理的なデータ配置に沿った自動分割」「スナップショット一貫性」「分割単位の分散ワーカーへの配布と並列実行」という構図で成り立っています。Spanner ではこれらが高度に抽象化された API として提供されていますが、TiDB や AlloyDB(PostgreSQL)にもそれに近い DB 固有の機能が存在します。

この Spanner の機能と TiDB や PostgreSQL の機能は以下のように対応します。

Spanner

TiDB(dumpling 相当)

AlloyDB(PostgreSQL)

PartitionQuery(Split 境界で自動分割)

TABLESAMPLE REGIONS()(TiKV Region 境界)

ctid 物理ブロック範囲(pg_relation_size)

バッチ読み取り専用トランザクション(スナップショット)

tidb_snapshot MVCC(マルチバージョン同時実行制御) + TSO(タイムスタンプオラクル)

バッチ読み取り(ctid スナップショットのズレは許容)

パーティショントークンによる分散実行

範囲条件の分散実行(本記事の設計)

範囲条件の分散実行(本記事の設計)

パーティショントークンによる自動 GC 保護

tidb_gc_life_time の引き上げで代替

(該当なし)

Spanner の SpannerIO で提供されている「分割 → 配布 → 並列スナップショット読み取り」を、TiDB/AlloyDB では DB 固有の機能を組み合わせて自前で構築する、というのが本記事のモジュールの狙いです。以降その共通の仕組みと各 DB 向けの実装を見ていきます。

共通アーキテクチャ

両モジュールに共通する基本戦略は次の 3 ステップです。TiDBSource/PostgresSource は Cloud Dataflow バッチジョブとして実行され、以下 3 つのステップで役割が分かれます。

  • テーブルの範囲分割:1 本のコネクションでメタデータだけを取得し、テーブルを物理的な分割単位(Range)のリストに列挙する
  • 再分配:分割単位を PCollection の「種」として撒き、Reshuffle でワーカーに再分配する
  • 並列読み込み:各ワーカーが担当 Range をネイティブのバルク転送機構で並列に読み取る
image
image

以降、TiDB と PostgreSQL それぞれについて、この 3 ステップの中身を掘り下げます。まず TiDB から、この 3 つのステップがどのように実装されるかを見ていきます。

TiDB テーブルからのデータ抽出

TiDB 公式ツール dumpling に学ぶ

TiDB には dumpling という高速なエクスポートツールが公式に提供されています。TiDBSource の設計は、この dumpling が高スループットを実現している仕組みを参考にしています。まずは dumpling 側の要点を整理します。

テーブルのチャンク分割と並列読み取り

dumpling は、1 テーブルを丸ごと 1 クエリで読むのではなく、テーブルをチャンク(範囲)に分割し、各チャンクを独立した SELECT クエリとして並列実行します。チャンク分割は 3 段階のフォールバック構造になっています。

戦略

方式

概要

A(最優先)

TiKV Region ベース分割

TABLESAMPLE REGIONS() で Region 境界をチャンク境界にする

B(フォールバック)

数値インデックスベース分割

数値型 PK/インデックスの MIN/MAX から均等分割

C(最終)

テーブル全体ダンプ

分割可能なフィールドがない場合は 1 クエリ

特に重要なのが戦略Aです。TiDBではデータがTiKV上でRegion(デフォルト96MB単位)に分散配置されます。dumplingはこのRegion境界をそのままチャンク境界として利用するため、各チャンクが異なるTiKVノードへの読み取りリクエストに分散され、クラスタ全体のI/O帯域を引き出せます。

image
image

dumplingの並列実行の仕組み: Producer-Consumer

分割したチャンクを並列に読み出すために、dumplingは内部で Producer-Consumer という構造をとります。登場人物は次の3つです(いずれもdumplingの実装に出てくる用語です)。

  • Producer(プロデューサ): テーブルをチャンクに分割し、「このチャンクを読め」というタスクを作り続ける係。dumplingではメインのgoroutineが担当します。先ほどのRegion境界などをもとにタスクを生成します。
  • Writer(ライター): 生成されたタスクを受け取り、実際にSELECTを発行してデータを読み出す係。--threads で指定した数だけ並列に動き、それぞれが独立したDB接続を持ちます。タスクを消費するConsumer側にあたります。
  • infiniteChan(無制限チャネル): ProducerとWriterの間をつなぐ、容量に上限のないキュー(待ち行列)。Writerの処理が詰まってもProducerがブロックされず、生成済みのタスクをいくらでも貯めておけます。
image
image

このように、タスクを作成する人(Producer)とタスクを実行する人(Writer)を分離し、その間を待ち行列(infiniteChan)でつなぐことで、分割と読み取りを互いに待たせずに並列で回す基本構造です。後述のTiDBSourceは、この役割分担をそのままDataflowの分散モデルに置き換えています。

Snapshot読み取り

dumplingはTiDBのMVCC (Multi-Version Concurrency Control)機構を利用し、特定のTSO(Timestamp Oracle)時点の スナップショット から一貫したデータを読み取ります。

  • ロック不要: FLUSH TABLES WITH READ LOCKのような排他ロックが不要で、書き込みをブロックしない。
  • 一貫性保証: 全Writerが同一時点のデータを読むため整合性が保たれる。
  • 高スループット: ロック競合がないため並列度を上げられる。

加えてdumplingは、長時間のダンプ中にTiDBのGC(Garbage Collection)がスナップショット時点の古いバージョンを回収しないよう、PD(Placement Driver)に対してGC SafePointを登録します。

TiDBSource は、dumpling のこれらのアイデアを Apache Beam / Dataflow のモデルに移植したものです。dumpling が goroutine で実現していた並列性を、Dataflow の分散ワーカーによる並列性に置き換えています。対応関係は次の通りです。

dumpling

TiDBSource (Dataflow)

Producer がチャンクタスクを生成

パイプライン構築時に Range のリストを生成

infiniteChan + 複数 Writer goroutine

Reshuffle + ParDo による分散ワーカー並列処理

各 Writer が独立 DB 接続で SELECT

各ワーカーが@Setup で独自コネクションを確立

TSO スナップショット読み取り

TSO を一度取得し全ワーカーに配布

ステップ 1: 分割キーの決定と Range の列挙

パイプラインの初期起動時に 1 本のコネクションを張り、出力スキーマの確定・スナップショット TSO の取得・テーブルの分割を行います。ここではメタデータと境界値だけを読み、実データのスキャンは行いません。

分割キーの自動解決 は次の優先順位で行われます。利用者がカラムを指定しなくても、テーブルのメタデータから自動的に決定されます。

  • 利用者が splitField を明示指定していればそれを使う
  • なければ単一カラムの主キー(PK)
  • それもなければ暗黙の行 ID _tidb_rowid(クラスタードキーを持たないテーブル向け)

_tidb_rowid は、明示的な主キーを持たないテーブルで TiDB が内部的に振る暗黙の行 ID です。これを分割キーに使えるため、主キー設計に依存せず、どんなテーブルでも分割の足がかりを得られます。

Range の列挙 は、先述の dumpling の戦略 A→B→C と同じ 3 段階フォールバックで行います。

戦略 A は、次の SQL で TiKV の Region 境界を取り出します。

SELECT pk FROM table TABLESAMPLE REGIONS() ORDER BY pk

TABLESAMPLE REGIONS() は各 Region の先頭行を返すため、結果の各値が「次のチャンクの下限」になります。境界値の列 b[1], b[2], …, b[n] から、隣り合う境界で挟まれた半開区間を生成します。取りこぼしを防ぐため、最初の区間は下側を、最後の区間は上側を開いておきます。

chunk[ -∞, b[1] ), chunk[ b[1], b[2] ), …, chunk[ b[n], +∞ )

TABLESAMPLE REGIONS() は TiDB v5.0 以降の構文です。非 TiDB の MySQL や古い TiDB ではこのクエリが失敗するため、自動的に戦略 B(数値 MIN/MAX 均等分割)へフォールバックします。戦略 B は、SELECT MIN(pk), MAX(pk) で取得した範囲を、推定行数とチャンクあたりの目標行数 splitSize から決めた個数で等分します。

chunks = ⌈ 推定行数 / splitSize ⌉

step = (max − min) / chunks + 1

区間 = [min, min+step), [min+step, min+2·step), … , [ …, max]

(stepの計算では厳密な切り上げ ⌈(max−min)/chunks⌉ ではなく+1 としています。半開区間 [cutoff, cutoff+step) で走査するため、割り切れるケースでもmax が最終チャンクに確実に含まれるようstep を 1 大きく取っており、実際のチャンク数は chunks 以下になります)

ステップ2: Rangeの再分配(Reshuffle)

範囲が決まったら次にワーカーに範囲ごとの処理を並列にさせる必要があります。列挙したRangeのリストを並列実行するよう明示的に指定するためにPCollection化したRange の後に、 Reshuffle.viaRandomKey() を挟みます。 Reshuffle には2つの重要な役割があります。

  • fusionの分断: Dataflowは連続する処理を一つの処理Stageとして結合してしまうことがあります。これがないとDataflowはCreateと後続の読み取りParDoを融合し、Rangeが1ワーカーに偏って並列性が出ません。
  • ランダムキーによる再分配: 全Rangeが利用可能なワーカーへ均等にばらまかれ、クラスタ全体に読み取り負荷が分散されます。

これがdumplingの「infiniteChanから複数Writerがタスクを取り出す」構造に相当します。

ステップ3: 各ワーカーでのスナップショット並列読み取り

各ワーカーは処理開始時(@Setup)に自前のコネクションを確立し、パイプライン構築時に取得・配布されたTSOで tidb_snapshot をセットします。 全ワーカーが同一TSOを使うことで、分散読み取りでも単一時点の一貫したスナップショットになります。

TSOの取得は、一貫スナップショットを開始してその時点の論理時刻を読むだけで完了します(トランザクション自体はすぐロールバックします)。

START TRANSACTION WITH CONSISTENT SNAPSHOT;SELECT @@tidb_current_ts; -- ← この値(TSO)を全ワーカーに配布

各ワーカー側では、読み取り前にセッション変数を設定します。

SET @@tidb_snapshot = ''; -- ロックなしで同一MVCC版を読む

SET @@session.tidb_enable_paging = ON; -- 大量スキャン時のメモリ使用量を抑制

tidb_enable_paging はCoprocessorリクエストのメモリ使用量を抑える設定です(TiDB v6.2.0以降はデフォルト有効。変数を知らないDBではスキップ)。

実際の読み取りでは、担当Rangeを分割キーへの値の範囲条件に変換してSELECTに組み込みます。

SELECT * FROM table WHERE pk >= 1000 AND pk

ここで重要なのは、この条件が主キー(インデックス)に対する値の比較である点です。データベースはこの行値比較を使ってインデックスをシークし、該当範囲の先頭へ直接ジャンプして必要な行だけを読みます。OFFSETのように先頭から数え直す必要がないため、どのチャンクも一定コストで読み出せます。

また、読み取りはMySQL Connector/Jの行単位ストリーミングモード(fetchSize = Integer.MIN_VALUE)で行います。これは結果セット全体をワーカー側メモリにバッファせず1行ずつ取り出す特別な設定で、巨大チャンクでもワーカーのメモリ消費が一定に保たれます。dumplingがgo-sql-driver/mysqlのストリーミング動作で実現しているのと同じ効果を、JDBC側で引き出している形です。

制約: GC SafePoint

dumplingはPDクライアント経由でGC SafePointを登録しますが、これは JDBCコネクションからは到達できません。そのためTiDBSourceではSafePoint登録を行わず、長時間の読み取りに対しては運用側でクラスタの tidb_gc_life_time を引き上げ、読み取り中にGCがスナップショットのバージョンを回収しないようにする運用方針としています。

AlloyDB(PostgreSQL)テーブルからのデータ抽出

AlloyDBとPostgreSQL互換性

AlloyDBは、Google CloudがPostgreSQL互換で提供するフルマネージドのデータベースサービスです。ワイヤプロトコルもSQLの方言もPostgreSQLと互換であるため、クライアントから見ればPostgreSQLそのものとして扱え、標準のPostgreSQL JDBCドライバがそのまま使えます。さらに ctid(物理行位置)や COPY、pg_relation_size といったPostgreSQLの内部機能・システムカタログも利用できます。

したがって、AlloyDBからのデータ抽出はPostgreSQLの機能を前提に設計できます。以下ではPostgresSourceがPostgreSQLのどの機能を使っているかを説明しますが、その内容は基本的にAlloyDBにも当てはまります。

通常のJDBCクエリ取得との違い

PostgresSource も「物理的な分割 → 並列読み取り」という骨格はTiDBと同じですが、データ転送経路とテーブル分割の方式 がPostgreSQL固有の機能に最適化されています。まず、通常のJDBC経由のクエリ取得との違いを整理します。

観点

通常のJDBC (SELECT)

PostgresSource (COPY ... TO STDOUT BINARY)

転送経路

拡張クエリプロトコル

COPYプロトコル(バルク転送専用)

行ごとの処理

パース/プラン/結果整形が走りうる

クエリは1回プラン。あとはタプルを連続送出

データ形式

テキスト or バイナリのフィールド単位

バイナリのタプルストリームを直接デコード

並列分割

OFFSET/LIMIT等(大きいほど低速)

ctid物理ブロック範囲(フルスキャン不要)

PostgreSQL の COPY は、もともと大量データのインポート/エクスポートのために用意された専用経路で、通常のクエリ実行に伴う 1 行ごとのオーバーヘッドを回避できます。PostgresSource は JDBC ドライバの CopyManager API(PGCopyInputStream)を使い、COPY (SELECT …) TO STDOUT (FORMAT BINARY) のバイナリ出力ストリームを受け取って、Avro のレコードへ直接デコードします。中間のテキスト変換を挟まないぶん、CPU 負荷とアロケーションを抑えられます。

PostgreSQL の機能を活用した Dataflow での実装

ステップ 1: ctid ブロック範囲の列挙

PostgreSQL の全タプルには ctid(物理的な行ロケーション = (ブロック番号,ブロック内オフセット))が付与されています。PostgresSource はテーブルを物理ブロック範囲で分割し、各範囲を TID range scan で読みます。分割の計画には実データのスキャンが一切不要です。

image
image

ブロック数は、テーブルの実ディスクサイズをブロックサイズで割って求めます。

SELECT pg_relation_size('table'::regclass) / current_setting('block_size')::bigint

pg_relation_size は実ディスクサイズを返すため、統計情報の pg_class.relpages 推定値よりも正確で、しかも stat 呼び出し 1 回で済みます。

1 ブロックあたりの行密度は推定行数(pg_class.reltuples)から求め、目標行数 splitSize に対応するブロック幅を機械的に算出します。[0, blockCount) をこの幅で割っていくだけなので、フルスキャンも OFFSET も不要です。

行密度 = 推定行数 / ブロック数

1 範囲のブロック幅 = max(1, round(splitSize / 行密度))

範囲 = [0, w), [w, 2w), … , [kw, blockCount) ※最後の範囲は上限を開く

最後の範囲を上限なし(オープンエンド)にしておくことで、分割を計画した後に追記された行も読み取れます。各範囲は ctid の半開区間条件に変換されます。

WHERE ctid >= '(0,0)'::tid AND ctid < …

この条件も、TiDB の主キー範囲と同様に物理位置の値による範囲比較です。PostgreSQL 14 以降ではこれが TID range scan として処理され、該当ブロックへ直接シークして必要な範囲だけを読みます。

注意: ctid は物理的な行位置であるため、読み取り中に INSERT/UPDATE(別ページへの移動)や VACUUM が発生すると、同じ行を二重に読んだり取りこぼしたりする可能性があります。同期中に更新されないテーブルに対して使うか、バッチ読み取りで一般的なスナップショットのズレを許容する前提で利用します。また、TID range scan は PostgreSQL 14 以降のサポート機能であり、それ以前のバージョンでは各範囲がシーケンシャルスキャンにフォールバックします。

ステップ 2: Range の再分配

TiDB と同様、Range のリストを Create で撒き、Reshuffle.viaRandomKey() でワーカーへ再分配します。fusion 分断と負荷分散の狙いは同じです。

ステップ 3: 各ワーカーでのバイナリ COPY 並列読み取り

各ワーカーは、担当 Range の ctid 条件を組み込んだ SELECT を COPY (...) TO STDOUT (FORMAT BINARY) でラップし、バイナリストリームを受け取ります。

COPY (SELECT * FROM table WHERE ctid >= '(0,0)'::tid AND ctid

返ってくるのは PostgreSQL の COPY バイナリフォーマットです。PostgresSource はこれを直接パースします。先頭の固定シグネチャ(PGCOPY\n\377\r\n\0)とヘッダを読み飛ばし、以降はタプルを 1 件ずつ読みます。各タプルは「フィールド数」に続いて、フィールドごとに「長さ(-1 は NULL)+値のバイト列」が並ぶ構造で、終端は番兵値(フィールド数 = -1)で示されます。

値のデコードは、PostgreSQL のバイナリ表現を Avro の値へ型ごとに変換します。たとえば次のような処理です。

  • 整数・浮動小数: ビッグエンディアンでそのまま読む
  • numeric: base-10000 の digit 配列から十進数を復元
  • date / timestamp: PostgreSQL エポック(2000-01-01)基準の値を Unix エポック基準へ補正
  • uuid / json / jsonb / bytea: それぞれの専用処理

これらをドライバのテキスト変換を介さず自前でデコードすることで、転送経路を最短化しています。

なお、IAM 認証(Cloud SQL / AlloyDB)にも対応しており、user 未指定時はワーカーのサービスアカウントを DB ユーザとして使い、接続 URL に enableIamAuth=true を自動付与します。

検証用の 12 つのカラムを持つ 6 億件のダミーテーブルデータを Avro ファイルとして GCS に出力するタスクで、6 コア並列で 8 分で処理完了するようになりました。

制約: なぜ TiDBSource のようにワーカー間の一貫性を担保できないのか

TiDBSource では tidb_snapshot によって全ワーカーが同一時点を読み、ワーカー間の一貫性を担保していました。一方の PostgresSource では、各 ctid レンジがそれぞれ独立したトランザクションで読まれるため、レンジ間(別接続・別時刻)での一貫性は保証されません。読み取り中に INSERT/UPDATE(ctid が別ページへ移動)/VACUUM が起きると、レンジをまたいで行の重複や欠落が起こりえます。

PostgreSQL にも一見すると同等の仕組みがあります。pg_dump の並列モードは、pg_export_snapshot() でスナップショットをエクスポートし、各ワーカーが SET TRANSACTION SNAPSHOT でそれを取り込むことで、ワーカー間の一貫性を担保しています。**

PostgresSource で同じことを今回実現できなかった理由は スナップショットの「寿命」の違い** にあります。

TiDB(tidb_snapshot)

PostgreSQL(pg_export_snapshot() )

実体

TSO(論理タイムスタンプ=ただの数値)

エクスポート元トランザクションに紐づくスナップショット ID

寿命

永続的(GC されるまで。トランザクション非依存)

一時的(エクスポート元トランザクションが開いている間だけ有効)

共有方法

数値を渡し、各セッションが独立に SET

元トランザクションが生存中に各セッションが SET TRANSACTION SNAPSHOT で取り込む

PostgreSQL のエクスポートされたスナップショットは、それをエクスポートしたトランザクションが終了するまでしかインポートできません。pg_dump の並列モードが成立するのは、単一プロセスのリーダーがダンプ全体の間ずっとエクスポート元トランザクションを開いたまま保持し、自身でワーカーを起動するからです。PostgresSource が動く Cloud Dataflow の実行モデルでは、元トランザクションを並列読み取りの全期間にわたって保持する場所を確保するのが難しかったのです。TiDBSource ではランチャーで TSO(数値)がトランザクションと無関係に有効だったためワーカーに配るだけで済みました。

そのため PostgresSource では各レンジ独立読み取りを前提とし、「同期中に更新されないテーブルに対して使う/バッチ読み取りで一般的なスナップショットのズレを許容する」という運用方針にしています。

まとめ:高スループットを支える設計要素

両モジュールでは、「テーブルの物理的なデータ配置に沿って分割し、その分割単位を分散ワーカーで並列に読み取る」 という共通する設計思想で実装しました。

以下 SpannerSource(Spanner の PartitionQuery 等を活用したモジュール)も加えた各設計要素の比較表です。すでにいずれかの DB に親しんでいる人は別の DB の機能と比較することで関心・理解が深まるかもしれません。

要素

SpannerSource

TiDBSource

PostgresSource

物理分割の基準

Split 境界(PartitionQuery)

TiKV Region 境界(TABLESAMPLE REGIONS())

ctid 物理ブロック範囲(pg_relation_size)

分割キーの自動抽出

Spanner が自動分割(指定不要)

PK / _tidb_rowid を自動解決

ctid(全テーブル共通)

分割計画のコスト

API 呼び出しのみ(フルスキャン不要)

境界キーの列挙のみ(フルスキャン不要)

stat 呼び出しのみ(フルスキャン・OFFSET 不要)

範囲の読み取り

Partition Token を実行

PK 値の範囲比較でインデックスをシーク

ctid の範囲比較で TID range scan

転送機構

gRPC ストリーミング(executeStreamingSql)

ストリーミング ResultSet(fetchSize=MIN_VALUE)

バイナリ COPY(COPY ... TO STDOUT BINARY)

一貫性

BatchReadOnlyTransaction(TimestampBound)

tidb_snapshot による MVCC スナップショット

バッチ読み取り(ctid スナップショットのズレは許容)

並列化

Partition Token を Reshuffle + ParDo で分散

Reshuffle + ParDo(dumpling の Write

原文を表示

こんにちは。Data Ingestion チームでData Engineerをしている @orfeon です。この記事は「Merpay & Mercoin Tech Openness Month 2026」の14日目の記事です。

はじめに

Data Ingestion(旧Data Platform)チームでは、多数のマイクロサービスが管理するデータベース・テーブルから、大量のデータを継続的にDWH(データウェアハウス)へ同期する必要があります。同期対象には数億〜数百億件に達する大規模なテーブルも含まれ、これらをいかに速く・安全に・一貫性を保ったまま抽出するかが、DWHの鮮度や安定性にとって大事になります。

これまで Cloud Spanner からのデータ取得では、Spannerの分散DB特有の機能(後述)を活用することで、大規模テーブルでも高いスループットでの取得を実現できていました。 一方、社内にはTiDBやAlloyDBといったSpanner以外のデータベースも多く利用されており、その中には数百億件以上に達するテーブルもあります。 これらのテーブルは従来、主キーなどでシーク方式で取得していましたが、単一コネクションでのシーケンシャルなデータ取得になるため、大規模テーブルでは取得に非常に時間がかかっていました。

そこで今回、Spannerと同じように、それぞれのDBに特有の機能を活用して並列取得などでスループットを上げるよう工夫しました。 具体的には、TiDB と AlloyDB の大規模テーブルをDWHへ同期する仕組みを Cloud Dataflow(Apache Beam) 上に構築しました。 本記事では、その中核となる2つのSourceモジュール  TiDBSource と PostgresSource  について、高いスループットを実現するための工夫を解説します。

なぜ汎用JDBCではなく専用モジュールなのか

Beam/Dataflowには汎用的な JdbcIO が既に存在します。 しかし汎用JDBCは「SELECTを実行して結果を1行ずつ読む」という標準的な経路をたどるため、大規模テーブルでは以下のボトルネックが発生します。

  • 1行ごとのSQL処理オーバーヘッド: 通常のクエリ実行では、サーバ側でのタプルのテキスト/プロトコル変換などが行ごとに発生する。
  • 並列化の難しさ: テーブルを並列に読むには「どこで分割するか」を決める必要があるが、OFFSETベースの分割はオフセットが大きくなるほど遅くなり、フルスキャンを誘発する。
  • 一貫性の確保: 並列に複数コネクションから読む場合、各コネクションが別々の時点を読むと整合性が崩れる。

そこで今回のモジュールでは、それぞれのデータベースが持つネイティブなバルク転送機構と物理的なデータ配置情報を活用し、汎用JDBCのボトルネックを回避する設計にしました。

加えて運用上の大きなメリットとして 分割キー(フィルタ条件)の自動抽出 があります。 マイクロサービスごとに膨大なテーブルを扱う環境では、テーブル1つひとつに対して「どのカラムで分割するか」を人手で指定するのは現実的ではありません。 両モジュールはテーブルのメタデータから主キー(PK)や暗黙の行ID、物理ブロック位置を自動で見つけ出し、分割範囲の絞り込み条件を組み立てます。 利用者は接続先とテーブル名を指定するだけで、同じ設定が多数のテーブルに横展開することができます。

なぜCloud Spannerでは高いスループットでデータ取得が可能なのか

今回の設計の発想は、既にうまくいっていたSpannerからの取得方法を、TiDBやAlloyDBにも持ち込むことにありました。 そこでまずSpannerが大規模テーブルでも高いスループットを出せている理由を説明します。 Spannerは分散データベースとして、以下の機能を組み合わせています。

  • PartitionQuery / PartitionRead(Splitベースの自動分割): Spannerはデータを内部的に Split(キー範囲+負荷ベース)へ分割して保持しています。PartitionQuery はこのSplit境界に基づいてクエリを複数のパーティションに自動分割します。クライアントはキー範囲などSplitの内部構造を意識する必要がありません。
  • BatchReadOnlyTransaction(スナップショット一貫性): 全パーティションの読み取りが、TimestampBound で指定した同一スナップショットを参照することを保証します。ロックを取らずに一貫した読み取りができます。
  • Partition Tokenの分散・並列実行: 分割結果はシリアライズ可能なPartition Tokenとして返されるため、複数プロセス・複数マシン、そしてBeam Workerに配布してそのまま並列実行できます。Apache Beamの SpannerIO も内部でこの仕組みを使っています。
  • Partition Tokenによる自動バージョン保持: Tokenが有効な間は対象バージョンがGCされないことが保証されるため、クライアント側で明示的なバージョン保護(SafePoint管理)が不要です。
  • Data Boost(Spanner固有): Google管理の独立した計算リソースで読み取るオプションで、本番ワークロードへの影響をほぼゼロにしつつ弾力的にスケールできます。

これらは「物理的なデータ配置に沿った自動分割」「スナップショット一貫性」「分割単位の分散ワーカーへの配布と並列実行」という構図で成り立っています。Spannerではこれらが高度に抽象化されたAPIとして提供されていますが、TiDBやAlloyDB(PostgreSQL)にもそれに近いDB固有の機能が存在します。

このSpannerの機能とTiDBやPostgreSQLの機能は以下のように対応します。

Spanner

TiDB(dumpling相当)

AlloyDB(PostgreSQL)

PartitionQuery(Split境界で自動分割)

TABLESAMPLE REGIONS()(TiKV Region境界)

ctid物理ブロック範囲(pg_relation_size)

BatchReadOnlyTransaction(スナップショット)

tidb_snapshot MVCC(Multi-Version Concurrency Control) + TSO(Timestamp Oracle)

バッチ読み取り(ctidスナップショットのズレは許容)

Partition Tokenの分散実行

Range条件の分散実行(本記事の設計)

Range条件の分散実行(本記事の設計)

Partition Tokenによる自動GC保護

tidb_gc_life_time の引き上げで代替

(該当なし)

SpannerのSpannerIOで提供されている「分割 → 配布 → 並列スナップショット読み取り」を、TiDB/AlloyDBではDB固有の機能を組み合わせて自前で構築する、というのが本記事のモジュールの狙いです。 以降その共通の仕組みと各DB向けの実装を見ていきます。

共通アーキテクチャ

両モジュールに共通する基本戦略は次の3ステップです。 TiDBSource/PostgresSourceはCloud Dataflow バッチジョブとして実行され、以下3つのステップで役割が分かれます。

  • テーブルの範囲分割: 1本のコネクションでメタデータだけを取得し、テーブルを物理的な分割単位(Range)のリストに列挙する
  • 再分配: 分割単位をPCollectionの「種」として撒き、Reshuffleでワーカーに再分配する
  • 並列読み込み: 各ワーカーが担当Rangeをネイティブのバルク転送機構で並列に読み取る

以降、TiDBとPostgreSQLそれぞれについて、この3ステップの中身を掘り下げます。まずTiDBから、この3つのステップがどのように実装されるかを見ていきます。

TiDBテーブルからのデータ抽出

TiDB公式ツール dumpling に学ぶ

TiDBには dumpling という高速なエクスポートツールが公式に提供されています。 TiDBSource の設計は、このdumplingが高スループットを実現している仕組みを参考にしています。 まずはdumpling側の要点を整理します。

テーブルのチャンク分割と並列読み取り

dumplingは、1テーブルを丸ごと1クエリで読むのではなく、テーブルをチャンク(範囲)に分割し、各チャンクを独立したSELECTクエリとして並列実行します。 チャンク分割は3段階のフォールバック構造になっています。

戦略

方式

概要

A(最優先)

TiKV Regionベース分割

TABLESAMPLE REGIONS() でRegion境界をチャンク境界にする

B(フォールバック)

数値インデックスベース分割

数値型PK/インデックスのMIN/MAXから均等分割

C(最終)

テーブル全体ダンプ

分割可能なフィールドがない場合は1クエリ

特に重要なのが戦略Aです。 TiDBではデータがTiKV上で Region(デフォルト96MB単位) に分散配置されます。 dumplingはこのRegion境界をそのままチャンク境界として利用するため、各チャンクが異なるTiKVノードへの読み取りリクエストに分散され、クラスタ全体のI/O帯域を引き出せます。

dumplingの並列実行の仕組み: Producer-Consumer

分割したチャンクを並列に読み出すために、dumplingは内部で Producer-Consumer という構造をとります。登場人物は次の3つです(いずれもdumplingの実装に出てくる用語です)。

  • Producer(プロデューサ): テーブルをチャンクに分割し、「このチャンクを読め」というタスクを作り続ける係。dumplingではメインのgoroutineが担当します。先ほどのRegion境界などをもとにタスクを生成します。
  • Writer(ライター): 生成されたタスクを受け取り、実際にSELECTを発行してデータを読み出す係。--threads で指定した数だけ並列に動き、それぞれが独立したDB接続を持ちます。タスクを消費するConsumer側にあたります。
  • infiniteChan(無制限チャネル): ProducerとWriterの間をつなぐ、容量に上限のないキュー(待ち行列)。Writerの処理が詰まってもProducerがブロックされず、生成済みのタスクをいくらでも貯めておけます。

このように、タスクを作成する人(Producer)とタスクを実行する人(Writer)を分離し、その間を待ち行列(infiniteChan)でつなぐことで、分割と読み取りを互いに待たせずに並列で回す基本構造です。後述のTiDBSourceは、この役割分担をそのままDataflowの分散モデルに置き換えています。

Snapshot読み取り

dumplingはTiDBのMVCC (Multi-Version Concurrency Control)機構を利用し、特定のTSO(Timestamp Oracle)時点の スナップショット から一貫したデータを読み取ります。

  • ロック不要: FLUSH TABLES WITH READ LOCK のような排他ロックが不要で、書き込みをブロックしない。
  • 一貫性保証: 全Writerが同一時点のデータを読むため整合性が保たれる。
  • 高スループット: ロック競合がないため並列度を上げられる。

加えてdumplingは、長時間のダンプ中にTiDBのGC(Garbage Collection)がスナップショット時点の古いバージョンを回収しないよう、PD(Placement Driver)に対してGC SafePointを登録します。

TiDBの機能を活用したDataflowでの実装

TiDBSource は、dumplingのこれらのアイデアを Apache Beam / Dataflowのモデルに移植 したものです。dumplingがgoroutineで実現していた並列性を、Dataflowの分散ワーカーによる並列性に置き換えています。対応関係は次の通りです。

dumpling

TiDBSource (Dataflow)

Producerがチャンクタスクを生成

パイプライン構築時にRangeのリストを生成

infiniteChan + 複数Writer goroutine

Reshuffle + ParDoによる分散ワーカー並列処理

各Writerが独立DB接続でSELECT

各ワーカーが@Setupで独自コネクションを確立

TSOスナップショット読み取り

TSOを一度取得し全ワーカーに配布

ステップ1: 分割キーの決定とRangeの列挙

パイプラインの初期起動時に1本のコネクションを張り、出力スキーマの確定・スナップショットTSOの取得・テーブルの分割を行います。 ここではメタデータと境界値だけを読み、実データのスキャンは行いません。

分割キーの自動解決 は次の優先順位で行われます。利用者がカラムを指定しなくても、テーブルのメタデータから自動的に決定されます。

  • 利用者が splitField を明示指定していればそれを使う
  • なければ単一カラムの主キー(PK)
  • それもなければ暗黙の行ID _tidb_rowid(クラスタードキーを持たないテーブル向け)

_tidb_rowid は、明示的な主キーを持たないテーブルでTiDBが内部的に振る暗黙の行IDです。 これを分割キーに使えるため、主キー設計に依存せず、どんなテーブルでも分割の足がかりを得られます。

Rangeの列挙 は、先述のdumplingの戦略A→B→Cと同じ3段階フォールバックで行います。

戦略Aは、次のSQLでTiKVのRegion境界を取り出します。

code
SELECT `pk` FROM table TABLESAMPLE REGIONS() ORDER BY `pk`

TABLESAMPLE REGIONS() は各Regionの先頭行を返すため、結果の各値が「次のチャンクの下限」になります。 境界値の列 b[1], b[2], …, b[n] から、隣り合う境界で挟まれた半開区間を生成します。 取りこぼしを防ぐため、最初の区間は下側を、最後の区間は上側を開いておきます。

code
chunk[ -∞,  b[1] ),  chunk[ b[1], b[2] ),  …,  chunk[ b[n], +∞ )

TABLESAMPLE REGIONS() はTiDB v5.0以降の構文です。 非TiDBのMySQLや古いTiDBではこのクエリが失敗するため、自動的に戦略B(数値MIN/MAX均等分割)へフォールバックします。 戦略Bは、SELECT MIN(pk), MAX(pk) で取得した範囲を、推定行数とチャンクあたりの目標行数 splitSize から決めた個数で等分します。

code
chunks = ⌈ 推定行数 / splitSize ⌉
step   = (max − min) / chunks + 1
区間   = [min, min+step), [min+step, min+2·step), … , [ …, max]

(stepの計算では厳密な切り上げ ⌈(max−min)/chunks⌉ ではなく+1 としています。半開区間 [cutoff, cutoff+step) で走査するため、割り切れるケースでもmax が最終チャンクに確実に含まれるようstep を 1 大きく取っており、実際のチャンク数は chunks 以下になります)

ステップ2: Rangeの再分配(Reshuffle)

範囲が決まったら次にワーカーに範囲ごとの処理を並列にさせる必要があります。列挙したRangeのリストを並列実行するよう明示的に指定するためにPCollection化したRange の後に、 Reshuffle.viaRandomKey() を挟みます。 Reshuffle には2つの重要な役割があります。

  • fusionの分断: Dataflowは連続する処理を一つの処理Stageとして結合してしまうことがあります。これがないとDataflowはCreateと後続の読み取りParDoを融合し、Rangeが1ワーカーに偏って並列性が出ません。
  • ランダムキーによる再分配: 全Rangeが利用可能なワーカーへ均等にばらまかれ、クラスタ全体に読み取り負荷が分散されます。

これがdumplingの「infiniteChanから複数Writerがタスクを取り出す」構造に相当します。

ステップ3: 各ワーカーでのスナップショット並列読み取り

各ワーカーは処理開始時(@Setup)に自前のコネクションを確立し、パイプライン構築時に取得・配布されたTSOで tidb_snapshot をセットします。 全ワーカーが同一TSOを使うことで、分散読み取りでも単一時点の一貫したスナップショットになります。

TSOの取得は、一貫スナップショットを開始してその時点の論理時刻を読むだけで完了します(トランザクション自体はすぐロールバックします)。

code
START TRANSACTION WITH CONSISTENT SNAPSHOT;SELECT @@tidb_current_ts;   -- ← この値(TSO)を全ワーカーに配布

各ワーカー側では、読み取り前にセッション変数を設定します。

code
SET @@tidb_snapshot = '';  -- ロックなしで同一MVCC版を読む
SET @@session.tidb_enable_paging = ON;  -- 大量スキャン時のメモリ使用量を抑制

tidb_enable_paging はCoprocessorリクエストのメモリ使用量を抑える設定です(TiDB v6.2.0以降はデフォルト有効。変数を知らないDBではスキップ)。

実際の読み取りでは、担当Rangeを分割キーへの値の範囲条件に変換してSELECTに組み込みます。

code
SELECT * FROM table WHERE `pk` >= 1000 AND `pk` < 2000

ここで重要なのは、この条件が主キー(インデックス)に対する値の比較である点です。 データベースはこの行値比較を使ってインデックスをシークし、該当範囲の先頭へ直接ジャンプして必要な行だけを読みます。 OFFSETのように先頭から数え直す必要がないため、どのチャンクも一定コストで読み出せます。

また、読み取りはMySQL Connector/Jの行単位ストリーミングモード(fetchSize = Integer.MIN_VALUE)で行います。 これは結果セット全体をワーカー側メモリにバッファせず1行ずつ取り出す特別な設定で、巨大チャンクでもワーカーのメモリ消費が一定に保たれます。 dumplingがgo-sql-driver/mysqlのストリーミング動作で実現しているのと同じ効果を、JDBC側で引き出している形です。

制約: GC SafePoint

dumplingはPDクライアント経由でGC SafePointを登録しますが、これは JDBCコネクションからは到達できません。 そのためTiDBSourceではSafePoint登録を行わず、長時間の読み取りに対しては運用側でクラスタの tidb_gc_life_time を引き上げ、読み取り中にGCがスナップショットのバージョンを回収しないようにする運用方針としています。

AlloyDB(PostgreSQL)テーブルからのデータ抽出

AlloyDBとPostgreSQL互換性

AlloyDBは、Google CloudがPostgreSQL互換で提供するフルマネージドのデータベースサービスです。 ワイヤプロトコルもSQLの方言もPostgreSQLと互換であるため、クライアントから見ればPostgreSQLそのものとして扱え、標準のPostgreSQL JDBCドライバがそのまま使えます。 さらに ctid(物理行位置)や COPY、pg_relation_size といったPostgreSQLの内部機能・システムカタログも利用できます。

したがって、AlloyDBからのデータ抽出は PostgreSQLの機能を前提に設計できます。 以下ではPostgresSourceがPostgreSQLのどの機能を使っているかを説明しますが、その内容は基本的にAlloyDBにも当てはまります。

通常のJDBCクエリ取得との違い

PostgresSource も「物理的な分割 → 並列読み取り」という骨格はTiDBと同じですが、データ転送経路とテーブル分割の方式 がPostgreSQL固有の機能に最適化されています。 まず、通常のJDBC経由のクエリ取得との違いを整理します。

観点

通常のJDBC (SELECT)

PostgresSource (COPY ... TO STDOUT BINARY)

転送経路

拡張クエリプロトコル

COPYプロトコル(バルク転送専用)

行ごとの処理

パース/プラン/結果整形が走りうる

クエリは1回プラン。あとはタプルを連続送出

データ形式

テキスト or バイナリのフィールド単位

バイナリのタプルストリームを直接デコード

並列分割

OFFSET/LIMIT等(大きいほど低速)

ctid物理ブロック範囲(フルスキャン不要)

PostgreSQLの COPY は、もともと大量データのインポート/エクスポートのために用意された専用経路で、通常のクエリ実行に伴う1行ごとのオーバーヘッドを回避できます。 PostgresSource はJDBCドライバの CopyManager API(PGCopyInputStream)を使い、COPY (SELECT …) TO STDOUT (FORMAT BINARY) のバイナリ出力ストリームを受け取って、Avroのレコードへ直接デコードします。 中間のテキスト変換を挟まないぶん、CPU負荷とアロケーションを抑えられます。

PostgreSQLの機能を活用したDataflowでの実装

ステップ1: ctidブロック範囲の列挙

PostgreSQLの全タプルには ctid(物理的な行ロケーション = (ブロック番号, ブロック内オフセット))が付与されています。 PostgresSource はテーブルを 物理ブロック範囲 で分割し、各範囲を TID range scan で読みます。 分割の計画には実データのスキャンが一切不要です。

ブロック数は、テーブルの実ディスクサイズをブロックサイズで割って求めます。

code
SELECT pg_relation_size('table'::regclass) / current_setting('block_size')::bigint

pg_relation_size は実ディスクサイズを返すため、統計情報の pg_class.relpages 推定値よりも正確で、しかもstat呼び出し1回で済みます。

1ブロックあたりの行密度は推定行数(pg_class.reltuples)から求め、目標行数 splitSize に対応するブロック幅を機械的に算出します。 [0, blockCount) をこの幅で割っていくだけなので、フルスキャンもOFFSETも不要です。

code
行密度 = 推定行数 / ブロック数1範囲のブロック幅 = max(1, round( splitSize / 行密度 ))
範囲   = [0, w), [w, 2w), … , [kw, blockCount)   ※最後の範囲は上限を開く

最後の範囲を上限なし(オープンエンド)にしておくことで、分割を計画した後に追記された行も読み取れます。各範囲は ctid の半開区間条件に変換されます。

code
WHERE ctid >= '(0,0)'::tid AND ctid < '(3,0)'::tid

この条件も、TiDBの主キー範囲と同様に物理位置の値による範囲比較です。 PostgreSQL 14以降ではこれが TID range scan として処理され、該当ブロックへ直接シークして必要な範囲だけを読みます。

注意点: ctid は物理的な行位置なので、読み取り中にINSERT/UPDATE(別ページへの移動)/VACUUMが起きると、同じ行を二重に読んだり取りこぼしたりする可能性があります。同期中に更新されないテーブルに対して使うか、バッチ読み取りで一般的なスナップショットのズレを許容する前提で利用します。また、TID range scanはPostgreSQL 14以降のサポートで、それ以前のバージョンでは各範囲がシーケンシャルスキャンにフォールバックします。

ステップ2: Rangeの再分配

TiDBと同様、RangeのリストをCreateで撒き、Reshuffle.viaRandomKey()でワーカーへ再分配します。fusion分断と負荷分散の狙いは同じです。

ステップ3: 各ワーカーでのバイナリCOPY並列読み取り

各ワーカーは、担当Rangeのctid条件を組み込んだSELECTを COPY (…) TO STDOUT (FORMAT BINARY) でラップし、バイナリストリームを受け取ります。

code
COPY (SELECT * FROM table WHERE ctid >= '(0,0)'::tid AND ctid < '(3,0)'::tid)TO STDOUT (FORMAT BINARY)

返ってくるのはPostgreSQLのCOPYバイナリフォーマットです。PostgresSourceはこれを直接パースします。 先頭の固定シグネチャ(PGCOPY\n\377\r\n\0)とヘッダを読み飛ばし、以降はタプルを1件ずつ読みます。 各タプルは「フィールド数」に続いて、フィールドごとに「長さ(-1はNULL)+値のバイト列」が並ぶ構造で、終端は番兵値(フィールド数 = -1)で示されます。

値のデコードは、PostgreSQLのバイナリ表現をAvroの値へ型ごとに変換します。たとえば次のような処理です。

  • 整数・浮動小数: ビッグエンディアンでそのまま読む
  • numeric: base-10000 のdigit配列から十進数を復元
  • date / timestamp: PostgreSQLエポック(2000-01-01)基準の値をUnixエポック基準へ補正
  • uuid / json / jsonb / bytea: それぞれの専用処理

これらをドライバのテキスト変換を介さず自前でデコードすることで、転送経路を最短化しています。

なお、IAM認証(Cloud SQL / AlloyDB)にも対応しており、user未指定時はワーカーのサービスアカウントをDBユーザとして使い、接続URLに enableIamAuth=true を自動付与します。

検証用の12つのカラムを持つ6億件のダミーテーブルデータをAvroファイルとしてGCSに出力するタスクで、6コア並列で8分で処理完了するようになりました。

制約: なぜTiDBSourceのようにワーカー間の一貫性を担保できないのか

TiDBSource では tidb_snapshot によって全ワーカーが同一時点を読み、ワーカー間の一貫性を担保していました。一方の PostgresSource では、各ctidレンジがそれぞれ独立したトランザクションで読まれるため、レンジ間(別接続・別時刻)での一貫性は保証されません。読み取り中にINSERT/UPDATE(ctidが別ページへ移動)/VACUUMが起きると、レンジをまたいで行の重複や欠落が起こりえます。

PostgreSQLにも一見すると同等の仕組みがあります。pg_dump の並列モードは、pg_export_snapshot() でスナップショットをエクスポートし、各ワーカーが SET TRANSACTION SNAPSHOT でそれを取り込むことで、ワーカー間の一貫性を担保しています。**

PostgresSource で同じことを今回実現できなかった理由は スナップショットの「寿命」の違い** にあります。

TiDB(tidb_snapshot)

PostgreSQL(pg_export_snapshot() )

実体

TSO(論理タイムスタンプ=ただの数値)

エクスポート元トランザクションに紐づくスナップショットID

寿命

永続的(GCされるまで。トランザクション非依存)

一時的(エクスポート元トランザクションが開いている間だけ有効)

共有方法

数値を渡し、各セッションが独立に SET

元トランザクションが生存中に各セッションが SET TRANSACTION SNAPSHOT で取り込む

PostgreSQLのエクスポートされたスナップショットは、それをエクスポートしたトランザクションが終了するまでしかインポートできません。pg_dump の並列モードが成立するのは、単一プロセスのリーダーがダンプ全体の間ずっとエクスポート元トランザクションを開いたまま保持し、自身でワーカーを起動するからです。 PostgresSource が動くCloud Dataflowの実行モデルでは、元トランザクションを並列読み取りの全期間にわたって保持する場所を確保するのが難しかったのです。TiDBSourceではランチャーでTSO(数値)がトランザクションと無関係に有効だったためワーカーに配るだけで済みました。

そのためPostgresSource では各レンジ独立読み取りを前提とし、「同期中に更新されないテーブルに対して使う/バッチ読み取りで一般的なスナップショットのズレを許容する」という運用方針にしています。

まとめ: 高スループットを支える設計要素

両モジュールでは、「テーブルの物理的なデータ配置に沿って分割し、その分割単位を分散ワーカーで並列に読み取る」 という共通する設計思想で実装しました。

以下 SpannerSource(SpannerのPartitionQuery等を活用したモジュール)も加えた各設計要素の比較表です。すでにいずれかのDBに親しんでいる人は別のDBの機能と比較することで関心・理解が深まるかもしれません。

要素

SpannerSource

TiDBSource

PostgresSource

物理分割の基準

Split境界(PartitionQuery)

TiKV Region境界(TABLESAMPLE REGIONS())

ctid物理ブロック範囲(pg_relation_size)

分割キーの自動抽出

Spannerが自動分割(指定不要)

PK / _tidb_rowid を自動解決

ctid(全テーブル共通)

分割計画のコスト

API呼び出しのみ(フルスキャン不要)

境界キーの列挙のみ(フルスキャン不要)

stat呼び出しのみ(フルスキャン・OFFSET不要)

範囲の読み取り

Partition Tokenを実行

PK値の範囲比較でインデックスをシーク

ctidの範囲比較でTID range scan

転送機構

gRPCストリーミング(executeStreamingSql)

ストリーミングResultSet(fetchSize=MIN_VALUE)

バイナリCOPY(COPY ... TO STDOUT BINARY)

一貫性

BatchReadOnlyTransaction(TimestampBound)

tidb_snapshotによるMVCCスナップショット

バッチ読み取り(ctidスナップショットのズレは許容)

並列化

Partition TokenをReshuffle + ParDoで分散

Reshuffle + ParDo(dumplingのWrite

この記事をシェア

関連記事

Vercel Blog★32026年6月16日 09:00

Workflow SDK、実行中のキャンセル機能をサポートへ

Vercel が公開した Workflow SDK のベータ版が、標準的な AbortController と AbortSignal API をワークフローおよびステップの境界全体で利用可能にし、実行中の操作を柔軟にキャンセルできるようになった。

TLDR AI★42026年6月12日 09:00

予測データデバッグ:モデル学習前にその挙動を明らかにし制御する(11 分読)

Silico プラットフォームに統合された予測データデバッグ手法は、トレーニング前の選好データ分析によりモデルの潜在的な挙動を特定します。これによりエンジニアは安全性やハルシネーションなどの問題を事前に対処し、パフォーマンスと安全性を向上させます。

KDnuggets★32026年6月2日 21:00

現代のデータベースシステムとツールのための GitHub リポジトリ 10 選

KDnuggets が、現代のデータベースシステムや開発ツールとして注目すべき GitHub リポジトリ 10 個を紹介している。

今日のまとめ

AI日報で今日の重要ニュースをまとめ読み

ニュース一覧に戻る元記事を読む