大規模データ更新・削除を「安全に分割実行」するための汎用ツール
メルカリのDBREチームは、大規模データの更新・削除操作を安全に分割実行するための汎用ツールを実装し、その設計思想と運用上の利点を紹介するとともに、ツールのREADMEを公開した。
キーポイント
大規模データ操作の課題
UPDATE/DELETEを一括実行すると、大きなトランザクションによりレプリケーション遅延やDB負荷増大、UNDOログ肥大化などのリスクが生じ、サービス影響の可能性がある。
従来の回避策とその限界
対象を小分けにしたSQLや使い捨てスクリプトで対応していたが、毎回の作成や安全なSQLの組み立てに手間がかかり、運用コストが積み上がっていた。
汎用ツールによる解決策
最終的に達成したい条件をSQLに近い形で記述しつつ、実行時には主キー単位で対象を取得し、バッチに分割して短いトランザクションを繰り返すことで安全に処理する。
運用性を高める制御機能
処理間隔やバッチサイズなどの設定を実行中に変更可能で、監視結果に応じて自動で一時停止する仕組みも組み込まれている。
実装と公開方針
生成AIを利用して実装し、社内で既に利用している。コード自体のOSS公開は行わなかったが、README.mdを公開し、同様の課題を持つ他者が自環境に合わせて実装する叩き台として提供する。
操作タイプとNULL操作のユースケース
ツールはUPDATE、DELETE、NULLの3つの操作タイプをサポートしており、NULL操作はbefore_sqlのみを実行し、データのアーカイブや変換に使用できます。
障害時の詳細な追跡と自動再開
失敗した更新を追跡し、バッチレベルの失敗では最初と最後のIDを記録し、詳細なレポートをファイルに書き出します。また、各バッチ後に進捗を保存し、再起動時に自動的に最後の成功位置から再開します。
影響分析・編集コメントを表示
影響分析
この記事は、大規模データベース運用における普遍的な課題に対する実用的な解決策を共有しており、特にデータ駆動型企業のエンジニアリングチームに直接的な価値を提供する。ツールの設計思想(安全性と運用性の分離、実行中制御)は、データベース運用ツールのベストプラクティスとして参考になる。ただし、完全なOSS公開ではなくREADMEのみの公開であるため、業界全体への直接的・即時的影響は限定的と言える。
編集コメント
データベース運用の地味だが重要な課題に焦点を当て、実用的な解決策を具体的に示した良質な技術記事。生成AIを開発に活用した点も興味深いが、完全なOSS公開ではなくREADMEのみの公開である点は、読者の期待と実際の提供物にギャップを生む可能性がある。
DBRE (DataBase Reliability Engineering) チームの taka-h です。
大規模なデータ更新や削除は、やりたいこと自体は SQL で表現できても、そのまま一度に実行すると運用上のリスクが高くなります。例えば大きなトランザクションが発生すると、レプリケーション遅延や DB 負荷の増大、UNDO ログの肥大化などにつながり、結果としてサービス影響を招く可能性があります。
そこで私たちは、UPDATE/DELETE のような「最終的にやりたい操作」を SQL に近い形で記述しつつ、実行時には安全な単位に分割して処理できる汎用ツールを実装しました。さらに、実行中に処理速度などの設定を変更できることや、監視結果に応じて自動で一時停止できることなど、実運用で必要になる制御も組み込んでいます。
本記事では、なぜこの問題が起きるのか、従来どのように回避してきたのか、そして今回のツールがどのように安全性と運用性を両立するのかを紹介します。最後に、ツールの README も公開するので、同様の課題を持つ方が自分たちの環境に合わせて実装する際の叩き台として使えるはずです。
なおこのツールは、社内の次のようなデータベース運用の支援を前提とします。
- データをアーカイブ/削除する
- データをバックフィルする
- データを一括で更新する
大規模データの更新/削除操作における課題
小規模なデータベースであれば、目的の SQL をそのまま実行しても問題にならないことがあります。一方で、一定以上の規模のデータを扱う場合は、同じ SQL でも“そのまま一括実行する”こと自体がリスクになります。
主な理由は、処理対象が多いと大きなトランザクションが発生しやすく、その副作用が DB 全体に波及するためです。具体的には、変更の伝播(レプリケーションなど)に遅延が発生したり、DB が高負荷になったり、UNDO ログが肥大化して回復や性能に影響が出たりします。
このような場合の従来の方針は、「対象を小分けにして処理する」でした。たとえば、対象の主キーをある程度の件数に分割し、短いトランザクションを繰り返すような SQL を作成してもらったり、専用の使い切りのスクリプトを都度用意して対応していました。
BEGIN;
-- 対象の主キーを少量ずつ指定して処理する
DELETE FROM items WHERE id IN (...);
COMMIT;
SLEEP ...;
ただし、毎回使い切りのスクリプトを作ったり、対象主キーを取り出して分割したりするのは手間です。依頼者側に“安全な形の SQL
また、実運用では「削除や更新の進捗」とは独立に、DB 全体が高負荷になったり、想定外の問題が発生したりします。そのため、状況に応じて処理速度や挙動を調整できること、そして必要なら自動的に一時停止できることが重要です。
この要件に対して本ツールでは、処理間隔やバッチサイズなどの設定を実行中に変更できる機能を持たせています。これは、MySQL のオンラインスキーマ変更ツールである gh-ost が「実行中に操作を制御できる」点で運用上便利なのと同じ発想です。さらに、監視結果に応じて自動で処理を一時停止する仕組みも組み込んでいます。

最終的なコンフィグ例は上図の通りです。実行したい条件(SQL に近い記述)と、どう安全に実行するか(運用上の関心事項)を分離して設定できます。また、processing に属する項目の多くは実行中に変更可能です。
このツールは主に生成 AI を利用して実装し、動作確認のうえ社内で既に利用しています。コード自体の OSS としての公開にはふみきれなかったのですが、次の章でこのツールの README.md を公開します。これをご利用の環境に合わせた要件の追加、修正をしていただいた上で、生成 AI を利用し同様のツールが利用できるようになることを期待しています。
もし試してみて有用だった点や改善アイデアがあれば、SNS などで議論いただけると嬉しいです。また、「メルカリの DBRE チームの公開した README.md で作ってみた」ということで宣伝していただけるとありがたいです。
最後に、現在メルカリでは、この記事の発行者の所属する DBRE チーム の EM(Engineering Manager) を募集しています。詳しくはこちらをご覧ください。
汎用データ更新ツールの README.md
data-updater
A tool for batch data operations (UPDATE, DELETE, or NULL) on database records using primary keys with configurable conditions.
- カスタマイズ可能なバッチサイズを持つカーソルベースのバッチ処理
- 3 つの操作タイプ: UPDATE(更新)、DELETE(削除)、および NULL(before_sql のみ)
- 並列実行: より高いパフォーマンスのために、SELECT 操作と UPDATE 操作が同時に実行されます
- レプリカサポート: プライマリの負荷を軽減するため、SELECT クエリをレプリカデータベースにルーティングします
- JOIN サポート: 複数のテーブルを含む複雑なクエリにより、対象レコードを特定できます
- Before SQL フック: 各バッチ実行前に SQL を実行(アーカイブ、監査ログの記録など)
- カスタム ORDER BY: カスタム順序でレコードを処理します
- ランタイム制御用のインタラクティブコマンド(gh-ost に類似)
- YAML ベースの設定: すべての設定を単一の設定ファイルに記述
- 推定完了時間 (ETA) を含むリアルタイムステータス監視
- 一時停止/再開機能
- 動的な設定更新
- ソケットベースの遠隔制御インターフェース
- 失敗した ID の追跡: 更新に失敗したレコードを記録し、終了時にサマリーを表示します
- バッチレベルでの失敗の場合: 失敗したバッチの最初の ID と最後の ID のみを記録
- 部分的な更新の場合: 不一致をログに記録しますが、個別の ID は追跡しません
- 100 件以上の失敗がある場合、詳細レポートをファイルに書き出します
- 自動再開: 各バッチ完了後にステータスファイルへ進捗を保存
- 再起動時に最後の成功した位置から自動的に再開
- 手動で進捗を追跡したり、再開ポイントを指定する必要はありません
- ステータスファイルはアダプター/テーブル固有であり、複数の並行ジョブに対応可能
インストール
go install github.com/xxx/cmd/data-updaterクイックスタート
- 設定ファイルを作成します:
# config.yaml
database:
host: localhost
port: 3306
user: myuser
password: mypassword
database: mydatabase
options:
charset: utf8mb4
parseTime: "true"
processing:
batch_size: 1000
interval: 1s
adapter:
table_name: users
pk_columns:
- user_id
update_sql: "status = 'processed', updated_at = NOW()"
where_clause: "status = 'pending'"- ツールを実行します:
# Normal mode - executes updates
data-updater --config config.yaml
# Debug mode - SELECT only, no updates
data-updater --config config.yaml --debug
# Resume from specific ID
data-updater --config config.yaml --resume-from "12345"
# Show version
data-updater -vオペレーションタイプ
本ツールは3つのオペレーションタイプをサポートしています:
UPDATE (デフォルト)
指定された条件に一致するレコードを更新します。
adapter:
table_name: users
pk_columns: ["user_id"]
operation: update # or omit (default)
update_sql: "status = 'processed', updated_at = NOW()"
where_clause: "status = 'pending'"DELETE
指定された条件に一致するレコードを削除します。
重要: DELETE オペレーションはデータを永久的に削除します。必ず --debug モードでテストしてから実行してください。
adapter:
table_name: old_logs
pk_columns: ["id"]
operation: delete
where_clause: "created_at before_sql without UPDATE or DELETE. Useful for archiving, copying, or transforming data.
```yaml
adapter:
table_name: items
pk_columns: ["id"]
operation: "null"
before_sql: |
INSERT INTO archived_items (id, name, created_at, archived_at)
SELECT id, name, created_at, NOW() FROM items WHERE id IN (?)
where_clause: "status = 'inactive'"Configuration
All settings are managed through a YAML configuration file:
Database Configuration
database:
host: localhost # データベースホスト(デフォルト:localhost)
port: 3306 # データポート(デフォルト:3306)
user: myuser # データベースユーザー(必須)
password: mypassword # データベースパスワード(必須)
database: mydatabase # データベース名(必須)
options: # MySQL 接続オプション(任意)
charset: utf8mb4
parseTime: "true"
loc: UTC
timeout: 30s
# レプリカ設定(任意)
replica_host: replica-db.example.com # SELECT クエリはここに送信されます
replica_port: 3306 # プライマリポートをデフォルトとします
replica_user: replica_user # プライマリユーザーをデフォルトとします
replica_password: replica_password # プライマリパスワードをデフォルトとしますWhen replica_host is configured:
- SELECT queries (fetching PKs, COUNT) are routed to replica
- UPDATE/DELETE operations always use primary
- SELECT FOR UPDATE (pessimistic locking) uses primary
Processing Configuration
processing:
batch_size: 1000 # バッチあたりの行数
interval: 1s # バッチ間の時間(例:1s, 500ms, 2m)
debug_mode: false # 更新を実行せずにクエリをログ出力するモード
pipeline_buffer: 1 # 並列 SELECT/UPDATE のバッファサイズ
pessimistic_locking: true # SELECT FOR UPDATE を使用(デフォルト:true)
lock_retry_count: 3 # ロック取得の再試行回数Adapter Configuration
adapter:
table_name: users # 対象テーブル(必須)
table_alias: u # メインテーブルのエイリアス(結合使用時に必須)
pk_columns: # 主キーカラム(必須)
- user_id
operation: update # "update"(デフォルト)、"delete"、または "null"
update_sql: "status = 'processed'" # SET クラース(更新時に必須)
before_sql: "..." # 操作前に実行する SQL(null 時に必須)
where_clause: "status = 'pending'" # 追加の WHERE 条件(任意)
join_clause: "..." # JOIN 文(任意)
order_by: "created_at" # カスタム ORDER BY(任意、デフォルトは主キー)Interactive Control
interactive:
enabled: true # ソケットベースの制御を有効化
socket_path: "/tmp/data-updater.sock" # Unix ソケットパスStatus File (Automatic Resume)
status_file:
enabled: true # 自動再開を有効化
path: "/var/lib/status" # カスタムパス(任意)Advanced Features
JOIN Support
Use JOINs for complex queries that need to reference multiple tables:
adapter:
table_name: items
table_alias: i
pk_columns: ["id"]
operation: delete
join_clause: |
LEFT JOIN transaction_evidences te ON te.item_id = i.id
where_clause: |
i.status = 'cancel'
AND te.id IS NULLHow it works:
- SELECT query uses JOINs + WHERE to fetch PKs
- DELETE/UPDATE query only uses primary keys (no JOINs)
Before SQL (Pre-operation Hook)
Execute SQL before each batch within the same transaction:
adapter:
table_name: items
pk_columns: ["id"]
operation: delete
before_sql: |
INSERT INTO deleted_item_ids (id, created, deleted)
SELECT id, created, NOW() FROM items WHERE id IN (?)
where_clause: "status = 'cancel'"Notes:
- Use IN (?) placeholder - expanded to all PKs in the batch
- For composite keys: (col1, col2) IN (?)
- Executed atomically with the main operation
- If before_sql fails, entire transaction is rolled back
Custom ORDER BY
Process records in a specific order:
adapter:
table_name: items
table_alias: i
pk_columns: ["id"]
order_by: "i.created, i.id"Understanding update_sql
The update_sql parameter specifies the SET clause. Do not include trailing semicolons.
# シンプルなステータス更新
update_sql: "status = 'processed'"
# 結果:UPDATE users SET status = 'processed' WHERE user_id IN (...)
# 複数カラム
update_sql: "status = 'archived', archived_at = NOW()"
# CASE 文の使用
update_sql: |
status = CASE
WHEN last_login --config, -c: YAML 設定ファイルへのパス(操作に必須)
- --debug, -d: デバッグモードを有効化(SELECT のみ実行、更新は行わない)
- --resume-from: 特定の主キーから手動で再開する
- --total-rows: 初期の COUNT クエリをスキップし、指定された値を使用する(例:--total-rows 1000000)。また、処理行数ではなく選択行数に基づいた停止条件としても機能する(rows_handled は選択行数、rows_processed は UPDATE による影響行数)
- --pk-source: テーブルから主キーを読み込む代わりにファイルまたはディレクトリから読み取る(ローカルパスまたは gs://bucket/path)
- --version, -v: バージョン情報を表示
- --help, -h: ヘルプメッセージを表示
## インタラクティブコマンド
Unix ソケットを介してツールを制御します:
```bash
# Show status
echo "status" | nc -U /tmp/data-updater.sock
# Pause/resume processing
echo "pause" | nc -U /tmp/data-updater.sock
echo "resume" | nc -U /tmp/data-updater.sock
# Change batch size
echo "batch-size 5000" | nc -U /tmp/data-updater.sock
# Change interval
echo "interval 500ms" | nc -U /tmp/data-updater.sock
# Show help
echo "help" | nc -U /tmp/data-updater.sock
# Auto-interval: show status / enable / disable / set min
echo "auto-interval" | nc -U /tmp/data-updater.sock
echo "auto-interval on" | nc -U /tmp/data-updater.sock
echo "auto-interval off" | nc -U /tmp/data-updater.sock
echo "auto-interval min 200ms" | nc -U /tmp/data-updater.sockデバッグモード
デバッグモードでは、更新を実行せずにクエリを検証できます:
data-updater --config config.yaml --debug出力例:
INFO DEBUG: UPDATE query that would be executed query="UPDATE users SET status = 'processed' WHERE user_id IN (?,?,?)" args_count=3 primary_keys_count=3再開機能
自動再開(デフォルト)
- 各バッチの成功後に進行状況が保存される
- 再起動時に、最後の位置から自動的に再開する
- ステータスファイル名:data-updater-{table}-{adapter}.status
手動再開
# Single primary key
data-updater --config config.yaml --resume-from "12345"
# Composite primary key
data-updater --config config.yaml --resume-from "tenant1,12345"再開優先順位
- 手動 --resume-from(最優先)
- ステータスファイル(存在する場合)
- アダプターの初期カーソル(デフォルト)
COUNT クエリのスキップ
--total-rows を使用して、初期の COUNT クエリをスキップします:
# Useful for large tables or retries where you know the total
data-updater --config config.yaml --total-rows 1000000これは特に、以下のような場合に有用です:
- 中断後の再試行(カウント数は既知)
- COUNT(*) の実行コストが高い大規模テーブル
- 正確なカウント数が重要でない場合の高速起動
停止条件: --total-rows オプションは、指定された行数を処理(選択)した後にセレクターを停止します。この停止チェックには rows_handled が使用され、rows_processed は使用されません。つまり、UPDATE ステートメントが 0 行に影響を与える場合(例えば、他のプロセスによって既に削除されているレコードや where_clause でフィルタリングされたレコードなど)でも正しく機能します。
PK ソース(ファイルから主キーを読み取る)
データベーステーブルではなく、ファイルから主キーを読み取ります。
重要: --pk-source を使用して正確な進捗状況/推定所要時間 (ETA) を計算するには、--total-rows オプションが必須です。
# Count lines first
wc -l failed-ids.txt
# 1500 failed-ids.txt
# From local file (--total-rows is required)
data-updater --config config.yaml --pk-source "./failed-ids.txt" --total-rows 1500
# From local directory (processes all files)
data-updater --config config.yaml --pk-source "./failed-ids/" --total-rows 5000
# From GCS file
data-updater --config config.yaml --pk-source "gs://bucket/failed-ids.txt" --total-rows 1500
# From GCS directory
data-updater --config config.yaml --pk-source "gs://bucket/failed-ids/" --total-rows 10000または YAML で設定します:
pk_source:
path: "gs://my-bucket/failed-ids/"
gcs_project: "my-gcp-project" # Required for GCS paths
skip_header: true # Skip first line (for BQ exports with header)
prefetch_buffer: 5 # Number of GCS files to prefetch ahead (default: 5)GCS 認証:
GCS アクセスには Application Default Credentials (ADC) を使用します。以下のように設定してください:
gcloud auth application-default login
gcloud auth application-default set-quota-project ファイル形式(CSV):
# Comments starting with # are ignored
12345
12346
tenant1,12345
"value,with,comma",12346ヘッダー行のスキップ(BigQuery エクスポート用):
BigQuery のエクスポートには列名を含むヘッダー行が含まれます。skip_header: true を使用してこれをスキップします:
id
12345
12346機能:
- メモリ使用量を最小限に抑えるため、ファイルは 1 行ずつストリーミング読み取りされます
- GCS ファイルはバックグラウンドでプリフェッチされ、ダウンロードの遅延を排除します(バッファサイズは設定可能、デフォルトは 5)
- ディレクトリサポート: ソートされた順序でディレクトリ内のすべてのファイルを処理します
- 再開機能: ファイルごとおよび行番号ごとの進捗状況を追跡します
- where_clause と組み合わせて、ファイルから主キーをフィルタリングすることも可能です
ステータス指標
ステータスログとステータス対話型コマンドは、2 つのカウンターを報告します:
- rows_processed: UPDATE/DELETE 操作によって実際に影響を受けた行数(つまり、データベースが行の変更を報告した数)
- rows_handled: パイプラインを通じて選択され送信された行数。UPDATE/DELETE が実際にその行を変更したかどうかは問わない。このカウンターは進行状況のパーセンテージと予想到着時間(ETA: Estimated Time of Arrival)の計算に使用される
rows_handled の値が rows_processed より大きい場合、通常は一部の行がすでに望ましい状態にあったことを意味する(例えば、既に削除済みか、以前のランで既に更新済みであるなど)。
Hibernate (ヘルスチェックベースの一時停止)
hibernate 機能により、プロセッサは定期的に外部のヘルスチェックスクリプトを実行できるようになる。そのスクリプトがゼロ以外の終了コード(問題を示す)を返した場合、プロセッサは設定された期間だけ一時停止し、その後自動的に再開する。
設定
processing:
hibernate_script_path: "/path/to/check.sh"
hibernate_pause_period: 30s
hibernate_check_interval: 15s- hibernate_script_path: 実行可能なスクリプトへのパス。このスクリプトは設定されたチェック間隔(デフォルト 15 秒)で実行される。終了コード 0 は正常を意味し、ゼロ以外の終了コードは一時停止(hibernation)トリガーとなる。
- hibernate_pause_period: スクリプトが問題を検知した際にプロセッサが一時停止する時間。hibernate_script_path が設定されている場合、必須項目である。
- hibernate_check_interval: ヘルスチェックスクリプトが実行される頻度。デフォルトは 15 秒。
動作
- プロセッサ実行中は、設定された間隔(デフォルトは 15 秒)ごとにヘルスチェックスクリプトが実行されます
- スクリプトが終了コード 0 で終了した場合、処理は通常通り継続されます
- スクリプトがゼロ以外の終了コードで終了した場合、プロセッサは hibernate_pause_period(hibernation 一時停止期間)の間一時停止し、その後自動的に再開します
- hibernation_count メトリクスは、一時停止がトリガーされた回数の合計を追跡します(status コマンドの出力および定期ログで確認可能)
ユースケース
- データベースレプリケーションの遅延がある閾値を超えた場合に一時停止する
- ディスク容量が少ない場合に一時停止する
- メンテナンスウィンドウ中に一時停止する
- 運用担当者が定義した任意のカスタムヘルスチェックを行う
時間別サマリーログ
長時間実行されるジョブの場合、JSON エントリを専用ファイルに書き出す時間別サマリーログを有効化できます。シャットダウン時には最終サマリーも書き出されるため、短時間のランでもレポートが生成されます。
processing:
hourly_log_path: "/var/log/data-updater/hourly.log"各 JSON 行には以下の情報が含まれます:
- rows_processed_total / rows_processed_delta — 合計処理レコード数と期間内の処理レコード数
- rows_failed_total / rows_failed_delta
- hibernation_count_total / hibernation_count_delta
- total_rows, rows_remaining, progress — 全体の進捗状況
- interactive_commands — 期間中にソケット経由で発行されたコマンド(タイムスタンプ付き)
- summary_type — "hourly"(時間別)または "final"(最終)
hourly_log_path が設定されていない場合、レポーターは起動されません。
オートインターバル調整
各時間ごとに観測されるヒバーネーション比率に基づいて処理間隔を自動的に調整します。多くのヒバーネーションチェックが失敗した場合(比率が高い場合)、間隔は増加し(速度低下)ます。比率が低い場合は、間隔は減少し(速度向上)ます。
processing:
auto_interval_enabled: true
auto_interval_high_ratio: 0.3 # ratio >= this → slow down (default: 0.3)
auto_interval_low_ratio: 0 # ratio <= this → speed up (default: 0)
auto_interval_increase_factor: 1.25 # multiply interval by this to slow down (default: 1.25)
auto_interval_decrease_factor: 0.8 # multiply interval by this to speed up (default: 0.8)
auto_interval_min: 200ms # floor for interval (default: initi原文を表示
DBRE (DataBase Reliability Engineering)チームの taka-h です。
大規模なデータ更新や削除は、やりたいこと自体はSQLで表現できても、そのまま一度に実行すると運用上のリスクが高くなります。例えば大きなトランザクションが発生すると、レプリケーション遅延やDB負荷の増大、UNDOログの肥大化などにつながり、結果としてサービス影響を招く可能性があります。
そこで私たちは、UPDATE/DELETEのような「最終的にやりたい操作」をSQLに近い形で記述しつつ、実行時には安全な単位に分割して処理できる汎用ツールを実装しました。さらに、実行中に処理速度などの設定を変更できることや、監視結果に応じて自動で一時停止できることなど、実運用で必要になる制御も組み込んでいます。
本記事では、なぜこの問題が起きるのか、従来どのように回避してきたのか、そして今回のツールがどのように安全性と運用性を両立するのかを紹介します。最後に、ツールのREADMEも公開するので、同様の課題を持つ方が自分たちの環境に合わせて実装する際の叩き台として使えるはずです。
なおこのツールは、社内の次のようなデータベース運用の支援を前提とします。
- データをアーカイブ/削除する
- データをバックフィルする
- データを一括で更新する
大規模データの更新/削除操作における課題
小規模なデータベースであれば、目的のSQLをそのまま実行しても問題にならないことがあります。一方で、一定以上の規模のデータを扱う場合は、同じSQLでも“そのまま一括実行する”こと自体がリスクになります。
主な理由は、処理対象が多いと大きなトランザクションが発生しやすく、その副作用がDB全体に波及するためです。具体的には、変更の伝播(レプリケーションなど)に遅延が発生したり、DBが高負荷になったり、UNDOログが肥大化して回復や性能に影響が出たりします。
このような場合の従来の方針は、「対象を小分けにして処理する」でした。たとえば、対象の主キーをある程度の件数に分割し、短いトランザクションを繰り返すようなSQLを作成してもらったり、専用の使い切りのスクリプトを都度用意して対応していました。
BEGIN;
-- 対象の主キーを少量ずつ指定して処理する
DELETE FROM items WHERE id IN (...);
COMMIT;
SLEEP ...;ただし、毎回使い切りのスクリプトを作ったり、対象主キーを取り出して分割したりするのは手間です。依頼者側に“安全な形のSQL”を組み立ててもらう必要が出るなど、運用コストが積み上がっていきます。
そこで、この問題に対して汎用的な解決策を提供するツールを実装しました。
解決策: 汎用化ツール
このツールでは、利用者は「最終的に達成したい条件」をSQLに近い形で記述します。一方で実行時には、その条件に合致する対象を主キー単位で取得し、バッチに分割して短いトランザクションを繰り返すことで、安全にUPDATE/DELETEを進められるようにしています。

また、実運用では「削除や更新の進捗」とは独立に、DB全体が高負荷になったり、想定外の問題が発生したりします。そのため、状況に応じて処理速度や挙動を調整できること、そして必要なら自動的に一時停止できることが重要です。
この要件に対して本ツールでは、処理間隔やバッチサイズなどの設定を実行中に変更できる機能を持たせています。これは、MySQLのオンラインスキーマ変更ツールである gh-ost が「実行中に操作を制御できる」点で運用上便利なのと同じ発想です。さらに、監視結果に応じて自動で処理を一時停止する仕組みも組み込んでいます。

最終的なコンフィグ例は上図の通りです。実行したい条件(SQLに近い記述)と、どう安全に実行するか(運用上の関心事項)を分離して設定できます。また、processingに属する項目の多くは実行中に変更可能です。
このツールは主に生成AIを利用して実装し、動作確認のうえ社内で既に利用しています。コード自体のOSSとしての公開にはふみきれなかったのですが、次の章でこのツールのREADME.mdを公開します。これをご利用の環境に合わせた要件の追加、修正をしていただいた上で、生成AIを利用し同様のツールが利用できるようになることを期待しています。
もし試してみて有用だった点や改善アイデアがあれば、SNSなどで議論いただけると嬉しいです。また、「メルカリのDBREチームの公開したREADME.mdで作ってみた」ということで宣伝いただけるとありがたいです。
最後に、現在メルカリでは、この記事の発行者の所属する DBREチーム の EM(Engineering Manager) を募集しています。詳しくはこちらをご覧ください。
汎用データ更新ツールのREADME.md
`
data-updater
A tool for batch data operations (UPDATE, DELETE, or NULL) on database records using primary keys with configurable conditions.
Features
- Cursor-based batch processing with configurable batch size
- Three operation types: UPDATE, DELETE, and NULL (before_sql only)
- Parallel execution: SELECT and UPDATE operations run concurrently for better performance
- Replica support: Route SELECT queries to replica database to reduce primary load
- JOIN support: Complex queries with multiple tables to identify target records
- Before SQL hooks: Execute SQL before each batch (archiving, audit logging)
- Custom ORDER BY: Process records in custom order
- Interactive commands for runtime control (similar to gh-ost)
- YAML-based configuration: All settings in a single configuration file
- Real-time status monitoring with ETA
- Pause/resume functionality
- Dynamic configuration updates
- Socket-based remote control interface
- Failed ID tracking: Records failed updates and displays summary on exit
- For batch-level failures: Records only first and last ID of the failed batch
- For partial updates: Logs the discrepancy but doesn't track individual IDs
- Writes detailed report to file if >100 failures
- Automatic resume: Saves progress to status file after each batch
- Automatically resumes from last successful position on restart
- No need to manually track progress or specify resume points
- Status files are adapter/table specific for multiple concurrent jobs
Install
go install github.com/xxx/cmd/data-updaterQuick Start
- Create a configuration file:
# config.yaml
database:
host: localhost
port: 3306
user: myuser
password: mypassword
database: mydatabase
options:
charset: utf8mb4
parseTime: "true"
processing:
batch_size: 1000
interval: 1s
adapter:
table_name: users
pk_columns:
- user_id
update_sql: "status = 'processed', updated_at = NOW()"
where_clause: "status = 'pending'"- Run the tool:
# Normal mode - executes updates
data-updater --config config.yaml
# Debug mode - SELECT only, no updates
data-updater --config config.yaml --debug
# Resume from specific ID
data-updater --config config.yaml --resume-from "12345"
# Show version
data-updater -vOperation Types
The tool supports three operation types:
UPDATE (default)
Updates records matching the specified conditions.
adapter:
table_name: users
pk_columns: ["user_id"]
operation: update # or omit (default)
update_sql: "status = 'processed', updated_at = NOW()"
where_clause: "status = 'pending'"DELETE
Deletes records matching the specified conditions.
Important: The DELETE operation permanently removes data. Always test with --debug` mode first.
adapter:
table_name: old_logs
pk_columns: ["id"]
operation: delete
where_clause: "created_at < '2023-01-01'"NULL
Executes only before_sql without UPDATE or DELETE. Useful for archiving, copying, or transforming data.
adapter:
table_name: items
pk_columns: ["id"]
operation: "null"
before_sql: |
INSERT INTO archived_items (id, name, created_at, archived_at)
SELECT id, name, created_at, NOW() FROM items WHERE id IN (?)
where_clause: "status = 'inactive'"Configuration
All settings are managed through a YAML configuration file:
Database Configuration
database:
host: localhost # Database host (default: localhost)
port: 3306 # Database port (default: 3306)
user: myuser # Database user (required)
password: mypassword # Database password (required)
database: mydatabase # Database name (required)
options: # MySQL connection options (optional)
charset: utf8mb4
parseTime: "true"
loc: UTC
timeout: 30s
# Replica configuration (optional)
replica_host: replica-db.example.com # SELECT queries go here
replica_port: 3306 # Defaults to primary port
replica_user: replica_user # Defaults to primary user
replica_password: replica_password # Defaults to primary passwordWhen replica_host is configured:
- SELECT queries (fetching PKs, COUNT) are routed to replica
- UPDATE/DELETE operations always use primary
- SELECT FOR UPDATE (pessimistic locking) uses primary
Processing Configuration
processing:
batch_size: 1000 # Number of rows per batch
interval: 1s # Time between batches (e.g., 1s, 500ms, 2m)
debug_mode: false # Log queries without executing updates
pipeline_buffer: 1 # Buffer size for parallel SELECT/UPDATE
pessimistic_locking: true # Use SELECT FOR UPDATE (default: true)
lock_retry_count: 3 # Number of lock acquisition retriesAdapter Configuration
adapter:
table_name: users # Target table (required)
table_alias: u # Alias for main table (required when using joins)
pk_columns: # Primary key column(s) (required)
- user_id
operation: update # "update" (default), "delete", or "null"
update_sql: "status = 'processed'" # SET clause (required for update)
before_sql: "..." # SQL to execute before operation (required for null)
where_clause: "status = 'pending'" # Additional WHERE (optional)
join_clause: "..." # JOIN statements (optional)
order_by: "created_at" # Custom ORDER BY (optional, defaults to PK)Interactive Control
interactive:
enabled: true # Enable socket-based control
socket_path: "/tmp/data-updater.sock" # Unix socket pathStatus File (Automatic Resume)
status_file:
enabled: true # Enable automatic resume
path: "/var/lib/status" # Custom path (optional)Advanced Features
JOIN Support
Use JOINs for complex queries that need to reference multiple tables:
adapter:
table_name: items
table_alias: i
pk_columns: ["id"]
operation: delete
join_clause: |
LEFT JOIN transaction_evidences te ON te.item_id = i.id
where_clause: |
i.status = 'cancel'
AND te.id IS NULLHow it works:
- SELECT query uses JOINs + WHERE to fetch PKs
- DELETE/UPDATE query only uses primary keys (no JOINs)
Before SQL (Pre-operation Hook)
Execute SQL before each batch within the same transaction:
adapter:
table_name: items
pk_columns: ["id"]
operation: delete
before_sql: |
INSERT INTO deleted_item_ids (id, created, deleted)
SELECT id, created, NOW() FROM items WHERE id IN (?)
where_clause: "status = 'cancel'"Notes:
- Use
IN (?)placeholder - expanded to all PKs in the batch - For composite keys:
(col1, col2) IN (?) - Executed atomically with the main operation
- If
before_sqlfails, entire transaction is rolled back
Custom ORDER BY
Process records in a specific order:
adapter:
table_name: items
table_alias: i
pk_columns: ["id"]
order_by: "i.created, i.id"Understanding update_sql
The update_sql parameter specifies the SET clause. Do not include trailing semicolons.
# Simple status update
update_sql: "status = 'processed'"
# Results in: UPDATE users SET status = 'processed' WHERE user_id IN (...)
# Multiple columns
update_sql: "status = 'archived', archived_at = NOW()"
# Using CASE statements
update_sql: |
status = CASE
WHEN last_login < NOW() - INTERVAL 30 DAY THEN 'inactive'
ELSE 'active'
ENDImportant:
- Do NOT include UPDATE keyword, table name, or WHERE clause
- The tool automatically adds WHERE pk IN (...) for batch updates
Using where_clause for Idempotent Operations
Make updates safe to run multiple times:
adapter:
update_sql: "status = 'processed', processed_at = NOW()"
where_clause: "status = 'pending'"
# Results in: UPDATE users SET ... WHERE user_id IN (...) AND status = 'pending'Command Line Options
--config, -c: Path to YAML configuration file (required for operation)--debug, -d: Enable debug mode (SELECT only, no updates)--resume-from: Manual resume from specific primary key(s)--total-rows: Skip initial COUNT query and use provided value (e.g.,--total-rows 1000000). Also used as a stop condition based onrows_handled(rows selected), notrows_processed(rows affected by UPDATE)--pk-source: Read PKs from file/directory instead of table (local path orgs://bucket/path)--version, -v: Show version information--help, -h: Show help message
Interactive Commands
Control the tool via Unix socket:
# Show status
echo "status" | nc -U /tmp/data-updater.sock
# Pause/resume processing
echo "pause" | nc -U /tmp/data-updater.sock
echo "resume" | nc -U /tmp/data-updater.sock
# Change batch size
echo "batch-size 5000" | nc -U /tmp/data-updater.sock
# Change interval
echo "interval 500ms" | nc -U /tmp/data-updater.sock
# Show help
echo "help" | nc -U /tmp/data-updater.sock
# Auto-interval: show status / enable / disable / set min
echo "auto-interval" | nc -U /tmp/data-updater.sock
echo "auto-interval on" | nc -U /tmp/data-updater.sock
echo "auto-interval off" | nc -U /tmp/data-updater.sock
echo "auto-interval min 200ms" | nc -U /tmp/data-updater.sockDebug Mode
Debug mode allows you to verify queries without executing updates:
data-updater --config config.yaml --debugExample output:
INFO DEBUG: UPDATE query that would be executed query="UPDATE users SET status = 'processed' WHERE user_id IN (?,?,?)" args_count=3 primary_keys_count=3Resume Feature
Automatic Resume (Default)
- Progress saved after each successful batch
- On restart, automatically resumes from last position
- Status files named:
data-updater-{table}-{adapter}.status
Manual Resume
# Single primary key
data-updater --config config.yaml --resume-from "12345"
# Composite primary key
data-updater --config config.yaml --resume-from "tenant1,12345"Resume Priority
- Manual
--resume-from(highest) - Status file (if exists)
- Adapter's initial cursor (default)
Skip COUNT Query
Use --total-rows to skip the initial COUNT query:
# Useful for large tables or retries where you know the total
data-updater --config config.yaml --total-rows 1000000This is particularly useful when:
- Retrying after interruption (you already know the count)
- Large tables where COUNT(*) is expensive
- Faster startup when exact count is not critical
Stop condition: --total-rows stops the selector after handling (selecting) that many rows. The stop check uses rows_handled, not rows_processed. This means it works correctly even when UPDATE affects 0 rows (e.g., records already deleted by another process or filtered out by where_clause).
PK Source (Read PKs from File)
Read primary keys from a file instead of the database table.
Important: --total-rows is required when using --pk-source for accurate progress/ETA calculation.
# Count lines first
wc -l failed-ids.txt
# 1500 failed-ids.txt
# From local file (--total-rows is required)
data-updater --config config.yaml --pk-source "./failed-ids.txt" --total-rows 1500
# From local directory (processes all files)
data-updater --config config.yaml --pk-source "./failed-ids/" --total-rows 5000
# From GCS file
data-updater --config config.yaml --pk-source "gs://bucket/failed-ids.txt" --total-rows 1500
# From GCS directory
data-updater --config config.yaml --pk-source "gs://bucket/failed-ids/" --total-rows 10000Or configure in YAML:
pk_source:
path: "gs://my-bucket/failed-ids/"
gcs_project: "my-gcp-project" # Required for GCS paths
skip_header: true # Skip first line (for BQ exports with header)
prefetch_buffer: 5 # Number of GCS files to prefetch ahead (default: 5)GCS Authentication:
GCS access uses Application Default Credentials (ADC). Set up with:
gcloud auth application-default login
gcloud auth application-default set-quota-project <project>File format (CSV):
# Comments starting with # are ignored
12345
12346
tenant1,12345
"value,with,comma",12346Skip header (for BigQuery exports):
BigQuery exports include a header row with column names. Use skip_header: true to skip it:
id
12345
12346Features:
- Files are read line by line (streaming) to minimize memory usage
- GCS files are prefetched in the background to eliminate download latency (configurable buffer, default 5)
- Directory support: processes all files in sorted order
- Resume support: tracks progress per file and line number
- Can be combined with
where_clauseto filter PKs from file
Status Metrics
Status logs and the status interactive command report two counters:
rows_processed: rows successfully affected by the UPDATE/DELETE operation (i.e., the database reported a row change)rows_handled: rows selected and sent through the pipeline, regardless of whether the UPDATE/DELETE actually modified the row. This counter is used for progress percentage and ETA calculations
When rows_handled is higher than rows_processed, it typically means some rows were already in the desired state (e.g., already deleted or already updated by a previous run).
Hibernate (Health-Check Based Pause)
The hibernate feature allows the processor to periodically run an external health-check script. If the script returns a non-zero exit code (indicating a problem), the processor pauses for a configurable period, then automatically resumes.
Configuration
processing:
hibernate_script_path: "/path/to/check.sh"
hibernate_pause_period: 30s
hibernate_check_interval: 15shibernate_script_path: Path to an executable script. The script is run at the configured check interval (default 15s). Exit code 0 means healthy; any non-zero exit code triggers hibernation.hibernate_pause_period: How long the processor pauses when the script signals a problem. Required whenhibernate_script_pathis set.hibernate_check_interval: How often the health-check script is executed. Defaults to15s.
Behavior
- The health-check script is executed at the configured interval (default 15s) while the processor is running
- If the script exits with code 0, processing continues normally
- If the script exits with a non-zero code, the processor pauses for
hibernate_pause_period, then automatically resumes - The
hibernation_countmetric tracks the total number of times hibernation was triggered (visible instatuscommand output and periodic logs)
Use Cases
- Pause when database replication lag exceeds a threshold
- Pause when disk space is low
- Pause during maintenance windows
- Any custom operator-defined health check
Hourly Summary Log
For long-running jobs, you can enable an hourly summary log that writes JSON entries to a dedicated file. A final summary is also written on shutdown, so short-lived runs still produce a report.
processing:
hourly_log_path: "/var/log/data-updater/hourly.log"Each JSON line includes:
rows_processed_total/rows_processed_delta— records processed in total and during the periodrows_failed_total/rows_failed_deltahibernation_count_total/hibernation_count_deltatotal_rows,rows_remaining,progress— overall progressinteractive_commands— commands issued via socket during the period (with timestamps)summary_type—"hourly"or"final"
If hourly_log_path is not set, the reporter is not started.
Auto-Interval Adjustment
Automatically adjusts the processing interval based on the hibernation ratio observed each hour. When many hibernate checks fail (high ratio), the interval increases (slows down). When the ratio is low, the interval decreases (speeds up).
processing:
auto_interval_enabled: true
auto_interval_high_ratio: 0.3 # ratio >= this → slow down (default: 0.3)
auto_interval_low_ratio: 0 # ratio <= this → speed up (default: 0)
auto_interval_increase_factor: 1.25 # multiply interval by this to slow down (default: 1.25)
auto_interval_decrease_factor: 0.8 # multiply interval by this to speed up (default: 0.8)
auto_interval_min: 200ms # floor for interval (default: initi関連記事
今日のまとめ
AI日報で今日の重要ニュースをまとめ読み